This is page 4 of 19. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ ├── release
│ │ │ ├── beta.md
│ │ │ ├── changelog.md
│ │ │ ├── release-check.md
│ │ │ └── release.md
│ │ ├── spec.md
│ │ └── test-live.md
│ └── settings.json
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── ARCHITECTURE.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ ├── Docker.md
│ └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 6830751f5fb6_merge_multiple_heads.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ │ ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│ │ └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── container.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ ├── template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ └── routers
│ │ ├── __init__.py
│ │ ├── directory_router.py
│ │ ├── importer_router.py
│ │ ├── knowledge_router.py
│ │ ├── memory_router.py
│ │ ├── project_router.py
│ │ ├── prompt_router.py
│ │ ├── resource_router.py
│ │ └── search_router.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── rclone_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── format.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ ├── telemetry.py
│ │ │ └── tool.py
│ │ ├── container.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── db.py
│ │ ├── importers.py
│ │ ├── projects.py
│ │ ├── repositories.py
│ │ └── services.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ ├── directory.py
│ │ │ ├── knowledge.py
│ │ │ ├── memory.py
│ │ │ ├── project.py
│ │ │ ├── resource.py
│ │ │ └── search.py
│ │ ├── container.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── project_resolver.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── postgres_search_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ ├── search_index_row.py
│ │ ├── search_repository_base.py
│ │ ├── search_repository.py
│ │ └── sqlite_search_repository.py
│ ├── runtime.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ ├── sync_report.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── entity.py
│ │ └── resource.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── coordinator.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── telemetry.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ └── test_disable_permalinks_integration.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_api_container.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ ├── test_template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_resource_router.py
│ │ └── test_search_router.py
│ ├── cli
│ │ ├── cloud
│ │ │ ├── test_cloud_api_client_and_utils.py
│ │ │ ├── test_rclone_config_and_bmignore_filters.py
│ │ │ └── test_upload_path.py
│ │ ├── conftest.py
│ │ ├── test_auth_cli_auth.py
│ │ ├── test_cli_container.py
│ │ ├── test_cli_exit.py
│ │ ├── test_cli_tool_exit.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ ├── test_project_add_with_local_path.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_conversation_indexing.py
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ └── test_clients.py
│ │ ├── conftest.py
│ │ ├── test_async_client_modes.py
│ │ ├── test_mcp_container.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_project_context.py
│ │ ├── test_prompts.py
│ │ ├── test_recent_activity_prompt_modes.py
│ │ ├── test_resources.py
│ │ ├── test_server_lifespan_branches.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_project_management.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note_kebab_filenames.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── README.md
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_postgres_search_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_relation_response_reference_resolution.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization_cloud_mode_branches.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_coordinator.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_atomic_adds.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ ├── test_project_resolver.py
│ ├── test_rclone_commands.py
│ ├── test_runtime.py
│ ├── test_telemetry.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_timezone_utils.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/list_directory.py:
--------------------------------------------------------------------------------
```python
"""List directory tool for Basic Memory MCP server."""
from typing import Optional
from loguru import logger
from fastmcp import Context
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project
from basic_memory.mcp.server import mcp
from basic_memory.telemetry import track_mcp_tool
@mcp.tool(
description="List directory contents with filtering and depth control.",
)
async def list_directory(
dir_name: str = "/",
depth: int = 1,
file_name_glob: Optional[str] = None,
project: Optional[str] = None,
context: Context | None = None,
) -> str:
"""List directory contents from the knowledge base with optional filtering.
This tool provides 'ls' functionality for browsing the knowledge base directory structure.
It can list immediate children or recursively explore subdirectories with depth control,
and supports glob pattern filtering for finding specific files.
Args:
dir_name: Directory path to list (default: root "/")
Examples: "/", "/projects", "/research/ml"
depth: Recursion depth (1-10, default: 1 for immediate children only)
Higher values show subdirectory contents recursively
file_name_glob: Optional glob pattern for filtering file names
Examples: "*.md", "*meeting*", "project_*"
project: Project name to list directory from. Optional - server will resolve using hierarchy.
If unknown, use list_memory_projects() to discover available projects.
context: Optional FastMCP context for performance caching.
Returns:
Formatted listing of directory contents with file metadata
Examples:
# List root directory contents
list_directory()
# List specific folder
list_directory(dir_name="/projects")
# Find all markdown files
list_directory(file_name_glob="*.md")
# Deep exploration of research folder
list_directory(dir_name="/research", depth=3)
# Find meeting notes in projects folder
list_directory(dir_name="/projects", file_name_glob="*meeting*")
# Explicit project specification
list_directory(project="work-docs", dir_name="/projects")
Raises:
ToolError: If project doesn't exist or directory path is invalid
"""
track_mcp_tool("list_directory")
async with get_client() as client:
active_project = await get_active_project(client, project, context)
logger.debug(
f"Listing directory '{dir_name}' in project {project} with depth={depth}, glob='{file_name_glob}'"
)
# Import here to avoid circular import
from basic_memory.mcp.clients import DirectoryClient
# Use typed DirectoryClient for API calls
directory_client = DirectoryClient(client, active_project.external_id)
nodes = await directory_client.list(dir_name, depth=depth, file_name_glob=file_name_glob)
if not nodes:
filter_desc = ""
if file_name_glob:
filter_desc = f" matching '{file_name_glob}'"
return f"No files found in directory '{dir_name}'{filter_desc}"
# Format the results
output_lines = []
if file_name_glob:
output_lines.append(
f"Files in '{dir_name}' matching '{file_name_glob}' (depth {depth}):"
)
else:
output_lines.append(f"Contents of '{dir_name}' (depth {depth}):")
output_lines.append("")
# Group by type and sort
directories = [n for n in nodes if n["type"] == "directory"]
files = [n for n in nodes if n["type"] == "file"]
# Sort by name
directories.sort(key=lambda x: x["name"])
files.sort(key=lambda x: x["name"])
# Display directories first
for node in directories:
path_display = node["directory_path"]
output_lines.append(f"📁 {node['name']:<30} {path_display}")
# Add separator if we have both directories and files
if directories and files:
output_lines.append("")
# Display files with metadata
for node in files:
path_display = node["directory_path"]
title = node.get("title", "")
updated = node.get("updated_at", "")
# Remove leading slash if present, requesting the file via read_note does not use the beginning slash'
if path_display.startswith("/"):
path_display = path_display[1:]
# Format date if available
date_str = ""
if updated:
try:
from datetime import datetime
dt = datetime.fromisoformat(updated.replace("Z", "+00:00"))
date_str = dt.strftime("%Y-%m-%d")
except Exception: # pragma: no cover
date_str = updated[:10] if len(updated) >= 10 else ""
# Create formatted line
file_line = f"📄 {node['name']:<30} {path_display}"
if title and title != node["name"]:
file_line += f" | {title}"
if date_str:
file_line += f" | {date_str}"
output_lines.append(file_line)
# Add summary
output_lines.append("")
total_count = len(directories) + len(files)
summary_parts = []
if directories:
summary_parts.append(
f"{len(directories)} director{'y' if len(directories) == 1 else 'ies'}"
)
if files:
summary_parts.append(f"{len(files)} file{'s' if len(files) != 1 else ''}")
output_lines.append(f"Total: {total_count} items ({', '.join(summary_parts)})")
return "\n".join(output_lines)
```
--------------------------------------------------------------------------------
/tests/cli/cloud/test_cloud_api_client_and_utils.py:
--------------------------------------------------------------------------------
```python
from contextlib import asynccontextmanager
import json
import httpx
import pytest
from basic_memory.cli.auth import CLIAuth
from basic_memory.cli.commands.cloud.api_client import (
SubscriptionRequiredError,
make_api_request,
)
from basic_memory.cli.commands.cloud.cloud_utils import (
create_cloud_project,
fetch_cloud_projects,
project_exists,
)
@pytest.mark.asyncio
async def test_make_api_request_success_injects_auth_and_accept_encoding(
config_home, config_manager
):
# Arrange: create a token on disk so CLIAuth can authenticate without any network.
auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
auth.token_file.parent.mkdir(parents=True, exist_ok=True)
auth.token_file.write_text(
'{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
encoding="utf-8",
)
async def handler(request: httpx.Request) -> httpx.Response:
assert request.headers.get("authorization") == "Bearer token-123"
assert request.headers.get("accept-encoding") == "identity"
return httpx.Response(200, json={"ok": True})
transport = httpx.MockTransport(handler)
@asynccontextmanager
async def http_client_factory():
async with httpx.AsyncClient(transport=transport) as client:
yield client
# Act
resp = await make_api_request(
method="GET",
url="https://cloud.example.test/proxy/health",
auth=auth,
http_client_factory=http_client_factory,
)
# Assert
assert resp.json()["ok"] is True
@pytest.mark.asyncio
async def test_make_api_request_raises_subscription_required(config_home, config_manager):
auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
auth.token_file.parent.mkdir(parents=True, exist_ok=True)
auth.token_file.write_text(
'{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
encoding="utf-8",
)
async def handler(_request: httpx.Request) -> httpx.Response:
return httpx.Response(
403,
json={
"detail": {
"error": "subscription_required",
"message": "Need subscription",
"subscribe_url": "https://example.test/subscribe",
}
},
)
transport = httpx.MockTransport(handler)
@asynccontextmanager
async def http_client_factory():
async with httpx.AsyncClient(transport=transport) as client:
yield client
with pytest.raises(SubscriptionRequiredError) as exc:
await make_api_request(
method="GET",
url="https://cloud.example.test/proxy/health",
auth=auth,
http_client_factory=http_client_factory,
)
assert exc.value.subscribe_url == "https://example.test/subscribe"
@pytest.mark.asyncio
async def test_cloud_utils_fetch_and_exists_and_create_project(
config_home, config_manager, monkeypatch
):
# Point config.cloud_host at our mocked base URL
config = config_manager.load_config()
config.cloud_host = "https://cloud.example.test"
config_manager.save_config(config)
auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
auth.token_file.parent.mkdir(parents=True, exist_ok=True)
auth.token_file.write_text(
'{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
encoding="utf-8",
)
seen = {"create_payload": None}
async def handler(request: httpx.Request) -> httpx.Response:
if request.method == "GET" and request.url.path == "/proxy/projects/projects":
return httpx.Response(
200,
json={
"projects": [
{"id": 1, "name": "alpha", "path": "alpha", "is_default": True},
{"id": 2, "name": "beta", "path": "beta", "is_default": False},
]
},
)
if request.method == "POST" and request.url.path == "/proxy/projects/projects":
# httpx.Request doesn't have .json(); parse bytes payload.
seen["create_payload"] = json.loads(request.content.decode("utf-8"))
return httpx.Response(
200,
json={
"message": "created",
"status": "success",
"default": False,
"old_project": None,
"new_project": {
"name": seen["create_payload"]["name"],
"path": seen["create_payload"]["path"],
},
},
)
raise AssertionError(f"Unexpected request: {request.method} {request.url}")
transport = httpx.MockTransport(handler)
@asynccontextmanager
async def http_client_factory():
async with httpx.AsyncClient(
transport=transport, base_url="https://cloud.example.test"
) as client:
yield client
async def api_request(**kwargs):
return await make_api_request(auth=auth, http_client_factory=http_client_factory, **kwargs)
projects = await fetch_cloud_projects(api_request=api_request)
assert [p.name for p in projects.projects] == ["alpha", "beta"]
assert await project_exists("alpha", api_request=api_request) is True
assert await project_exists("missing", api_request=api_request) is False
created = await create_cloud_project("My Project", api_request=api_request)
assert created.new_project is not None
assert created.new_project["name"] == "My Project"
# Path should be permalink-like (kebab)
assert seen["create_payload"]["path"] == "my-project"
```
--------------------------------------------------------------------------------
/src/basic_memory/api/v2/routers/importer_router.py:
--------------------------------------------------------------------------------
```python
"""V2 Import Router - ID-based data import operations.
This router uses v2 dependencies for consistent project handling with external_id UUIDs.
Import endpoints use project_id in the path for consistency with other v2 endpoints.
"""
import json
import logging
from fastapi import APIRouter, Form, HTTPException, UploadFile, status, Path
from basic_memory.deps import (
ChatGPTImporterV2ExternalDep,
ClaudeConversationsImporterV2ExternalDep,
ClaudeProjectsImporterV2ExternalDep,
MemoryJsonImporterV2ExternalDep,
)
from basic_memory.importers import Importer
from basic_memory.schemas.importer import (
ChatImportResult,
EntityImportResult,
ProjectImportResult,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/import", tags=["import-v2"])
@router.post("/chatgpt", response_model=ChatImportResult)
async def import_chatgpt(
importer: ChatGPTImporterV2ExternalDep,
file: UploadFile,
project_id: str = Path(..., description="Project external UUID"),
folder: str = Form("conversations"),
) -> ChatImportResult:
"""Import conversations from ChatGPT JSON export.
Args:
project_id: Project external UUID from URL path
file: The ChatGPT conversations.json file.
folder: The folder to place the files in.
importer: ChatGPT importer instance.
Returns:
ChatImportResult with import statistics.
Raises:
HTTPException: If import fails.
"""
logger.info(f"V2 Importing ChatGPT conversations for project {project_id}")
return await import_file(importer, file, folder)
@router.post("/claude/conversations", response_model=ChatImportResult)
async def import_claude_conversations(
importer: ClaudeConversationsImporterV2ExternalDep,
file: UploadFile,
project_id: str = Path(..., description="Project external UUID"),
folder: str = Form("conversations"),
) -> ChatImportResult:
"""Import conversations from Claude conversations.json export.
Args:
project_id: Project external UUID from URL path
file: The Claude conversations.json file.
folder: The folder to place the files in.
importer: Claude conversations importer instance.
Returns:
ChatImportResult with import statistics.
Raises:
HTTPException: If import fails.
"""
logger.info(f"V2 Importing Claude conversations for project {project_id}")
return await import_file(importer, file, folder)
@router.post("/claude/projects", response_model=ProjectImportResult)
async def import_claude_projects(
importer: ClaudeProjectsImporterV2ExternalDep,
file: UploadFile,
project_id: str = Path(..., description="Project external UUID"),
folder: str = Form("projects"),
) -> ProjectImportResult:
"""Import projects from Claude projects.json export.
Args:
project_id: Project external UUID from URL path
file: The Claude projects.json file.
folder: The base folder to place the files in.
importer: Claude projects importer instance.
Returns:
ProjectImportResult with import statistics.
Raises:
HTTPException: If import fails.
"""
logger.info(f"V2 Importing Claude projects for project {project_id}")
return await import_file(importer, file, folder)
@router.post("/memory-json", response_model=EntityImportResult)
async def import_memory_json(
importer: MemoryJsonImporterV2ExternalDep,
file: UploadFile,
project_id: str = Path(..., description="Project external UUID"),
folder: str = Form("conversations"),
) -> EntityImportResult:
"""Import entities and relations from a memory.json file.
Args:
project_id: Project external UUID from URL path
file: The memory.json file.
folder: Optional destination folder within the project.
importer: Memory JSON importer instance.
Returns:
EntityImportResult with import statistics.
Raises:
HTTPException: If import fails.
"""
logger.info(f"V2 Importing memory.json for project {project_id}")
try:
file_data = []
file_bytes = await file.read()
file_str = file_bytes.decode("utf-8")
for line in file_str.splitlines():
json_data = json.loads(line)
file_data.append(json_data)
result = await importer.import_data(file_data, folder)
if not result.success: # pragma: no cover
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=result.error_message or "Import failed",
)
except Exception as e:
logger.exception("V2 Import failed")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Import failed: {str(e)}",
)
return result
async def import_file(importer: Importer, file: UploadFile, destination_folder: str):
"""Helper function to import a file using an importer instance.
Args:
importer: The importer instance to use
file: The file to import
destination_folder: Destination folder for imported content
Returns:
Import result from the importer
Raises:
HTTPException: If import fails
"""
try:
# Process file
json_data = json.load(file.file)
result = await importer.import_data(json_data, destination_folder)
if not result.success: # pragma: no cover
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=result.error_message or "Import failed",
)
return result
except Exception as e:
logger.exception("V2 Import failed")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Import failed: {str(e)}",
)
```
--------------------------------------------------------------------------------
/test-int/cli/test_project_commands_integration.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for project CLI commands."""
import tempfile
from pathlib import Path
from typer.testing import CliRunner
from basic_memory.cli.main import app as cli_app
def test_project_list(app, app_config, test_project, config_manager):
"""Test 'bm project list' command shows projects."""
runner = CliRunner()
result = runner.invoke(cli_app, ["project", "list"])
if result.exit_code != 0:
print(f"STDOUT: {result.stdout}")
print(f"STDERR: {result.stderr}")
print(f"Exception: {result.exception}")
assert result.exit_code == 0
assert "test-project" in result.stdout
assert "[X]" in result.stdout # default marker
def test_project_info(app, app_config, test_project, config_manager):
"""Test 'bm project info' command shows project details."""
runner = CliRunner()
result = runner.invoke(cli_app, ["project", "info", "test-project"])
if result.exit_code != 0:
print(f"STDOUT: {result.stdout}")
print(f"STDERR: {result.stderr}")
assert result.exit_code == 0
assert "Basic Memory Project Info" in result.stdout
assert "test-project" in result.stdout
assert "Statistics" in result.stdout
def test_project_info_json(app, app_config, test_project, config_manager):
"""Test 'bm project info --json' command outputs valid JSON."""
import json
runner = CliRunner()
result = runner.invoke(cli_app, ["project", "info", "test-project", "--json"])
if result.exit_code != 0:
print(f"STDOUT: {result.stdout}")
print(f"STDERR: {result.stderr}")
assert result.exit_code == 0
# Parse JSON to verify it's valid
data = json.loads(result.stdout)
assert data["project_name"] == "test-project"
assert "statistics" in data
assert "system" in data
def test_project_add_and_remove(app, app_config, config_manager):
"""Test adding and removing a project."""
runner = CliRunner()
# Use a separate temporary directory to avoid nested path conflicts
with tempfile.TemporaryDirectory() as temp_dir:
new_project_path = Path(temp_dir) / "new-project"
new_project_path.mkdir()
# Add project
result = runner.invoke(cli_app, ["project", "add", "new-project", str(new_project_path)])
if result.exit_code != 0:
print(f"STDOUT: {result.stdout}")
print(f"STDERR: {result.stderr}")
assert result.exit_code == 0
assert (
"Project 'new-project' added successfully" in result.stdout
or "added" in result.stdout.lower()
)
# Verify it shows up in list
result = runner.invoke(cli_app, ["project", "list"])
assert result.exit_code == 0
assert "new-project" in result.stdout
# Remove project
result = runner.invoke(cli_app, ["project", "remove", "new-project"])
assert result.exit_code == 0
assert "removed" in result.stdout.lower() or "deleted" in result.stdout.lower()
def test_project_set_default(app, app_config, config_manager):
"""Test setting default project."""
runner = CliRunner()
# Use a separate temporary directory to avoid nested path conflicts
with tempfile.TemporaryDirectory() as temp_dir:
new_project_path = Path(temp_dir) / "another-project"
new_project_path.mkdir()
# Add a second project
result = runner.invoke(
cli_app, ["project", "add", "another-project", str(new_project_path)]
)
if result.exit_code != 0:
print(f"STDOUT: {result.stdout}")
print(f"STDERR: {result.stderr}")
assert result.exit_code == 0
# Set as default
result = runner.invoke(cli_app, ["project", "default", "another-project"])
if result.exit_code != 0:
print(f"STDOUT: {result.stdout}")
print(f"STDERR: {result.stderr}")
assert result.exit_code == 0
assert "default" in result.stdout.lower()
# Verify in list
result = runner.invoke(cli_app, ["project", "list"])
assert result.exit_code == 0
# The new project should have the [X] marker now
lines = result.stdout.split("\n")
for line in lines:
if "another-project" in line:
assert "[X]" in line
def test_remove_main_project(app, app_config, config_manager):
"""Test that removing main project then listing projects prevents main from reappearing (issue #397)."""
runner = CliRunner()
# Create separate temp dirs for each project
with (
tempfile.TemporaryDirectory() as main_dir,
tempfile.TemporaryDirectory() as new_default_dir,
):
main_path = Path(main_dir)
new_default_path = Path(new_default_dir)
# Ensure main exists
result = runner.invoke(cli_app, ["project", "list"])
if "main" not in result.stdout:
result = runner.invoke(cli_app, ["project", "add", "main", str(main_path)])
print(result.stdout)
assert result.exit_code == 0
# Confirm main is present
result = runner.invoke(cli_app, ["project", "list"])
assert "main" in result.stdout
# Add a second project
result = runner.invoke(cli_app, ["project", "add", "new_default", str(new_default_path)])
assert result.exit_code == 0
# Set new_default as default (if needed)
result = runner.invoke(cli_app, ["project", "default", "new_default"])
assert result.exit_code == 0
# Remove main
result = runner.invoke(cli_app, ["project", "remove", "main"])
assert result.exit_code == 0
# Confirm only new_default exists and main does not
result = runner.invoke(cli_app, ["project", "list"])
assert result.exit_code == 0
assert "main" not in result.stdout
assert "new_default" in result.stdout
```
--------------------------------------------------------------------------------
/tests/sync/test_tmp_files.py:
--------------------------------------------------------------------------------
```python
"""Test proper handling of .tmp files during sync."""
import asyncio
from pathlib import Path
import pytest
from watchfiles import Change
async def create_test_file(path: Path, content: str = "test content") -> None:
"""Create a test file with given content."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
@pytest.mark.asyncio
async def test_temp_file_filter(watch_service, app_config, project_config, test_project):
"""Test that .tmp files are correctly filtered out."""
# Test filter_changes method directly
tmp_path = Path(test_project.path) / "test.tmp"
assert not watch_service.filter_changes(Change.added, str(tmp_path))
# Test with valid file
valid_path = Path(test_project.path) / "test.md"
assert watch_service.filter_changes(Change.added, str(valid_path))
@pytest.mark.asyncio
async def test_handle_tmp_files(watch_service, project_config, test_project, sync_service):
"""Test handling of .tmp files during sync process."""
project_dir = Path(test_project.path)
# Create a .tmp file - this simulates a file being written with write_file_atomic
tmp_file = project_dir / "test.tmp"
await create_test_file(tmp_file, "This is a temporary file")
# Create the target final file
final_file = project_dir / "test.md"
await create_test_file(final_file, "This is the final file")
# Setup changes that include both the .tmp and final file
changes = {
(Change.added, str(tmp_file)),
(Change.added, str(final_file)),
}
# Handle changes
await watch_service.handle_changes(test_project, changes)
# Verify only the final file got an entity
tmp_entity = await sync_service.entity_repository.get_by_file_path("test.tmp")
final_entity = await sync_service.entity_repository.get_by_file_path("test.md")
assert tmp_entity is None, "Temp file should not have an entity"
assert final_entity is not None, "Final file should have an entity"
@pytest.mark.asyncio
async def test_atomic_write_tmp_file_handling(
watch_service, project_config, test_project, sync_service
):
"""Test handling of file changes during atomic write operations."""
project_dir = project_config.home
# This test simulates the full atomic write process:
# 1. First a .tmp file is created
# 2. Then the .tmp file is renamed to the final file
# 3. Both events are processed by the watch service
# Setup file paths
tmp_path = project_dir / "document.tmp"
final_path = project_dir / "document.md"
# Create mockup of the atomic write process
await create_test_file(tmp_path, "Content for document")
# First batch of changes - .tmp file created
changes1 = {(Change.added, str(tmp_path))}
# Process first batch
await watch_service.handle_changes(test_project, changes1)
# Now "replace" the temp file with the final file
tmp_path.rename(final_path)
# Second batch of changes - .tmp file deleted, final file added
changes2 = {(Change.deleted, str(tmp_path)), (Change.added, str(final_path))}
# Process second batch
await watch_service.handle_changes(test_project, changes2)
# Verify only the final file is in the database
tmp_entity = await sync_service.entity_repository.get_by_file_path("document.tmp")
final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
assert tmp_entity is None, "Temp file should not have an entity"
assert final_entity is not None, "Final file should have an entity"
# Check events
new_events = [e for e in watch_service.state.recent_events if e.action == "new"]
assert len(new_events) == 1
assert new_events[0].path == "document.md"
@pytest.mark.asyncio
async def test_rapid_atomic_writes(watch_service, project_config, test_project, sync_service):
"""Test handling of rapid atomic writes to the same destination."""
project_dir = Path(test_project.path)
# This test simulates multiple rapid atomic writes to the same file:
# 1. Several .tmp files are created one after another
# 2. Each is then renamed to the same final file
# 3. Events are batched and processed together
# Setup file paths
tmp1_path = project_dir / "document.1.tmp"
tmp2_path = project_dir / "document.2.tmp"
final_path = project_dir / "document.md"
# Create multiple temp files that will be used in sequence
await create_test_file(tmp1_path, "First version")
await create_test_file(tmp2_path, "Second version")
# Simulate the first atomic write
tmp1_path.replace(final_path)
# Brief pause to ensure file system registers the change
await asyncio.sleep(0.1)
# Read content to verify
content1 = final_path.read_text(encoding="utf-8")
assert content1 == "First version"
# Simulate the second atomic write
tmp2_path.replace(final_path)
# Verify content was updated
content2 = final_path.read_text(encoding="utf-8")
assert content2 == "Second version"
# Create a batch of changes that might arrive in mixed order
changes = {
(Change.added, str(tmp1_path)),
(Change.deleted, str(tmp1_path)),
(Change.added, str(tmp2_path)),
(Change.deleted, str(tmp2_path)),
(Change.added, str(final_path)),
(Change.modified, str(final_path)),
}
# Process all changes
await watch_service.handle_changes(test_project, changes)
# Verify only the final file is in the database
final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
assert final_entity is not None
# Also verify no tmp entities were created
tmp1_entity = await sync_service.entity_repository.get_by_file_path("document.1.tmp")
tmp2_entity = await sync_service.entity_repository.get_by_file_path("document.2.tmp")
assert tmp1_entity is None
assert tmp2_entity is None
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/project_context.py:
--------------------------------------------------------------------------------
```python
"""Project context utilities for Basic Memory MCP server.
Provides project lookup utilities for MCP tools.
Handles project validation and context management in one place.
Note: This module uses ProjectResolver for unified project resolution.
The resolve_project_parameter function is a thin wrapper for backwards
compatibility with existing MCP tools.
"""
from typing import Optional, List
from httpx import AsyncClient
from httpx._types import (
HeaderTypes,
)
from loguru import logger
from fastmcp import Context
from basic_memory.config import ConfigManager
from basic_memory.project_resolver import ProjectResolver
from basic_memory.schemas.project_info import ProjectItem, ProjectList
from basic_memory.utils import generate_permalink
async def resolve_project_parameter(
project: Optional[str] = None,
allow_discovery: bool = False,
cloud_mode: Optional[bool] = None,
default_project_mode: Optional[bool] = None,
default_project: Optional[str] = None,
) -> Optional[str]:
"""Resolve project parameter using three-tier hierarchy.
This is a thin wrapper around ProjectResolver for backwards compatibility.
New code should consider using ProjectResolver directly for more detailed
resolution information.
if cloud_mode:
project is required (unless allow_discovery=True for tools that support discovery mode)
else:
Resolution order:
1. Single Project Mode (--project cli arg, or BASIC_MEMORY_MCP_PROJECT env var) - highest priority
2. Explicit project parameter - medium priority
3. Default project if default_project_mode=true - lowest priority
Args:
project: Optional explicit project parameter
allow_discovery: If True, allows returning None in cloud mode for discovery mode
(used by tools like recent_activity that can operate across all projects)
cloud_mode: Optional explicit cloud mode. If not provided, reads from ConfigManager.
default_project_mode: Optional explicit default project mode. If not provided, reads from ConfigManager.
default_project: Optional explicit default project. If not provided, reads from ConfigManager.
Returns:
Resolved project name or None if no resolution possible
"""
# Load config for any values not explicitly provided
if cloud_mode is None or default_project_mode is None or default_project is None:
config = ConfigManager().config
if cloud_mode is None:
cloud_mode = config.cloud_mode
if default_project_mode is None:
default_project_mode = config.default_project_mode
if default_project is None:
default_project = config.default_project
# Create resolver with configuration and resolve
resolver = ProjectResolver.from_env(
cloud_mode=cloud_mode,
default_project_mode=default_project_mode,
default_project=default_project,
)
result = resolver.resolve(project=project, allow_discovery=allow_discovery)
return result.project
async def get_project_names(client: AsyncClient, headers: HeaderTypes | None = None) -> List[str]:
# Deferred import to avoid circular dependency with tools
from basic_memory.mcp.tools.utils import call_get
response = await call_get(client, "/projects/projects", headers=headers)
project_list = ProjectList.model_validate(response.json())
return [project.name for project in project_list.projects]
async def get_active_project(
client: AsyncClient,
project: Optional[str] = None,
context: Optional[Context] = None,
headers: HeaderTypes | None = None,
) -> ProjectItem:
"""Get and validate project, setting it in context if available.
Args:
client: HTTP client for API calls
project: Optional project name (resolved using hierarchy)
context: Optional FastMCP context to cache the result
Returns:
The validated project item
Raises:
ValueError: If no project can be resolved
HTTPError: If project doesn't exist or is inaccessible
"""
# Deferred import to avoid circular dependency with tools
from basic_memory.mcp.tools.utils import call_get
resolved_project = await resolve_project_parameter(project)
if not resolved_project:
project_names = await get_project_names(client, headers)
raise ValueError(
"No project specified. "
"Either set 'default_project_mode=true' in config, or use 'project' argument.\n"
f"Available projects: {project_names}"
)
project = resolved_project
# Check if already cached in context
if context:
cached_project = context.get_state("active_project")
if cached_project and cached_project.name == project:
logger.debug(f"Using cached project from context: {project}")
return cached_project
# Validate project exists by calling API
logger.debug(f"Validating project: {project}")
permalink = generate_permalink(project)
response = await call_get(client, f"/{permalink}/project/item", headers=headers)
active_project = ProjectItem.model_validate(response.json())
# Cache in context if available
if context:
context.set_state("active_project", active_project)
logger.debug(f"Cached project in context: {project}")
logger.debug(f"Validated project: {active_project.name}")
return active_project
def add_project_metadata(result: str, project_name: str) -> str:
"""Add project context as metadata footer for assistant session tracking.
Provides clear project context to help the assistant remember which
project is being used throughout the conversation session.
Args:
result: The tool result string
project_name: The project name that was used
Returns:
Result with project session tracking metadata
"""
return f"{result}\n\n[Session: Using project '{project_name}']"
```
--------------------------------------------------------------------------------
/test-int/test_db_wal_mode.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for WAL mode and Windows-specific SQLite optimizations.
These tests use real filesystem databases (not in-memory) to verify WAL mode
and other SQLite configuration settings work correctly in production scenarios.
"""
import pytest
from sqlalchemy import text
@pytest.mark.asyncio
async def test_wal_mode_enabled(engine_factory, db_backend):
"""Test that WAL mode is enabled on filesystem database connections."""
if db_backend == "postgres":
pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
engine, _ = engine_factory
# Execute a query to verify WAL mode is enabled
async with engine.connect() as conn:
result = await conn.execute(text("PRAGMA journal_mode"))
journal_mode = result.fetchone()[0]
# WAL mode should be enabled for filesystem databases
assert journal_mode.upper() == "WAL"
@pytest.mark.asyncio
async def test_busy_timeout_configured(engine_factory, db_backend):
"""Test that busy timeout is configured for database connections."""
if db_backend == "postgres":
pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
engine, _ = engine_factory
async with engine.connect() as conn:
result = await conn.execute(text("PRAGMA busy_timeout"))
busy_timeout = result.fetchone()[0]
# Busy timeout should be 10 seconds (10000 milliseconds)
assert busy_timeout == 10000
@pytest.mark.asyncio
async def test_synchronous_mode_configured(engine_factory, db_backend):
"""Test that synchronous mode is set to NORMAL for performance."""
if db_backend == "postgres":
pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
engine, _ = engine_factory
async with engine.connect() as conn:
result = await conn.execute(text("PRAGMA synchronous"))
synchronous = result.fetchone()[0]
# Synchronous should be NORMAL (1)
assert synchronous == 1
@pytest.mark.asyncio
async def test_cache_size_configured(engine_factory, db_backend):
"""Test that cache size is configured for performance."""
if db_backend == "postgres":
pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
engine, _ = engine_factory
async with engine.connect() as conn:
result = await conn.execute(text("PRAGMA cache_size"))
cache_size = result.fetchone()[0]
# Cache size should be -64000 (64MB)
assert cache_size == -64000
@pytest.mark.asyncio
async def test_temp_store_configured(engine_factory, db_backend):
"""Test that temp_store is set to MEMORY."""
if db_backend == "postgres":
pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
engine, _ = engine_factory
async with engine.connect() as conn:
result = await conn.execute(text("PRAGMA temp_store"))
temp_store = result.fetchone()[0]
# temp_store should be MEMORY (2)
assert temp_store == 2
@pytest.mark.asyncio
@pytest.mark.windows
@pytest.mark.skipif(
__import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
)
async def test_windows_locking_mode_when_on_windows(tmp_path, monkeypatch, config_manager):
"""Test that Windows-specific locking mode is set when running on Windows."""
from basic_memory.db import engine_session_factory, DatabaseType
from basic_memory.config import DatabaseBackend
# Force SQLite backend for this SQLite-specific test
config_manager.config.database_backend = DatabaseBackend.SQLITE
# Set HOME environment variable
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
db_path = tmp_path / "test_windows.db"
async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (
engine,
_,
):
async with engine.connect() as conn:
result = await conn.execute(text("PRAGMA locking_mode"))
locking_mode = result.fetchone()[0]
# Locking mode should be NORMAL on Windows
assert locking_mode.upper() == "NORMAL"
@pytest.mark.asyncio
@pytest.mark.windows
@pytest.mark.skipif(
__import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
)
async def test_null_pool_on_windows(tmp_path, monkeypatch):
"""Test that NullPool is used on Windows to avoid connection pooling issues."""
from basic_memory.db import engine_session_factory, DatabaseType
from sqlalchemy.pool import NullPool
# Set HOME environment variable
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
db_path = tmp_path / "test_windows_pool.db"
async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (engine, _):
# Engine should be using NullPool on Windows
assert isinstance(engine.pool, NullPool)
@pytest.mark.asyncio
@pytest.mark.windows
@pytest.mark.skipif(
__import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
)
async def test_memory_database_no_null_pool_on_windows(tmp_path, monkeypatch):
"""Test that in-memory databases do NOT use NullPool even on Windows.
NullPool closes connections immediately, which destroys in-memory databases.
This test ensures in-memory databases maintain connection pooling.
"""
from basic_memory.db import engine_session_factory, DatabaseType
from sqlalchemy.pool import NullPool
# Set HOME environment variable
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
db_path = tmp_path / "test_memory.db"
async with engine_session_factory(db_path, DatabaseType.MEMORY) as (engine, _):
# In-memory databases should NOT use NullPool on Windows
assert not isinstance(engine.pool, NullPool)
```
--------------------------------------------------------------------------------
/src/basic_memory/repository/relation_repository.py:
--------------------------------------------------------------------------------
```python
"""Repository for managing Relation objects."""
from typing import Sequence, List, Optional, Any, cast
from sqlalchemy import and_, delete, select
from sqlalchemy.engine import CursorResult
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload, aliased
from sqlalchemy.orm.interfaces import LoaderOption
from basic_memory import db
from basic_memory.models import Relation, Entity
from basic_memory.repository.repository import Repository
class RelationRepository(Repository[Relation]):
"""Repository for Relation model with memory-specific operations."""
def __init__(self, session_maker: async_sessionmaker, project_id: int):
"""Initialize with session maker and project_id filter.
Args:
session_maker: SQLAlchemy session maker
project_id: Project ID to filter all operations by
"""
super().__init__(session_maker, Relation, project_id=project_id)
async def find_relation(
self, from_permalink: str, to_permalink: str, relation_type: str
) -> Optional[Relation]:
"""Find a relation by its from and to path IDs."""
from_entity = aliased(Entity)
to_entity = aliased(Entity)
query = (
select(Relation)
.join(from_entity, Relation.from_id == from_entity.id)
.join(to_entity, Relation.to_id == to_entity.id)
.where(
and_(
from_entity.permalink == from_permalink,
to_entity.permalink == to_permalink,
Relation.relation_type == relation_type,
)
)
)
return await self.find_one(query)
async def find_by_entities(self, from_id: int, to_id: int) -> Sequence[Relation]:
"""Find all relations between two entities."""
query = select(Relation).where((Relation.from_id == from_id) & (Relation.to_id == to_id))
result = await self.execute_query(query)
return result.scalars().all()
async def find_by_type(self, relation_type: str) -> Sequence[Relation]:
"""Find all relations of a specific type."""
query = select(Relation).filter(Relation.relation_type == relation_type)
result = await self.execute_query(query)
return result.scalars().all()
async def delete_outgoing_relations_from_entity(self, entity_id: int) -> None:
"""Delete outgoing relations for an entity.
Only deletes relations where this entity is the source (from_id),
as these are the ones owned by this entity's markdown file.
"""
async with db.scoped_session(self.session_maker) as session:
await session.execute(delete(Relation).where(Relation.from_id == entity_id))
async def find_unresolved_relations(self) -> Sequence[Relation]:
"""Find all unresolved relations, where to_id is null."""
query = select(Relation).filter(Relation.to_id.is_(None))
result = await self.execute_query(query)
return result.scalars().all()
async def find_unresolved_relations_for_entity(self, entity_id: int) -> Sequence[Relation]:
"""Find unresolved relations for a specific entity.
Args:
entity_id: The entity whose unresolved outgoing relations to find.
Returns:
List of unresolved relations where this entity is the source.
"""
query = select(Relation).filter(Relation.from_id == entity_id, Relation.to_id.is_(None))
result = await self.execute_query(query)
return result.scalars().all()
async def add_all_ignore_duplicates(self, relations: List[Relation]) -> int:
"""Bulk insert relations, ignoring duplicates.
Uses ON CONFLICT DO NOTHING to skip relations that would violate the
unique constraint on (from_id, to_name, relation_type). This is useful
for bulk operations where the same link may appear multiple times in
a document.
Works with both SQLite and PostgreSQL dialects.
Args:
relations: List of Relation objects to insert
Returns:
Number of relations actually inserted (excludes duplicates)
"""
if not relations:
return 0
# Convert Relation objects to dicts for insert
values = [
{
"project_id": r.project_id if r.project_id else self.project_id,
"from_id": r.from_id,
"to_id": r.to_id,
"to_name": r.to_name,
"relation_type": r.relation_type,
"context": r.context,
}
for r in relations
]
async with db.scoped_session(self.session_maker) as session:
# Check dialect to use appropriate insert
dialect_name = session.bind.dialect.name if session.bind else "sqlite"
if dialect_name == "postgresql": # pragma: no cover
# PostgreSQL: use RETURNING to count inserted rows
# (rowcount is 0 for ON CONFLICT DO NOTHING)
stmt = ( # pragma: no cover
pg_insert(Relation)
.values(values)
.on_conflict_do_nothing()
.returning(Relation.id)
)
result = await session.execute(stmt) # pragma: no cover
return len(result.fetchall()) # pragma: no cover
else:
# SQLite: rowcount works correctly
stmt = sqlite_insert(Relation).values(values)
stmt = stmt.on_conflict_do_nothing()
result = cast(CursorResult[Any], await session.execute(stmt))
return result.rowcount if result.rowcount > 0 else 0
def get_load_options(self) -> List[LoaderOption]:
return [selectinload(Relation.from_entity), selectinload(Relation.to_entity)]
```
--------------------------------------------------------------------------------
/test-int/mcp/test_build_context_underscore.py:
--------------------------------------------------------------------------------
```python
"""Integration test for build_context with underscore in memory:// URLs."""
import pytest
from fastmcp import Client
@pytest.mark.asyncio
async def test_build_context_underscore_normalization(mcp_server, app, test_project):
"""Test that build_context normalizes underscores in relation types."""
async with Client(mcp_server) as client:
# Create parent note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Parent Entity",
"folder": "testing",
"content": "# Parent Entity\n\nMain entity for testing underscore relations.",
"tags": "test,parent",
},
)
# Create child notes with different relation formats
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Child with Underscore",
"folder": "testing",
"content": """# Child with Underscore
- part_of [[Parent Entity]]
- related_to [[Parent Entity]]
""",
"tags": "test,child",
},
)
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Child with Hyphen",
"folder": "testing",
"content": """# Child with Hyphen
- part-of [[Parent Entity]]
- related-to [[Parent Entity]]
""",
"tags": "test,child",
},
)
# Test 1: Search with underscore format should return results
# Relation permalinks are: source/relation_type/target
# So child-with-underscore/part-of/parent-entity
result_underscore = await client.call_tool(
"build_context",
{
"project": test_project.name,
"url": "memory://testing/*/part_of/*parent*", # Using underscore
},
)
# Parse response
assert len(result_underscore.content) == 1
response_text = result_underscore.content[0].text # pyright: ignore
assert '"results"' in response_text
# Both relations should be found since they both connect to parent-entity
# The system should normalize the underscore to hyphen internally
assert "part-of" in response_text.lower()
# Test 2: Search with hyphen format should also return results
result_hyphen = await client.call_tool(
"build_context",
{
"project": test_project.name,
"url": "memory://testing/*/part-of/*parent*", # Using hyphen
},
)
response_text_hyphen = result_hyphen.content[0].text # pyright: ignore
assert '"results"' in response_text_hyphen
assert "part-of" in response_text_hyphen.lower()
# Test 3: Test with related_to/related-to as well
result_related = await client.call_tool(
"build_context",
{
"project": test_project.name,
"url": "memory://testing/*/related_to/*parent*", # Using underscore
},
)
response_text_related = result_related.content[0].text # pyright: ignore
assert '"results"' in response_text_related
assert "related-to" in response_text_related.lower()
# Test 4: Test exact path (non-wildcard) with underscore
# Exact relation permalink would be child/relation/target
result_exact = await client.call_tool(
"build_context",
{
"project": test_project.name,
"url": "memory://testing/child-with-underscore/part_of/testing/parent-entity",
},
)
response_text_exact = result_exact.content[0].text # pyright: ignore
assert '"results"' in response_text_exact
assert "part-of" in response_text_exact.lower()
@pytest.mark.asyncio
async def test_build_context_complex_underscore_paths(mcp_server, app, test_project):
"""Test build_context with complex paths containing underscores."""
async with Client(mcp_server) as client:
# Create notes with underscores in titles and relations
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "workflow_manager_agent",
"folder": "specs",
"content": """# Workflow Manager Agent
Specification for the workflow manager agent.
""",
"tags": "spec,workflow",
},
)
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "task_parser",
"folder": "components",
"content": """# Task Parser
- part_of [[workflow_manager_agent]]
- implements_for [[workflow_manager_agent]]
""",
"tags": "component,parser",
},
)
# Test with underscores in all parts of the path
# Relations are created as: task-parser/part-of/workflow-manager-agent
# So search for */part_of/* or */part-of/* to find them
test_cases = [
"memory://components/*/part_of/*workflow*",
"memory://components/*/part-of/*workflow*",
"memory://*/task*/part_of/*",
"memory://*/task*/part-of/*",
]
for url in test_cases:
result = await client.call_tool(
"build_context", {"project": test_project.name, "url": url}
)
# All variations should work and find the related content
assert len(result.content) == 1
response = result.content[0].text # pyright: ignore
assert '"results"' in response
# The relation should be found showing part-of connection
assert "part-of" in response.lower(), f"Failed for URL: {url}"
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_utils.py:
--------------------------------------------------------------------------------
```python
"""Tests for MCP tool utilities."""
import pytest
from httpx import HTTPStatusError
from mcp.server.fastmcp.exceptions import ToolError
from basic_memory.mcp.tools.utils import (
call_get,
call_post,
call_put,
call_delete,
get_error_message,
)
@pytest.fixture
def mock_response(monkeypatch):
"""Create a mock response."""
class MockResponse:
def __init__(self, status_code=200):
self.status_code = status_code
self.is_success = status_code < 400
self.json = lambda: {}
def raise_for_status(self):
if self.status_code >= 400:
raise HTTPStatusError(
message=f"HTTP Error {self.status_code}", request=None, response=self
)
return MockResponse
class _Client:
def __init__(self):
self.calls: list[tuple[str, tuple, dict]] = []
self._responses: dict[str, object] = {}
def set_response(self, method: str, response):
self._responses[method.lower()] = response
async def get(self, *args, **kwargs):
self.calls.append(("get", args, kwargs))
return self._responses["get"]
async def post(self, *args, **kwargs):
self.calls.append(("post", args, kwargs))
return self._responses["post"]
async def put(self, *args, **kwargs):
self.calls.append(("put", args, kwargs))
return self._responses["put"]
async def delete(self, *args, **kwargs):
self.calls.append(("delete", args, kwargs))
return self._responses["delete"]
@pytest.mark.asyncio
async def test_call_get_success(mock_response):
"""Test successful GET request."""
client = _Client()
client.set_response("get", mock_response())
response = await call_get(client, "http://test.com")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_call_get_error(mock_response):
"""Test GET request with error."""
client = _Client()
client.set_response("get", mock_response(404))
with pytest.raises(ToolError) as exc:
await call_get(client, "http://test.com")
assert "Resource not found" in str(exc.value)
@pytest.mark.asyncio
async def test_call_post_success(mock_response):
"""Test successful POST request."""
client = _Client()
response = mock_response()
response.json = lambda: {"test": "data"}
client.set_response("post", response)
response = await call_post(client, "http://test.com", json={"test": "data"})
assert response.status_code == 200
@pytest.mark.asyncio
async def test_call_post_error(mock_response):
"""Test POST request with error."""
client = _Client()
response = mock_response(500)
response.json = lambda: {"test": "error"}
client.set_response("post", response)
with pytest.raises(ToolError) as exc:
await call_post(client, "http://test.com", json={"test": "data"})
assert "Internal server error" in str(exc.value)
@pytest.mark.asyncio
async def test_call_put_success(mock_response):
"""Test successful PUT request."""
client = _Client()
client.set_response("put", mock_response())
response = await call_put(client, "http://test.com", json={"test": "data"})
assert response.status_code == 200
@pytest.mark.asyncio
async def test_call_put_error(mock_response):
"""Test PUT request with error."""
client = _Client()
client.set_response("put", mock_response(400))
with pytest.raises(ToolError) as exc:
await call_put(client, "http://test.com", json={"test": "data"})
assert "Invalid request" in str(exc.value)
@pytest.mark.asyncio
async def test_call_delete_success(mock_response):
"""Test successful DELETE request."""
client = _Client()
client.set_response("delete", mock_response())
response = await call_delete(client, "http://test.com")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_call_delete_error(mock_response):
"""Test DELETE request with error."""
client = _Client()
client.set_response("delete", mock_response(403))
with pytest.raises(ToolError) as exc:
await call_delete(client, "http://test.com")
assert "Access denied" in str(exc.value)
@pytest.mark.asyncio
async def test_call_get_with_params(mock_response):
"""Test GET request with query parameters."""
client = _Client()
client.set_response("get", mock_response())
params = {"key": "value", "test": "data"}
await call_get(client, "http://test.com", params=params)
assert len(client.calls) == 1
method, _args, kwargs = client.calls[0]
assert method == "get"
assert kwargs["params"] == params
@pytest.mark.asyncio
async def test_get_error_message():
"""Test the get_error_message function."""
# Test 400 status code
message = get_error_message(400, "http://test.com/resource", "GET")
assert "Invalid request" in message
assert "resource" in message
# Test 404 status code
message = get_error_message(404, "http://test.com/missing", "GET")
assert "Resource not found" in message
assert "missing" in message
# Test 500 status code
message = get_error_message(500, "http://test.com/server", "POST")
assert "Internal server error" in message
assert "server" in message
# Test URL object handling
from httpx import URL
url = URL("http://test.com/complex/path")
message = get_error_message(403, url, "DELETE")
assert "Access denied" in message
assert "path" in message
@pytest.mark.asyncio
async def test_call_post_with_json(mock_response):
"""Test POST request with JSON payload."""
client = _Client()
response = mock_response()
response.json = lambda: {"test": "data"}
client.set_response("post", response)
json_data = {"key": "value", "nested": {"test": "data"}}
await call_post(client, "http://test.com", json=json_data)
assert len(client.calls) == 1
method, _args, kwargs = client.calls[0]
assert method == "post"
assert kwargs["json"] == json_data
```
--------------------------------------------------------------------------------
/test-int/test_disable_permalinks_integration.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for the disable_permalinks configuration."""
import pytest
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.repository import (
EntityRepository,
ObservationRepository,
RelationRepository,
ProjectRepository,
)
from basic_memory.repository.postgres_search_repository import PostgresSearchRepository
from basic_memory.repository.sqlite_search_repository import SQLiteSearchRepository
from basic_memory.schemas import Entity as EntitySchema
from basic_memory.services import FileService
from basic_memory.services.entity_service import EntityService
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.services.search_service import SearchService
from basic_memory.sync.sync_service import SyncService
@pytest.mark.asyncio
async def test_disable_permalinks_create_entity(tmp_path, engine_factory, app_config, test_project):
"""Test that entities created with disable_permalinks=True don't have permalinks."""
from basic_memory.config import DatabaseBackend
engine, session_maker = engine_factory
# Override app config to enable disable_permalinks
app_config.disable_permalinks = True
# Setup repositories
entity_repository = EntityRepository(session_maker, project_id=test_project.id)
observation_repository = ObservationRepository(session_maker, project_id=test_project.id)
relation_repository = RelationRepository(session_maker, project_id=test_project.id)
# Use database-specific search repository
if app_config.database_backend == DatabaseBackend.POSTGRES:
search_repository = PostgresSearchRepository(session_maker, project_id=test_project.id)
else:
search_repository = SQLiteSearchRepository(session_maker, project_id=test_project.id)
# Setup services
entity_parser = EntityParser(tmp_path)
markdown_processor = MarkdownProcessor(entity_parser)
file_service = FileService(tmp_path, markdown_processor)
search_service = SearchService(search_repository, entity_repository, file_service)
await search_service.init_search_index()
link_resolver = LinkResolver(entity_repository, search_service)
entity_service = EntityService(
entity_parser=entity_parser,
entity_repository=entity_repository,
observation_repository=observation_repository,
relation_repository=relation_repository,
file_service=file_service,
link_resolver=link_resolver,
app_config=app_config,
)
# Create entity via API
entity_data = EntitySchema(
title="Test Note",
folder="test",
entity_type="note",
content="Test content",
)
created = await entity_service.create_entity(entity_data)
# Verify entity has no permalink
assert created.permalink is None
# Verify file has no permalink in frontmatter
file_path = tmp_path / "test" / "Test Note.md"
assert file_path.exists()
content = file_path.read_text()
assert "permalink:" not in content
assert "Test content" in content
@pytest.mark.asyncio
async def test_disable_permalinks_sync_workflow(tmp_path, engine_factory, app_config, test_project):
"""Test full sync workflow with disable_permalinks enabled."""
from basic_memory.config import DatabaseBackend
engine, session_maker = engine_factory
# Override app config to enable disable_permalinks
app_config.disable_permalinks = True
# Create a test markdown file without frontmatter
test_file = tmp_path / "test_note.md"
test_file.write_text("# Test Note\nThis is test content.")
# Setup repositories
entity_repository = EntityRepository(session_maker, project_id=test_project.id)
observation_repository = ObservationRepository(session_maker, project_id=test_project.id)
relation_repository = RelationRepository(session_maker, project_id=test_project.id)
# Use database-specific search repository
if app_config.database_backend == DatabaseBackend.POSTGRES:
search_repository = PostgresSearchRepository(session_maker, project_id=test_project.id)
else:
search_repository = SQLiteSearchRepository(session_maker, project_id=test_project.id)
project_repository = ProjectRepository(session_maker)
# Setup services
entity_parser = EntityParser(tmp_path)
markdown_processor = MarkdownProcessor(entity_parser)
file_service = FileService(tmp_path, markdown_processor)
search_service = SearchService(search_repository, entity_repository, file_service)
await search_service.init_search_index()
link_resolver = LinkResolver(entity_repository, search_service)
entity_service = EntityService(
entity_parser=entity_parser,
entity_repository=entity_repository,
observation_repository=observation_repository,
relation_repository=relation_repository,
file_service=file_service,
link_resolver=link_resolver,
app_config=app_config,
)
sync_service = SyncService(
app_config=app_config,
entity_service=entity_service,
project_repository=project_repository,
entity_parser=entity_parser,
entity_repository=entity_repository,
relation_repository=relation_repository,
search_service=search_service,
file_service=file_service,
)
# Run sync
report = await sync_service.scan(tmp_path)
# Note: scan may pick up database files too, so just check our file is there
assert "test_note.md" in report.new
# Sync the file
await sync_service.sync_file("test_note.md", new=True)
# Verify file has no permalink added
content = test_file.read_text()
assert "permalink:" not in content
assert "# Test Note" in content
# Verify entity in database has no permalink
entities = await entity_repository.find_all()
assert len(entities) == 1
assert entities[0].permalink is None
# Title is extracted from filename when no frontmatter, or from frontmatter when present
assert entities[0].title in ("test_note", "Test Note")
```
--------------------------------------------------------------------------------
/tests/repository/test_repository.py:
--------------------------------------------------------------------------------
```python
"""Test repository implementation."""
from datetime import datetime, UTC
import pytest
from sqlalchemy import String, DateTime
from sqlalchemy.orm import Mapped, mapped_column
from basic_memory.models import Base
from basic_memory.repository.repository import Repository
class ModelTest(Base):
"""Test model for repository tests."""
__tablename__ = "test_model"
id: Mapped[str] = mapped_column(String(255), primary_key=True)
name: Mapped[str] = mapped_column(String(255))
description: Mapped[str | None] = mapped_column(String(255), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, default=lambda: datetime.now(UTC).replace(tzinfo=None)
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=lambda: datetime.now(UTC).replace(tzinfo=None),
onupdate=lambda: datetime.now(UTC).replace(tzinfo=None),
)
@pytest.fixture
def repository(session_maker):
"""Create a test repository."""
return Repository(session_maker, ModelTest)
@pytest.mark.asyncio
async def test_add(repository):
"""Test bulk creation of entities."""
# Create test instances
instance = ModelTest(id="test_add", name="Test Add")
await repository.add(instance)
# Verify we can find in db
found = await repository.find_by_id("test_add")
assert found is not None
assert found.name == "Test Add"
@pytest.mark.asyncio
async def test_add_all(repository):
"""Test bulk creation of entities."""
# Create test instances
instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]
await repository.add_all(instances)
# Verify we can find them in db
found = await repository.find_by_id("test_0")
assert found is not None
assert found.name == "Test 0"
@pytest.mark.asyncio
async def test_bulk_create(repository):
"""Test bulk creation of entities."""
# Create test instances
instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]
# Bulk create
await repository.create_all([instance.__dict__ for instance in instances])
# Verify we can find them in db
found = await repository.find_by_id("test_0")
assert found is not None
assert found.name == "Test 0"
@pytest.mark.asyncio
async def test_find_all(repository):
"""Test finding multiple entities by IDs."""
# Create test data
instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
await repository.create_all([instance.__dict__ for instance in instances])
found = await repository.find_all(limit=3)
assert len(found) == 3
@pytest.mark.asyncio
async def test_find_by_ids(repository):
"""Test finding multiple entities by IDs."""
# Create test data
instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
await repository.create_all([instance.__dict__ for instance in instances])
# Test finding subset of entities
ids_to_find = ["test_0", "test_2", "test_4"]
found = await repository.find_by_ids(ids_to_find)
assert len(found) == 3
assert sorted([e.id for e in found]) == sorted(ids_to_find)
# Test finding with some non-existent IDs
mixed_ids = ["test_0", "nonexistent", "test_4"]
partial_found = await repository.find_by_ids(mixed_ids)
assert len(partial_found) == 2
assert sorted([e.id for e in partial_found]) == ["test_0", "test_4"]
# Test with empty list
empty_found = await repository.find_by_ids([])
assert len(empty_found) == 0
# Test with all non-existent IDs
not_found = await repository.find_by_ids(["fake1", "fake2"])
assert len(not_found) == 0
@pytest.mark.asyncio
async def test_delete_by_ids(repository):
"""Test finding multiple entities by IDs."""
# Create test data
instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
await repository.create_all([instance.__dict__ for instance in instances])
# Test delete subset of entities
ids_to_delete = ["test_0", "test_2", "test_4"]
deleted_count = await repository.delete_by_ids(ids_to_delete)
assert deleted_count == 3
# Test finding subset of entities
ids_to_find = ["test_1", "test_3"]
found = await repository.find_by_ids(ids_to_find)
assert len(found) == 2
assert sorted([e.id for e in found]) == sorted(ids_to_find)
assert await repository.find_by_id(ids_to_delete[0]) is None
assert await repository.find_by_id(ids_to_delete[1]) is None
assert await repository.find_by_id(ids_to_delete[2]) is None
@pytest.mark.asyncio
async def test_update(repository):
"""Test finding entities modified since a timestamp."""
# Create initial test data
instance = ModelTest(id="test_add", name="Test Add")
await repository.add(instance)
instance = ModelTest(id="test_add", name="Updated")
# Find recently modified
modified = await repository.update(instance.id, {"name": "Updated"})
assert modified is not None
assert modified.name == "Updated"
@pytest.mark.asyncio
async def test_update_model(repository):
"""Test finding entities modified since a timestamp."""
# Create initial test data
instance = ModelTest(id="test_add", name="Test Add")
await repository.add(instance)
instance.name = "Updated"
# Find recently modified
modified = await repository.update(instance.id, instance)
assert modified is not None
assert modified.name == "Updated"
@pytest.mark.asyncio
async def test_update_model_not_found(repository):
"""Test finding entities modified since a timestamp."""
# Create initial test data
instance = ModelTest(id="test_add", name="Test Add")
await repository.add(instance)
modified = await repository.update("0", {}) # Use string ID for Postgres compatibility
assert modified is None
@pytest.mark.asyncio
async def test_count(repository):
"""Test bulk creation of entities."""
# Create test instances
instance = ModelTest(id="test_add", name="Test Add")
await repository.add(instance)
# Verify we can count in db
count = await repository.count()
assert count == 1
```
--------------------------------------------------------------------------------
/tests/mcp/test_obsidian_yaml_formatting.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for Obsidian-compatible YAML formatting in write_note tool."""
import pytest
from basic_memory.mcp.tools import write_note
@pytest.mark.asyncio
async def test_write_note_tags_yaml_format(app, project_config, test_project):
"""Test that write_note creates files with proper YAML list format for tags."""
# Create a note with tags using write_note
result = await write_note.fn(
project=test_project.name,
title="YAML Format Test",
folder="test",
content="Testing YAML tag formatting",
tags=["system", "overview", "reference"],
)
# Verify the note was created successfully
assert "Created note" in result
assert "file_path: test/YAML Format Test.md" in result
# Read the file directly to check YAML formatting
file_path = project_config.home / "test" / "YAML Format Test.md"
content = file_path.read_text(encoding="utf-8")
# Should use YAML list format
assert "tags:" in content
assert "- system" in content
assert "- overview" in content
assert "- reference" in content
# Should NOT use JSON array format
assert '["system"' not in content
assert '"overview"' not in content
assert '"reference"]' not in content
@pytest.mark.asyncio
async def test_write_note_stringified_json_tags(app, project_config, test_project):
"""Test that stringified JSON arrays are handled correctly."""
# This simulates the issue where AI assistants pass tags as stringified JSON
result = await write_note.fn(
project=test_project.name,
title="Stringified JSON Test",
folder="test",
content="Testing stringified JSON tag input",
tags='["python", "testing", "json"]', # Stringified JSON array
)
# Verify the note was created successfully
assert "Created note" in result
# Read the file to check formatting
file_path = project_config.home / "test" / "Stringified JSON Test.md"
content = file_path.read_text(encoding="utf-8")
# Should properly parse the JSON and format as YAML list
assert "tags:" in content
assert "- python" in content
assert "- testing" in content
assert "- json" in content
# Should NOT have the original stringified format issues
assert '["python"' not in content
assert '"testing"' not in content
assert '"json"]' not in content
@pytest.mark.asyncio
async def test_write_note_single_tag_yaml_format(app, project_config, test_project):
"""Test that single tags are still formatted as YAML lists."""
await write_note.fn(
project=test_project.name,
title="Single Tag Test",
folder="test",
content="Testing single tag formatting",
tags=["solo-tag"],
)
file_path = project_config.home / "test" / "Single Tag Test.md"
content = file_path.read_text(encoding="utf-8")
# Single tag should still use list format
assert "tags:" in content
assert "- solo-tag" in content
@pytest.mark.asyncio
async def test_write_note_no_tags(app, project_config, test_project):
"""Test that notes without tags work normally."""
await write_note.fn(
project=test_project.name,
title="No Tags Test",
folder="test",
content="Testing note without tags",
tags=None,
)
file_path = project_config.home / "test" / "No Tags Test.md"
content = file_path.read_text(encoding="utf-8")
# Should not have tags field in frontmatter
assert "tags:" not in content
assert "title: No Tags Test" in content
@pytest.mark.asyncio
async def test_write_note_empty_tags_list(app, project_config, test_project):
"""Test that empty tag lists are handled properly."""
await write_note.fn(
project=test_project.name,
title="Empty Tags Test",
folder="test",
content="Testing empty tag list",
tags=[],
)
file_path = project_config.home / "test" / "Empty Tags Test.md"
content = file_path.read_text(encoding="utf-8")
# Should not add tags field to frontmatter for empty lists
assert "tags:" not in content
@pytest.mark.asyncio
async def test_write_note_update_preserves_yaml_format(app, project_config, test_project):
"""Test that updating a note preserves the YAML list format."""
# First, create the note
await write_note.fn(
project=test_project.name,
title="Update Format Test",
folder="test",
content="Initial content",
tags=["initial", "tag"],
)
# Then update it with new tags
result = await write_note.fn(
project=test_project.name,
title="Update Format Test",
folder="test",
content="Updated content",
tags=["updated", "new-tag", "format"],
)
# Should be an update, not a new creation
assert "Updated note" in result
# Check the file format
file_path = project_config.home / "test" / "Update Format Test.md"
content = file_path.read_text(encoding="utf-8")
# Should have proper YAML formatting for updated tags
assert "tags:" in content
assert "- updated" in content
assert "- new-tag" in content
assert "- format" in content
# Old tags should be gone
assert "- initial" not in content
assert "- tag" not in content
# Content should be updated
assert "Updated content" in content
assert "Initial content" not in content
@pytest.mark.asyncio
async def test_complex_tags_yaml_format(app, project_config, test_project):
"""Test that complex tags with special characters format correctly."""
await write_note.fn(
project=test_project.name,
title="Complex Tags Test",
folder="test",
content="Testing complex tag formats",
tags=["python-3.9", "api_integration", "v2.0", "nested/category", "under_score"],
)
file_path = project_config.home / "test" / "Complex Tags Test.md"
content = file_path.read_text(encoding="utf-8")
# All complex tags should format correctly
assert "- python-3.9" in content
assert "- api_integration" in content
assert "- v2.0" in content
assert "- nested/category" in content
assert "- under_score" in content
```
--------------------------------------------------------------------------------
/specs/SPEC-11 Basic Memory API Performance Optimization.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-11: Basic Memory API Performance Optimization'
type: spec
permalink: specs/spec-11-basic-memory-api-performance-optimization
tags:
- performance
- api
- mcp
- database
- cloud
---
# SPEC-11: Basic Memory API Performance Optimization
## Why
The Basic Memory API experiences significant performance issues in cloud environments due to expensive per-request initialization. MCP tools making
HTTP requests to the API suffer from 350ms-2.6s latency overhead **before** any actual operation occurs.
**Root Cause Analysis:**
- GitHub Issue #82 shows repeated initialization sequences in logs (16:29:35 and 16:49:58)
- Each MCP tool call triggers full database initialization + project reconciliation
- `get_engine_factory()` dependency calls `db.get_or_create_db()` on every request
- `reconcile_projects_with_config()` runs expensive sync operations repeatedly
**Performance Impact:**
- Database connection setup: ~50-100ms per request
- Migration checks: ~100-500ms per request
- Project reconciliation: ~200ms-2s per request
- **Total overhead**: ~350ms-2.6s per MCP tool call
This creates compounding effects with tenant auto-start delays and increases timeout risk in cloud deployments.
## What
This optimization affects the **core basic-memory repository** components:
1. **API Lifespan Management** (`src/basic_memory/api/app.py`)
- Cache database connections in app state during startup
- Avoid repeated expensive initialization
2. **Dependency Injection** (`src/basic_memory/deps.py`)
- Modify `get_engine_factory()` to use cached connections
- Eliminate per-request database setup
3. **Initialization Service** (`src/basic_memory/services/initialization.py`)
- Add caching/throttling to project reconciliation
- Skip expensive operations when appropriate
4. **Configuration** (`src/basic_memory/config.py`)
- Add optional performance flags for cloud environments
**Backwards Compatibility**: All changes must be backwards compatible with existing CLI and non-cloud usage.
## How (High Level)
### Phase 1: Cache Database Connections (Critical - 80% of gains)
**Problem**: `get_engine_factory()` calls `db.get_or_create_db()` per request
**Solution**: Cache database engine/session in app state during lifespan
1. **Modify API Lifespan** (`api/app.py`):
```python
@asynccontextmanager
async def lifespan(app: FastAPI):
app_config = ConfigManager().config
await initialize_app(app_config)
# Cache database connection in app state
engine, session_maker = await db.get_or_create_db(app_config.database_path)
app.state.engine = engine
app.state.session_maker = session_maker
# ... rest of startup logic
```
2. Modify Dependency Injection (deps.py):
```python
async def get_engine_factory(
request: Request
) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
"""Get cached engine and session maker from app state."""
return request.app.state.engine, request.app.state.session_maker
```
Phase 2: Optimize Project Reconciliation (Secondary - 20% of gains)
Problem: reconcile_projects_with_config() runs expensive sync repeatedly
Solution: Add module-level caching with time-based throttling
1. Add Reconciliation Cache (services/initialization.py):
```ptyhon
_project_reconciliation_completed = False
_last_reconciliation_time = 0
async def reconcile_projects_with_config(app_config, force=False):
# Skip if recently completed (within 60 seconds) unless forced
if recently_completed and not force:
return
# ... existing logic
```
Phase 3: Cloud Environment Flags (Optional)
Problem: Force expensive initialization in production environments
Solution: Add skip flags for cloud/stateless deployments
1. Add Config Flag (config.py):
skip_initialization_sync: bool = Field(default=False)
2. Configure in Cloud (basic-memory-cloud integration):
BASIC_MEMORY_SKIP_INITIALIZATION_SYNC=true
How to Evaluate
Success Criteria
1. Performance Metrics (Primary):
- MCP tool response time reduced by 50%+ (measure before/after)
- Database connection overhead eliminated (0ms vs 50-100ms)
- Migration check overhead eliminated (0ms vs 100-500ms)
- Project reconciliation overhead reduced by 90%+
2. Load Testing:
- Concurrent MCP tool calls maintain performance
- No memory leaks in cached connections
- Database connection pool behaves correctly
3. Functional Correctness:
- All existing API endpoints work identically
- MCP tools maintain full functionality
- CLI operations unaffected
- Database migrations still execute properly
4. Backwards Compatibility:
- No breaking changes to existing APIs
- Config changes are optional with safe defaults
- Non-cloud deployments work unchanged
Testing Strategy
Performance Testing:
# Before optimization
time basic-memory-mcp-tools write_note "test" "content" "folder"
# Measure: ~1-3 seconds
# After optimization
time basic-memory-mcp-tools write_note "test" "content" "folder"
# Target: <500ms
Load Testing:
# Multiple concurrent MCP tool calls
for i in {1..10}; do
basic-memory-mcp-tools search "test" &
done
wait
# Verify: No degradation, consistent response times
Regression Testing:
# Full basic-memory test suite
just test
# All tests must pass
# Integration tests with cloud deployment
# Verify MCP gateway → API → database flow works
Validation Checklist
- Phase 1 Complete: Database connections cached, dependency injection optimized
- Performance Benchmark: 50%+ improvement in MCP tool response times
- Memory Usage: No leaks in cached connections over 24h+ periods
- Stress Testing: 100+ concurrent requests maintain performance
- Backwards Compatibility: All existing functionality preserved
- Documentation: Performance optimization documented in README
- Cloud Integration: basic-memory-cloud sees performance benefits
Notes
Implementation Priority:
- Phase 1 provides 80% of performance gains and should be implemented first
- Phase 2 provides remaining 20% and addresses edge cases
- Phase 3 is optional for maximum cloud optimization
Risk Mitigation:
- All changes backwards compatible
- Gradual rollout possible (Phase 1 → 2 → 3)
- Easy rollback via configuration flags
Cloud Integration:
- This optimization directly addresses basic-memory-cloud issue #82
- Changes in core basic-memory will benefit all cloud tenants
- No changes needed in basic-memory-cloud itself
```
--------------------------------------------------------------------------------
/specs/SPEC-1 Specification-Driven Development Process.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-1: Specification-Driven Development Process'
type: spec
permalink: specs/spec-1-specification-driven-development-process
tags:
- process
- specification
- development
- meta
---
# SPEC-1: Specification-Driven Development Process
## Why
We're implementing specification-driven development to solve the complexity and circular refactoring issues in our web development process.
Instead of getting lost in framework details and type gymnastics, we start with clear specifications that drive implementation.
The default approach of adhoc development with AI agents tends to result in:
- Circular refactoring cycles
- Fighting framework complexity
- Lost context between sessions
- Unclear requirements and scope
## What
This spec defines our process for using basic-memory as the specification engine to build basic-memory-cloud.
We're creating a recursive development pattern where basic-memory manages the specs that drive the development of basic-memory-cloud.
**Affected Areas:**
- All future component development
- Architecture decisions
- Agent collaboration workflows
- Knowledge management and context preservation
## How (High Level)
### Specification Structure
Name: Spec names should be numbered sequentially, followed by a description eg. `SPEC-X - Simple Description.md`.
See: [[Spec-2: Slash Commands Reference]]
Every spec is a complete thought containing:
- **Why**: The reasoning and problem being solved
- **What**: What is affected or changed
- **How**: High-level approach to implementation
- **How to Evaluate**: Testing/validation procedure
- Additional context as needed
### Living Specification Format
Specifications are **living documents** that evolve throughout implementation:
**Progress Tracking:**
- **Completed items**: Use ✅ checkmark emoji for implemented features
- **Pending items**: Use `- [ ]` GitHub-style checkboxes for remaining tasks
- **In-progress items**: Use `- [x]` when work is actively underway
**Status Philosophy:**
- **Avoid static status headers** like "COMPLETE" or "IN PROGRESS" that become stale
- **Use checklists within content** to show granular implementation progress
- **Keep specs informative** while providing clear progress visibility
- **Update continuously** as understanding and implementation evolve
**Example Format:**
```markdown
### ComponentName
- ✅ Basic functionality implemented
- ✅ Props and events defined
- - [ ] Add sorting controls
- - [ ] Improve accessibility
- - [x] Currently implementing responsive design
```
This creates **git-friendly progress tracking** where `[ ]` easily becomes `[x]` or ✅ when completed, and specs remain valuable throughout the development lifecycle.
## Claude Code
We will leverage Claude Code capabilities to make the process semi-automated.
- Slash commands: define repeatable steps in the process (create spec, implement, review, etc)
- Agents: define roles to carry out instructions (front end developer, baskend developer, etc)
- MCP tools: enable agents to implement specs via actions (write code, test, etc)
### Workflow
1. **Create**: Write spec as complete thought in `/specs` folder
2. **Discuss**: Iterate and refine through agent collaboration
3. **Implement**: Hand spec to appropriate specialist agent
4. **Validate**: Review implementation against spec criteria
5. **Document**: Update spec with learnings and decisions
### Slash Commands
Claude slash commands are used to manage the flow.
These are simple instructions to help make the process uniform.
They can be updated and refined as needed.
- `/spec create [name]` - Create new specification
- `/spec status` - Show current spec states
- `/spec implement [name]` - Hand to appropriate agent
- `/spec review [name]` - Validate implementation
### Agent Orchestration
Agents are defined with clear roles, for instance:
- **system-architect**: Creates high-level specs, ADRs, architectural decisions
- **vue-developer**: Component specs, UI patterns, frontend architecture
- **python-developer**: Implementation specs, technical details, backend logic
-
- Each agent reads/updates specs through basic-memory tools.
## How to Evaluate
### Success Criteria
- Specs provide clear, actionable guidance for implementation
- Reduced circular refactoring and scope creep
- Persistent context across development sessions
- Clean separation between "what/why" and implementation details
- Specs record a history of what happened and why for historical context
### Testing Procedure
1. Create a spec for an existing problematic component
2. Have an agent implement following only the spec
3. Compare result quality and development speed vs. ad-hoc approach
4. Measure context preservation across sessions
5. Evaluate spec clarity and completeness
### Metrics
- Time from spec to working implementation
- Number of refactoring cycles required
- Agent understanding of requirements
- Spec reusability for similar components
## Notes
- Start simple: specs are just complete thoughts, not heavy processes
- Use basic-memory's knowledge graph to link specs, decisions, components
- Let the process evolve naturally based on what works
- Focus on solving the actual problem: Manage complexity in development
## Observations
- [problem] Web development without clear goals and documentation circular refactoring cycles #complexity
- [solution] Specification-driven development reduces scope creep and context loss #process-improvement
- [pattern] basic-memory as specification engine creates recursive development loop #meta-development
- [workflow] Five-step process: Create → Discuss → Implement → Validate → Document #methodology
- [tool] Slash commands provide uniform process automation #automation
- [agent-pattern] Three specialized agents handle different implementation domains #specialization
- [success-metric] Time from spec to working implementation measures process efficiency #measurement
- [learning] Process should evolve naturally based on what works in practice #adaptation
- [format] Living specifications use checklists for progress tracking instead of static status headers #documentation
- [evolution] Specs evolve throughout implementation maintaining value as working documents #continuous-improvement
## Relations
- spec [[Spec-2: Slash Commands Reference]]
- spec [[Spec-3: Agent Definitions]]
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/recent_activity.py:
--------------------------------------------------------------------------------
```python
"""Recent activity prompts for Basic Memory MCP server.
These prompts help users see what has changed in their knowledge base recently.
"""
from typing import Annotated, Optional
from loguru import logger
from pydantic import Field
from basic_memory.mcp.prompts.utils import format_prompt_context, PromptContext, PromptContextItem
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.recent_activity import recent_activity
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import GraphContext, ProjectActivitySummary
from basic_memory.schemas.search import SearchItemType
@mcp.prompt(
name="recent_activity",
description="Get recent activity from a specific project or across all projects",
)
async def recent_activity_prompt(
timeframe: Annotated[
TimeFrame,
Field(description="How far back to look for activity (e.g. '1d', '1 week')"),
] = "7d",
project: Annotated[
Optional[str],
Field(
description="Specific project to get activity from (None for discovery across all projects)"
),
] = None,
) -> str:
"""Get recent activity from a specific project or across all projects.
This prompt helps you see what's changed recently in the knowledge base.
In discovery mode (project=None), it shows activity across all projects.
In project-specific mode, it shows detailed activity for one project.
Args:
timeframe: How far back to look for activity (e.g. '1d', '1 week')
project: Specific project to get activity from (None for discovery across all projects)
Returns:
Formatted summary of recent activity
"""
logger.info(f"Getting recent activity, timeframe: {timeframe}, project: {project}")
recent = await recent_activity.fn(
project=project, timeframe=timeframe, type=[SearchItemType.ENTITY]
)
# Extract primary results from the hierarchical structure
primary_results = []
related_results = []
if isinstance(recent, ProjectActivitySummary):
# Discovery mode - extract results from all projects
for _, project_activity in recent.projects.items():
if project_activity.activity.results:
# Take up to 2 primary results per project
for item in project_activity.activity.results[:2]:
primary_results.append(item.primary_result)
# Add up to 1 related result per primary item
if item.related_results:
related_results.extend(item.related_results[:1]) # pragma: no cover
# Limit total results for readability
primary_results = primary_results[:8]
related_results = related_results[:6]
elif isinstance(recent, GraphContext):
# Project-specific mode - use existing logic
if recent.results:
# Take up to 5 primary results
for item in recent.results[:5]:
primary_results.append(item.primary_result)
# Add up to 2 related results per primary item
if item.related_results:
related_results.extend(item.related_results[:2]) # pragma: no cover
# Set topic based on mode
if project:
topic = f"Recent Activity in {project} ({timeframe})"
else:
topic = f"Recent Activity Across All Projects ({timeframe})"
prompt_context = format_prompt_context(
PromptContext(
topic=topic,
timeframe=timeframe,
results=[
PromptContextItem(
primary_results=primary_results,
related_results=related_results[:10], # Limit total related results
)
],
)
)
# Add mode-specific suggestions
first_title = "Recent Topic"
if primary_results and len(primary_results) > 0:
first_title = primary_results[0].title
if project:
# Project-specific suggestions
capture_suggestions = f"""
## Opportunity to Capture Activity Summary
Consider creating a summary note of recent activity in {project}:
```python
await write_note(
"{project}",
title="Activity Summary {timeframe}",
content='''
# Activity Summary for {project} ({timeframe})
## Overview
[Summary of key changes and developments in this project over this period]
## Key Updates
[List main updates and their significance within this project]
## Observations
- [trend] [Observation about patterns in recent activity]
- [insight] [Connection between different activities]
## Relations
- summarizes [[{first_title}]]
- relates_to [[{project} Overview]]
''',
folder="summaries"
)
```
Summarizing periodic activity helps create high-level insights and connections within the project.
"""
else:
# Discovery mode suggestions
project_count = len(recent.projects) if isinstance(recent, ProjectActivitySummary) else 0
most_active = (
getattr(recent.summary, "most_active_project", "Unknown")
if isinstance(recent, ProjectActivitySummary)
else "Unknown"
)
capture_suggestions = f"""
## Cross-Project Activity Discovery
Found activity across {project_count} projects. Most active: **{most_active}**
Consider creating a cross-project summary:
```python
await write_note(
"{most_active if most_active != "Unknown" else "main"}",
title="Cross-Project Activity Summary {timeframe}",
content='''
# Cross-Project Activity Summary ({timeframe})
## Overview
Activity found across {project_count} projects, with {most_active} showing the most activity.
## Key Developments
[Summarize important changes across all projects]
## Project Insights
[Note patterns or connections between projects]
## Observations
- [trend] [Cross-project patterns observed]
- [insight] [Connections between different project activities]
## Relations
- summarizes [[{first_title}]]
- relates_to [[Project Portfolio Overview]]
''',
folder="summaries"
)
```
Cross-project summaries help identify broader trends and project interconnections.
"""
return prompt_context + capture_suggestions
```
--------------------------------------------------------------------------------
/src/basic_memory/deps/importers.py:
--------------------------------------------------------------------------------
```python
"""Importer dependency injection for basic-memory.
This module provides importer dependencies:
- ChatGPTImporter
- ClaudeConversationsImporter
- ClaudeProjectsImporter
- MemoryJsonImporter
"""
from typing import Annotated
from fastapi import Depends
from basic_memory.deps.projects import (
ProjectConfigDep,
ProjectConfigV2Dep,
ProjectConfigV2ExternalDep,
)
from basic_memory.deps.services import (
FileServiceDep,
FileServiceV2Dep,
FileServiceV2ExternalDep,
MarkdownProcessorDep,
MarkdownProcessorV2Dep,
MarkdownProcessorV2ExternalDep,
)
from basic_memory.importers import (
ChatGPTImporter,
ClaudeConversationsImporter,
ClaudeProjectsImporter,
MemoryJsonImporter,
)
# --- ChatGPT Importer ---
async def get_chatgpt_importer(
project_config: ProjectConfigDep,
markdown_processor: MarkdownProcessorDep,
file_service: FileServiceDep,
) -> ChatGPTImporter:
"""Create ChatGPTImporter with dependencies."""
return ChatGPTImporter(project_config.home, markdown_processor, file_service)
ChatGPTImporterDep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer)]
async def get_chatgpt_importer_v2( # pragma: no cover
project_config: ProjectConfigV2Dep,
markdown_processor: MarkdownProcessorV2Dep,
file_service: FileServiceV2Dep,
) -> ChatGPTImporter:
"""Create ChatGPTImporter with v2 dependencies."""
return ChatGPTImporter(project_config.home, markdown_processor, file_service)
ChatGPTImporterV2Dep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer_v2)]
async def get_chatgpt_importer_v2_external(
project_config: ProjectConfigV2ExternalDep,
markdown_processor: MarkdownProcessorV2ExternalDep,
file_service: FileServiceV2ExternalDep,
) -> ChatGPTImporter:
"""Create ChatGPTImporter with v2 external_id dependencies."""
return ChatGPTImporter(project_config.home, markdown_processor, file_service)
ChatGPTImporterV2ExternalDep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer_v2_external)]
# --- Claude Conversations Importer ---
async def get_claude_conversations_importer(
project_config: ProjectConfigDep,
markdown_processor: MarkdownProcessorDep,
file_service: FileServiceDep,
) -> ClaudeConversationsImporter:
"""Create ClaudeConversationsImporter with dependencies."""
return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)
ClaudeConversationsImporterDep = Annotated[
ClaudeConversationsImporter, Depends(get_claude_conversations_importer)
]
async def get_claude_conversations_importer_v2( # pragma: no cover
project_config: ProjectConfigV2Dep,
markdown_processor: MarkdownProcessorV2Dep,
file_service: FileServiceV2Dep,
) -> ClaudeConversationsImporter:
"""Create ClaudeConversationsImporter with v2 dependencies."""
return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)
ClaudeConversationsImporterV2Dep = Annotated[
ClaudeConversationsImporter, Depends(get_claude_conversations_importer_v2)
]
async def get_claude_conversations_importer_v2_external(
project_config: ProjectConfigV2ExternalDep,
markdown_processor: MarkdownProcessorV2ExternalDep,
file_service: FileServiceV2ExternalDep,
) -> ClaudeConversationsImporter:
"""Create ClaudeConversationsImporter with v2 external_id dependencies."""
return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)
ClaudeConversationsImporterV2ExternalDep = Annotated[
ClaudeConversationsImporter, Depends(get_claude_conversations_importer_v2_external)
]
# --- Claude Projects Importer ---
async def get_claude_projects_importer(
project_config: ProjectConfigDep,
markdown_processor: MarkdownProcessorDep,
file_service: FileServiceDep,
) -> ClaudeProjectsImporter:
"""Create ClaudeProjectsImporter with dependencies."""
return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)
ClaudeProjectsImporterDep = Annotated[ClaudeProjectsImporter, Depends(get_claude_projects_importer)]
async def get_claude_projects_importer_v2( # pragma: no cover
project_config: ProjectConfigV2Dep,
markdown_processor: MarkdownProcessorV2Dep,
file_service: FileServiceV2Dep,
) -> ClaudeProjectsImporter:
"""Create ClaudeProjectsImporter with v2 dependencies."""
return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)
ClaudeProjectsImporterV2Dep = Annotated[
ClaudeProjectsImporter, Depends(get_claude_projects_importer_v2)
]
async def get_claude_projects_importer_v2_external(
project_config: ProjectConfigV2ExternalDep,
markdown_processor: MarkdownProcessorV2ExternalDep,
file_service: FileServiceV2ExternalDep,
) -> ClaudeProjectsImporter:
"""Create ClaudeProjectsImporter with v2 external_id dependencies."""
return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)
ClaudeProjectsImporterV2ExternalDep = Annotated[
ClaudeProjectsImporter, Depends(get_claude_projects_importer_v2_external)
]
# --- Memory JSON Importer ---
async def get_memory_json_importer(
project_config: ProjectConfigDep,
markdown_processor: MarkdownProcessorDep,
file_service: FileServiceDep,
) -> MemoryJsonImporter:
"""Create MemoryJsonImporter with dependencies."""
return MemoryJsonImporter(project_config.home, markdown_processor, file_service)
MemoryJsonImporterDep = Annotated[MemoryJsonImporter, Depends(get_memory_json_importer)]
async def get_memory_json_importer_v2( # pragma: no cover
project_config: ProjectConfigV2Dep,
markdown_processor: MarkdownProcessorV2Dep,
file_service: FileServiceV2Dep,
) -> MemoryJsonImporter:
"""Create MemoryJsonImporter with v2 dependencies."""
return MemoryJsonImporter(project_config.home, markdown_processor, file_service)
MemoryJsonImporterV2Dep = Annotated[MemoryJsonImporter, Depends(get_memory_json_importer_v2)]
async def get_memory_json_importer_v2_external(
project_config: ProjectConfigV2ExternalDep,
markdown_processor: MarkdownProcessorV2ExternalDep,
file_service: FileServiceV2ExternalDep,
) -> MemoryJsonImporter:
"""Create MemoryJsonImporter with v2 external_id dependencies."""
return MemoryJsonImporter(project_config.home, markdown_processor, file_service)
MemoryJsonImporterV2ExternalDep = Annotated[
MemoryJsonImporter, Depends(get_memory_json_importer_v2_external)
]
```
--------------------------------------------------------------------------------
/src/basic_memory/importers/claude_conversations_importer.py:
--------------------------------------------------------------------------------
```python
"""Claude conversations import service for Basic Memory."""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional
from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
from basic_memory.importers.base import Importer
from basic_memory.schemas.importer import ChatImportResult
from basic_memory.importers.utils import clean_filename, format_timestamp
logger = logging.getLogger(__name__)
class ClaudeConversationsImporter(Importer[ChatImportResult]):
"""Service for importing Claude conversations."""
def handle_error( # pragma: no cover
self, message: str, error: Optional[Exception] = None
) -> ChatImportResult:
"""Return a failed ChatImportResult with an error message."""
error_msg = f"{message}: {error}" if error else message
return ChatImportResult(
import_count={},
success=False,
error_message=error_msg,
conversations=0,
messages=0,
)
async def import_data(
self, source_data, destination_folder: str, **kwargs: Any
) -> ChatImportResult:
"""Import conversations from Claude JSON export.
Args:
source_data: Path to the Claude conversations.json file.
destination_folder: Destination folder within the project.
**kwargs: Additional keyword arguments.
Returns:
ChatImportResult containing statistics and status of the import.
"""
try:
# Ensure the destination folder exists
await self.ensure_folder_exists(destination_folder)
conversations = source_data
# Process each conversation
messages_imported = 0
chats_imported = 0
for chat in conversations:
# Get name, providing default for unnamed conversations
chat_name = chat.get("name") or f"Conversation {chat.get('uuid', 'untitled')}"
# Convert to entity
entity = self._format_chat_content(
folder=destination_folder,
name=chat_name,
messages=chat["chat_messages"],
created_at=chat["created_at"],
modified_at=chat["updated_at"],
)
# Write file using relative path - FileService handles base_path
file_path = f"{entity.frontmatter.metadata['permalink']}.md"
await self.write_entity(entity, file_path)
chats_imported += 1
messages_imported += len(chat["chat_messages"])
return ChatImportResult(
import_count={"conversations": chats_imported, "messages": messages_imported},
success=True,
conversations=chats_imported,
messages=messages_imported,
)
except Exception as e: # pragma: no cover
logger.exception("Failed to import Claude conversations")
return self.handle_error("Failed to import Claude conversations", e)
def _format_chat_content(
self,
folder: str,
name: str,
messages: List[Dict[str, Any]],
created_at: str,
modified_at: str,
) -> EntityMarkdown:
"""Convert chat messages to Basic Memory entity format.
Args:
folder: Destination folder name (relative path).
name: Chat name.
messages: List of chat messages.
created_at: Creation timestamp.
modified_at: Modification timestamp.
Returns:
EntityMarkdown instance representing the conversation.
"""
# Generate permalink using folder name (relative path)
date_prefix = datetime.fromisoformat(created_at.replace("Z", "+00:00")).strftime("%Y%m%d")
clean_title = clean_filename(name)
permalink = f"{folder}/{date_prefix}-{clean_title}"
# Format content
content = self._format_chat_markdown(
name=name,
messages=messages,
created_at=created_at,
modified_at=modified_at,
permalink=permalink,
)
# Create entity
entity = EntityMarkdown(
frontmatter=EntityFrontmatter(
metadata={
"type": "conversation",
"title": name,
"created": created_at,
"modified": modified_at,
"permalink": permalink,
}
),
content=content,
)
return entity
def _format_chat_markdown(
self,
name: str,
messages: List[Dict[str, Any]],
created_at: str,
modified_at: str,
permalink: str,
) -> str:
"""Format chat as clean markdown.
Args:
name: Chat name.
messages: List of chat messages.
created_at: Creation timestamp.
modified_at: Modification timestamp.
permalink: Permalink for the entity.
Returns:
Formatted markdown content.
"""
# Start with frontmatter and title
lines = [
f"# {name}\n",
]
# Add messages
for msg in messages:
# Format timestamp
ts = format_timestamp(msg["created_at"])
# Add message header
lines.append(f"### {msg['sender'].title()} ({ts})")
# Handle message content
content = msg.get("text", "")
if msg.get("content"):
# Filter out None values before joining
content = " ".join(
str(c.get("text", ""))
for c in msg["content"]
if c and c.get("text") is not None
)
lines.append(content)
# Handle attachments
attachments = msg.get("attachments", [])
for attachment in attachments:
if "file_name" in attachment:
lines.append(f"\n**Attachment: {attachment['file_name']}**")
if "extracted_content" in attachment:
lines.append("```")
lines.append(attachment["extracted_content"])
lines.append("```")
# Add spacing between messages
lines.append("")
return "\n".join(lines)
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/status.py:
--------------------------------------------------------------------------------
```python
"""Status command for basic-memory CLI."""
from typing import Set, Dict
from typing import Annotated, Optional
from mcp.server.fastmcp.exceptions import ToolError
import typer
from loguru import logger
from rich.console import Console
from rich.panel import Panel
from rich.tree import Tree
from basic_memory.cli.app import app
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.tools.utils import call_post
from basic_memory.schemas import SyncReportResponse
from basic_memory.mcp.project_context import get_active_project
# Create rich console
console = Console()
def add_files_to_tree(
tree: Tree, paths: Set[str], style: str, checksums: Dict[str, str] | None = None
):
"""Add files to tree, grouped by directory."""
# Group by directory
by_dir = {}
for path in sorted(paths):
parts = path.split("/", 1)
dir_name = parts[0] if len(parts) > 1 else ""
file_name = parts[1] if len(parts) > 1 else parts[0]
by_dir.setdefault(dir_name, []).append((file_name, path))
# Add to tree
for dir_name, files in sorted(by_dir.items()):
if dir_name:
branch = tree.add(f"[bold]{dir_name}/[/bold]")
else:
branch = tree
for file_name, full_path in sorted(files):
if checksums and full_path in checksums:
checksum_short = checksums[full_path][:8]
branch.add(f"[{style}]{file_name}[/{style}] ({checksum_short})")
else:
branch.add(f"[{style}]{file_name}[/{style}]")
def group_changes_by_directory(changes: SyncReportResponse) -> Dict[str, Dict[str, int]]:
"""Group changes by directory for summary view."""
by_dir = {}
for change_type, paths in [
("new", changes.new),
("modified", changes.modified),
("deleted", changes.deleted),
]:
for path in paths:
dir_name = path.split("/", 1)[0]
by_dir.setdefault(dir_name, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
by_dir[dir_name][change_type] += 1
# Handle moves - count in both source and destination directories
for old_path, new_path in changes.moves.items():
old_dir = old_path.split("/", 1)[0]
new_dir = new_path.split("/", 1)[0]
by_dir.setdefault(old_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
by_dir.setdefault(new_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
by_dir[old_dir]["moved"] += 1
if old_dir != new_dir:
by_dir[new_dir]["moved"] += 1
return by_dir
def build_directory_summary(counts: Dict[str, int]) -> str:
"""Build summary string for directory changes."""
parts = []
if counts["new"]:
parts.append(f"[green]+{counts['new']} new[/green]")
if counts["modified"]:
parts.append(f"[yellow]~{counts['modified']} modified[/yellow]")
if counts["moved"]:
parts.append(f"[blue]↔{counts['moved']} moved[/blue]")
if counts["deleted"]:
parts.append(f"[red]-{counts['deleted']} deleted[/red]")
return " ".join(parts)
def display_changes(
project_name: str, title: str, changes: SyncReportResponse, verbose: bool = False
):
"""Display changes using Rich for better visualization."""
tree = Tree(f"{project_name}: {title}")
if changes.total == 0 and not changes.skipped_files:
tree.add("No changes")
console.print(Panel(tree, expand=False))
return
if verbose:
# Full file listing with checksums
if changes.new:
new_branch = tree.add("[green]New Files[/green]")
add_files_to_tree(new_branch, changes.new, "green", changes.checksums)
if changes.modified:
mod_branch = tree.add("[yellow]Modified[/yellow]")
add_files_to_tree(mod_branch, changes.modified, "yellow", changes.checksums)
if changes.moves:
move_branch = tree.add("[blue]Moved[/blue]")
for old_path, new_path in sorted(changes.moves.items()):
move_branch.add(f"[blue]{old_path}[/blue] → [blue]{new_path}[/blue]")
if changes.deleted:
del_branch = tree.add("[red]Deleted[/red]")
add_files_to_tree(del_branch, changes.deleted, "red")
if changes.skipped_files:
skip_branch = tree.add("[red]! Skipped (Circuit Breaker)[/red]")
for skipped in sorted(changes.skipped_files, key=lambda x: x.path):
skip_branch.add(
f"[red]{skipped.path}[/red] "
f"(failures: {skipped.failure_count}, reason: {skipped.reason})"
)
else:
# Show directory summaries
by_dir = group_changes_by_directory(changes)
for dir_name, counts in sorted(by_dir.items()):
summary = build_directory_summary(counts)
if summary: # Only show directories with changes
tree.add(f"[bold]{dir_name}/[/bold] {summary}")
# Show skipped files summary in non-verbose mode
if changes.skipped_files:
skip_count = len(changes.skipped_files)
tree.add(
f"[red]! {skip_count} file{'s' if skip_count != 1 else ''} "
f"skipped due to repeated failures[/red]"
)
console.print(Panel(tree, expand=False))
async def run_status(project: Optional[str] = None, verbose: bool = False): # pragma: no cover
"""Check sync status of files vs database."""
try:
async with get_client() as client:
project_item = await get_active_project(client, project, None)
response = await call_post(client, f"{project_item.project_url}/project/status")
sync_report = SyncReportResponse.model_validate(response.json())
display_changes(project_item.name, "Status", sync_report, verbose)
except (ValueError, ToolError) as e:
console.print(f"[red]Error: {e}[/red]")
raise typer.Exit(1)
@app.command()
def status(
project: Annotated[
Optional[str],
typer.Option(help="The project name."),
] = None,
verbose: bool = typer.Option(False, "--verbose", "-v", help="Show detailed file information"),
):
"""Show sync status between files and database."""
from basic_memory.cli.commands.command_utils import run_with_cleanup
try:
run_with_cleanup(run_status(project, verbose)) # pragma: no cover
except Exception as e:
logger.error(f"Error checking status: {e}")
typer.echo(f"Error checking status: {e}", err=True)
raise typer.Exit(code=1) # pragma: no cover
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/g9a0b3c4d5e6_add_external_id_to_project_and_entity.py:
--------------------------------------------------------------------------------
```python
"""Add external_id UUID column to project and entity tables
Revision ID: g9a0b3c4d5e6
Revises: f8a9b2c3d4e5
Create Date: 2025-12-29 10:00:00.000000
"""
import uuid
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy import text
def column_exists(connection, table: str, column: str) -> bool:
"""Check if a column exists in a table (idempotent migration support)."""
if connection.dialect.name == "postgresql":
result = connection.execute(
text(
"SELECT 1 FROM information_schema.columns "
"WHERE table_name = :table AND column_name = :column"
),
{"table": table, "column": column},
)
return result.fetchone() is not None
else:
# SQLite
result = connection.execute(text(f"PRAGMA table_info({table})"))
columns = [row[1] for row in result]
return column in columns
def index_exists(connection, index_name: str) -> bool:
"""Check if an index exists (idempotent migration support)."""
if connection.dialect.name == "postgresql":
result = connection.execute(
text("SELECT 1 FROM pg_indexes WHERE indexname = :index_name"),
{"index_name": index_name},
)
return result.fetchone() is not None
else:
# SQLite
result = connection.execute(
text("SELECT 1 FROM sqlite_master WHERE type='index' AND name = :index_name"),
{"index_name": index_name},
)
return result.fetchone() is not None
# revision identifiers, used by Alembic.
revision: str = "g9a0b3c4d5e6"
down_revision: Union[str, None] = "f8a9b2c3d4e5"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Add external_id UUID column to project and entity tables.
This migration:
1. Adds external_id column to project table
2. Adds external_id column to entity table
3. Generates UUIDs for existing rows
4. Creates unique indexes on both columns
"""
connection = op.get_bind()
dialect = connection.dialect.name
# -------------------------------------------------------------------------
# Add external_id to project table
# -------------------------------------------------------------------------
if not column_exists(connection, "project", "external_id"):
# Step 1: Add external_id column as nullable first
op.add_column("project", sa.Column("external_id", sa.String(), nullable=True))
# Step 2: Generate UUIDs for existing rows
if dialect == "postgresql":
# Postgres has gen_random_uuid() function
op.execute("""
UPDATE project
SET external_id = gen_random_uuid()::text
WHERE external_id IS NULL
""")
else:
# SQLite: need to generate UUIDs in Python
result = connection.execute(text("SELECT id FROM project WHERE external_id IS NULL"))
for row in result:
new_uuid = str(uuid.uuid4())
connection.execute(
text("UPDATE project SET external_id = :uuid WHERE id = :id"),
{"uuid": new_uuid, "id": row[0]},
)
# Step 3: Make external_id NOT NULL
if dialect == "postgresql":
op.alter_column("project", "external_id", nullable=False)
else:
# SQLite requires batch operations for ALTER COLUMN
with op.batch_alter_table("project") as batch_op:
batch_op.alter_column("external_id", nullable=False)
# Step 4: Create unique index on project.external_id (idempotent)
if not index_exists(connection, "ix_project_external_id"):
op.create_index("ix_project_external_id", "project", ["external_id"], unique=True)
# -------------------------------------------------------------------------
# Add external_id to entity table
# -------------------------------------------------------------------------
if not column_exists(connection, "entity", "external_id"):
# Step 1: Add external_id column as nullable first
op.add_column("entity", sa.Column("external_id", sa.String(), nullable=True))
# Step 2: Generate UUIDs for existing rows
if dialect == "postgresql":
# Postgres has gen_random_uuid() function
op.execute("""
UPDATE entity
SET external_id = gen_random_uuid()::text
WHERE external_id IS NULL
""")
else:
# SQLite: need to generate UUIDs in Python
result = connection.execute(text("SELECT id FROM entity WHERE external_id IS NULL"))
for row in result:
new_uuid = str(uuid.uuid4())
connection.execute(
text("UPDATE entity SET external_id = :uuid WHERE id = :id"),
{"uuid": new_uuid, "id": row[0]},
)
# Step 3: Make external_id NOT NULL
if dialect == "postgresql":
op.alter_column("entity", "external_id", nullable=False)
else:
# SQLite requires batch operations for ALTER COLUMN
with op.batch_alter_table("entity") as batch_op:
batch_op.alter_column("external_id", nullable=False)
# Step 4: Create unique index on entity.external_id (idempotent)
if not index_exists(connection, "ix_entity_external_id"):
op.create_index("ix_entity_external_id", "entity", ["external_id"], unique=True)
def downgrade() -> None:
"""Remove external_id columns from project and entity tables."""
connection = op.get_bind()
dialect = connection.dialect.name
# Drop from entity table
if index_exists(connection, "ix_entity_external_id"):
op.drop_index("ix_entity_external_id", table_name="entity")
if column_exists(connection, "entity", "external_id"):
if dialect == "postgresql":
op.drop_column("entity", "external_id")
else:
with op.batch_alter_table("entity") as batch_op:
batch_op.drop_column("external_id")
# Drop from project table
if index_exists(connection, "ix_project_external_id"):
op.drop_index("ix_project_external_id", table_name="project")
if column_exists(connection, "project", "external_id"):
if dialect == "postgresql":
op.drop_column("project", "external_id")
else:
with op.batch_alter_table("project") as batch_op:
batch_op.drop_column("external_id")
```
--------------------------------------------------------------------------------
/tests/api/test_search_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for search router."""
from datetime import datetime, timezone
import pytest
import pytest_asyncio
from sqlalchemy import text
from basic_memory import db
from basic_memory.schemas import Entity as EntitySchema
from basic_memory.schemas.search import SearchItemType, SearchResponse
@pytest_asyncio.fixture
async def indexed_entity(full_entity, search_service):
"""Create an entity and index it."""
await search_service.index_entity(full_entity)
return full_entity
@pytest.mark.asyncio
async def test_search_basic(client, indexed_entity, project_url):
"""Test basic text search."""
response = await client.post(f"{project_url}/search/", json={"text": "search"})
assert response.status_code == 200
search_results = SearchResponse.model_validate(response.json())
assert len(search_results.results) == 3
found = False
for r in search_results.results:
if r.type == SearchItemType.ENTITY.value:
assert r.permalink == indexed_entity.permalink
found = True
assert found, "Expected to find indexed entity in results"
@pytest.mark.asyncio
async def test_search_basic_pagination(client, indexed_entity, project_url):
"""Test basic text search."""
response = await client.post(
f"{project_url}/search/?page=3&page_size=1", json={"text": "search"}
)
assert response.status_code == 200
search_results = SearchResponse.model_validate(response.json())
assert len(search_results.results) == 1
assert search_results.current_page == 3
assert search_results.page_size == 1
@pytest.mark.asyncio
async def test_search_with_entity_type_filter(client, indexed_entity, project_url):
"""Test search with type filter."""
# Should find with correct type
response = await client.post(
f"{project_url}/search/",
json={"text": "test", "entity_types": [SearchItemType.ENTITY.value]},
)
assert response.status_code == 200
search_results = SearchResponse.model_validate(response.json())
assert len(search_results.results) > 0
# Should find with relation type
response = await client.post(
f"{project_url}/search/",
json={"text": "test", "entity_types": [SearchItemType.RELATION.value]},
)
assert response.status_code == 200
search_results = SearchResponse.model_validate(response.json())
assert len(search_results.results) == 2
@pytest.mark.asyncio
async def test_search_with_type_filter(client, indexed_entity, project_url):
"""Test search with entity type filter."""
# Should find with correct entity type
response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["test"]})
assert response.status_code == 200
search_results = SearchResponse.model_validate(response.json())
assert len(search_results.results) == 1
# Should not find with wrong entity type
response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["note"]})
assert response.status_code == 200
search_results = SearchResponse.model_validate(response.json())
assert len(search_results.results) == 0
@pytest.mark.asyncio
async def test_search_with_date_filter(client, indexed_entity, project_url):
"""Test search with date filter."""
# Should find with past date
past_date = datetime(2020, 1, 1, tzinfo=timezone.utc)
response = await client.post(
f"{project_url}/search/", json={"text": "test", "after_date": past_date.isoformat()}
)
assert response.status_code == 200
search_results = SearchResponse.model_validate(response.json())
# Should not find with future date
future_date = datetime(2030, 1, 1, tzinfo=timezone.utc)
response = await client.post(
f"{project_url}/search/", json={"text": "test", "after_date": future_date.isoformat()}
)
assert response.status_code == 200
search_results = SearchResponse.model_validate(response.json())
assert len(search_results.results) == 0
@pytest.mark.asyncio
async def test_search_empty(search_service, client, project_url):
"""Test search with no matches."""
response = await client.post(f"{project_url}/search/", json={"text": "nonexistent"})
assert response.status_code == 200
search_result = SearchResponse.model_validate(response.json())
assert len(search_result.results) == 0
@pytest.mark.asyncio
async def test_reindex(
client, search_service, entity_service, session_maker, project_url, app_config
):
"""Test reindex endpoint."""
# Skip for Postgres - needs investigation of database connection isolation
from basic_memory.config import DatabaseBackend
if app_config.database_backend == DatabaseBackend.POSTGRES:
pytest.skip("Not yet supported for Postgres - database connection isolation issue")
# Create test entity and document
await entity_service.create_entity(
EntitySchema(
title="TestEntity1",
folder="test",
entity_type="test",
),
)
# Clear search index
async with db.scoped_session(session_maker) as session:
await session.execute(text("DELETE FROM search_index"))
await session.commit()
# Verify nothing is searchable
response = await client.post(f"{project_url}/search/", json={"text": "test"})
search_results = SearchResponse.model_validate(response.json())
assert len(search_results.results) == 0
# Trigger reindex
reindex_response = await client.post(f"{project_url}/search/reindex")
assert reindex_response.status_code == 200
assert reindex_response.json()["status"] == "ok"
# Verify content is searchable again
search_response = await client.post(f"{project_url}/search/", json={"text": "test"})
search_results = SearchResponse.model_validate(search_response.json())
assert len(search_results.results) == 1
@pytest.mark.asyncio
async def test_multiple_filters(client, indexed_entity, project_url):
"""Test search with multiple filters combined."""
response = await client.post(
f"{project_url}/search/",
json={
"text": "test",
"entity_types": [SearchItemType.ENTITY.value],
"types": ["test"],
"after_date": datetime(2020, 1, 1, tzinfo=timezone.utc).isoformat(),
},
)
assert response.status_code == 200
search_result = SearchResponse.model_validate(response.json())
assert len(search_result.results) == 1
result = search_result.results[0]
assert result.permalink == indexed_entity.permalink
assert result.type == SearchItemType.ENTITY.value
assert result.metadata["entity_type"] == "test"
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/format.py:
--------------------------------------------------------------------------------
```python
"""Format command for basic-memory CLI."""
from pathlib import Path
from typing import Annotated, Optional
import typer
from loguru import logger
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from basic_memory.cli.app import app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager, get_project_config
from basic_memory.file_utils import format_file
console = Console()
def is_markdown_extension(path: Path) -> bool:
"""Check if file has a markdown extension."""
return path.suffix.lower() in (".md", ".markdown")
async def format_single_file(file_path: Path, app_config) -> tuple[Path, bool, Optional[str]]:
"""Format a single file.
Returns:
Tuple of (path, success, error_message)
"""
try:
result = await format_file(
file_path, app_config, is_markdown=is_markdown_extension(file_path)
)
if result is not None:
return (file_path, True, None)
else:
return (file_path, False, "No formatter configured or formatting skipped")
except Exception as e:
return (file_path, False, str(e))
async def format_files(
paths: list[Path], app_config, show_progress: bool = True
) -> tuple[int, int, list[tuple[Path, str]]]:
"""Format multiple files.
Returns:
Tuple of (formatted_count, skipped_count, errors)
"""
formatted = 0
skipped = 0
errors: list[tuple[Path, str]] = []
if show_progress:
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
task = progress.add_task("Formatting files...", total=len(paths))
for file_path in paths:
path, success, error = await format_single_file(file_path, app_config)
if success:
formatted += 1
elif error and "No formatter configured" not in error:
errors.append((path, error))
else:
skipped += 1
progress.update(task, advance=1)
else:
for file_path in paths:
path, success, error = await format_single_file(file_path, app_config)
if success:
formatted += 1
elif error and "No formatter configured" not in error:
errors.append((path, error))
else:
skipped += 1
return formatted, skipped, errors
async def run_format(
path: Optional[Path] = None,
project: Optional[str] = None,
) -> None:
"""Run the format command."""
app_config = ConfigManager().config
# Check if formatting is enabled
if (
not app_config.format_on_save
and not app_config.formatter_command
and not app_config.formatters
):
console.print(
"[yellow]No formatters configured. Set format_on_save=true and "
"formatter_command or formatters in your config.[/yellow]"
)
console.print(
"\nExample config (~/.basic-memory/config.json):\n"
' "format_on_save": true,\n'
' "formatter_command": "prettier --write {file}"\n'
)
raise typer.Exit(1)
# Temporarily enable format_on_save for this command
# (so format_file actually runs the formatter)
original_format_on_save = app_config.format_on_save
app_config.format_on_save = True
try:
# Determine which files to format
if path:
# Format specific file or directory
if path.is_file():
files = [path]
elif path.is_dir():
# Find all markdown and json files
files = (
list(path.rglob("*.md"))
+ list(path.rglob("*.json"))
+ list(path.rglob("*.canvas"))
)
else:
console.print(f"[red]Path not found: {path}[/red]")
raise typer.Exit(1)
else:
# Format all files in project
project_config = get_project_config(project)
project_path = Path(project_config.home)
if not project_path.exists():
console.print(f"[red]Project path not found: {project_path}[/red]")
raise typer.Exit(1)
# Find all markdown and json files
files = (
list(project_path.rglob("*.md"))
+ list(project_path.rglob("*.json"))
+ list(project_path.rglob("*.canvas"))
)
if not files:
console.print("[yellow]No files found to format.[/yellow]")
return
console.print(f"Found {len(files)} file(s) to format...")
formatted, skipped, errors = await format_files(files, app_config)
# Print summary
console.print()
if formatted > 0:
console.print(f"[green]Formatted: {formatted} file(s)[/green]")
if skipped > 0:
console.print(f"[dim]Skipped: {skipped} file(s) (no formatter for extension)[/dim]")
if errors:
console.print(f"[red]Errors: {len(errors)} file(s)[/red]")
for path, error in errors:
console.print(f" [red]{path}[/red]: {error}")
finally:
# Restore original setting
app_config.format_on_save = original_format_on_save
@app.command()
def format(
path: Annotated[
Optional[Path],
typer.Argument(help="File or directory to format. Defaults to current project."),
] = None,
project: Annotated[
Optional[str],
typer.Option("--project", "-p", help="Project name to format."),
] = None,
) -> None:
"""Format files using configured formatters.
Uses the formatter_command or formatters settings from your config.
By default, formats all .md, .json, and .canvas files in the current project.
Examples:
basic-memory format # Format all files in current project
basic-memory format --project research # Format files in specific project
basic-memory format notes/meeting.md # Format a specific file
basic-memory format notes/ # Format all files in directory
"""
try:
run_with_cleanup(run_format(path, project))
except Exception as e:
if not isinstance(e, typer.Exit):
logger.error(f"Error formatting files: {e}")
console.print(f"[red]Error formatting files: {e}[/red]")
raise typer.Exit(code=1)
raise
```
--------------------------------------------------------------------------------
/tests/cli/test_import_chatgpt.py:
--------------------------------------------------------------------------------
```python
"""Tests for import_chatgpt command."""
import json
import pytest
from typer.testing import CliRunner
from basic_memory.cli.app import app, import_app
from basic_memory.cli.commands import import_chatgpt # noqa
from basic_memory.config import get_project_config
# Set up CLI runner
runner = CliRunner()
@pytest.fixture
def sample_conversation():
"""Sample ChatGPT conversation data for testing."""
return {
"title": "Test Conversation",
"create_time": 1736616594.24054, # Example timestamp
"update_time": 1736616603.164995,
"mapping": {
"root": {"id": "root", "message": None, "parent": None, "children": ["msg1"]},
"msg1": {
"id": "msg1",
"message": {
"id": "msg1",
"author": {"role": "user", "name": None, "metadata": {}},
"create_time": 1736616594.24054,
"content": {"content_type": "text", "parts": ["Hello, this is a test message"]},
"status": "finished_successfully",
"metadata": {},
},
"parent": "root",
"children": ["msg2"],
},
"msg2": {
"id": "msg2",
"message": {
"id": "msg2",
"author": {"role": "assistant", "name": None, "metadata": {}},
"create_time": 1736616603.164995,
"content": {"content_type": "text", "parts": ["This is a test response"]},
"status": "finished_successfully",
"metadata": {},
},
"parent": "msg1",
"children": [],
},
},
}
@pytest.fixture
def sample_conversation_with_code():
"""Sample conversation with code block."""
conversation = {
"title": "Code Test",
"create_time": 1736616594.24054,
"update_time": 1736616603.164995,
"mapping": {
"root": {"id": "root", "message": None, "parent": None, "children": ["msg1"]},
"msg1": {
"id": "msg1",
"message": {
"id": "msg1",
"author": {"role": "assistant", "name": None, "metadata": {}},
"create_time": 1736616594.24054,
"content": {
"content_type": "code",
"language": "python",
"text": "def hello():\n print('Hello world!')",
},
"status": "finished_successfully",
"metadata": {},
},
"parent": "root",
"children": [],
},
"msg2": {
"id": "msg2",
"message": {
"id": "msg2",
"author": {"role": "assistant", "name": None, "metadata": {}},
"create_time": 1736616594.24054,
"status": "finished_successfully",
"metadata": {},
},
"parent": "root",
"children": [],
},
},
}
return conversation
@pytest.fixture
def sample_conversation_with_hidden():
"""Sample conversation with hidden messages."""
conversation = {
"title": "Hidden Test",
"create_time": 1736616594.24054,
"update_time": 1736616603.164995,
"mapping": {
"root": {
"id": "root",
"message": None,
"parent": None,
"children": ["visible", "hidden"],
},
"visible": {
"id": "visible",
"message": {
"id": "visible",
"author": {"role": "user", "name": None, "metadata": {}},
"create_time": 1736616594.24054,
"content": {"content_type": "text", "parts": ["Visible message"]},
"status": "finished_successfully",
"metadata": {},
},
"parent": "root",
"children": [],
},
"hidden": {
"id": "hidden",
"message": {
"id": "hidden",
"author": {"role": "system", "name": None, "metadata": {}},
"create_time": 1736616594.24054,
"content": {"content_type": "text", "parts": ["Hidden message"]},
"status": "finished_successfully",
"metadata": {"is_visually_hidden_from_conversation": True},
},
"parent": "root",
"children": [],
},
},
}
return conversation
@pytest.fixture
def sample_chatgpt_json(tmp_path, sample_conversation):
"""Create a sample ChatGPT JSON file."""
json_file = tmp_path / "conversations.json"
with open(json_file, "w", encoding="utf-8") as f:
json.dump([sample_conversation], f)
return json_file
def test_import_chatgpt_command_success(tmp_path, sample_chatgpt_json, monkeypatch):
"""Test successful conversation import via command."""
# Set up test environment
monkeypatch.setenv("HOME", str(tmp_path))
# Run import
result = runner.invoke(import_app, ["chatgpt", str(sample_chatgpt_json)])
assert result.exit_code == 0
assert "Import complete" in result.output
assert "Imported 1 conversations" in result.output
assert "Containing 2 messages" in result.output
def test_import_chatgpt_command_invalid_json(tmp_path):
"""Test error handling for invalid JSON."""
# Create invalid JSON file
invalid_file = tmp_path / "invalid.json"
invalid_file.write_text("not json")
result = runner.invoke(import_app, ["chatgpt", str(invalid_file)])
assert result.exit_code == 1
assert "Error during import" in result.output
def test_import_chatgpt_with_custom_folder(tmp_path, sample_chatgpt_json, monkeypatch):
"""Test import with custom conversations folder."""
# Set up test environment
config = get_project_config()
config.home = tmp_path
conversations_folder = "chats"
# Run import
result = runner.invoke(
app,
[
"import",
"chatgpt",
str(sample_chatgpt_json),
"--folder",
conversations_folder,
],
)
assert result.exit_code == 0
# Check files in custom folder
conv_path = tmp_path / conversations_folder / "20250111-Test_Conversation.md"
assert conv_path.exists()
```
--------------------------------------------------------------------------------
/tests/api/v2/test_prompt_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for V2 prompt router endpoints (ID-based)."""
import pytest
import pytest_asyncio
from httpx import AsyncClient
from basic_memory.models import Project
from basic_memory.services.context_service import ContextService
@pytest_asyncio.fixture
async def context_service(entity_repository, search_service, observation_repository):
"""Create a real context service for testing."""
return ContextService(entity_repository, search_service, observation_repository)
@pytest.mark.asyncio
async def test_continue_conversation_endpoint(
client: AsyncClient,
entity_service,
search_service,
context_service,
entity_repository,
test_graph,
v2_project_url: str,
):
"""Test the v2 continue_conversation endpoint with real services."""
# Create request data
request_data = {
"topic": "Root", # This should match our test entity in test_graph
"timeframe": "7d",
"depth": 1,
"related_items_limit": 2,
}
# Call the endpoint
response = await client.post(
f"{v2_project_url}/prompt/continue-conversation", json=request_data
)
# Verify response
assert response.status_code == 200
result = response.json()
assert "prompt" in result
assert "context" in result
# Check content of context
context = result["context"]
assert context["topic"] == "Root"
assert context["timeframe"] == "7d"
assert context["has_results"] is True
assert len(context["hierarchical_results"]) > 0
# Check content of prompt
prompt = result["prompt"]
assert "Continuing conversation on: Root" in prompt
assert "memory retrieval session" in prompt
@pytest.mark.asyncio
async def test_continue_conversation_without_topic(
client: AsyncClient,
entity_service,
search_service,
context_service,
entity_repository,
test_graph,
v2_project_url: str,
):
"""Test v2 continue_conversation without topic - should use recent activity."""
request_data = {"timeframe": "1d", "depth": 1, "related_items_limit": 2}
response = await client.post(
f"{v2_project_url}/prompt/continue-conversation", json=request_data
)
assert response.status_code == 200
result = response.json()
assert "Recent Activity" in result["context"]["topic"]
@pytest.mark.asyncio
async def test_search_prompt_endpoint(
client: AsyncClient, entity_service, search_service, test_graph, v2_project_url: str
):
"""Test the v2 search_prompt endpoint with real services."""
# Create request data
request_data = {
"query": "Root", # This should match our test entity
"timeframe": "7d",
}
# Call the endpoint
response = await client.post(f"{v2_project_url}/prompt/search", json=request_data)
# Verify response
assert response.status_code == 200
result = response.json()
assert "prompt" in result
assert "context" in result
# Check content of context
context = result["context"]
assert context["query"] == "Root"
assert context["timeframe"] == "7d"
assert context["has_results"] is True
assert len(context["results"]) > 0
# Check content of prompt
prompt = result["prompt"]
assert 'Search Results for: "Root"' in prompt
assert "This is a memory search session" in prompt
@pytest.mark.asyncio
async def test_search_prompt_no_results(
client: AsyncClient, entity_service, search_service, v2_project_url: str
):
"""Test the v2 search_prompt endpoint with a query that returns no results."""
# Create request data with a query that shouldn't match anything
request_data = {"query": "NonExistentQuery12345", "timeframe": "7d"}
# Call the endpoint
response = await client.post(f"{v2_project_url}/prompt/search", json=request_data)
# Verify response
assert response.status_code == 200
result = response.json()
# Check content of context
context = result["context"]
assert context["query"] == "NonExistentQuery12345"
assert context["has_results"] is False
assert len(context["results"]) == 0
# Check content of prompt
prompt = result["prompt"]
assert 'Search Results for: "NonExistentQuery12345"' in prompt
assert "I couldn't find any results for this query" in prompt
assert "Opportunity to Capture Knowledge" in prompt
@pytest.mark.asyncio
async def test_error_handling(client: AsyncClient, monkeypatch, v2_project_url: str):
"""Test error handling in v2 endpoints by breaking the template loader."""
# Patch the template loader to raise an exception
def mock_render(*args, **kwargs):
raise Exception("Template error")
# Apply the patch
monkeypatch.setattr("basic_memory.api.template_loader.TemplateLoader.render", mock_render)
# Test continue_conversation error handling
response = await client.post(
f"{v2_project_url}/prompt/continue-conversation",
json={"topic": "test error", "timeframe": "7d"},
)
assert response.status_code == 500
assert "detail" in response.json()
assert "Template error" in response.json()["detail"]
# Test search_prompt error handling
response = await client.post(
f"{v2_project_url}/prompt/search", json={"query": "test error", "timeframe": "7d"}
)
assert response.status_code == 500
assert "detail" in response.json()
assert "Template error" in response.json()["detail"]
@pytest.mark.asyncio
async def test_v2_prompt_endpoints_use_project_id_not_name(
client: AsyncClient, test_project: Project
):
"""Verify v2 prompt endpoints require project ID, not name."""
# Try using project name instead of ID - should fail
response = await client.post(
f"/v2/projects/{test_project.name}/prompt/continue-conversation",
json={"topic": "test", "timeframe": "7d"},
)
# Should get validation error or 404 because name is not a valid integer
assert response.status_code in [404, 422]
# Also test search endpoint
response = await client.post(
f"/v2/projects/{test_project.name}/prompt/search",
json={"query": "test", "timeframe": "7d"},
)
assert response.status_code in [404, 422]
@pytest.mark.asyncio
async def test_prompt_invalid_project_id(client: AsyncClient):
"""Test prompt endpoints with invalid project ID return 404."""
# Test continue-conversation
response = await client.post(
"/v2/projects/999999/prompt/continue-conversation",
json={"topic": "test", "timeframe": "7d"},
)
assert response.status_code == 404
# Test search
response = await client.post(
"/v2/projects/999999/prompt/search",
json={"query": "test", "timeframe": "7d"},
)
assert response.status_code == 404
```
--------------------------------------------------------------------------------
/.claude/commands/release/release.md:
--------------------------------------------------------------------------------
```markdown
# /release - Create Stable Release
Create a stable release using the automated justfile target with comprehensive validation.
## Usage
```
/release <version>
```
**Parameters:**
- `version` (required): Release version like `v0.13.2`
## Implementation
You are an expert release manager for the Basic Memory project. When the user runs `/release`, execute the following steps:
### Step 1: Pre-flight Validation
#### Version Check
1. Check current version in `src/basic_memory/__init__.py`
2. Verify new version format matches `v\d+\.\d+\.\d+` pattern
3. Confirm version is higher than current version
#### Git Status
1. Check current git status for uncommitted changes
2. Verify we're on the `main` branch
3. Confirm no existing tag with this version
#### Documentation Validation
1. **Changelog Check**
- CHANGELOG.md contains entry for target version
- Entry includes all major features and fixes
- Breaking changes are documented
### Step 2: Use Justfile Automation
Execute the automated release process:
```bash
just release <version>
```
The justfile target handles:
- ✅ Version format validation
- ✅ Git status and branch checks
- ✅ Quality checks (`just check` - lint, format, type-check, tests)
- ✅ Version update in `src/basic_memory/__init__.py`
- ✅ Automatic commit with proper message
- ✅ Tag creation and pushing to GitHub
- ✅ Release workflow trigger (automatic on tag push)
The GitHub Actions workflow (`.github/workflows/release.yml`) then:
- ✅ Builds the package using `uv build`
- ✅ Creates GitHub release with auto-generated notes
- ✅ Publishes to PyPI
- ✅ Updates Homebrew formula (stable releases only)
### Step 3: Monitor Release Process
1. Verify tag push triggered the workflow (should start automatically within seconds)
2. Monitor workflow progress at: https://github.com/basicmachines-co/basic-memory/actions
3. Watch for successful completion of both jobs:
- `release` - Builds package and publishes to PyPI
- `homebrew` - Updates Homebrew formula (stable releases only)
4. Check for any workflow failures and investigate logs if needed
### Step 4: Post-Release Validation
#### GitHub Release
1. Verify GitHub release is created at: https://github.com/basicmachines-co/basic-memory/releases/tag/<version>
2. Check that release notes are auto-generated from commits
3. Validate release assets (`.whl` and `.tar.gz` files are attached)
#### PyPI Publication
1. Verify package published at: https://pypi.org/project/basic-memory/<version>/
2. Test installation: `uv tool install basic-memory`
3. Verify installed version: `basic-memory --version`
#### Homebrew Formula (Stable Releases Only)
1. Check formula update at: https://github.com/basicmachines-co/homebrew-basic-memory
2. Verify formula version matches release
3. Test Homebrew installation: `brew install basicmachines-co/basic-memory/basic-memory`
#### Website Updates
**1. basicmachines.co** (`/Users/drew/code/basicmachines.co`)
- **Goal**: Update version number displayed on the homepage
- **Location**: Search for "Basic Memory v0." in the codebase to find version displays
- **What to update**:
- Hero section heading that shows "Basic Memory v{VERSION}"
- "What's New in v{VERSION}" section heading
- Feature highlights array (look for array of features with title/description)
- **Process**:
1. Pull latest from GitHub: `git pull origin main`
2. Create release branch: `git checkout -b release/v{VERSION}`
3. Search codebase for current version number (e.g., "v0.16.1")
4. Update version numbers to new release version
5. Update feature highlights with 3-5 key features from this release (extract from CHANGELOG.md)
6. Commit changes: `git commit -m "chore: update to v{VERSION}"`
7. Push branch: `git push origin release/v{VERSION}`
- **Deploy**: Follow deployment process for basicmachines.co
**2. docs.basicmemory.com** (`/Users/drew/code/docs.basicmemory.com`)
- **Goal**: Add new release notes section to the latest-releases page
- **File**: `src/pages/latest-releases.mdx`
- **What to do**:
1. Pull latest from GitHub: `git pull origin main`
2. Create release branch: `git checkout -b release/v{VERSION}`
3. Read the existing file to understand the format and structure
4. Read `/Users/drew/code/basic-memory/CHANGELOG.md` to get release content
5. Add new release section **at the top** (after MDX imports, before other releases)
6. Follow the existing pattern:
- Heading: `## [v{VERSION}](github-link) — YYYY-MM-DD`
- Focus statement if applicable
- `<Info>` block with highlights (3-5 key items)
- Sections for Features, Bug Fixes, Breaking Changes, etc.
- Link to full changelog at the end
- Separator `---` between releases
7. Commit changes: `git commit -m "docs: add v{VERSION} release notes"`
8. Push branch: `git push origin release/v{VERSION}`
- **Source content**: Extract and format sections from CHANGELOG.md for this version
- **Deploy**: Follow deployment process for docs.basicmemory.com
**4. Announce Release**
- Post to Discord community if significant changes
- Update social media if major release
- Notify users via appropriate channels
## Pre-conditions Check
Before starting, verify:
- [ ] All beta testing is complete
- [ ] Critical bugs are fixed
- [ ] Breaking changes are documented
- [ ] CHANGELOG.md is updated (if needed)
- [ ] Version number follows semantic versioning
## Error Handling
- If `just release` fails, examine the error output for specific issues
- If quality checks fail, fix issues and retry
- If changelog entry missing, update CHANGELOG.md and commit before retrying
- If GitHub Actions fail, check workflow logs for debugging
## Success Output
```
🎉 Stable Release v0.13.2 Created Successfully!
🏷️ Tag: v0.13.2
📋 GitHub Release: https://github.com/basicmachines-co/basic-memory/releases/tag/v0.13.2
📦 PyPI: https://pypi.org/project/basic-memory/0.13.2/
🍺 Homebrew: https://github.com/basicmachines-co/homebrew-basic-memory
🚀 GitHub Actions: Completed
Install with pip/uv:
uv tool install basic-memory
Install with Homebrew:
brew install basicmachines-co/basic-memory/basic-memory
Users can now upgrade:
uv tool upgrade basic-memory
brew upgrade basic-memory
```
## Context
- This creates production releases used by end users
- Must pass all quality gates before proceeding
- Uses the automated justfile target for consistency
- Version is automatically updated in `__init__.py`
- Triggers automated GitHub release with changelog
- Package is published to PyPI for `pip` and `uv` users
- Homebrew formula is automatically updated for stable releases
- Supports multiple installation methods (uv, pip, Homebrew)
```
--------------------------------------------------------------------------------
/src/basic_memory/importers/claude_projects_importer.py:
--------------------------------------------------------------------------------
```python
"""Claude projects import service for Basic Memory."""
import logging
from typing import Any, Dict, Optional
from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
from basic_memory.importers.base import Importer
from basic_memory.schemas.importer import ProjectImportResult
from basic_memory.importers.utils import clean_filename
logger = logging.getLogger(__name__)
class ClaudeProjectsImporter(Importer[ProjectImportResult]):
"""Service for importing Claude projects."""
def handle_error( # pragma: no cover
self, message: str, error: Optional[Exception] = None
) -> ProjectImportResult:
"""Return a failed ProjectImportResult with an error message."""
error_msg = f"{message}: {error}" if error else message
return ProjectImportResult(
import_count={},
success=False,
error_message=error_msg,
documents=0,
prompts=0,
)
async def import_data(
self, source_data, destination_folder: str, **kwargs: Any
) -> ProjectImportResult:
"""Import projects from Claude JSON export.
Args:
source_path: Path to the Claude projects.json file.
destination_folder: Base folder for projects within the project.
**kwargs: Additional keyword arguments.
Returns:
ProjectImportResult containing statistics and status of the import.
"""
try:
# Ensure the base folder exists
if destination_folder:
await self.ensure_folder_exists(destination_folder)
projects = source_data
# Process each project
docs_imported = 0
prompts_imported = 0
for project in projects:
project_dir = clean_filename(project["name"])
# Create project directories using FileService with relative path
docs_dir = (
f"{destination_folder}/{project_dir}/docs"
if destination_folder
else f"{project_dir}/docs"
)
await self.file_service.ensure_directory(docs_dir)
# Import prompt template if it exists
if prompt_entity := self._format_prompt_markdown(project, destination_folder):
# Write file using relative path - FileService handles base_path
file_path = f"{prompt_entity.frontmatter.metadata['permalink']}.md"
await self.write_entity(prompt_entity, file_path)
prompts_imported += 1
# Import project documents
for doc in project.get("docs", []):
entity = self._format_project_markdown(project, doc, destination_folder)
# Write file using relative path - FileService handles base_path
file_path = f"{entity.frontmatter.metadata['permalink']}.md"
await self.write_entity(entity, file_path)
docs_imported += 1
return ProjectImportResult(
import_count={"documents": docs_imported, "prompts": prompts_imported},
success=True,
documents=docs_imported,
prompts=prompts_imported,
)
except Exception as e: # pragma: no cover
logger.exception("Failed to import Claude projects")
return self.handle_error("Failed to import Claude projects", e)
def _format_project_markdown(
self, project: Dict[str, Any], doc: Dict[str, Any], destination_folder: str = ""
) -> EntityMarkdown:
"""Format a project document as a Basic Memory entity.
Args:
project: Project data.
doc: Document data.
destination_folder: Optional destination folder prefix.
Returns:
EntityMarkdown instance representing the document.
"""
# Extract timestamps
created_at = doc.get("created_at") or project["created_at"]
modified_at = project["updated_at"]
# Generate clean names for organization
project_dir = clean_filename(project["name"])
doc_file = clean_filename(doc["filename"])
# Build permalink with optional destination folder prefix
permalink = (
f"{destination_folder}/{project_dir}/docs/{doc_file}"
if destination_folder
else f"{project_dir}/docs/{doc_file}"
)
# Create entity
entity = EntityMarkdown(
frontmatter=EntityFrontmatter(
metadata={
"type": "project_doc",
"title": doc["filename"],
"created": created_at,
"modified": modified_at,
"permalink": permalink,
"project_name": project["name"],
"project_uuid": project["uuid"],
"doc_uuid": doc["uuid"],
}
),
content=doc["content"],
)
return entity
def _format_prompt_markdown(
self, project: Dict[str, Any], destination_folder: str = ""
) -> Optional[EntityMarkdown]:
"""Format project prompt template as a Basic Memory entity.
Args:
project: Project data.
destination_folder: Optional destination folder prefix.
Returns:
EntityMarkdown instance representing the prompt template, or None if
no prompt template exists.
"""
if not project.get("prompt_template"):
return None
# Extract timestamps
created_at = project["created_at"]
modified_at = project["updated_at"]
# Generate clean project directory name
project_dir = clean_filename(project["name"])
# Build permalink with optional destination folder prefix
permalink = (
f"{destination_folder}/{project_dir}/prompt-template"
if destination_folder
else f"{project_dir}/prompt-template"
)
# Create entity
entity = EntityMarkdown(
frontmatter=EntityFrontmatter(
metadata={
"type": "prompt_template",
"title": f"Prompt Template: {project['name']}",
"created": created_at,
"modified": modified_at,
"permalink": permalink,
"project_name": project["name"],
"project_uuid": project["uuid"],
}
),
content=f"# Prompt Template: {project['name']}\n\n{project['prompt_template']}",
)
return entity
```
--------------------------------------------------------------------------------
/src/basic_memory/api/routers/utils.py:
--------------------------------------------------------------------------------
```python
from typing import Optional, List
from basic_memory.repository import EntityRepository
from basic_memory.repository.search_repository import SearchIndexRow
from basic_memory.schemas.memory import (
EntitySummary,
ObservationSummary,
RelationSummary,
MemoryMetadata,
GraphContext,
ContextResult,
)
from basic_memory.schemas.search import SearchItemType, SearchResult
from basic_memory.services import EntityService
from basic_memory.services.context_service import (
ContextResultRow,
ContextResult as ServiceContextResult,
)
async def to_graph_context(
context_result: ServiceContextResult,
entity_repository: EntityRepository,
page: Optional[int] = None,
page_size: Optional[int] = None,
):
# First pass: collect all entity IDs needed for relations
entity_ids_needed: set[int] = set()
for context_item in context_result.results:
for item in (
[context_item.primary_result] + context_item.observations + context_item.related_results
):
if item.type == SearchItemType.RELATION:
if item.from_id: # pyright: ignore
entity_ids_needed.add(item.from_id) # pyright: ignore
if item.to_id:
entity_ids_needed.add(item.to_id)
# Batch fetch all entities at once
entity_lookup: dict[int, str] = {}
if entity_ids_needed:
entities = await entity_repository.find_by_ids(list(entity_ids_needed))
entity_lookup = {e.id: e.title for e in entities}
# Helper function to convert items to summaries
def to_summary(item: SearchIndexRow | ContextResultRow):
match item.type:
case SearchItemType.ENTITY:
return EntitySummary(
entity_id=item.id,
title=item.title, # pyright: ignore
permalink=item.permalink,
content=item.content,
file_path=item.file_path,
created_at=item.created_at,
)
case SearchItemType.OBSERVATION:
return ObservationSummary(
observation_id=item.id,
entity_id=item.entity_id, # pyright: ignore
title=item.title, # pyright: ignore
file_path=item.file_path,
category=item.category, # pyright: ignore
content=item.content, # pyright: ignore
permalink=item.permalink, # pyright: ignore
created_at=item.created_at,
)
case SearchItemType.RELATION:
from_title = entity_lookup.get(item.from_id) if item.from_id else None # pyright: ignore
to_title = entity_lookup.get(item.to_id) if item.to_id else None
return RelationSummary(
relation_id=item.id,
entity_id=item.entity_id, # pyright: ignore
title=item.title, # pyright: ignore
file_path=item.file_path,
permalink=item.permalink, # pyright: ignore
relation_type=item.relation_type, # pyright: ignore
from_entity=from_title,
from_entity_id=item.from_id, # pyright: ignore
to_entity=to_title,
to_entity_id=item.to_id,
created_at=item.created_at,
)
case _: # pragma: no cover
raise ValueError(f"Unexpected type: {item.type}")
# Process the hierarchical results
hierarchical_results = []
for context_item in context_result.results:
# Process primary result
primary_result = to_summary(context_item.primary_result)
# Process observations (always ObservationSummary, validated by context_service)
observations = [to_summary(obs) for obs in context_item.observations]
# Process related results
related = [to_summary(rel) for rel in context_item.related_results]
# Add to hierarchical results
hierarchical_results.append(
ContextResult(
primary_result=primary_result,
observations=observations, # pyright: ignore[reportArgumentType]
related_results=related,
)
)
# Create schema metadata from service metadata
metadata = MemoryMetadata(
uri=context_result.metadata.uri,
types=context_result.metadata.types,
depth=context_result.metadata.depth,
timeframe=context_result.metadata.timeframe,
generated_at=context_result.metadata.generated_at,
primary_count=context_result.metadata.primary_count,
related_count=context_result.metadata.related_count,
total_results=context_result.metadata.primary_count + context_result.metadata.related_count,
total_relations=context_result.metadata.total_relations,
total_observations=context_result.metadata.total_observations,
)
# Return new GraphContext with just hierarchical results
return GraphContext(
results=hierarchical_results,
metadata=metadata,
page=page,
page_size=page_size,
)
async def to_search_results(entity_service: EntityService, results: List[SearchIndexRow]):
search_results = []
for r in results:
entities = await entity_service.get_entities_by_id([r.entity_id, r.from_id, r.to_id]) # pyright: ignore
# Determine which IDs to set based on type
entity_id = None
observation_id = None
relation_id = None
if r.type == SearchItemType.ENTITY:
entity_id = r.id
elif r.type == SearchItemType.OBSERVATION:
observation_id = r.id
entity_id = r.entity_id # Parent entity
elif r.type == SearchItemType.RELATION:
relation_id = r.id
entity_id = r.entity_id # Parent entity
search_results.append(
SearchResult(
title=r.title, # pyright: ignore
type=r.type, # pyright: ignore
permalink=r.permalink,
score=r.score, # pyright: ignore
entity=entities[0].permalink if entities else None,
content=r.content,
file_path=r.file_path,
metadata=r.metadata,
entity_id=entity_id,
observation_id=observation_id,
relation_id=relation_id,
category=r.category,
from_entity=entities[0].permalink if entities else None,
to_entity=entities[1].permalink if len(entities) > 1 else None,
relation_type=r.relation_type,
)
)
return search_results
```
--------------------------------------------------------------------------------
/tests/repository/test_postgres_search_repository.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for PostgresSearchRepository.
These tests only run in Postgres mode (testcontainers) and ensure that the
Postgres tsvector-backed search implementation remains well covered.
"""
from datetime import datetime, timedelta, timezone
import pytest
from basic_memory.repository.postgres_search_repository import PostgresSearchRepository
from basic_memory.repository.search_index_row import SearchIndexRow
from basic_memory.schemas.search import SearchItemType
pytestmark = pytest.mark.postgres
@pytest.fixture(autouse=True)
def _require_postgres_backend(db_backend):
"""Ensure these tests never run under SQLite."""
if db_backend != "postgres":
pytest.skip("PostgresSearchRepository tests require BASIC_MEMORY_TEST_POSTGRES=1")
@pytest.mark.asyncio
async def test_postgres_search_repository_index_and_search(session_maker, test_project):
repo = PostgresSearchRepository(session_maker, project_id=test_project.id)
await repo.init_search_index() # no-op but should be exercised
now = datetime.now(timezone.utc)
row = SearchIndexRow(
project_id=test_project.id,
id=1,
title="Coffee Brewing",
content_stems="coffee brewing pour over",
content_snippet="coffee brewing snippet",
permalink="docs/coffee-brewing",
file_path="docs/coffee-brewing.md",
type="entity",
metadata={"entity_type": "note"},
created_at=now,
updated_at=now,
)
await repo.index_item(row)
# Basic full-text search
results = await repo.search(search_text="coffee")
assert any(r.permalink == "docs/coffee-brewing" for r in results)
# Boolean query path
results = await repo.search(search_text="coffee AND brewing")
assert any(r.permalink == "docs/coffee-brewing" for r in results)
# Title-only search path
results = await repo.search(title="Coffee Brewing")
assert any(r.permalink == "docs/coffee-brewing" for r in results)
# Exact permalink search
results = await repo.search(permalink="docs/coffee-brewing")
assert len(results) == 1
# Permalink pattern match (LIKE)
results = await repo.search(permalink_match="docs/coffee*")
assert any(r.permalink == "docs/coffee-brewing" for r in results)
# Item type filter
results = await repo.search(search_item_types=[SearchItemType.ENTITY])
assert any(r.permalink == "docs/coffee-brewing" for r in results)
# Entity type filter via metadata JSONB containment
results = await repo.search(types=["note"])
assert any(r.permalink == "docs/coffee-brewing" for r in results)
# Date filter (also exercises order_by_clause)
results = await repo.search(after_date=now - timedelta(days=1))
assert any(r.permalink == "docs/coffee-brewing" for r in results)
# Limit/offset
results = await repo.search(limit=1, offset=0)
assert len(results) == 1
@pytest.mark.asyncio
async def test_postgres_search_repository_bulk_index_items_and_prepare_terms(
session_maker, test_project
):
repo = PostgresSearchRepository(session_maker, project_id=test_project.id)
# Empty batch is a no-op
await repo.bulk_index_items([])
# Exercise term preparation helpers
assert "&" in repo._prepare_search_term("coffee AND brewing")
assert repo._prepare_search_term("coff*") == "coff:*"
assert repo._prepare_search_term("()&!:") == "NOSPECIALCHARS:*"
assert repo._prepare_search_term("coffee brewing") == "coffee:* & brewing:*"
assert repo._prepare_single_term(" ") == " "
assert repo._prepare_single_term("coffee", is_prefix=False) == "coffee"
now = datetime.now(timezone.utc)
rows = [
SearchIndexRow(
project_id=test_project.id,
id=10,
title="Pour Over",
content_stems="pour over coffee",
content_snippet="pour over snippet",
permalink="docs/pour-over",
file_path="docs/pour-over.md",
type="entity",
metadata={"entity_type": "note"},
created_at=now,
updated_at=now,
),
SearchIndexRow(
project_id=test_project.id,
id=11,
title="French Press",
content_stems="french press coffee",
content_snippet="french press snippet",
permalink="docs/french-press",
file_path="docs/french-press.md",
type="entity",
metadata={"entity_type": "note"},
created_at=now,
updated_at=now,
),
]
await repo.bulk_index_items(rows)
results = await repo.search(search_text="coffee")
permalinks = {r.permalink for r in results}
assert "docs/pour-over" in permalinks
assert "docs/french-press" in permalinks
@pytest.mark.asyncio
async def test_postgres_search_repository_wildcard_text_and_permalink_match_exact(
session_maker, test_project
):
repo = PostgresSearchRepository(session_maker, project_id=test_project.id)
now = datetime.now(timezone.utc)
await repo.index_item(
SearchIndexRow(
project_id=test_project.id,
id=1,
title="X",
content_stems="x",
content_snippet="x",
permalink="docs/x",
file_path="docs/x.md",
type="entity",
metadata={"entity_type": "note"},
created_at=now,
updated_at=now,
)
)
# search_text="*" should not add tsquery conditions (covers the pass branch)
results = await repo.search(search_text="*")
assert results
# permalink_match without '*' uses exact match branch
results = await repo.search(permalink_match="docs/x")
assert len(results) == 1
@pytest.mark.asyncio
async def test_postgres_search_repository_tsquery_syntax_error_returns_empty(
session_maker, test_project
):
repo = PostgresSearchRepository(session_maker, project_id=test_project.id)
# Trailing boolean operator creates an invalid tsquery; repository should return []
results = await repo.search(search_text="coffee AND")
assert results == []
@pytest.mark.asyncio
async def test_postgres_search_repository_reraises_non_tsquery_db_errors(
session_maker, test_project
):
"""Dropping the search_index table triggers a non-tsquery DB error which should be re-raised."""
repo = PostgresSearchRepository(session_maker, project_id=test_project.id)
from sqlalchemy import text
from basic_memory import db
async with db.scoped_session(session_maker) as session:
await session.execute(text("DROP TABLE search_index"))
await session.commit()
with pytest.raises(Exception):
# Use a non-text query so the generated SQL doesn't include to_tsquery(),
# ensuring we hit the generic "re-raise other db errors" branch.
await repo.search(permalink="docs/anything")
```
--------------------------------------------------------------------------------
/src/basic_memory/schemas/project_info.py:
--------------------------------------------------------------------------------
```python
"""Schema for project info response."""
import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any
from pydantic import Field, BaseModel
from basic_memory.utils import generate_permalink
class ProjectStatistics(BaseModel):
"""Statistics about the current project."""
# Basic counts
total_entities: int = Field(description="Total number of entities in the knowledge base")
total_observations: int = Field(description="Total number of observations across all entities")
total_relations: int = Field(description="Total number of relations between entities")
total_unresolved_relations: int = Field(
description="Number of relations with unresolved targets"
)
# Entity counts by type
entity_types: Dict[str, int] = Field(
description="Count of entities by type (e.g., note, conversation)"
)
# Observation counts by category
observation_categories: Dict[str, int] = Field(
description="Count of observations by category (e.g., tech, decision)"
)
# Relation counts by type
relation_types: Dict[str, int] = Field(
description="Count of relations by type (e.g., implements, relates_to)"
)
# Graph metrics
most_connected_entities: List[Dict[str, Any]] = Field(
description="Entities with the most relations, including their titles and permalinks"
)
isolated_entities: int = Field(description="Number of entities with no relations")
class ActivityMetrics(BaseModel):
"""Activity metrics for the current project."""
# Recent activity
recently_created: List[Dict[str, Any]] = Field(
description="Recently created entities with timestamps"
)
recently_updated: List[Dict[str, Any]] = Field(
description="Recently updated entities with timestamps"
)
# Growth over time (last 6 months)
monthly_growth: Dict[str, Dict[str, int]] = Field(
description="Monthly growth statistics for entities, observations, and relations"
)
class SystemStatus(BaseModel):
"""System status information."""
# Version information
version: str = Field(description="Basic Memory version")
# Database status
database_path: str = Field(description="Path to the SQLite database")
database_size: str = Field(description="Size of the database in human-readable format")
# Watch service status
watch_status: Optional[Dict[str, Any]] = Field(
default=None, description="Watch service status information (if running)"
)
# System information
timestamp: datetime = Field(description="Timestamp when the information was collected")
class ProjectInfoResponse(BaseModel):
"""Response for the project_info tool."""
# Project configuration
project_name: str = Field(description="Name of the current project")
project_path: str = Field(description="Path to the current project files")
available_projects: Dict[str, Dict[str, Any]] = Field(
description="Map of configured project names to detailed project information"
)
default_project: str = Field(description="Name of the default project")
# Statistics
statistics: ProjectStatistics = Field(description="Statistics about the knowledge base")
# Activity metrics
activity: ActivityMetrics = Field(description="Activity and growth metrics")
# System status
system: SystemStatus = Field(description="System and service status information")
class ProjectInfoRequest(BaseModel):
"""Request model for switching projects."""
name: str = Field(..., description="Name of the project to switch to")
path: str = Field(..., description="Path to the project directory")
set_default: bool = Field(..., description="Set the project as the default")
class WatchEvent(BaseModel):
timestamp: datetime
path: str
action: str # new, delete, etc
status: str # success, error
checksum: Optional[str]
error: Optional[str] = None
class WatchServiceState(BaseModel):
# Service status
running: bool = False
start_time: datetime = datetime.now() # Use directly with Pydantic model
pid: int = os.getpid() # Use directly with Pydantic model
# Stats
error_count: int = 0
last_error: Optional[datetime] = None
last_scan: Optional[datetime] = None
# File counts
synced_files: int = 0
# Recent activity
recent_events: List[WatchEvent] = [] # Use directly with Pydantic model
def add_event(
self,
path: str,
action: str,
status: str,
checksum: Optional[str] = None,
error: Optional[str] = None,
) -> WatchEvent: # pragma: no cover
event = WatchEvent(
timestamp=datetime.now(),
path=path,
action=action,
status=status,
checksum=checksum,
error=error,
)
self.recent_events.insert(0, event)
self.recent_events = self.recent_events[:100] # Keep last 100
return event
def record_error(self, error: str): # pragma: no cover
self.error_count += 1
self.add_event(path="", action="sync", status="error", error=error)
self.last_error = datetime.now()
class ProjectWatchStatus(BaseModel):
"""Project with its watch status."""
name: str = Field(..., description="Name of the project")
path: str = Field(..., description="Path to the project")
watch_status: Optional[WatchServiceState] = Field(
None, description="Watch status information for the project"
)
class ProjectItem(BaseModel):
"""Simple representation of a project."""
id: int
external_id: str # UUID string for API references (required after migration)
name: str
path: str
is_default: bool = False
@property
def permalink(self) -> str: # pragma: no cover
return generate_permalink(self.name)
@property
def home(self) -> Path: # pragma: no cover
return Path(self.path).expanduser()
@property
def project_url(self) -> str: # pragma: no cover
return f"/{generate_permalink(self.name)}"
class ProjectList(BaseModel):
"""Response model for listing projects."""
projects: List[ProjectItem]
default_project: str
class ProjectStatusResponse(BaseModel):
"""Response model for switching projects."""
message: str = Field(..., description="Status message about the project switch")
status: str = Field(..., description="Status of the switch (success or error)")
default: bool = Field(..., description="True if the project was set as the default")
old_project: Optional[ProjectItem] = Field(
None, description="Information about the project being switched from"
)
new_project: Optional[ProjectItem] = Field(
None, description="Information about the project being switched to"
)
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/env.py:
--------------------------------------------------------------------------------
```python
"""Alembic environment configuration."""
import asyncio
import os
from logging.config import fileConfig
# Allow nested event loops (needed for pytest-asyncio and other async contexts)
# Note: nest_asyncio doesn't work with uvloop or Python 3.14+, so we handle those cases separately
import sys
if sys.version_info < (3, 14):
try:
import nest_asyncio
nest_asyncio.apply()
except (ImportError, ValueError):
# nest_asyncio not available or can't patch this loop type (e.g., uvloop)
pass
# For Python 3.14+, we rely on the thread-based fallback in run_migrations_online()
from sqlalchemy import engine_from_config, pool
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from alembic import context
from basic_memory.config import ConfigManager
# Trigger: only set test env when actually running under pytest
# Why: alembic/env.py is imported during normal operations (MCP server startup, migrations)
# but we only want test behavior during actual test runs
# Outcome: prevents is_test_env from returning True in production, enabling watch service
if os.getenv("PYTEST_CURRENT_TEST") is not None:
os.environ["BASIC_MEMORY_ENV"] = "test"
# Import after setting environment variable # noqa: E402
from basic_memory.models import Base # noqa: E402
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Load app config - this will read environment variables (BASIC_MEMORY_DATABASE_BACKEND, etc.)
# due to Pydantic's env_prefix="BASIC_MEMORY_" setting
app_config = ConfigManager().config
# Set the SQLAlchemy URL based on database backend configuration
# If the URL is already set in config (e.g., from run_migrations), use that
# Otherwise, get it from app config
# Note: alembic.ini has a placeholder URL "driver://user:pass@localhost/dbname" that we need to override
current_url = config.get_main_option("sqlalchemy.url")
if not current_url or current_url == "driver://user:pass@localhost/dbname":
from basic_memory.db import DatabaseType
sqlalchemy_url = DatabaseType.get_db_url(
app_config.database_path, DatabaseType.FILESYSTEM, app_config
)
config.set_main_option("sqlalchemy.url", sqlalchemy_url)
# Interpret the config file for Python logging.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata
# Add this function to tell Alembic what to include/exclude
def include_object(object, name, type_, reflected, compare_to):
# Ignore SQLite FTS tables
if type_ == "table" and name.startswith("search_index"):
return False
return True
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
include_object=include_object,
render_as_batch=True,
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
"""Execute migrations with the given connection."""
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object=include_object,
render_as_batch=True,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations(connectable):
"""Run migrations asynchronously with AsyncEngine."""
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
Supports both sync engines (SQLite) and async engines (PostgreSQL with asyncpg).
"""
# Check if a connection/engine was provided (e.g., from run_migrations)
connectable = context.config.attributes.get("connection", None)
if connectable is None:
# No connection provided, create engine from config
url = context.config.get_main_option("sqlalchemy.url")
# Check if it's an async URL (sqlite+aiosqlite or postgresql+asyncpg)
if url and ("+asyncpg" in url or "+aiosqlite" in url):
# Create async engine for asyncpg or aiosqlite
connectable = create_async_engine(
url,
poolclass=pool.NullPool,
future=True,
)
else:
# Create sync engine for regular sqlite or postgresql
connectable = engine_from_config(
context.config.get_section(context.config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
# Handle async engines (PostgreSQL with asyncpg)
if isinstance(connectable, AsyncEngine):
# Try to run async migrations
# nest_asyncio allows asyncio.run() from within event loops, but doesn't work with uvloop
try:
asyncio.run(run_async_migrations(connectable))
except RuntimeError as e:
if "cannot be called from a running event loop" in str(e):
# We're in a running event loop (likely uvloop) - need to use a different approach
# Create a new thread to run the async migrations
import concurrent.futures
def run_in_thread():
"""Run async migrations in a new event loop in a separate thread."""
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
new_loop.run_until_complete(run_async_migrations(connectable))
finally:
new_loop.close()
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_in_thread)
future.result() # Wait for completion and re-raise any exceptions
else:
raise
else:
# Handle sync engines (SQLite) or sync connections
if hasattr(connectable, "connect"):
# It's an engine, get a connection
with connectable.connect() as connection:
do_run_migrations(connection)
else:
# It's already a connection
do_run_migrations(connectable)
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
```
--------------------------------------------------------------------------------
/tests/cli/test_import_claude_conversations.py:
--------------------------------------------------------------------------------
```python
"""Tests for import_claude command (chat conversations)."""
import json
import pytest
from typer.testing import CliRunner
from basic_memory.cli.app import app
from basic_memory.cli.commands import import_claude_conversations # noqa
from basic_memory.config import get_project_config
# Set up CLI runner
runner = CliRunner()
@pytest.fixture
def sample_conversation():
"""Sample conversation data for testing."""
return {
"uuid": "test-uuid",
"name": "Test Conversation",
"created_at": "2025-01-05T20:55:32.499880+00:00",
"updated_at": "2025-01-05T20:56:39.477600+00:00",
"chat_messages": [
{
"uuid": "msg-1",
"text": "Hello, this is a test",
"sender": "human",
"created_at": "2025-01-05T20:55:32.499880+00:00",
"content": [{"type": "text", "text": "Hello, this is a test"}],
},
{
"uuid": "msg-2",
"text": "Response to test",
"sender": "assistant",
"created_at": "2025-01-05T20:55:40.123456+00:00",
"content": [{"type": "text", "text": "Response to test"}],
},
],
}
@pytest.fixture
def sample_conversations_json(tmp_path, sample_conversation):
"""Create a sample conversations.json file."""
json_file = tmp_path / "conversations.json"
with open(json_file, "w", encoding="utf-8") as f:
json.dump([sample_conversation], f)
return json_file
def test_import_conversations_command_file_not_found(tmp_path):
"""Test error handling for nonexistent file."""
nonexistent = tmp_path / "nonexistent.json"
result = runner.invoke(app, ["import", "claude", "conversations", str(nonexistent)])
assert result.exit_code == 1
assert "File not found" in result.output
def test_import_conversations_command_success(tmp_path, sample_conversations_json, monkeypatch):
"""Test successful conversation import via command."""
# Set up test environment
monkeypatch.setenv("HOME", str(tmp_path))
# Run import
result = runner.invoke(
app, ["import", "claude", "conversations", str(sample_conversations_json)]
)
assert result.exit_code == 0
assert "Import complete" in result.output
assert "Imported 1 conversations" in result.output
assert "Containing 2 messages" in result.output
def test_import_conversations_command_invalid_json(tmp_path):
"""Test error handling for invalid JSON."""
# Create invalid JSON file
invalid_file = tmp_path / "invalid.json"
invalid_file.write_text("not json")
result = runner.invoke(app, ["import", "claude", "conversations", str(invalid_file)])
assert result.exit_code == 1
assert "Error during import" in result.output
def test_import_conversations_with_custom_folder(tmp_path, sample_conversations_json, monkeypatch):
"""Test import with custom conversations folder."""
# Set up test environment
config = get_project_config()
config.home = tmp_path
conversations_folder = "chats"
# Run import
result = runner.invoke(
app,
[
"import",
"claude",
"conversations",
str(sample_conversations_json),
"--folder",
conversations_folder,
],
)
assert result.exit_code == 0
# Check files in custom folder
conv_path = tmp_path / conversations_folder / "20250105-Test_Conversation.md"
assert conv_path.exists()
def test_import_conversation_with_attachments(tmp_path):
"""Test importing conversation with attachments."""
# Create conversation with attachments
conversation = {
"uuid": "test-uuid",
"name": "Test With Attachments",
"created_at": "2025-01-05T20:55:32.499880+00:00",
"updated_at": "2025-01-05T20:56:39.477600+00:00",
"chat_messages": [
{
"uuid": "msg-1",
"text": "Here's a file",
"sender": "human",
"created_at": "2025-01-05T20:55:32.499880+00:00",
"content": [{"type": "text", "text": "Here's a file"}],
"attachments": [
{"file_name": "test.txt", "extracted_content": "Test file content"}
],
}
],
}
json_file = tmp_path / "with_attachments.json"
with open(json_file, "w", encoding="utf-8") as f:
json.dump([conversation], f)
config = get_project_config()
# Set up environment
config.home = tmp_path
# Run import
result = runner.invoke(app, ["import", "claude", "conversations", str(json_file)])
assert result.exit_code == 0
# Check attachment formatting
conv_path = tmp_path / "conversations/20250105-Test_With_Attachments.md"
content = conv_path.read_text(encoding="utf-8")
assert "**Attachment: test.txt**" in content
assert "```" in content
assert "Test file content" in content
def test_import_conversation_with_none_text_values(tmp_path):
"""Test importing conversation with None text values in content array (issue #236)."""
# Create conversation with None text values
conversation = {
"uuid": "test-uuid",
"name": "Test With None Text",
"created_at": "2025-01-05T20:55:32.499880+00:00",
"updated_at": "2025-01-05T20:56:39.477600+00:00",
"chat_messages": [
{
"uuid": "msg-1",
"text": None,
"sender": "human",
"created_at": "2025-01-05T20:55:32.499880+00:00",
"content": [
{"type": "text", "text": "Valid text here"},
{"type": "text", "text": None}, # This caused the TypeError
{"type": "text", "text": "More valid text"},
],
},
{
"uuid": "msg-2",
"text": None,
"sender": "assistant",
"created_at": "2025-01-05T20:55:40.123456+00:00",
"content": [
{"type": "text", "text": None}, # All None case
{"type": "text", "text": None},
],
},
],
}
json_file = tmp_path / "with_none_text.json"
with open(json_file, "w", encoding="utf-8") as f:
json.dump([conversation], f)
config = get_project_config()
config.home = tmp_path
# Run import - should not fail with TypeError
result = runner.invoke(app, ["import", "claude", "conversations", str(json_file)])
assert result.exit_code == 0
# Check that valid text is preserved and None values are filtered out
conv_path = tmp_path / "conversations/20250105-Test_With_None_Text.md"
assert conv_path.exists()
content = conv_path.read_text(encoding="utf-8")
assert "Valid text here" in content
assert "More valid text" in content
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/chatgpt_tools.py:
--------------------------------------------------------------------------------
```python
"""ChatGPT-compatible MCP tools for Basic Memory.
These adapters expose Basic Memory's search/fetch functionality using the exact
tool names and response structure OpenAI's MCP clients expect: each call returns
a list containing a single `{"type": "text", "text": "{...json...}"}` item.
"""
import json
from typing import Any, Dict, List, Optional
from loguru import logger
from fastmcp import Context
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.search import search_notes
from basic_memory.mcp.tools.read_note import read_note
from basic_memory.schemas.search import SearchResponse
from basic_memory.config import ConfigManager
from basic_memory.telemetry import track_mcp_tool
def _format_search_results_for_chatgpt(results: SearchResponse) -> List[Dict[str, Any]]:
"""Format search results according to ChatGPT's expected schema.
Returns a list of result objects with id, title, and url fields.
"""
formatted_results = []
for result in results.results:
formatted_result = {
"id": result.permalink or f"doc-{len(formatted_results)}",
"title": result.title if result.title and result.title.strip() else "Untitled",
"url": result.permalink or "",
}
formatted_results.append(formatted_result)
return formatted_results
def _format_document_for_chatgpt(
content: str, identifier: str, title: Optional[str] = None
) -> Dict[str, Any]:
"""Format document content according to ChatGPT's expected schema.
Returns a document object with id, title, text, url, and metadata fields.
"""
# Extract title from markdown content if not provided
if not title and isinstance(content, str):
lines = content.split("\n")
if lines and lines[0].startswith("# "):
title = lines[0][2:].strip()
else:
title = identifier.split("/")[-1].replace("-", " ").title()
# Ensure title is never None
if not title:
title = "Untitled Document"
# Handle error cases
if isinstance(content, str) and content.lstrip().startswith("# Note Not Found"):
return {
"id": identifier,
"title": title or "Document Not Found",
"text": content,
"url": identifier,
"metadata": {"error": "Document not found"},
}
return {
"id": identifier,
"title": title or "Untitled Document",
"text": content,
"url": identifier,
"metadata": {"format": "markdown"},
}
@mcp.tool(description="Search for content across the knowledge base")
async def search(
query: str,
context: Context | None = None,
) -> List[Dict[str, Any]]:
"""ChatGPT/OpenAI MCP search adapter returning a single text content item.
Args:
query: Search query (full-text syntax supported by `search_notes`)
context: Optional FastMCP context passed through for auth/session data
Returns:
List with one dict: `{ "type": "text", "text": "{...JSON...}" }`
where the JSON body contains `results`, `total_count`, and echo of `query`.
"""
track_mcp_tool("search")
logger.info(f"ChatGPT search request: query='{query}'")
try:
# ChatGPT tools don't expose project parameter, so use default project
config = ConfigManager().config
default_project = config.default_project
# Call underlying search_notes with sensible defaults for ChatGPT
results = await search_notes.fn(
query=query,
project=default_project, # Use default project for ChatGPT
page=1,
page_size=10, # Reasonable default for ChatGPT consumption
search_type="text", # Default to full-text search
context=context,
)
# Handle string error responses from search_notes
if isinstance(results, str):
logger.warning(f"Search failed with error: {results[:100]}...")
search_results = {
"results": [],
"error": "Search failed",
"error_details": results[:500], # Truncate long error messages
}
else:
# Format successful results for ChatGPT
formatted_results = _format_search_results_for_chatgpt(results)
search_results = {
"results": formatted_results,
"total_count": len(results.results), # Use actual count from results
"query": query,
}
logger.info(f"Search completed: {len(formatted_results)} results returned")
# Return in MCP content array format as required by OpenAI
return [{"type": "text", "text": json.dumps(search_results, ensure_ascii=False)}]
except Exception as e:
logger.error(f"ChatGPT search failed for query '{query}': {e}")
error_results = {
"results": [],
"error": "Internal search error",
"error_message": str(e)[:200],
}
return [{"type": "text", "text": json.dumps(error_results, ensure_ascii=False)}]
@mcp.tool(description="Fetch the full contents of a search result document")
async def fetch(
id: str,
context: Context | None = None,
) -> List[Dict[str, Any]]:
"""ChatGPT/OpenAI MCP fetch adapter returning a single text content item.
Args:
id: Document identifier (permalink, title, or memory URL)
context: Optional FastMCP context passed through for auth/session data
Returns:
List with one dict: `{ "type": "text", "text": "{...JSON...}" }`
where the JSON body includes `id`, `title`, `text`, `url`, and metadata.
"""
track_mcp_tool("fetch")
logger.info(f"ChatGPT fetch request: id='{id}'")
try:
# ChatGPT tools don't expose project parameter, so use default project
config = ConfigManager().config
default_project = config.default_project
# Call underlying read_note function
content = await read_note.fn(
identifier=id,
project=default_project, # Use default project for ChatGPT
page=1,
page_size=10, # Default pagination
context=context,
)
# Format the document for ChatGPT
document = _format_document_for_chatgpt(content, id)
logger.info(f"Fetch completed: id='{id}', content_length={len(document.get('text', ''))}")
# Return in MCP content array format as required by OpenAI
return [{"type": "text", "text": json.dumps(document, ensure_ascii=False)}]
except Exception as e:
logger.error(f"ChatGPT fetch failed for id '{id}': {e}")
error_document = {
"id": id,
"title": "Fetch Error",
"text": f"Failed to fetch document: {str(e)[:200]}",
"url": id,
"metadata": {"error": "Fetch failed"},
}
return [{"type": "text", "text": json.dumps(error_document, ensure_ascii=False)}]
```