#
tokens: 48515/50000 24/416 files (page 5/27)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 5 of 27. Use http://codebase.md/basicmachines-co/basic-memory?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── commands
│   │   ├── release
│   │   │   ├── beta.md
│   │   │   ├── changelog.md
│   │   │   ├── release-check.md
│   │   │   └── release.md
│   │   ├── spec.md
│   │   └── test-live.md
│   └── settings.json
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── ARCHITECTURE.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   ├── Docker.md
│   └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 6830751f5fb6_merge_multiple_heads.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       │       ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│       │       └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── container.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   ├── template_loader.py
│       │   └── v2
│       │       ├── __init__.py
│       │       └── routers
│       │           ├── __init__.py
│       │           ├── directory_router.py
│       │           ├── importer_router.py
│       │           ├── knowledge_router.py
│       │           ├── memory_router.py
│       │           ├── project_router.py
│       │           ├── prompt_router.py
│       │           ├── resource_router.py
│       │           └── search_router.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── format.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── telemetry.py
│       │   │   └── tool.py
│       │   ├── container.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps
│       │   ├── __init__.py
│       │   ├── config.py
│       │   ├── db.py
│       │   ├── importers.py
│       │   ├── projects.py
│       │   ├── repositories.py
│       │   └── services.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── clients
│       │   │   ├── __init__.py
│       │   │   ├── directory.py
│       │   │   ├── knowledge.py
│       │   │   ├── memory.py
│       │   │   ├── project.py
│       │   │   ├── resource.py
│       │   │   └── search.py
│       │   ├── container.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── project_resolver.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── postgres_search_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   ├── search_index_row.py
│       │   ├── search_repository_base.py
│       │   ├── search_repository.py
│       │   └── sqlite_search_repository.py
│       ├── runtime.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   ├── sync_report.py
│       │   └── v2
│       │       ├── __init__.py
│       │       ├── entity.py
│       │       └── resource.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── coordinator.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── telemetry.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   └── test_disable_permalinks_integration.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_api_container.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   ├── test_template_loader.py
│   │   └── v2
│   │       ├── __init__.py
│   │       ├── conftest.py
│   │       ├── test_directory_router.py
│   │       ├── test_importer_router.py
│   │       ├── test_knowledge_router.py
│   │       ├── test_memory_router.py
│   │       ├── test_project_router.py
│   │       ├── test_prompt_router.py
│   │       ├── test_resource_router.py
│   │       └── test_search_router.py
│   ├── cli
│   │   ├── cloud
│   │   │   ├── test_cloud_api_client_and_utils.py
│   │   │   ├── test_rclone_config_and_bmignore_filters.py
│   │   │   └── test_upload_path.py
│   │   ├── conftest.py
│   │   ├── test_auth_cli_auth.py
│   │   ├── test_cli_container.py
│   │   ├── test_cli_exit.py
│   │   ├── test_cli_tool_exit.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_conversation_indexing.py
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── clients
│   │   │   ├── __init__.py
│   │   │   └── test_clients.py
│   │   ├── conftest.py
│   │   ├── test_async_client_modes.py
│   │   ├── test_mcp_container.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_project_context.py
│   │   ├── test_prompts.py
│   │   ├── test_recent_activity_prompt_modes.py
│   │   ├── test_resources.py
│   │   ├── test_server_lifespan_branches.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_project_management.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note_kebab_filenames.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── README.md
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_postgres_search_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_relation_response_reference_resolution.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization_cloud_mode_branches.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_coordinator.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_atomic_adds.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_project_resolver.py
│   ├── test_rclone_commands.py
│   ├── test_runtime.py
│   ├── test_telemetry.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_timezone_utils.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/canvas.py:
--------------------------------------------------------------------------------

```python
  1 | """Canvas creation tool for Basic Memory MCP server.
  2 | 
  3 | This tool creates Obsidian canvas files (.canvas) using the JSON Canvas 1.0 spec.
  4 | """
  5 | 
  6 | import json
  7 | from typing import Dict, List, Any, Optional
  8 | 
  9 | from loguru import logger
 10 | from fastmcp import Context
 11 | 
 12 | from basic_memory.mcp.async_client import get_client
 13 | from basic_memory.mcp.project_context import get_active_project
 14 | from basic_memory.mcp.server import mcp
 15 | from basic_memory.mcp.tools.utils import call_put, call_post, resolve_entity_id
 16 | from basic_memory.telemetry import track_mcp_tool
 17 | 
 18 | 
 19 | @mcp.tool(
 20 |     description="Create an Obsidian canvas file to visualize concepts and connections.",
 21 | )
 22 | async def canvas(
 23 |     nodes: List[Dict[str, Any]],
 24 |     edges: List[Dict[str, Any]],
 25 |     title: str,
 26 |     folder: str,
 27 |     project: Optional[str] = None,
 28 |     context: Context | None = None,
 29 | ) -> str:
 30 |     """Create an Obsidian canvas file with the provided nodes and edges.
 31 | 
 32 |     This tool creates a .canvas file compatible with Obsidian's Canvas feature,
 33 |     allowing visualization of relationships between concepts or documents.
 34 | 
 35 |     Project Resolution:
 36 |     Server resolves projects in this order: Single Project Mode → project parameter → default project.
 37 |     If project unknown, use list_memory_projects() or recent_activity() first.
 38 | 
 39 |     For the full JSON Canvas 1.0 specification, see the 'spec://canvas' resource.
 40 | 
 41 |     Args:
 42 |         project: Project name to create canvas in. Optional - server will resolve using hierarchy.
 43 |                 If unknown, use list_memory_projects() to discover available projects.
 44 |         nodes: List of node objects following JSON Canvas 1.0 spec
 45 |         edges: List of edge objects following JSON Canvas 1.0 spec
 46 |         title: The title of the canvas (will be saved as title.canvas)
 47 |         folder: Folder path relative to project root where the canvas should be saved.
 48 |                 Use forward slashes (/) as separators. Examples: "diagrams", "projects/2025", "visual/maps"
 49 |         context: Optional FastMCP context for performance caching.
 50 | 
 51 |     Returns:
 52 |         A summary of the created canvas file
 53 | 
 54 |     Important Notes:
 55 |     - When referencing files, use the exact file path as shown in Obsidian
 56 |       Example: "folder/Document Name.md" (not permalink format)
 57 |     - For file nodes, the "file" attribute must reference an existing file
 58 |     - Nodes require id, type, x, y, width, height properties
 59 |     - Edges require id, fromNode, toNode properties
 60 |     - Position nodes in a logical layout (x,y coordinates in pixels)
 61 |     - Use color attributes ("1"-"6" or hex) for visual organization
 62 | 
 63 |     Basic Structure:
 64 |     ```json
 65 |     {
 66 |       "nodes": [
 67 |         {
 68 |           "id": "node1",
 69 |           "type": "file",  // Options: "file", "text", "link", "group"
 70 |           "file": "folder/Document.md",
 71 |           "x": 0,
 72 |           "y": 0,
 73 |           "width": 400,
 74 |           "height": 300
 75 |         }
 76 |       ],
 77 |       "edges": [
 78 |         {
 79 |           "id": "edge1",
 80 |           "fromNode": "node1",
 81 |           "toNode": "node2",
 82 |           "label": "connects to"
 83 |         }
 84 |       ]
 85 |     }
 86 |     ```
 87 | 
 88 |     Examples:
 89 |         # Create canvas in project
 90 |         canvas("my-project", nodes=[...], edges=[...], title="My Canvas", folder="diagrams")
 91 | 
 92 |         # Create canvas in work project
 93 |         canvas("work-project", nodes=[...], edges=[...], title="Process Flow", folder="visual/maps")
 94 | 
 95 |     Raises:
 96 |         ToolError: If project doesn't exist or folder path is invalid
 97 |     """
 98 |     track_mcp_tool("canvas")
 99 |     async with get_client() as client:
100 |         active_project = await get_active_project(client, project, context)
101 | 
102 |         # Ensure path has .canvas extension
103 |         file_title = title if title.endswith(".canvas") else f"{title}.canvas"
104 |         file_path = f"{folder}/{file_title}"
105 | 
106 |         # Create canvas data structure
107 |         canvas_data = {"nodes": nodes, "edges": edges}
108 | 
109 |         # Convert to JSON
110 |         canvas_json = json.dumps(canvas_data, indent=2)
111 | 
112 |         # Try to create the canvas file first (optimistic create)
113 |         logger.info(f"Creating canvas file: {file_path} in project {project}")
114 |         try:
115 |             response = await call_post(
116 |                 client,
117 |                 f"/v2/projects/{active_project.external_id}/resource",
118 |                 json={"file_path": file_path, "content": canvas_json},
119 |             )
120 |             action = "Created"
121 |         except Exception as e:
122 |             # If creation failed due to conflict (already exists), try to update
123 |             if (
124 |                 "409" in str(e)
125 |                 or "conflict" in str(e).lower()
126 |                 or "already exists" in str(e).lower()
127 |             ):
128 |                 logger.info(f"Canvas file exists, updating instead: {file_path}")
129 |                 try:
130 |                     entity_id = await resolve_entity_id(
131 |                         client, active_project.external_id, file_path
132 |                     )
133 |                     # For update, send content in JSON body
134 |                     response = await call_put(
135 |                         client,
136 |                         f"/v2/projects/{active_project.external_id}/resource/{entity_id}",
137 |                         json={"content": canvas_json},
138 |                     )
139 |                     action = "Updated"
140 |                 except Exception as update_error:  # pragma: no cover
141 |                     # Re-raise the original error if update also fails
142 |                     raise e from update_error  # pragma: no cover
143 |             else:
144 |                 # Re-raise if it's not a conflict error
145 |                 raise  # pragma: no cover
146 | 
147 |         # Parse response
148 |         result = response.json()
149 |         logger.debug(result)
150 | 
151 |         # Build summary
152 |         summary = [f"# {action}: {file_path}", "\nThe canvas is ready to open in Obsidian."]
153 | 
154 |         return "\n".join(summary)
155 | 
```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_read_content.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for the read_content MCP tool security validation.
  2 | 
  3 | We keep these tests focused on path boundary/security checks, and rely on
  4 | `tests/mcp/test_tool_resource.py` for full-stack content-type behavior.
  5 | """
  6 | 
  7 | from __future__ import annotations
  8 | 
  9 | import pytest
 10 | from mcp.server.fastmcp.exceptions import ToolError
 11 | 
 12 | from basic_memory.mcp.tools import read_content, write_note
 13 | 
 14 | 
 15 | @pytest.mark.asyncio
 16 | async def test_read_content_blocks_path_traversal_unix(client, test_project):
 17 |     attack_paths = [
 18 |         "../secrets.txt",
 19 |         "../../etc/passwd",
 20 |         "../../../root/.ssh/id_rsa",
 21 |         "notes/../../../etc/shadow",
 22 |         "folder/../../outside/file.md",
 23 |         "../../../../etc/hosts",
 24 |         "../../../home/user/.env",
 25 |     ]
 26 | 
 27 |     for attack_path in attack_paths:
 28 |         result = await read_content.fn(project=test_project.name, path=attack_path)
 29 |         assert result["type"] == "error"
 30 |         assert "paths must stay within project boundaries" in result["error"]
 31 |         assert attack_path in result["error"]
 32 | 
 33 | 
 34 | @pytest.mark.asyncio
 35 | async def test_read_content_blocks_path_traversal_windows(client, test_project):
 36 |     attack_paths = [
 37 |         "..\\secrets.txt",
 38 |         "..\\..\\Windows\\System32\\config\\SAM",
 39 |         "notes\\..\\..\\..\\Windows\\System32",
 40 |         "\\\\server\\share\\file.txt",
 41 |         "..\\..\\Users\\user\\.env",
 42 |         "\\\\..\\..\\Windows",
 43 |         "..\\..\\..\\Boot.ini",
 44 |     ]
 45 | 
 46 |     for attack_path in attack_paths:
 47 |         result = await read_content.fn(project=test_project.name, path=attack_path)
 48 |         assert result["type"] == "error"
 49 |         assert "paths must stay within project boundaries" in result["error"]
 50 |         assert attack_path in result["error"]
 51 | 
 52 | 
 53 | @pytest.mark.asyncio
 54 | async def test_read_content_blocks_absolute_paths(client, test_project):
 55 |     attack_paths = [
 56 |         "/etc/passwd",
 57 |         "/home/user/.env",
 58 |         "/var/log/auth.log",
 59 |         "/root/.ssh/id_rsa",
 60 |         "C:\\Windows\\System32\\config\\SAM",
 61 |         "C:\\Users\\user\\.env",
 62 |         "D:\\secrets\\config.json",
 63 |         "/tmp/malicious.txt",
 64 |         "/usr/local/bin/evil",
 65 |     ]
 66 | 
 67 |     for attack_path in attack_paths:
 68 |         result = await read_content.fn(project=test_project.name, path=attack_path)
 69 |         assert result["type"] == "error"
 70 |         assert "paths must stay within project boundaries" in result["error"]
 71 |         assert attack_path in result["error"]
 72 | 
 73 | 
 74 | @pytest.mark.asyncio
 75 | async def test_read_content_blocks_home_directory_access(client, test_project):
 76 |     attack_paths = [
 77 |         "~/secrets.txt",
 78 |         "~/.env",
 79 |         "~/.ssh/id_rsa",
 80 |         "~/Documents/passwords.txt",
 81 |         "~\\AppData\\secrets",
 82 |         "~\\Desktop\\config.ini",
 83 |         "~/.bashrc",
 84 |         "~/Library/Preferences/secret.plist",
 85 |     ]
 86 | 
 87 |     for attack_path in attack_paths:
 88 |         result = await read_content.fn(project=test_project.name, path=attack_path)
 89 |         assert result["type"] == "error"
 90 |         assert "paths must stay within project boundaries" in result["error"]
 91 |         assert attack_path in result["error"]
 92 | 
 93 | 
 94 | @pytest.mark.asyncio
 95 | async def test_read_content_blocks_memory_url_attacks(client, test_project):
 96 |     attack_paths = [
 97 |         "memory://../../etc/passwd",
 98 |         "memory://../../../root/.ssh/id_rsa",
 99 |         "memory://~/.env",
100 |         "memory:///etc/passwd",
101 |     ]
102 | 
103 |     for attack_path in attack_paths:
104 |         result = await read_content.fn(project=test_project.name, path=attack_path)
105 |         assert result["type"] == "error"
106 |         assert "paths must stay within project boundaries" in result["error"]
107 | 
108 | 
109 | @pytest.mark.asyncio
110 | async def test_read_content_unicode_path_attacks(client, test_project):
111 |     unicode_attacks = [
112 |         "notes/文档/../../../etc/passwd",
113 |         "docs/café/../../.env",
114 |         "files/αβγ/../../../secret.txt",
115 |     ]
116 | 
117 |     for attack_path in unicode_attacks:
118 |         result = await read_content.fn(project=test_project.name, path=attack_path)
119 |         assert result["type"] == "error"
120 |         assert "paths must stay within project boundaries" in result["error"]
121 | 
122 | 
123 | @pytest.mark.asyncio
124 | async def test_read_content_very_long_attack_path(client, test_project):
125 |     long_attack = "../" * 1000 + "etc/passwd"
126 |     result = await read_content.fn(project=test_project.name, path=long_attack)
127 |     assert result["type"] == "error"
128 |     assert "paths must stay within project boundaries" in result["error"]
129 | 
130 | 
131 | @pytest.mark.asyncio
132 | async def test_read_content_case_variations_attacks(client, test_project):
133 |     case_attacks = [
134 |         "../ETC/passwd",
135 |         "../Etc/PASSWD",
136 |         "..\\WINDOWS\\system32",
137 |         "~/.SSH/id_rsa",
138 |     ]
139 | 
140 |     for attack_path in case_attacks:
141 |         result = await read_content.fn(project=test_project.name, path=attack_path)
142 |         assert result["type"] == "error"
143 |         assert "paths must stay within project boundaries" in result["error"]
144 | 
145 | 
146 | @pytest.mark.asyncio
147 | async def test_read_content_allows_safe_path_integration(client, test_project):
148 |     await write_note.fn(
149 |         project=test_project.name,
150 |         title="Meeting",
151 |         folder="notes",
152 |         content="This is a safe note for read_content()",
153 |     )
154 | 
155 |     result = await read_content.fn(project=test_project.name, path="notes/meeting")
156 |     assert result["type"] == "text"
157 |     assert "safe note" in result["text"]
158 | 
159 | 
160 | @pytest.mark.asyncio
161 | async def test_read_content_empty_path_does_not_trigger_security_error(client, test_project):
162 |     try:
163 |         result = await read_content.fn(project=test_project.name, path="")
164 |         if isinstance(result, dict) and result.get("type") == "error":
165 |             assert "paths must stay within project boundaries" not in result.get("error", "")
166 |     except ToolError:
167 |         # Acceptable: resource resolution may treat empty path as not-found.
168 |         pass
169 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/utils.py:
--------------------------------------------------------------------------------

```python
  1 | """Utility functions for formatting prompt responses.
  2 | 
  3 | These utilities help format data from various tools into consistent,
  4 | user-friendly markdown summaries.
  5 | """
  6 | 
  7 | from dataclasses import dataclass
  8 | from textwrap import dedent
  9 | from typing import List
 10 | 
 11 | from basic_memory.schemas.base import TimeFrame
 12 | from basic_memory.schemas.memory import (
 13 |     normalize_memory_url,
 14 |     EntitySummary,
 15 |     RelationSummary,
 16 |     ObservationSummary,
 17 | )
 18 | 
 19 | 
 20 | @dataclass
 21 | class PromptContextItem:
 22 |     primary_results: List[EntitySummary]
 23 |     related_results: List[EntitySummary | RelationSummary | ObservationSummary]
 24 | 
 25 | 
 26 | @dataclass
 27 | class PromptContext:
 28 |     timeframe: TimeFrame
 29 |     topic: str
 30 |     results: List[PromptContextItem]
 31 | 
 32 | 
 33 | def format_prompt_context(context: PromptContext) -> str:
 34 |     """Format continuation context into a helpful summary.
 35 |     Returns:
 36 |         Formatted continuation summary
 37 |     """
 38 |     if not context.results:  # pragma: no cover
 39 |         return dedent(f"""
 40 |             # Continuing conversation on: {context.topic}
 41 | 
 42 |             This is a memory retrieval session. 
 43 |             The supplied query did not return any information specifically on this topic.
 44 |             
 45 |             ## Opportunity to Capture New Knowledge!
 46 |             
 47 |             This is an excellent chance to start documenting this topic:
 48 |             
 49 |             ```python
 50 |             await write_note(
 51 |                 title="{context.topic}",
 52 |                 content=f'''
 53 |                 # {context.topic}
 54 |                 
 55 |                 ## Overview
 56 |                 [Summary of what we know about {context.topic}]
 57 |                 
 58 |                 ## Key Points
 59 |                 [Main aspects or components of {context.topic}]
 60 |                 
 61 |                 ## Observations
 62 |                 - [category] [First important observation about {context.topic}]
 63 |                 - [category] [Second observation about {context.topic}]
 64 |                 
 65 |                 ## Relations
 66 |                 - relates_to [[Related Topic]]
 67 |                 - part_of [[Broader Context]]
 68 |                 '''
 69 |             )
 70 |             ```
 71 |             
 72 |             ## Other Options
 73 |             
 74 |             Please use the available basic-memory tools to gather relevant context before responding.
 75 |             You can also:
 76 |             - Try a different search term
 77 |             - Check recent activity with `recent_activity(timeframe="1w")`
 78 |             """)
 79 | 
 80 |     # Start building our summary with header - add knowledge capture emphasis
 81 |     summary = dedent(f"""
 82 |         # Continuing conversation on: {context.topic}
 83 | 
 84 |         This is a memory retrieval session. 
 85 |         
 86 |         Please use the available basic-memory tools to gather relevant context before responding. 
 87 |         Start by executing one of the suggested commands below to retrieve content.
 88 | 
 89 |         Here's what I found from previous conversations:
 90 |         
 91 |         > **Knowledge Capture Recommendation:** As you continue this conversation, actively look for opportunities to record new information, decisions, or insights that emerge. Use `write_note()` to document important context.
 92 |         """)
 93 | 
 94 |     # Track what we've added to avoid duplicates
 95 |     added_permalinks = set()
 96 |     sections = []
 97 | 
 98 |     # Process each context
 99 |     for context in context.results:  # pyright: ignore
100 |         for primary in context.primary_results:  # pyright: ignore
101 |             if primary.permalink not in added_permalinks:
102 |                 primary_permalink = primary.permalink
103 | 
104 |                 added_permalinks.add(primary_permalink)
105 | 
106 |                 # Use permalink if available, otherwise use file_path
107 |                 if primary_permalink:
108 |                     memory_url = normalize_memory_url(primary_permalink)
109 |                     read_command = f'read_note("{primary_permalink}")'
110 |                 else:
111 |                     memory_url = f"file://{primary.file_path}"
112 |                     read_command = f'read_file("{primary.file_path}")'
113 | 
114 |                 section = dedent(f"""
115 |                     --- {memory_url}
116 | 
117 |                     ## {primary.title}
118 |                     - **Type**: {primary.type}
119 |                     """)
120 | 
121 |                 # Add creation date
122 |                 section += f"- **Created**: {primary.created_at.strftime('%Y-%m-%d %H:%M')}\n"
123 | 
124 |                 # Add content snippet
125 |                 if hasattr(primary, "content") and primary.content:  # pyright: ignore
126 |                     content = primary.content or ""  # pyright: ignore  # pragma: no cover
127 |                     if content:  # pragma: no cover
128 |                         section += f"\n**Excerpt**:\n{content}\n"  # pragma: no cover
129 | 
130 |                 section += dedent(f"""
131 | 
132 |                     You can read this document with: `{read_command}`
133 |                     """)
134 |                 sections.append(section)
135 | 
136 |         if context.related_results:  # pyright: ignore
137 |             section += dedent(  # pyright: ignore
138 |                 """   
139 |                 ## Related Context
140 |                 """
141 |             )
142 | 
143 |             for related in context.related_results:  # pyright: ignore
144 |                 section_content = dedent(f"""
145 |                     - type: **{related.type}**
146 |                     - title: {related.title}
147 |                     """)
148 |                 if related.permalink:  # pragma: no cover
149 |                     section_content += (
150 |                         f'You can view this document with: `read_note("{related.permalink}")`'
151 |                     )
152 |                 else:  # pragma: no cover
153 |                     section_content += (
154 |                         f'You can view this file with: `read_file("{related.file_path}")`'
155 |                     )
156 | 
157 |                 section += section_content
158 |                 sections.append(section)
159 | 
160 |     # Add all sections
161 |     summary += "\n".join(sections)
162 |     return summary
163 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/list_directory.py:
--------------------------------------------------------------------------------

```python
  1 | """List directory tool for Basic Memory MCP server."""
  2 | 
  3 | from typing import Optional
  4 | 
  5 | from loguru import logger
  6 | from fastmcp import Context
  7 | 
  8 | from basic_memory.mcp.async_client import get_client
  9 | from basic_memory.mcp.project_context import get_active_project
 10 | from basic_memory.mcp.server import mcp
 11 | from basic_memory.telemetry import track_mcp_tool
 12 | 
 13 | 
 14 | @mcp.tool(
 15 |     description="List directory contents with filtering and depth control.",
 16 | )
 17 | async def list_directory(
 18 |     dir_name: str = "/",
 19 |     depth: int = 1,
 20 |     file_name_glob: Optional[str] = None,
 21 |     project: Optional[str] = None,
 22 |     context: Context | None = None,
 23 | ) -> str:
 24 |     """List directory contents from the knowledge base with optional filtering.
 25 | 
 26 |     This tool provides 'ls' functionality for browsing the knowledge base directory structure.
 27 |     It can list immediate children or recursively explore subdirectories with depth control,
 28 |     and supports glob pattern filtering for finding specific files.
 29 | 
 30 |     Args:
 31 |         dir_name: Directory path to list (default: root "/")
 32 |                  Examples: "/", "/projects", "/research/ml"
 33 |         depth: Recursion depth (1-10, default: 1 for immediate children only)
 34 |                Higher values show subdirectory contents recursively
 35 |         file_name_glob: Optional glob pattern for filtering file names
 36 |                        Examples: "*.md", "*meeting*", "project_*"
 37 |         project: Project name to list directory from. Optional - server will resolve using hierarchy.
 38 |                 If unknown, use list_memory_projects() to discover available projects.
 39 |         context: Optional FastMCP context for performance caching.
 40 | 
 41 |     Returns:
 42 |         Formatted listing of directory contents with file metadata
 43 | 
 44 |     Examples:
 45 |         # List root directory contents
 46 |         list_directory()
 47 | 
 48 |         # List specific folder
 49 |         list_directory(dir_name="/projects")
 50 | 
 51 |         # Find all markdown files
 52 |         list_directory(file_name_glob="*.md")
 53 | 
 54 |         # Deep exploration of research folder
 55 |         list_directory(dir_name="/research", depth=3)
 56 | 
 57 |         # Find meeting notes in projects folder
 58 |         list_directory(dir_name="/projects", file_name_glob="*meeting*")
 59 | 
 60 |         # Explicit project specification
 61 |         list_directory(project="work-docs", dir_name="/projects")
 62 | 
 63 |     Raises:
 64 |         ToolError: If project doesn't exist or directory path is invalid
 65 |     """
 66 |     track_mcp_tool("list_directory")
 67 |     async with get_client() as client:
 68 |         active_project = await get_active_project(client, project, context)
 69 | 
 70 |         logger.debug(
 71 |             f"Listing directory '{dir_name}' in project {project} with depth={depth}, glob='{file_name_glob}'"
 72 |         )
 73 | 
 74 |         # Import here to avoid circular import
 75 |         from basic_memory.mcp.clients import DirectoryClient
 76 | 
 77 |         # Use typed DirectoryClient for API calls
 78 |         directory_client = DirectoryClient(client, active_project.external_id)
 79 |         nodes = await directory_client.list(dir_name, depth=depth, file_name_glob=file_name_glob)
 80 | 
 81 |         if not nodes:
 82 |             filter_desc = ""
 83 |             if file_name_glob:
 84 |                 filter_desc = f" matching '{file_name_glob}'"
 85 |             return f"No files found in directory '{dir_name}'{filter_desc}"
 86 | 
 87 |         # Format the results
 88 |         output_lines = []
 89 |         if file_name_glob:
 90 |             output_lines.append(
 91 |                 f"Files in '{dir_name}' matching '{file_name_glob}' (depth {depth}):"
 92 |             )
 93 |         else:
 94 |             output_lines.append(f"Contents of '{dir_name}' (depth {depth}):")
 95 |         output_lines.append("")
 96 | 
 97 |         # Group by type and sort
 98 |         directories = [n for n in nodes if n["type"] == "directory"]
 99 |         files = [n for n in nodes if n["type"] == "file"]
100 | 
101 |         # Sort by name
102 |         directories.sort(key=lambda x: x["name"])
103 |         files.sort(key=lambda x: x["name"])
104 | 
105 |         # Display directories first
106 |         for node in directories:
107 |             path_display = node["directory_path"]
108 |             output_lines.append(f"📁 {node['name']:<30} {path_display}")
109 | 
110 |         # Add separator if we have both directories and files
111 |         if directories and files:
112 |             output_lines.append("")
113 | 
114 |         # Display files with metadata
115 |         for node in files:
116 |             path_display = node["directory_path"]
117 |             title = node.get("title", "")
118 |             updated = node.get("updated_at", "")
119 | 
120 |             # Remove leading slash if present, requesting the file via read_note does not use the beginning slash'
121 |             if path_display.startswith("/"):
122 |                 path_display = path_display[1:]
123 | 
124 |             # Format date if available
125 |             date_str = ""
126 |             if updated:
127 |                 try:
128 |                     from datetime import datetime
129 | 
130 |                     dt = datetime.fromisoformat(updated.replace("Z", "+00:00"))
131 |                     date_str = dt.strftime("%Y-%m-%d")
132 |                 except Exception:  # pragma: no cover
133 |                     date_str = updated[:10] if len(updated) >= 10 else ""
134 | 
135 |             # Create formatted line
136 |             file_line = f"📄 {node['name']:<30} {path_display}"
137 |             if title and title != node["name"]:
138 |                 file_line += f" | {title}"
139 |             if date_str:
140 |                 file_line += f" | {date_str}"
141 | 
142 |             output_lines.append(file_line)
143 | 
144 |         # Add summary
145 |         output_lines.append("")
146 |         total_count = len(directories) + len(files)
147 |         summary_parts = []
148 |         if directories:
149 |             summary_parts.append(
150 |                 f"{len(directories)} director{'y' if len(directories) == 1 else 'ies'}"
151 |             )
152 |         if files:
153 |             summary_parts.append(f"{len(files)} file{'s' if len(files) != 1 else ''}")
154 | 
155 |         output_lines.append(f"Total: {total_count} items ({', '.join(summary_parts)})")
156 | 
157 |         return "\n".join(output_lines)
158 | 
```

--------------------------------------------------------------------------------
/tests/cli/cloud/test_cloud_api_client_and_utils.py:
--------------------------------------------------------------------------------

```python
  1 | from contextlib import asynccontextmanager
  2 | import json
  3 | 
  4 | import httpx
  5 | import pytest
  6 | 
  7 | from basic_memory.cli.auth import CLIAuth
  8 | from basic_memory.cli.commands.cloud.api_client import (
  9 |     SubscriptionRequiredError,
 10 |     make_api_request,
 11 | )
 12 | from basic_memory.cli.commands.cloud.cloud_utils import (
 13 |     create_cloud_project,
 14 |     fetch_cloud_projects,
 15 |     project_exists,
 16 | )
 17 | 
 18 | 
 19 | @pytest.mark.asyncio
 20 | async def test_make_api_request_success_injects_auth_and_accept_encoding(
 21 |     config_home, config_manager
 22 | ):
 23 |     # Arrange: create a token on disk so CLIAuth can authenticate without any network.
 24 |     auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
 25 |     auth.token_file.parent.mkdir(parents=True, exist_ok=True)
 26 |     auth.token_file.write_text(
 27 |         '{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
 28 |         encoding="utf-8",
 29 |     )
 30 | 
 31 |     async def handler(request: httpx.Request) -> httpx.Response:
 32 |         assert request.headers.get("authorization") == "Bearer token-123"
 33 |         assert request.headers.get("accept-encoding") == "identity"
 34 |         return httpx.Response(200, json={"ok": True})
 35 | 
 36 |     transport = httpx.MockTransport(handler)
 37 | 
 38 |     @asynccontextmanager
 39 |     async def http_client_factory():
 40 |         async with httpx.AsyncClient(transport=transport) as client:
 41 |             yield client
 42 | 
 43 |     # Act
 44 |     resp = await make_api_request(
 45 |         method="GET",
 46 |         url="https://cloud.example.test/proxy/health",
 47 |         auth=auth,
 48 |         http_client_factory=http_client_factory,
 49 |     )
 50 | 
 51 |     # Assert
 52 |     assert resp.json()["ok"] is True
 53 | 
 54 | 
 55 | @pytest.mark.asyncio
 56 | async def test_make_api_request_raises_subscription_required(config_home, config_manager):
 57 |     auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
 58 |     auth.token_file.parent.mkdir(parents=True, exist_ok=True)
 59 |     auth.token_file.write_text(
 60 |         '{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
 61 |         encoding="utf-8",
 62 |     )
 63 | 
 64 |     async def handler(_request: httpx.Request) -> httpx.Response:
 65 |         return httpx.Response(
 66 |             403,
 67 |             json={
 68 |                 "detail": {
 69 |                     "error": "subscription_required",
 70 |                     "message": "Need subscription",
 71 |                     "subscribe_url": "https://example.test/subscribe",
 72 |                 }
 73 |             },
 74 |         )
 75 | 
 76 |     transport = httpx.MockTransport(handler)
 77 | 
 78 |     @asynccontextmanager
 79 |     async def http_client_factory():
 80 |         async with httpx.AsyncClient(transport=transport) as client:
 81 |             yield client
 82 | 
 83 |     with pytest.raises(SubscriptionRequiredError) as exc:
 84 |         await make_api_request(
 85 |             method="GET",
 86 |             url="https://cloud.example.test/proxy/health",
 87 |             auth=auth,
 88 |             http_client_factory=http_client_factory,
 89 |         )
 90 | 
 91 |     assert exc.value.subscribe_url == "https://example.test/subscribe"
 92 | 
 93 | 
 94 | @pytest.mark.asyncio
 95 | async def test_cloud_utils_fetch_and_exists_and_create_project(
 96 |     config_home, config_manager, monkeypatch
 97 | ):
 98 |     # Point config.cloud_host at our mocked base URL
 99 |     config = config_manager.load_config()
100 |     config.cloud_host = "https://cloud.example.test"
101 |     config_manager.save_config(config)
102 | 
103 |     auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
104 |     auth.token_file.parent.mkdir(parents=True, exist_ok=True)
105 |     auth.token_file.write_text(
106 |         '{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
107 |         encoding="utf-8",
108 |     )
109 | 
110 |     seen = {"create_payload": None}
111 | 
112 |     async def handler(request: httpx.Request) -> httpx.Response:
113 |         if request.method == "GET" and request.url.path == "/proxy/projects/projects":
114 |             return httpx.Response(
115 |                 200,
116 |                 json={
117 |                     "projects": [
118 |                         {"id": 1, "name": "alpha", "path": "alpha", "is_default": True},
119 |                         {"id": 2, "name": "beta", "path": "beta", "is_default": False},
120 |                     ]
121 |                 },
122 |             )
123 | 
124 |         if request.method == "POST" and request.url.path == "/proxy/projects/projects":
125 |             # httpx.Request doesn't have .json(); parse bytes payload.
126 |             seen["create_payload"] = json.loads(request.content.decode("utf-8"))
127 |             return httpx.Response(
128 |                 200,
129 |                 json={
130 |                     "message": "created",
131 |                     "status": "success",
132 |                     "default": False,
133 |                     "old_project": None,
134 |                     "new_project": {
135 |                         "name": seen["create_payload"]["name"],
136 |                         "path": seen["create_payload"]["path"],
137 |                     },
138 |                 },
139 |             )
140 | 
141 |         raise AssertionError(f"Unexpected request: {request.method} {request.url}")
142 | 
143 |     transport = httpx.MockTransport(handler)
144 | 
145 |     @asynccontextmanager
146 |     async def http_client_factory():
147 |         async with httpx.AsyncClient(
148 |             transport=transport, base_url="https://cloud.example.test"
149 |         ) as client:
150 |             yield client
151 | 
152 |     async def api_request(**kwargs):
153 |         return await make_api_request(auth=auth, http_client_factory=http_client_factory, **kwargs)
154 | 
155 |     projects = await fetch_cloud_projects(api_request=api_request)
156 |     assert [p.name for p in projects.projects] == ["alpha", "beta"]
157 | 
158 |     assert await project_exists("alpha", api_request=api_request) is True
159 |     assert await project_exists("missing", api_request=api_request) is False
160 | 
161 |     created = await create_cloud_project("My Project", api_request=api_request)
162 |     assert created.new_project is not None
163 |     assert created.new_project["name"] == "My Project"
164 |     # Path should be permalink-like (kebab)
165 |     assert seen["create_payload"]["path"] == "my-project"
166 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/api/v2/routers/importer_router.py:
--------------------------------------------------------------------------------

```python
  1 | """V2 Import Router - ID-based data import operations.
  2 | 
  3 | This router uses v2 dependencies for consistent project handling with external_id UUIDs.
  4 | Import endpoints use project_id in the path for consistency with other v2 endpoints.
  5 | """
  6 | 
  7 | import json
  8 | import logging
  9 | 
 10 | from fastapi import APIRouter, Form, HTTPException, UploadFile, status, Path
 11 | 
 12 | from basic_memory.deps import (
 13 |     ChatGPTImporterV2ExternalDep,
 14 |     ClaudeConversationsImporterV2ExternalDep,
 15 |     ClaudeProjectsImporterV2ExternalDep,
 16 |     MemoryJsonImporterV2ExternalDep,
 17 | )
 18 | from basic_memory.importers import Importer
 19 | from basic_memory.schemas.importer import (
 20 |     ChatImportResult,
 21 |     EntityImportResult,
 22 |     ProjectImportResult,
 23 | )
 24 | 
 25 | logger = logging.getLogger(__name__)
 26 | 
 27 | router = APIRouter(prefix="/import", tags=["import-v2"])
 28 | 
 29 | 
 30 | @router.post("/chatgpt", response_model=ChatImportResult)
 31 | async def import_chatgpt(
 32 |     importer: ChatGPTImporterV2ExternalDep,
 33 |     file: UploadFile,
 34 |     project_id: str = Path(..., description="Project external UUID"),
 35 |     folder: str = Form("conversations"),
 36 | ) -> ChatImportResult:
 37 |     """Import conversations from ChatGPT JSON export.
 38 | 
 39 |     Args:
 40 |         project_id: Project external UUID from URL path
 41 |         file: The ChatGPT conversations.json file.
 42 |         folder: The folder to place the files in.
 43 |         importer: ChatGPT importer instance.
 44 | 
 45 |     Returns:
 46 |         ChatImportResult with import statistics.
 47 | 
 48 |     Raises:
 49 |         HTTPException: If import fails.
 50 |     """
 51 |     logger.info(f"V2 Importing ChatGPT conversations for project {project_id}")
 52 |     return await import_file(importer, file, folder)
 53 | 
 54 | 
 55 | @router.post("/claude/conversations", response_model=ChatImportResult)
 56 | async def import_claude_conversations(
 57 |     importer: ClaudeConversationsImporterV2ExternalDep,
 58 |     file: UploadFile,
 59 |     project_id: str = Path(..., description="Project external UUID"),
 60 |     folder: str = Form("conversations"),
 61 | ) -> ChatImportResult:
 62 |     """Import conversations from Claude conversations.json export.
 63 | 
 64 |     Args:
 65 |         project_id: Project external UUID from URL path
 66 |         file: The Claude conversations.json file.
 67 |         folder: The folder to place the files in.
 68 |         importer: Claude conversations importer instance.
 69 | 
 70 |     Returns:
 71 |         ChatImportResult with import statistics.
 72 | 
 73 |     Raises:
 74 |         HTTPException: If import fails.
 75 |     """
 76 |     logger.info(f"V2 Importing Claude conversations for project {project_id}")
 77 |     return await import_file(importer, file, folder)
 78 | 
 79 | 
 80 | @router.post("/claude/projects", response_model=ProjectImportResult)
 81 | async def import_claude_projects(
 82 |     importer: ClaudeProjectsImporterV2ExternalDep,
 83 |     file: UploadFile,
 84 |     project_id: str = Path(..., description="Project external UUID"),
 85 |     folder: str = Form("projects"),
 86 | ) -> ProjectImportResult:
 87 |     """Import projects from Claude projects.json export.
 88 | 
 89 |     Args:
 90 |         project_id: Project external UUID from URL path
 91 |         file: The Claude projects.json file.
 92 |         folder: The base folder to place the files in.
 93 |         importer: Claude projects importer instance.
 94 | 
 95 |     Returns:
 96 |         ProjectImportResult with import statistics.
 97 | 
 98 |     Raises:
 99 |         HTTPException: If import fails.
100 |     """
101 |     logger.info(f"V2 Importing Claude projects for project {project_id}")
102 |     return await import_file(importer, file, folder)
103 | 
104 | 
105 | @router.post("/memory-json", response_model=EntityImportResult)
106 | async def import_memory_json(
107 |     importer: MemoryJsonImporterV2ExternalDep,
108 |     file: UploadFile,
109 |     project_id: str = Path(..., description="Project external UUID"),
110 |     folder: str = Form("conversations"),
111 | ) -> EntityImportResult:
112 |     """Import entities and relations from a memory.json file.
113 | 
114 |     Args:
115 |         project_id: Project external UUID from URL path
116 |         file: The memory.json file.
117 |         folder: Optional destination folder within the project.
118 |         importer: Memory JSON importer instance.
119 | 
120 |     Returns:
121 |         EntityImportResult with import statistics.
122 | 
123 |     Raises:
124 |         HTTPException: If import fails.
125 |     """
126 |     logger.info(f"V2 Importing memory.json for project {project_id}")
127 |     try:
128 |         file_data = []
129 |         file_bytes = await file.read()
130 |         file_str = file_bytes.decode("utf-8")
131 |         for line in file_str.splitlines():
132 |             json_data = json.loads(line)
133 |             file_data.append(json_data)
134 | 
135 |         result = await importer.import_data(file_data, folder)
136 |         if not result.success:  # pragma: no cover
137 |             raise HTTPException(
138 |                 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
139 |                 detail=result.error_message or "Import failed",
140 |             )
141 |     except Exception as e:
142 |         logger.exception("V2 Import failed")
143 |         raise HTTPException(
144 |             status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
145 |             detail=f"Import failed: {str(e)}",
146 |         )
147 |     return result
148 | 
149 | 
150 | async def import_file(importer: Importer, file: UploadFile, destination_folder: str):
151 |     """Helper function to import a file using an importer instance.
152 | 
153 |     Args:
154 |         importer: The importer instance to use
155 |         file: The file to import
156 |         destination_folder: Destination folder for imported content
157 | 
158 |     Returns:
159 |         Import result from the importer
160 | 
161 |     Raises:
162 |         HTTPException: If import fails
163 |     """
164 |     try:
165 |         # Process file
166 |         json_data = json.load(file.file)
167 |         result = await importer.import_data(json_data, destination_folder)
168 |         if not result.success:  # pragma: no cover
169 |             raise HTTPException(
170 |                 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
171 |                 detail=result.error_message or "Import failed",
172 |             )
173 | 
174 |         return result
175 | 
176 |     except Exception as e:
177 |         logger.exception("V2 Import failed")
178 |         raise HTTPException(
179 |             status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
180 |             detail=f"Import failed: {str(e)}",
181 |         )
182 | 
```

--------------------------------------------------------------------------------
/test-int/cli/test_project_commands_integration.py:
--------------------------------------------------------------------------------

```python
  1 | """Integration tests for project CLI commands."""
  2 | 
  3 | import tempfile
  4 | from pathlib import Path
  5 | 
  6 | from typer.testing import CliRunner
  7 | 
  8 | from basic_memory.cli.main import app as cli_app
  9 | 
 10 | 
 11 | def test_project_list(app, app_config, test_project, config_manager):
 12 |     """Test 'bm project list' command shows projects."""
 13 |     runner = CliRunner()
 14 |     result = runner.invoke(cli_app, ["project", "list"])
 15 | 
 16 |     if result.exit_code != 0:
 17 |         print(f"STDOUT: {result.stdout}")
 18 |         print(f"STDERR: {result.stderr}")
 19 |         print(f"Exception: {result.exception}")
 20 |     assert result.exit_code == 0
 21 |     assert "test-project" in result.stdout
 22 |     assert "[X]" in result.stdout  # default marker
 23 | 
 24 | 
 25 | def test_project_info(app, app_config, test_project, config_manager):
 26 |     """Test 'bm project info' command shows project details."""
 27 |     runner = CliRunner()
 28 |     result = runner.invoke(cli_app, ["project", "info", "test-project"])
 29 | 
 30 |     if result.exit_code != 0:
 31 |         print(f"STDOUT: {result.stdout}")
 32 |         print(f"STDERR: {result.stderr}")
 33 |     assert result.exit_code == 0
 34 |     assert "Basic Memory Project Info" in result.stdout
 35 |     assert "test-project" in result.stdout
 36 |     assert "Statistics" in result.stdout
 37 | 
 38 | 
 39 | def test_project_info_json(app, app_config, test_project, config_manager):
 40 |     """Test 'bm project info --json' command outputs valid JSON."""
 41 |     import json
 42 | 
 43 |     runner = CliRunner()
 44 |     result = runner.invoke(cli_app, ["project", "info", "test-project", "--json"])
 45 | 
 46 |     if result.exit_code != 0:
 47 |         print(f"STDOUT: {result.stdout}")
 48 |         print(f"STDERR: {result.stderr}")
 49 |     assert result.exit_code == 0
 50 | 
 51 |     # Parse JSON to verify it's valid
 52 |     data = json.loads(result.stdout)
 53 |     assert data["project_name"] == "test-project"
 54 |     assert "statistics" in data
 55 |     assert "system" in data
 56 | 
 57 | 
 58 | def test_project_add_and_remove(app, app_config, config_manager):
 59 |     """Test adding and removing a project."""
 60 |     runner = CliRunner()
 61 | 
 62 |     # Use a separate temporary directory to avoid nested path conflicts
 63 |     with tempfile.TemporaryDirectory() as temp_dir:
 64 |         new_project_path = Path(temp_dir) / "new-project"
 65 |         new_project_path.mkdir()
 66 | 
 67 |         # Add project
 68 |         result = runner.invoke(cli_app, ["project", "add", "new-project", str(new_project_path)])
 69 | 
 70 |         if result.exit_code != 0:
 71 |             print(f"STDOUT: {result.stdout}")
 72 |             print(f"STDERR: {result.stderr}")
 73 |         assert result.exit_code == 0
 74 |         assert (
 75 |             "Project 'new-project' added successfully" in result.stdout
 76 |             or "added" in result.stdout.lower()
 77 |         )
 78 | 
 79 |         # Verify it shows up in list
 80 |         result = runner.invoke(cli_app, ["project", "list"])
 81 |         assert result.exit_code == 0
 82 |         assert "new-project" in result.stdout
 83 | 
 84 |         # Remove project
 85 |         result = runner.invoke(cli_app, ["project", "remove", "new-project"])
 86 |         assert result.exit_code == 0
 87 |         assert "removed" in result.stdout.lower() or "deleted" in result.stdout.lower()
 88 | 
 89 | 
 90 | def test_project_set_default(app, app_config, config_manager):
 91 |     """Test setting default project."""
 92 |     runner = CliRunner()
 93 | 
 94 |     # Use a separate temporary directory to avoid nested path conflicts
 95 |     with tempfile.TemporaryDirectory() as temp_dir:
 96 |         new_project_path = Path(temp_dir) / "another-project"
 97 |         new_project_path.mkdir()
 98 | 
 99 |         # Add a second project
100 |         result = runner.invoke(
101 |             cli_app, ["project", "add", "another-project", str(new_project_path)]
102 |         )
103 |         if result.exit_code != 0:
104 |             print(f"STDOUT: {result.stdout}")
105 |             print(f"STDERR: {result.stderr}")
106 |         assert result.exit_code == 0
107 | 
108 |         # Set as default
109 |         result = runner.invoke(cli_app, ["project", "default", "another-project"])
110 |         if result.exit_code != 0:
111 |             print(f"STDOUT: {result.stdout}")
112 |             print(f"STDERR: {result.stderr}")
113 |         assert result.exit_code == 0
114 |         assert "default" in result.stdout.lower()
115 | 
116 |         # Verify in list
117 |         result = runner.invoke(cli_app, ["project", "list"])
118 |         assert result.exit_code == 0
119 |         # The new project should have the [X] marker now
120 |         lines = result.stdout.split("\n")
121 |         for line in lines:
122 |             if "another-project" in line:
123 |                 assert "[X]" in line
124 | 
125 | 
126 | def test_remove_main_project(app, app_config, config_manager):
127 |     """Test that removing main project then listing projects prevents main from reappearing (issue #397)."""
128 |     runner = CliRunner()
129 | 
130 |     # Create separate temp dirs for each project
131 |     with (
132 |         tempfile.TemporaryDirectory() as main_dir,
133 |         tempfile.TemporaryDirectory() as new_default_dir,
134 |     ):
135 |         main_path = Path(main_dir)
136 |         new_default_path = Path(new_default_dir)
137 | 
138 |         # Ensure main exists
139 |         result = runner.invoke(cli_app, ["project", "list"])
140 |         if "main" not in result.stdout:
141 |             result = runner.invoke(cli_app, ["project", "add", "main", str(main_path)])
142 |             print(result.stdout)
143 |             assert result.exit_code == 0
144 | 
145 |         # Confirm main is present
146 |         result = runner.invoke(cli_app, ["project", "list"])
147 |         assert "main" in result.stdout
148 | 
149 |         # Add a second project
150 |         result = runner.invoke(cli_app, ["project", "add", "new_default", str(new_default_path)])
151 |         assert result.exit_code == 0
152 | 
153 |         # Set new_default as default (if needed)
154 |         result = runner.invoke(cli_app, ["project", "default", "new_default"])
155 |         assert result.exit_code == 0
156 | 
157 |         # Remove main
158 |         result = runner.invoke(cli_app, ["project", "remove", "main"])
159 |         assert result.exit_code == 0
160 | 
161 |         # Confirm only new_default exists and main does not
162 |         result = runner.invoke(cli_app, ["project", "list"])
163 |         assert result.exit_code == 0
164 |         assert "main" not in result.stdout
165 |         assert "new_default" in result.stdout
166 | 
```

--------------------------------------------------------------------------------
/tests/sync/test_tmp_files.py:
--------------------------------------------------------------------------------

```python
  1 | """Test proper handling of .tmp files during sync."""
  2 | 
  3 | import asyncio
  4 | from pathlib import Path
  5 | 
  6 | import pytest
  7 | from watchfiles import Change
  8 | 
  9 | 
 10 | async def create_test_file(path: Path, content: str = "test content") -> None:
 11 |     """Create a test file with given content."""
 12 |     path.parent.mkdir(parents=True, exist_ok=True)
 13 |     path.write_text(content)
 14 | 
 15 | 
 16 | @pytest.mark.asyncio
 17 | async def test_temp_file_filter(watch_service, app_config, project_config, test_project):
 18 |     """Test that .tmp files are correctly filtered out."""
 19 |     # Test filter_changes method directly
 20 |     tmp_path = Path(test_project.path) / "test.tmp"
 21 |     assert not watch_service.filter_changes(Change.added, str(tmp_path))
 22 | 
 23 |     # Test with valid file
 24 |     valid_path = Path(test_project.path) / "test.md"
 25 |     assert watch_service.filter_changes(Change.added, str(valid_path))
 26 | 
 27 | 
 28 | @pytest.mark.asyncio
 29 | async def test_handle_tmp_files(watch_service, project_config, test_project, sync_service):
 30 |     """Test handling of .tmp files during sync process."""
 31 |     project_dir = Path(test_project.path)
 32 | 
 33 |     # Create a .tmp file - this simulates a file being written with write_file_atomic
 34 |     tmp_file = project_dir / "test.tmp"
 35 |     await create_test_file(tmp_file, "This is a temporary file")
 36 | 
 37 |     # Create the target final file
 38 |     final_file = project_dir / "test.md"
 39 |     await create_test_file(final_file, "This is the final file")
 40 | 
 41 |     # Setup changes that include both the .tmp and final file
 42 |     changes = {
 43 |         (Change.added, str(tmp_file)),
 44 |         (Change.added, str(final_file)),
 45 |     }
 46 | 
 47 |     # Handle changes
 48 |     await watch_service.handle_changes(test_project, changes)
 49 | 
 50 |     # Verify only the final file got an entity
 51 |     tmp_entity = await sync_service.entity_repository.get_by_file_path("test.tmp")
 52 |     final_entity = await sync_service.entity_repository.get_by_file_path("test.md")
 53 | 
 54 |     assert tmp_entity is None, "Temp file should not have an entity"
 55 |     assert final_entity is not None, "Final file should have an entity"
 56 | 
 57 | 
 58 | @pytest.mark.asyncio
 59 | async def test_atomic_write_tmp_file_handling(
 60 |     watch_service, project_config, test_project, sync_service
 61 | ):
 62 |     """Test handling of file changes during atomic write operations."""
 63 |     project_dir = project_config.home
 64 | 
 65 |     # This test simulates the full atomic write process:
 66 |     # 1. First a .tmp file is created
 67 |     # 2. Then the .tmp file is renamed to the final file
 68 |     # 3. Both events are processed by the watch service
 69 | 
 70 |     # Setup file paths
 71 |     tmp_path = project_dir / "document.tmp"
 72 |     final_path = project_dir / "document.md"
 73 | 
 74 |     # Create mockup of the atomic write process
 75 |     await create_test_file(tmp_path, "Content for document")
 76 | 
 77 |     # First batch of changes - .tmp file created
 78 |     changes1 = {(Change.added, str(tmp_path))}
 79 | 
 80 |     # Process first batch
 81 |     await watch_service.handle_changes(test_project, changes1)
 82 | 
 83 |     # Now "replace" the temp file with the final file
 84 |     tmp_path.rename(final_path)
 85 | 
 86 |     # Second batch of changes - .tmp file deleted, final file added
 87 |     changes2 = {(Change.deleted, str(tmp_path)), (Change.added, str(final_path))}
 88 | 
 89 |     # Process second batch
 90 |     await watch_service.handle_changes(test_project, changes2)
 91 | 
 92 |     # Verify only the final file is in the database
 93 |     tmp_entity = await sync_service.entity_repository.get_by_file_path("document.tmp")
 94 |     final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
 95 | 
 96 |     assert tmp_entity is None, "Temp file should not have an entity"
 97 |     assert final_entity is not None, "Final file should have an entity"
 98 | 
 99 |     # Check events
100 |     new_events = [e for e in watch_service.state.recent_events if e.action == "new"]
101 |     assert len(new_events) == 1
102 |     assert new_events[0].path == "document.md"
103 | 
104 | 
105 | @pytest.mark.asyncio
106 | async def test_rapid_atomic_writes(watch_service, project_config, test_project, sync_service):
107 |     """Test handling of rapid atomic writes to the same destination."""
108 |     project_dir = Path(test_project.path)
109 | 
110 |     # This test simulates multiple rapid atomic writes to the same file:
111 |     # 1. Several .tmp files are created one after another
112 |     # 2. Each is then renamed to the same final file
113 |     # 3. Events are batched and processed together
114 | 
115 |     # Setup file paths
116 |     tmp1_path = project_dir / "document.1.tmp"
117 |     tmp2_path = project_dir / "document.2.tmp"
118 |     final_path = project_dir / "document.md"
119 | 
120 |     # Create multiple temp files that will be used in sequence
121 |     await create_test_file(tmp1_path, "First version")
122 |     await create_test_file(tmp2_path, "Second version")
123 | 
124 |     # Simulate the first atomic write
125 |     tmp1_path.replace(final_path)
126 | 
127 |     # Brief pause to ensure file system registers the change
128 |     await asyncio.sleep(0.1)
129 | 
130 |     # Read content to verify
131 |     content1 = final_path.read_text(encoding="utf-8")
132 |     assert content1 == "First version"
133 | 
134 |     # Simulate the second atomic write
135 |     tmp2_path.replace(final_path)
136 | 
137 |     # Verify content was updated
138 |     content2 = final_path.read_text(encoding="utf-8")
139 |     assert content2 == "Second version"
140 | 
141 |     # Create a batch of changes that might arrive in mixed order
142 |     changes = {
143 |         (Change.added, str(tmp1_path)),
144 |         (Change.deleted, str(tmp1_path)),
145 |         (Change.added, str(tmp2_path)),
146 |         (Change.deleted, str(tmp2_path)),
147 |         (Change.added, str(final_path)),
148 |         (Change.modified, str(final_path)),
149 |     }
150 | 
151 |     # Process all changes
152 |     await watch_service.handle_changes(test_project, changes)
153 | 
154 |     # Verify only the final file is in the database
155 |     final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
156 |     assert final_entity is not None
157 | 
158 |     # Also verify no tmp entities were created
159 |     tmp1_entity = await sync_service.entity_repository.get_by_file_path("document.1.tmp")
160 |     tmp2_entity = await sync_service.entity_repository.get_by_file_path("document.2.tmp")
161 |     assert tmp1_entity is None
162 |     assert tmp2_entity is None
163 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/project_context.py:
--------------------------------------------------------------------------------

```python
  1 | """Project context utilities for Basic Memory MCP server.
  2 | 
  3 | Provides project lookup utilities for MCP tools.
  4 | Handles project validation and context management in one place.
  5 | 
  6 | Note: This module uses ProjectResolver for unified project resolution.
  7 | The resolve_project_parameter function is a thin wrapper for backwards
  8 | compatibility with existing MCP tools.
  9 | """
 10 | 
 11 | from typing import Optional, List
 12 | from httpx import AsyncClient
 13 | from httpx._types import (
 14 |     HeaderTypes,
 15 | )
 16 | from loguru import logger
 17 | from fastmcp import Context
 18 | 
 19 | from basic_memory.config import ConfigManager
 20 | from basic_memory.project_resolver import ProjectResolver
 21 | from basic_memory.schemas.project_info import ProjectItem, ProjectList
 22 | from basic_memory.utils import generate_permalink
 23 | 
 24 | 
 25 | async def resolve_project_parameter(
 26 |     project: Optional[str] = None,
 27 |     allow_discovery: bool = False,
 28 |     cloud_mode: Optional[bool] = None,
 29 |     default_project_mode: Optional[bool] = None,
 30 |     default_project: Optional[str] = None,
 31 | ) -> Optional[str]:
 32 |     """Resolve project parameter using three-tier hierarchy.
 33 | 
 34 |     This is a thin wrapper around ProjectResolver for backwards compatibility.
 35 |     New code should consider using ProjectResolver directly for more detailed
 36 |     resolution information.
 37 | 
 38 |     if cloud_mode:
 39 |         project is required (unless allow_discovery=True for tools that support discovery mode)
 40 |     else:
 41 |         Resolution order:
 42 |         1. Single Project Mode  (--project cli arg, or BASIC_MEMORY_MCP_PROJECT env var) - highest priority
 43 |         2. Explicit project parameter - medium priority
 44 |         3. Default project if default_project_mode=true - lowest priority
 45 | 
 46 |     Args:
 47 |         project: Optional explicit project parameter
 48 |         allow_discovery: If True, allows returning None in cloud mode for discovery mode
 49 |             (used by tools like recent_activity that can operate across all projects)
 50 |         cloud_mode: Optional explicit cloud mode. If not provided, reads from ConfigManager.
 51 |         default_project_mode: Optional explicit default project mode. If not provided, reads from ConfigManager.
 52 |         default_project: Optional explicit default project. If not provided, reads from ConfigManager.
 53 | 
 54 |     Returns:
 55 |         Resolved project name or None if no resolution possible
 56 |     """
 57 |     # Load config for any values not explicitly provided
 58 |     if cloud_mode is None or default_project_mode is None or default_project is None:
 59 |         config = ConfigManager().config
 60 |         if cloud_mode is None:
 61 |             cloud_mode = config.cloud_mode
 62 |         if default_project_mode is None:
 63 |             default_project_mode = config.default_project_mode
 64 |         if default_project is None:
 65 |             default_project = config.default_project
 66 | 
 67 |     # Create resolver with configuration and resolve
 68 |     resolver = ProjectResolver.from_env(
 69 |         cloud_mode=cloud_mode,
 70 |         default_project_mode=default_project_mode,
 71 |         default_project=default_project,
 72 |     )
 73 |     result = resolver.resolve(project=project, allow_discovery=allow_discovery)
 74 |     return result.project
 75 | 
 76 | 
 77 | async def get_project_names(client: AsyncClient, headers: HeaderTypes | None = None) -> List[str]:
 78 |     # Deferred import to avoid circular dependency with tools
 79 |     from basic_memory.mcp.tools.utils import call_get
 80 | 
 81 |     response = await call_get(client, "/projects/projects", headers=headers)
 82 |     project_list = ProjectList.model_validate(response.json())
 83 |     return [project.name for project in project_list.projects]
 84 | 
 85 | 
 86 | async def get_active_project(
 87 |     client: AsyncClient,
 88 |     project: Optional[str] = None,
 89 |     context: Optional[Context] = None,
 90 |     headers: HeaderTypes | None = None,
 91 | ) -> ProjectItem:
 92 |     """Get and validate project, setting it in context if available.
 93 | 
 94 |     Args:
 95 |         client: HTTP client for API calls
 96 |         project: Optional project name (resolved using hierarchy)
 97 |         context: Optional FastMCP context to cache the result
 98 | 
 99 |     Returns:
100 |         The validated project item
101 | 
102 |     Raises:
103 |         ValueError: If no project can be resolved
104 |         HTTPError: If project doesn't exist or is inaccessible
105 |     """
106 |     # Deferred import to avoid circular dependency with tools
107 |     from basic_memory.mcp.tools.utils import call_get
108 | 
109 |     resolved_project = await resolve_project_parameter(project)
110 |     if not resolved_project:
111 |         project_names = await get_project_names(client, headers)
112 |         raise ValueError(
113 |             "No project specified. "
114 |             "Either set 'default_project_mode=true' in config, or use 'project' argument.\n"
115 |             f"Available projects: {project_names}"
116 |         )
117 | 
118 |     project = resolved_project
119 | 
120 |     # Check if already cached in context
121 |     if context:
122 |         cached_project = context.get_state("active_project")
123 |         if cached_project and cached_project.name == project:
124 |             logger.debug(f"Using cached project from context: {project}")
125 |             return cached_project
126 | 
127 |     # Validate project exists by calling API
128 |     logger.debug(f"Validating project: {project}")
129 |     permalink = generate_permalink(project)
130 |     response = await call_get(client, f"/{permalink}/project/item", headers=headers)
131 |     active_project = ProjectItem.model_validate(response.json())
132 | 
133 |     # Cache in context if available
134 |     if context:
135 |         context.set_state("active_project", active_project)
136 |         logger.debug(f"Cached project in context: {project}")
137 | 
138 |     logger.debug(f"Validated project: {active_project.name}")
139 |     return active_project
140 | 
141 | 
142 | def add_project_metadata(result: str, project_name: str) -> str:
143 |     """Add project context as metadata footer for assistant session tracking.
144 | 
145 |     Provides clear project context to help the assistant remember which
146 |     project is being used throughout the conversation session.
147 | 
148 |     Args:
149 |         result: The tool result string
150 |         project_name: The project name that was used
151 | 
152 |     Returns:
153 |         Result with project session tracking metadata
154 |     """
155 |     return f"{result}\n\n[Session: Using project '{project_name}']"
156 | 
```

--------------------------------------------------------------------------------
/test-int/test_db_wal_mode.py:
--------------------------------------------------------------------------------

```python
  1 | """Integration tests for WAL mode and Windows-specific SQLite optimizations.
  2 | 
  3 | These tests use real filesystem databases (not in-memory) to verify WAL mode
  4 | and other SQLite configuration settings work correctly in production scenarios.
  5 | """
  6 | 
  7 | import pytest
  8 | from sqlalchemy import text
  9 | 
 10 | 
 11 | @pytest.mark.asyncio
 12 | async def test_wal_mode_enabled(engine_factory, db_backend):
 13 |     """Test that WAL mode is enabled on filesystem database connections."""
 14 |     if db_backend == "postgres":
 15 |         pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
 16 | 
 17 |     engine, _ = engine_factory
 18 | 
 19 |     # Execute a query to verify WAL mode is enabled
 20 |     async with engine.connect() as conn:
 21 |         result = await conn.execute(text("PRAGMA journal_mode"))
 22 |         journal_mode = result.fetchone()[0]
 23 | 
 24 |         # WAL mode should be enabled for filesystem databases
 25 |         assert journal_mode.upper() == "WAL"
 26 | 
 27 | 
 28 | @pytest.mark.asyncio
 29 | async def test_busy_timeout_configured(engine_factory, db_backend):
 30 |     """Test that busy timeout is configured for database connections."""
 31 |     if db_backend == "postgres":
 32 |         pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
 33 | 
 34 |     engine, _ = engine_factory
 35 | 
 36 |     async with engine.connect() as conn:
 37 |         result = await conn.execute(text("PRAGMA busy_timeout"))
 38 |         busy_timeout = result.fetchone()[0]
 39 | 
 40 |         # Busy timeout should be 10 seconds (10000 milliseconds)
 41 |         assert busy_timeout == 10000
 42 | 
 43 | 
 44 | @pytest.mark.asyncio
 45 | async def test_synchronous_mode_configured(engine_factory, db_backend):
 46 |     """Test that synchronous mode is set to NORMAL for performance."""
 47 |     if db_backend == "postgres":
 48 |         pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
 49 | 
 50 |     engine, _ = engine_factory
 51 | 
 52 |     async with engine.connect() as conn:
 53 |         result = await conn.execute(text("PRAGMA synchronous"))
 54 |         synchronous = result.fetchone()[0]
 55 | 
 56 |         # Synchronous should be NORMAL (1)
 57 |         assert synchronous == 1
 58 | 
 59 | 
 60 | @pytest.mark.asyncio
 61 | async def test_cache_size_configured(engine_factory, db_backend):
 62 |     """Test that cache size is configured for performance."""
 63 |     if db_backend == "postgres":
 64 |         pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
 65 | 
 66 |     engine, _ = engine_factory
 67 | 
 68 |     async with engine.connect() as conn:
 69 |         result = await conn.execute(text("PRAGMA cache_size"))
 70 |         cache_size = result.fetchone()[0]
 71 | 
 72 |         # Cache size should be -64000 (64MB)
 73 |         assert cache_size == -64000
 74 | 
 75 | 
 76 | @pytest.mark.asyncio
 77 | async def test_temp_store_configured(engine_factory, db_backend):
 78 |     """Test that temp_store is set to MEMORY."""
 79 |     if db_backend == "postgres":
 80 |         pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
 81 | 
 82 |     engine, _ = engine_factory
 83 | 
 84 |     async with engine.connect() as conn:
 85 |         result = await conn.execute(text("PRAGMA temp_store"))
 86 |         temp_store = result.fetchone()[0]
 87 | 
 88 |         # temp_store should be MEMORY (2)
 89 |         assert temp_store == 2
 90 | 
 91 | 
 92 | @pytest.mark.asyncio
 93 | @pytest.mark.windows
 94 | @pytest.mark.skipif(
 95 |     __import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
 96 | )
 97 | async def test_windows_locking_mode_when_on_windows(tmp_path, monkeypatch, config_manager):
 98 |     """Test that Windows-specific locking mode is set when running on Windows."""
 99 |     from basic_memory.db import engine_session_factory, DatabaseType
100 |     from basic_memory.config import DatabaseBackend
101 | 
102 |     # Force SQLite backend for this SQLite-specific test
103 |     config_manager.config.database_backend = DatabaseBackend.SQLITE
104 | 
105 |     # Set HOME environment variable
106 |     monkeypatch.setenv("HOME", str(tmp_path))
107 |     monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
108 | 
109 |     db_path = tmp_path / "test_windows.db"
110 | 
111 |     async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (
112 |         engine,
113 |         _,
114 |     ):
115 |         async with engine.connect() as conn:
116 |             result = await conn.execute(text("PRAGMA locking_mode"))
117 |             locking_mode = result.fetchone()[0]
118 | 
119 |             # Locking mode should be NORMAL on Windows
120 |             assert locking_mode.upper() == "NORMAL"
121 | 
122 | 
123 | @pytest.mark.asyncio
124 | @pytest.mark.windows
125 | @pytest.mark.skipif(
126 |     __import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
127 | )
128 | async def test_null_pool_on_windows(tmp_path, monkeypatch):
129 |     """Test that NullPool is used on Windows to avoid connection pooling issues."""
130 |     from basic_memory.db import engine_session_factory, DatabaseType
131 |     from sqlalchemy.pool import NullPool
132 | 
133 |     # Set HOME environment variable
134 |     monkeypatch.setenv("HOME", str(tmp_path))
135 |     monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
136 | 
137 |     db_path = tmp_path / "test_windows_pool.db"
138 | 
139 |     async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (engine, _):
140 |         # Engine should be using NullPool on Windows
141 |         assert isinstance(engine.pool, NullPool)
142 | 
143 | 
144 | @pytest.mark.asyncio
145 | @pytest.mark.windows
146 | @pytest.mark.skipif(
147 |     __import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
148 | )
149 | async def test_memory_database_no_null_pool_on_windows(tmp_path, monkeypatch):
150 |     """Test that in-memory databases do NOT use NullPool even on Windows.
151 | 
152 |     NullPool closes connections immediately, which destroys in-memory databases.
153 |     This test ensures in-memory databases maintain connection pooling.
154 |     """
155 |     from basic_memory.db import engine_session_factory, DatabaseType
156 |     from sqlalchemy.pool import NullPool
157 | 
158 |     # Set HOME environment variable
159 |     monkeypatch.setenv("HOME", str(tmp_path))
160 |     monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
161 | 
162 |     db_path = tmp_path / "test_memory.db"
163 | 
164 |     async with engine_session_factory(db_path, DatabaseType.MEMORY) as (engine, _):
165 |         # In-memory databases should NOT use NullPool on Windows
166 |         assert not isinstance(engine.pool, NullPool)
167 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/repository/relation_repository.py:
--------------------------------------------------------------------------------

```python
  1 | """Repository for managing Relation objects."""
  2 | 
  3 | from typing import Sequence, List, Optional, Any, cast
  4 | 
  5 | from sqlalchemy import and_, delete, select
  6 | from sqlalchemy.engine import CursorResult
  7 | from sqlalchemy.dialects.postgresql import insert as pg_insert
  8 | from sqlalchemy.dialects.sqlite import insert as sqlite_insert
  9 | from sqlalchemy.ext.asyncio import async_sessionmaker
 10 | from sqlalchemy.orm import selectinload, aliased
 11 | from sqlalchemy.orm.interfaces import LoaderOption
 12 | 
 13 | from basic_memory import db
 14 | from basic_memory.models import Relation, Entity
 15 | from basic_memory.repository.repository import Repository
 16 | 
 17 | 
 18 | class RelationRepository(Repository[Relation]):
 19 |     """Repository for Relation model with memory-specific operations."""
 20 | 
 21 |     def __init__(self, session_maker: async_sessionmaker, project_id: int):
 22 |         """Initialize with session maker and project_id filter.
 23 | 
 24 |         Args:
 25 |             session_maker: SQLAlchemy session maker
 26 |             project_id: Project ID to filter all operations by
 27 |         """
 28 |         super().__init__(session_maker, Relation, project_id=project_id)
 29 | 
 30 |     async def find_relation(
 31 |         self, from_permalink: str, to_permalink: str, relation_type: str
 32 |     ) -> Optional[Relation]:
 33 |         """Find a relation by its from and to path IDs."""
 34 |         from_entity = aliased(Entity)
 35 |         to_entity = aliased(Entity)
 36 | 
 37 |         query = (
 38 |             select(Relation)
 39 |             .join(from_entity, Relation.from_id == from_entity.id)
 40 |             .join(to_entity, Relation.to_id == to_entity.id)
 41 |             .where(
 42 |                 and_(
 43 |                     from_entity.permalink == from_permalink,
 44 |                     to_entity.permalink == to_permalink,
 45 |                     Relation.relation_type == relation_type,
 46 |                 )
 47 |             )
 48 |         )
 49 |         return await self.find_one(query)
 50 | 
 51 |     async def find_by_entities(self, from_id: int, to_id: int) -> Sequence[Relation]:
 52 |         """Find all relations between two entities."""
 53 |         query = select(Relation).where((Relation.from_id == from_id) & (Relation.to_id == to_id))
 54 |         result = await self.execute_query(query)
 55 |         return result.scalars().all()
 56 | 
 57 |     async def find_by_type(self, relation_type: str) -> Sequence[Relation]:
 58 |         """Find all relations of a specific type."""
 59 |         query = select(Relation).filter(Relation.relation_type == relation_type)
 60 |         result = await self.execute_query(query)
 61 |         return result.scalars().all()
 62 | 
 63 |     async def delete_outgoing_relations_from_entity(self, entity_id: int) -> None:
 64 |         """Delete outgoing relations for an entity.
 65 | 
 66 |         Only deletes relations where this entity is the source (from_id),
 67 |         as these are the ones owned by this entity's markdown file.
 68 |         """
 69 |         async with db.scoped_session(self.session_maker) as session:
 70 |             await session.execute(delete(Relation).where(Relation.from_id == entity_id))
 71 | 
 72 |     async def find_unresolved_relations(self) -> Sequence[Relation]:
 73 |         """Find all unresolved relations, where to_id is null."""
 74 |         query = select(Relation).filter(Relation.to_id.is_(None))
 75 |         result = await self.execute_query(query)
 76 |         return result.scalars().all()
 77 | 
 78 |     async def find_unresolved_relations_for_entity(self, entity_id: int) -> Sequence[Relation]:
 79 |         """Find unresolved relations for a specific entity.
 80 | 
 81 |         Args:
 82 |             entity_id: The entity whose unresolved outgoing relations to find.
 83 | 
 84 |         Returns:
 85 |             List of unresolved relations where this entity is the source.
 86 |         """
 87 |         query = select(Relation).filter(Relation.from_id == entity_id, Relation.to_id.is_(None))
 88 |         result = await self.execute_query(query)
 89 |         return result.scalars().all()
 90 | 
 91 |     async def add_all_ignore_duplicates(self, relations: List[Relation]) -> int:
 92 |         """Bulk insert relations, ignoring duplicates.
 93 | 
 94 |         Uses ON CONFLICT DO NOTHING to skip relations that would violate the
 95 |         unique constraint on (from_id, to_name, relation_type). This is useful
 96 |         for bulk operations where the same link may appear multiple times in
 97 |         a document.
 98 | 
 99 |         Works with both SQLite and PostgreSQL dialects.
100 | 
101 |         Args:
102 |             relations: List of Relation objects to insert
103 | 
104 |         Returns:
105 |             Number of relations actually inserted (excludes duplicates)
106 |         """
107 |         if not relations:
108 |             return 0
109 | 
110 |         # Convert Relation objects to dicts for insert
111 |         values = [
112 |             {
113 |                 "project_id": r.project_id if r.project_id else self.project_id,
114 |                 "from_id": r.from_id,
115 |                 "to_id": r.to_id,
116 |                 "to_name": r.to_name,
117 |                 "relation_type": r.relation_type,
118 |                 "context": r.context,
119 |             }
120 |             for r in relations
121 |         ]
122 | 
123 |         async with db.scoped_session(self.session_maker) as session:
124 |             # Check dialect to use appropriate insert
125 |             dialect_name = session.bind.dialect.name if session.bind else "sqlite"
126 | 
127 |             if dialect_name == "postgresql":  # pragma: no cover
128 |                 # PostgreSQL: use RETURNING to count inserted rows
129 |                 # (rowcount is 0 for ON CONFLICT DO NOTHING)
130 |                 stmt = (  # pragma: no cover
131 |                     pg_insert(Relation)
132 |                     .values(values)
133 |                     .on_conflict_do_nothing()
134 |                     .returning(Relation.id)
135 |                 )
136 |                 result = await session.execute(stmt)  # pragma: no cover
137 |                 return len(result.fetchall())  # pragma: no cover
138 |             else:
139 |                 # SQLite: rowcount works correctly
140 |                 stmt = sqlite_insert(Relation).values(values)
141 |                 stmt = stmt.on_conflict_do_nothing()
142 |                 result = cast(CursorResult[Any], await session.execute(stmt))
143 |                 return result.rowcount if result.rowcount > 0 else 0
144 | 
145 |     def get_load_options(self) -> List[LoaderOption]:
146 |         return [selectinload(Relation.from_entity), selectinload(Relation.to_entity)]
147 | 
```

--------------------------------------------------------------------------------
/test-int/mcp/test_build_context_underscore.py:
--------------------------------------------------------------------------------

```python
  1 | """Integration test for build_context with underscore in memory:// URLs."""
  2 | 
  3 | import pytest
  4 | from fastmcp import Client
  5 | 
  6 | 
  7 | @pytest.mark.asyncio
  8 | async def test_build_context_underscore_normalization(mcp_server, app, test_project):
  9 |     """Test that build_context normalizes underscores in relation types."""
 10 | 
 11 |     async with Client(mcp_server) as client:
 12 |         # Create parent note
 13 |         await client.call_tool(
 14 |             "write_note",
 15 |             {
 16 |                 "project": test_project.name,
 17 |                 "title": "Parent Entity",
 18 |                 "folder": "testing",
 19 |                 "content": "# Parent Entity\n\nMain entity for testing underscore relations.",
 20 |                 "tags": "test,parent",
 21 |             },
 22 |         )
 23 | 
 24 |         # Create child notes with different relation formats
 25 |         await client.call_tool(
 26 |             "write_note",
 27 |             {
 28 |                 "project": test_project.name,
 29 |                 "title": "Child with Underscore",
 30 |                 "folder": "testing",
 31 |                 "content": """# Child with Underscore
 32 | 
 33 | - part_of [[Parent Entity]]
 34 | - related_to [[Parent Entity]]
 35 |                 """,
 36 |                 "tags": "test,child",
 37 |             },
 38 |         )
 39 | 
 40 |         await client.call_tool(
 41 |             "write_note",
 42 |             {
 43 |                 "project": test_project.name,
 44 |                 "title": "Child with Hyphen",
 45 |                 "folder": "testing",
 46 |                 "content": """# Child with Hyphen
 47 | 
 48 | - part-of [[Parent Entity]]
 49 | - related-to [[Parent Entity]]
 50 |                 """,
 51 |                 "tags": "test,child",
 52 |             },
 53 |         )
 54 | 
 55 |         # Test 1: Search with underscore format should return results
 56 |         # Relation permalinks are: source/relation_type/target
 57 |         # So child-with-underscore/part-of/parent-entity
 58 |         result_underscore = await client.call_tool(
 59 |             "build_context",
 60 |             {
 61 |                 "project": test_project.name,
 62 |                 "url": "memory://testing/*/part_of/*parent*",  # Using underscore
 63 |             },
 64 |         )
 65 | 
 66 |         # Parse response
 67 |         assert len(result_underscore.content) == 1
 68 |         response_text = result_underscore.content[0].text  # pyright: ignore
 69 |         assert '"results"' in response_text
 70 | 
 71 |         # Both relations should be found since they both connect to parent-entity
 72 |         # The system should normalize the underscore to hyphen internally
 73 |         assert "part-of" in response_text.lower()
 74 | 
 75 |         # Test 2: Search with hyphen format should also return results
 76 |         result_hyphen = await client.call_tool(
 77 |             "build_context",
 78 |             {
 79 |                 "project": test_project.name,
 80 |                 "url": "memory://testing/*/part-of/*parent*",  # Using hyphen
 81 |             },
 82 |         )
 83 | 
 84 |         response_text_hyphen = result_hyphen.content[0].text  # pyright: ignore
 85 |         assert '"results"' in response_text_hyphen
 86 |         assert "part-of" in response_text_hyphen.lower()
 87 | 
 88 |         # Test 3: Test with related_to/related-to as well
 89 |         result_related = await client.call_tool(
 90 |             "build_context",
 91 |             {
 92 |                 "project": test_project.name,
 93 |                 "url": "memory://testing/*/related_to/*parent*",  # Using underscore
 94 |             },
 95 |         )
 96 | 
 97 |         response_text_related = result_related.content[0].text  # pyright: ignore
 98 |         assert '"results"' in response_text_related
 99 |         assert "related-to" in response_text_related.lower()
100 | 
101 |         # Test 4: Test exact path (non-wildcard) with underscore
102 |         # Exact relation permalink would be child/relation/target
103 |         result_exact = await client.call_tool(
104 |             "build_context",
105 |             {
106 |                 "project": test_project.name,
107 |                 "url": "memory://testing/child-with-underscore/part_of/testing/parent-entity",
108 |             },
109 |         )
110 | 
111 |         response_text_exact = result_exact.content[0].text  # pyright: ignore
112 |         assert '"results"' in response_text_exact
113 |         assert "part-of" in response_text_exact.lower()
114 | 
115 | 
116 | @pytest.mark.asyncio
117 | async def test_build_context_complex_underscore_paths(mcp_server, app, test_project):
118 |     """Test build_context with complex paths containing underscores."""
119 | 
120 |     async with Client(mcp_server) as client:
121 |         # Create notes with underscores in titles and relations
122 |         await client.call_tool(
123 |             "write_note",
124 |             {
125 |                 "project": test_project.name,
126 |                 "title": "workflow_manager_agent",
127 |                 "folder": "specs",
128 |                 "content": """# Workflow Manager Agent
129 | 
130 | Specification for the workflow manager agent.
131 |                 """,
132 |                 "tags": "spec,workflow",
133 |             },
134 |         )
135 | 
136 |         await client.call_tool(
137 |             "write_note",
138 |             {
139 |                 "project": test_project.name,
140 |                 "title": "task_parser",
141 |                 "folder": "components",
142 |                 "content": """# Task Parser
143 | 
144 | - part_of [[workflow_manager_agent]]
145 | - implements_for [[workflow_manager_agent]]
146 |                 """,
147 |                 "tags": "component,parser",
148 |             },
149 |         )
150 | 
151 |         # Test with underscores in all parts of the path
152 |         # Relations are created as: task-parser/part-of/workflow-manager-agent
153 |         # So search for */part_of/* or */part-of/* to find them
154 |         test_cases = [
155 |             "memory://components/*/part_of/*workflow*",
156 |             "memory://components/*/part-of/*workflow*",
157 |             "memory://*/task*/part_of/*",
158 |             "memory://*/task*/part-of/*",
159 |         ]
160 | 
161 |         for url in test_cases:
162 |             result = await client.call_tool(
163 |                 "build_context", {"project": test_project.name, "url": url}
164 |             )
165 | 
166 |             # All variations should work and find the related content
167 |             assert len(result.content) == 1
168 |             response = result.content[0].text  # pyright: ignore
169 |             assert '"results"' in response
170 |             # The relation should be found showing part-of connection
171 |             assert "part-of" in response.lower(), f"Failed for URL: {url}"
172 | 
```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_utils.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for MCP tool utilities."""
  2 | 
  3 | import pytest
  4 | from httpx import HTTPStatusError
  5 | from mcp.server.fastmcp.exceptions import ToolError
  6 | 
  7 | from basic_memory.mcp.tools.utils import (
  8 |     call_get,
  9 |     call_post,
 10 |     call_put,
 11 |     call_delete,
 12 |     get_error_message,
 13 | )
 14 | 
 15 | 
 16 | @pytest.fixture
 17 | def mock_response(monkeypatch):
 18 |     """Create a mock response."""
 19 | 
 20 |     class MockResponse:
 21 |         def __init__(self, status_code=200):
 22 |             self.status_code = status_code
 23 |             self.is_success = status_code < 400
 24 |             self.json = lambda: {}
 25 | 
 26 |         def raise_for_status(self):
 27 |             if self.status_code >= 400:
 28 |                 raise HTTPStatusError(
 29 |                     message=f"HTTP Error {self.status_code}", request=None, response=self
 30 |                 )
 31 | 
 32 |     return MockResponse
 33 | 
 34 | 
 35 | class _Client:
 36 |     def __init__(self):
 37 |         self.calls: list[tuple[str, tuple, dict]] = []
 38 |         self._responses: dict[str, object] = {}
 39 | 
 40 |     def set_response(self, method: str, response):
 41 |         self._responses[method.lower()] = response
 42 | 
 43 |     async def get(self, *args, **kwargs):
 44 |         self.calls.append(("get", args, kwargs))
 45 |         return self._responses["get"]
 46 | 
 47 |     async def post(self, *args, **kwargs):
 48 |         self.calls.append(("post", args, kwargs))
 49 |         return self._responses["post"]
 50 | 
 51 |     async def put(self, *args, **kwargs):
 52 |         self.calls.append(("put", args, kwargs))
 53 |         return self._responses["put"]
 54 | 
 55 |     async def delete(self, *args, **kwargs):
 56 |         self.calls.append(("delete", args, kwargs))
 57 |         return self._responses["delete"]
 58 | 
 59 | 
 60 | @pytest.mark.asyncio
 61 | async def test_call_get_success(mock_response):
 62 |     """Test successful GET request."""
 63 |     client = _Client()
 64 |     client.set_response("get", mock_response())
 65 | 
 66 |     response = await call_get(client, "http://test.com")
 67 |     assert response.status_code == 200
 68 | 
 69 | 
 70 | @pytest.mark.asyncio
 71 | async def test_call_get_error(mock_response):
 72 |     """Test GET request with error."""
 73 |     client = _Client()
 74 |     client.set_response("get", mock_response(404))
 75 | 
 76 |     with pytest.raises(ToolError) as exc:
 77 |         await call_get(client, "http://test.com")
 78 |     assert "Resource not found" in str(exc.value)
 79 | 
 80 | 
 81 | @pytest.mark.asyncio
 82 | async def test_call_post_success(mock_response):
 83 |     """Test successful POST request."""
 84 |     client = _Client()
 85 |     response = mock_response()
 86 |     response.json = lambda: {"test": "data"}
 87 |     client.set_response("post", response)
 88 | 
 89 |     response = await call_post(client, "http://test.com", json={"test": "data"})
 90 |     assert response.status_code == 200
 91 | 
 92 | 
 93 | @pytest.mark.asyncio
 94 | async def test_call_post_error(mock_response):
 95 |     """Test POST request with error."""
 96 |     client = _Client()
 97 |     response = mock_response(500)
 98 |     response.json = lambda: {"test": "error"}
 99 | 
100 |     client.set_response("post", response)
101 | 
102 |     with pytest.raises(ToolError) as exc:
103 |         await call_post(client, "http://test.com", json={"test": "data"})
104 |     assert "Internal server error" in str(exc.value)
105 | 
106 | 
107 | @pytest.mark.asyncio
108 | async def test_call_put_success(mock_response):
109 |     """Test successful PUT request."""
110 |     client = _Client()
111 |     client.set_response("put", mock_response())
112 | 
113 |     response = await call_put(client, "http://test.com", json={"test": "data"})
114 |     assert response.status_code == 200
115 | 
116 | 
117 | @pytest.mark.asyncio
118 | async def test_call_put_error(mock_response):
119 |     """Test PUT request with error."""
120 |     client = _Client()
121 |     client.set_response("put", mock_response(400))
122 | 
123 |     with pytest.raises(ToolError) as exc:
124 |         await call_put(client, "http://test.com", json={"test": "data"})
125 |     assert "Invalid request" in str(exc.value)
126 | 
127 | 
128 | @pytest.mark.asyncio
129 | async def test_call_delete_success(mock_response):
130 |     """Test successful DELETE request."""
131 |     client = _Client()
132 |     client.set_response("delete", mock_response())
133 | 
134 |     response = await call_delete(client, "http://test.com")
135 |     assert response.status_code == 200
136 | 
137 | 
138 | @pytest.mark.asyncio
139 | async def test_call_delete_error(mock_response):
140 |     """Test DELETE request with error."""
141 |     client = _Client()
142 |     client.set_response("delete", mock_response(403))
143 | 
144 |     with pytest.raises(ToolError) as exc:
145 |         await call_delete(client, "http://test.com")
146 |     assert "Access denied" in str(exc.value)
147 | 
148 | 
149 | @pytest.mark.asyncio
150 | async def test_call_get_with_params(mock_response):
151 |     """Test GET request with query parameters."""
152 |     client = _Client()
153 |     client.set_response("get", mock_response())
154 | 
155 |     params = {"key": "value", "test": "data"}
156 |     await call_get(client, "http://test.com", params=params)
157 | 
158 |     assert len(client.calls) == 1
159 |     method, _args, kwargs = client.calls[0]
160 |     assert method == "get"
161 |     assert kwargs["params"] == params
162 | 
163 | 
164 | @pytest.mark.asyncio
165 | async def test_get_error_message():
166 |     """Test the get_error_message function."""
167 | 
168 |     # Test 400 status code
169 |     message = get_error_message(400, "http://test.com/resource", "GET")
170 |     assert "Invalid request" in message
171 |     assert "resource" in message
172 | 
173 |     # Test 404 status code
174 |     message = get_error_message(404, "http://test.com/missing", "GET")
175 |     assert "Resource not found" in message
176 |     assert "missing" in message
177 | 
178 |     # Test 500 status code
179 |     message = get_error_message(500, "http://test.com/server", "POST")
180 |     assert "Internal server error" in message
181 |     assert "server" in message
182 | 
183 |     # Test URL object handling
184 |     from httpx import URL
185 | 
186 |     url = URL("http://test.com/complex/path")
187 |     message = get_error_message(403, url, "DELETE")
188 |     assert "Access denied" in message
189 |     assert "path" in message
190 | 
191 | 
192 | @pytest.mark.asyncio
193 | async def test_call_post_with_json(mock_response):
194 |     """Test POST request with JSON payload."""
195 |     client = _Client()
196 |     response = mock_response()
197 |     response.json = lambda: {"test": "data"}
198 | 
199 |     client.set_response("post", response)
200 | 
201 |     json_data = {"key": "value", "nested": {"test": "data"}}
202 |     await call_post(client, "http://test.com", json=json_data)
203 | 
204 |     assert len(client.calls) == 1
205 |     method, _args, kwargs = client.calls[0]
206 |     assert method == "post"
207 |     assert kwargs["json"] == json_data
208 | 
```

--------------------------------------------------------------------------------
/test-int/test_disable_permalinks_integration.py:
--------------------------------------------------------------------------------

```python
  1 | """Integration tests for the disable_permalinks configuration."""
  2 | 
  3 | import pytest
  4 | 
  5 | from basic_memory.markdown import EntityParser, MarkdownProcessor
  6 | from basic_memory.repository import (
  7 |     EntityRepository,
  8 |     ObservationRepository,
  9 |     RelationRepository,
 10 |     ProjectRepository,
 11 | )
 12 | from basic_memory.repository.postgres_search_repository import PostgresSearchRepository
 13 | from basic_memory.repository.sqlite_search_repository import SQLiteSearchRepository
 14 | from basic_memory.schemas import Entity as EntitySchema
 15 | from basic_memory.services import FileService
 16 | from basic_memory.services.entity_service import EntityService
 17 | from basic_memory.services.link_resolver import LinkResolver
 18 | from basic_memory.services.search_service import SearchService
 19 | from basic_memory.sync.sync_service import SyncService
 20 | 
 21 | 
 22 | @pytest.mark.asyncio
 23 | async def test_disable_permalinks_create_entity(tmp_path, engine_factory, app_config, test_project):
 24 |     """Test that entities created with disable_permalinks=True don't have permalinks."""
 25 |     from basic_memory.config import DatabaseBackend
 26 | 
 27 |     engine, session_maker = engine_factory
 28 | 
 29 |     # Override app config to enable disable_permalinks
 30 |     app_config.disable_permalinks = True
 31 | 
 32 |     # Setup repositories
 33 |     entity_repository = EntityRepository(session_maker, project_id=test_project.id)
 34 |     observation_repository = ObservationRepository(session_maker, project_id=test_project.id)
 35 |     relation_repository = RelationRepository(session_maker, project_id=test_project.id)
 36 | 
 37 |     # Use database-specific search repository
 38 |     if app_config.database_backend == DatabaseBackend.POSTGRES:
 39 |         search_repository = PostgresSearchRepository(session_maker, project_id=test_project.id)
 40 |     else:
 41 |         search_repository = SQLiteSearchRepository(session_maker, project_id=test_project.id)
 42 | 
 43 |     # Setup services
 44 |     entity_parser = EntityParser(tmp_path)
 45 |     markdown_processor = MarkdownProcessor(entity_parser)
 46 |     file_service = FileService(tmp_path, markdown_processor)
 47 |     search_service = SearchService(search_repository, entity_repository, file_service)
 48 |     await search_service.init_search_index()
 49 |     link_resolver = LinkResolver(entity_repository, search_service)
 50 | 
 51 |     entity_service = EntityService(
 52 |         entity_parser=entity_parser,
 53 |         entity_repository=entity_repository,
 54 |         observation_repository=observation_repository,
 55 |         relation_repository=relation_repository,
 56 |         file_service=file_service,
 57 |         link_resolver=link_resolver,
 58 |         app_config=app_config,
 59 |     )
 60 | 
 61 |     # Create entity via API
 62 |     entity_data = EntitySchema(
 63 |         title="Test Note",
 64 |         folder="test",
 65 |         entity_type="note",
 66 |         content="Test content",
 67 |     )
 68 | 
 69 |     created = await entity_service.create_entity(entity_data)
 70 | 
 71 |     # Verify entity has no permalink
 72 |     assert created.permalink is None
 73 | 
 74 |     # Verify file has no permalink in frontmatter
 75 |     file_path = tmp_path / "test" / "Test Note.md"
 76 |     assert file_path.exists()
 77 |     content = file_path.read_text()
 78 |     assert "permalink:" not in content
 79 |     assert "Test content" in content
 80 | 
 81 | 
 82 | @pytest.mark.asyncio
 83 | async def test_disable_permalinks_sync_workflow(tmp_path, engine_factory, app_config, test_project):
 84 |     """Test full sync workflow with disable_permalinks enabled."""
 85 |     from basic_memory.config import DatabaseBackend
 86 | 
 87 |     engine, session_maker = engine_factory
 88 | 
 89 |     # Override app config to enable disable_permalinks
 90 |     app_config.disable_permalinks = True
 91 | 
 92 |     # Create a test markdown file without frontmatter
 93 |     test_file = tmp_path / "test_note.md"
 94 |     test_file.write_text("# Test Note\nThis is test content.")
 95 | 
 96 |     # Setup repositories
 97 |     entity_repository = EntityRepository(session_maker, project_id=test_project.id)
 98 |     observation_repository = ObservationRepository(session_maker, project_id=test_project.id)
 99 |     relation_repository = RelationRepository(session_maker, project_id=test_project.id)
100 | 
101 |     # Use database-specific search repository
102 |     if app_config.database_backend == DatabaseBackend.POSTGRES:
103 |         search_repository = PostgresSearchRepository(session_maker, project_id=test_project.id)
104 |     else:
105 |         search_repository = SQLiteSearchRepository(session_maker, project_id=test_project.id)
106 | 
107 |     project_repository = ProjectRepository(session_maker)
108 | 
109 |     # Setup services
110 |     entity_parser = EntityParser(tmp_path)
111 |     markdown_processor = MarkdownProcessor(entity_parser)
112 |     file_service = FileService(tmp_path, markdown_processor)
113 |     search_service = SearchService(search_repository, entity_repository, file_service)
114 |     await search_service.init_search_index()
115 |     link_resolver = LinkResolver(entity_repository, search_service)
116 | 
117 |     entity_service = EntityService(
118 |         entity_parser=entity_parser,
119 |         entity_repository=entity_repository,
120 |         observation_repository=observation_repository,
121 |         relation_repository=relation_repository,
122 |         file_service=file_service,
123 |         link_resolver=link_resolver,
124 |         app_config=app_config,
125 |     )
126 | 
127 |     sync_service = SyncService(
128 |         app_config=app_config,
129 |         entity_service=entity_service,
130 |         project_repository=project_repository,
131 |         entity_parser=entity_parser,
132 |         entity_repository=entity_repository,
133 |         relation_repository=relation_repository,
134 |         search_service=search_service,
135 |         file_service=file_service,
136 |     )
137 | 
138 |     # Run sync
139 |     report = await sync_service.scan(tmp_path)
140 |     # Note: scan may pick up database files too, so just check our file is there
141 |     assert "test_note.md" in report.new
142 | 
143 |     # Sync the file
144 |     await sync_service.sync_file("test_note.md", new=True)
145 | 
146 |     # Verify file has no permalink added
147 |     content = test_file.read_text()
148 |     assert "permalink:" not in content
149 |     assert "# Test Note" in content
150 | 
151 |     # Verify entity in database has no permalink
152 |     entities = await entity_repository.find_all()
153 |     assert len(entities) == 1
154 |     assert entities[0].permalink is None
155 |     # Title is extracted from filename when no frontmatter, or from frontmatter when present
156 |     assert entities[0].title in ("test_note", "Test Note")
157 | 
```

--------------------------------------------------------------------------------
/tests/repository/test_repository.py:
--------------------------------------------------------------------------------

```python
  1 | """Test repository implementation."""
  2 | 
  3 | from datetime import datetime, UTC
  4 | import pytest
  5 | from sqlalchemy import String, DateTime
  6 | from sqlalchemy.orm import Mapped, mapped_column
  7 | 
  8 | from basic_memory.models import Base
  9 | from basic_memory.repository.repository import Repository
 10 | 
 11 | 
 12 | class ModelTest(Base):
 13 |     """Test model for repository tests."""
 14 | 
 15 |     __tablename__ = "test_model"
 16 | 
 17 |     id: Mapped[str] = mapped_column(String(255), primary_key=True)
 18 |     name: Mapped[str] = mapped_column(String(255))
 19 |     description: Mapped[str | None] = mapped_column(String(255), nullable=True)
 20 |     created_at: Mapped[datetime] = mapped_column(
 21 |         DateTime, default=lambda: datetime.now(UTC).replace(tzinfo=None)
 22 |     )
 23 |     updated_at: Mapped[datetime] = mapped_column(
 24 |         DateTime,
 25 |         default=lambda: datetime.now(UTC).replace(tzinfo=None),
 26 |         onupdate=lambda: datetime.now(UTC).replace(tzinfo=None),
 27 |     )
 28 | 
 29 | 
 30 | @pytest.fixture
 31 | def repository(session_maker):
 32 |     """Create a test repository."""
 33 |     return Repository(session_maker, ModelTest)
 34 | 
 35 | 
 36 | @pytest.mark.asyncio
 37 | async def test_add(repository):
 38 |     """Test bulk creation of entities."""
 39 |     # Create test instances
 40 |     instance = ModelTest(id="test_add", name="Test Add")
 41 |     await repository.add(instance)
 42 | 
 43 |     # Verify we can find in db
 44 |     found = await repository.find_by_id("test_add")
 45 |     assert found is not None
 46 |     assert found.name == "Test Add"
 47 | 
 48 | 
 49 | @pytest.mark.asyncio
 50 | async def test_add_all(repository):
 51 |     """Test bulk creation of entities."""
 52 |     # Create test instances
 53 |     instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]
 54 |     await repository.add_all(instances)
 55 | 
 56 |     # Verify we can find them in db
 57 |     found = await repository.find_by_id("test_0")
 58 |     assert found is not None
 59 |     assert found.name == "Test 0"
 60 | 
 61 | 
 62 | @pytest.mark.asyncio
 63 | async def test_bulk_create(repository):
 64 |     """Test bulk creation of entities."""
 65 |     # Create test instances
 66 |     instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]
 67 | 
 68 |     # Bulk create
 69 |     await repository.create_all([instance.__dict__ for instance in instances])
 70 | 
 71 |     # Verify we can find them in db
 72 |     found = await repository.find_by_id("test_0")
 73 |     assert found is not None
 74 |     assert found.name == "Test 0"
 75 | 
 76 | 
 77 | @pytest.mark.asyncio
 78 | async def test_find_all(repository):
 79 |     """Test finding multiple entities by IDs."""
 80 |     # Create test data
 81 |     instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
 82 |     await repository.create_all([instance.__dict__ for instance in instances])
 83 | 
 84 |     found = await repository.find_all(limit=3)
 85 |     assert len(found) == 3
 86 | 
 87 | 
 88 | @pytest.mark.asyncio
 89 | async def test_find_by_ids(repository):
 90 |     """Test finding multiple entities by IDs."""
 91 |     # Create test data
 92 |     instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
 93 |     await repository.create_all([instance.__dict__ for instance in instances])
 94 | 
 95 |     # Test finding subset of entities
 96 |     ids_to_find = ["test_0", "test_2", "test_4"]
 97 |     found = await repository.find_by_ids(ids_to_find)
 98 |     assert len(found) == 3
 99 |     assert sorted([e.id for e in found]) == sorted(ids_to_find)
100 | 
101 |     # Test finding with some non-existent IDs
102 |     mixed_ids = ["test_0", "nonexistent", "test_4"]
103 |     partial_found = await repository.find_by_ids(mixed_ids)
104 |     assert len(partial_found) == 2
105 |     assert sorted([e.id for e in partial_found]) == ["test_0", "test_4"]
106 | 
107 |     # Test with empty list
108 |     empty_found = await repository.find_by_ids([])
109 |     assert len(empty_found) == 0
110 | 
111 |     # Test with all non-existent IDs
112 |     not_found = await repository.find_by_ids(["fake1", "fake2"])
113 |     assert len(not_found) == 0
114 | 
115 | 
116 | @pytest.mark.asyncio
117 | async def test_delete_by_ids(repository):
118 |     """Test finding multiple entities by IDs."""
119 |     # Create test data
120 |     instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
121 |     await repository.create_all([instance.__dict__ for instance in instances])
122 | 
123 |     # Test delete subset of entities
124 |     ids_to_delete = ["test_0", "test_2", "test_4"]
125 |     deleted_count = await repository.delete_by_ids(ids_to_delete)
126 |     assert deleted_count == 3
127 | 
128 |     # Test finding subset of entities
129 |     ids_to_find = ["test_1", "test_3"]
130 |     found = await repository.find_by_ids(ids_to_find)
131 |     assert len(found) == 2
132 |     assert sorted([e.id for e in found]) == sorted(ids_to_find)
133 | 
134 |     assert await repository.find_by_id(ids_to_delete[0]) is None
135 |     assert await repository.find_by_id(ids_to_delete[1]) is None
136 |     assert await repository.find_by_id(ids_to_delete[2]) is None
137 | 
138 | 
139 | @pytest.mark.asyncio
140 | async def test_update(repository):
141 |     """Test finding entities modified since a timestamp."""
142 |     # Create initial test data
143 |     instance = ModelTest(id="test_add", name="Test Add")
144 |     await repository.add(instance)
145 | 
146 |     instance = ModelTest(id="test_add", name="Updated")
147 | 
148 |     # Find recently modified
149 |     modified = await repository.update(instance.id, {"name": "Updated"})
150 |     assert modified is not None
151 |     assert modified.name == "Updated"
152 | 
153 | 
154 | @pytest.mark.asyncio
155 | async def test_update_model(repository):
156 |     """Test finding entities modified since a timestamp."""
157 |     # Create initial test data
158 |     instance = ModelTest(id="test_add", name="Test Add")
159 |     await repository.add(instance)
160 | 
161 |     instance.name = "Updated"
162 | 
163 |     # Find recently modified
164 |     modified = await repository.update(instance.id, instance)
165 |     assert modified is not None
166 |     assert modified.name == "Updated"
167 | 
168 | 
169 | @pytest.mark.asyncio
170 | async def test_update_model_not_found(repository):
171 |     """Test finding entities modified since a timestamp."""
172 |     # Create initial test data
173 |     instance = ModelTest(id="test_add", name="Test Add")
174 |     await repository.add(instance)
175 | 
176 |     modified = await repository.update("0", {})  # Use string ID for Postgres compatibility
177 |     assert modified is None
178 | 
179 | 
180 | @pytest.mark.asyncio
181 | async def test_count(repository):
182 |     """Test bulk creation of entities."""
183 |     # Create test instances
184 |     instance = ModelTest(id="test_add", name="Test Add")
185 |     await repository.add(instance)
186 | 
187 |     # Verify we can count in db
188 |     count = await repository.count()
189 |     assert count == 1
190 | 
```

--------------------------------------------------------------------------------
/tests/mcp/test_obsidian_yaml_formatting.py:
--------------------------------------------------------------------------------

```python
  1 | """Integration tests for Obsidian-compatible YAML formatting in write_note tool."""
  2 | 
  3 | import pytest
  4 | 
  5 | from basic_memory.mcp.tools import write_note
  6 | 
  7 | 
  8 | @pytest.mark.asyncio
  9 | async def test_write_note_tags_yaml_format(app, project_config, test_project):
 10 |     """Test that write_note creates files with proper YAML list format for tags."""
 11 |     # Create a note with tags using write_note
 12 |     result = await write_note.fn(
 13 |         project=test_project.name,
 14 |         title="YAML Format Test",
 15 |         folder="test",
 16 |         content="Testing YAML tag formatting",
 17 |         tags=["system", "overview", "reference"],
 18 |     )
 19 | 
 20 |     # Verify the note was created successfully
 21 |     assert "Created note" in result
 22 |     assert "file_path: test/YAML Format Test.md" in result
 23 | 
 24 |     # Read the file directly to check YAML formatting
 25 |     file_path = project_config.home / "test" / "YAML Format Test.md"
 26 |     content = file_path.read_text(encoding="utf-8")
 27 | 
 28 |     # Should use YAML list format
 29 |     assert "tags:" in content
 30 |     assert "- system" in content
 31 |     assert "- overview" in content
 32 |     assert "- reference" in content
 33 | 
 34 |     # Should NOT use JSON array format
 35 |     assert '["system"' not in content
 36 |     assert '"overview"' not in content
 37 |     assert '"reference"]' not in content
 38 | 
 39 | 
 40 | @pytest.mark.asyncio
 41 | async def test_write_note_stringified_json_tags(app, project_config, test_project):
 42 |     """Test that stringified JSON arrays are handled correctly."""
 43 |     # This simulates the issue where AI assistants pass tags as stringified JSON
 44 |     result = await write_note.fn(
 45 |         project=test_project.name,
 46 |         title="Stringified JSON Test",
 47 |         folder="test",
 48 |         content="Testing stringified JSON tag input",
 49 |         tags='["python", "testing", "json"]',  # Stringified JSON array
 50 |     )
 51 | 
 52 |     # Verify the note was created successfully
 53 |     assert "Created note" in result
 54 | 
 55 |     # Read the file to check formatting
 56 |     file_path = project_config.home / "test" / "Stringified JSON Test.md"
 57 |     content = file_path.read_text(encoding="utf-8")
 58 | 
 59 |     # Should properly parse the JSON and format as YAML list
 60 |     assert "tags:" in content
 61 |     assert "- python" in content
 62 |     assert "- testing" in content
 63 |     assert "- json" in content
 64 | 
 65 |     # Should NOT have the original stringified format issues
 66 |     assert '["python"' not in content
 67 |     assert '"testing"' not in content
 68 |     assert '"json"]' not in content
 69 | 
 70 | 
 71 | @pytest.mark.asyncio
 72 | async def test_write_note_single_tag_yaml_format(app, project_config, test_project):
 73 |     """Test that single tags are still formatted as YAML lists."""
 74 |     await write_note.fn(
 75 |         project=test_project.name,
 76 |         title="Single Tag Test",
 77 |         folder="test",
 78 |         content="Testing single tag formatting",
 79 |         tags=["solo-tag"],
 80 |     )
 81 | 
 82 |     file_path = project_config.home / "test" / "Single Tag Test.md"
 83 |     content = file_path.read_text(encoding="utf-8")
 84 | 
 85 |     # Single tag should still use list format
 86 |     assert "tags:" in content
 87 |     assert "- solo-tag" in content
 88 | 
 89 | 
 90 | @pytest.mark.asyncio
 91 | async def test_write_note_no_tags(app, project_config, test_project):
 92 |     """Test that notes without tags work normally."""
 93 |     await write_note.fn(
 94 |         project=test_project.name,
 95 |         title="No Tags Test",
 96 |         folder="test",
 97 |         content="Testing note without tags",
 98 |         tags=None,
 99 |     )
100 | 
101 |     file_path = project_config.home / "test" / "No Tags Test.md"
102 |     content = file_path.read_text(encoding="utf-8")
103 | 
104 |     # Should not have tags field in frontmatter
105 |     assert "tags:" not in content
106 |     assert "title: No Tags Test" in content
107 | 
108 | 
109 | @pytest.mark.asyncio
110 | async def test_write_note_empty_tags_list(app, project_config, test_project):
111 |     """Test that empty tag lists are handled properly."""
112 |     await write_note.fn(
113 |         project=test_project.name,
114 |         title="Empty Tags Test",
115 |         folder="test",
116 |         content="Testing empty tag list",
117 |         tags=[],
118 |     )
119 | 
120 |     file_path = project_config.home / "test" / "Empty Tags Test.md"
121 |     content = file_path.read_text(encoding="utf-8")
122 | 
123 |     # Should not add tags field to frontmatter for empty lists
124 |     assert "tags:" not in content
125 | 
126 | 
127 | @pytest.mark.asyncio
128 | async def test_write_note_update_preserves_yaml_format(app, project_config, test_project):
129 |     """Test that updating a note preserves the YAML list format."""
130 |     # First, create the note
131 |     await write_note.fn(
132 |         project=test_project.name,
133 |         title="Update Format Test",
134 |         folder="test",
135 |         content="Initial content",
136 |         tags=["initial", "tag"],
137 |     )
138 | 
139 |     # Then update it with new tags
140 |     result = await write_note.fn(
141 |         project=test_project.name,
142 |         title="Update Format Test",
143 |         folder="test",
144 |         content="Updated content",
145 |         tags=["updated", "new-tag", "format"],
146 |     )
147 | 
148 |     # Should be an update, not a new creation
149 |     assert "Updated note" in result
150 | 
151 |     # Check the file format
152 |     file_path = project_config.home / "test" / "Update Format Test.md"
153 |     content = file_path.read_text(encoding="utf-8")
154 | 
155 |     # Should have proper YAML formatting for updated tags
156 |     assert "tags:" in content
157 |     assert "- updated" in content
158 |     assert "- new-tag" in content
159 |     assert "- format" in content
160 | 
161 |     # Old tags should be gone
162 |     assert "- initial" not in content
163 |     assert "- tag" not in content
164 | 
165 |     # Content should be updated
166 |     assert "Updated content" in content
167 |     assert "Initial content" not in content
168 | 
169 | 
170 | @pytest.mark.asyncio
171 | async def test_complex_tags_yaml_format(app, project_config, test_project):
172 |     """Test that complex tags with special characters format correctly."""
173 |     await write_note.fn(
174 |         project=test_project.name,
175 |         title="Complex Tags Test",
176 |         folder="test",
177 |         content="Testing complex tag formats",
178 |         tags=["python-3.9", "api_integration", "v2.0", "nested/category", "under_score"],
179 |     )
180 | 
181 |     file_path = project_config.home / "test" / "Complex Tags Test.md"
182 |     content = file_path.read_text(encoding="utf-8")
183 | 
184 |     # All complex tags should format correctly
185 |     assert "- python-3.9" in content
186 |     assert "- api_integration" in content
187 |     assert "- v2.0" in content
188 |     assert "- nested/category" in content
189 |     assert "- under_score" in content
190 | 
```

--------------------------------------------------------------------------------
/specs/SPEC-11 Basic Memory API Performance Optimization.md:
--------------------------------------------------------------------------------

```markdown
  1 | ---
  2 | title: 'SPEC-11: Basic Memory API Performance Optimization'
  3 | type: spec
  4 | permalink: specs/spec-11-basic-memory-api-performance-optimization
  5 | tags:
  6 | - performance
  7 | - api
  8 | - mcp
  9 | - database
 10 | - cloud
 11 | ---
 12 | 
 13 | # SPEC-11: Basic Memory API Performance Optimization
 14 | 
 15 | ## Why
 16 | 
 17 | The Basic Memory API experiences significant performance issues in cloud environments due to expensive per-request initialization. MCP tools making
 18 | HTTP requests to the API suffer from 350ms-2.6s latency overhead **before** any actual operation occurs.
 19 | 
 20 | **Root Cause Analysis:**
 21 | - GitHub Issue #82 shows repeated initialization sequences in logs (16:29:35 and 16:49:58)
 22 | - Each MCP tool call triggers full database initialization + project reconciliation
 23 | - `get_engine_factory()` dependency calls `db.get_or_create_db()` on every request
 24 | - `reconcile_projects_with_config()` runs expensive sync operations repeatedly
 25 | 
 26 | **Performance Impact:**
 27 | - Database connection setup: ~50-100ms per request
 28 | - Migration checks: ~100-500ms per request
 29 | - Project reconciliation: ~200ms-2s per request
 30 | - **Total overhead**: ~350ms-2.6s per MCP tool call
 31 | 
 32 | This creates compounding effects with tenant auto-start delays and increases timeout risk in cloud deployments.
 33 | 
 34 | ## What
 35 | 
 36 | This optimization affects the **core basic-memory repository** components:
 37 | 
 38 | 1. **API Lifespan Management** (`src/basic_memory/api/app.py`)
 39 |  - Cache database connections in app state during startup
 40 |  - Avoid repeated expensive initialization
 41 | 
 42 | 2. **Dependency Injection** (`src/basic_memory/deps.py`)
 43 |  - Modify `get_engine_factory()` to use cached connections
 44 |  - Eliminate per-request database setup
 45 | 
 46 | 3. **Initialization Service** (`src/basic_memory/services/initialization.py`)
 47 |  - Add caching/throttling to project reconciliation
 48 |  - Skip expensive operations when appropriate
 49 | 
 50 | 4. **Configuration** (`src/basic_memory/config.py`)
 51 |  - Add optional performance flags for cloud environments
 52 | 
 53 | **Backwards Compatibility**: All changes must be backwards compatible with existing CLI and non-cloud usage.
 54 | 
 55 | ## How (High Level)
 56 | 
 57 | ### Phase 1: Cache Database Connections (Critical - 80% of gains)
 58 | 
 59 | **Problem**: `get_engine_factory()` calls `db.get_or_create_db()` per request
 60 | **Solution**: Cache database engine/session in app state during lifespan
 61 | 
 62 | 1. **Modify API Lifespan** (`api/app.py`):
 63 |  ```python
 64 |  @asynccontextmanager
 65 |  async def lifespan(app: FastAPI):
 66 |      app_config = ConfigManager().config
 67 |      await initialize_app(app_config)
 68 | 
 69 |      # Cache database connection in app state
 70 |      engine, session_maker = await db.get_or_create_db(app_config.database_path)
 71 |      app.state.engine = engine
 72 |      app.state.session_maker = session_maker
 73 | 
 74 |      # ... rest of startup logic
 75 | ```
 76 | 
 77 | 2. Modify Dependency Injection (deps.py):
 78 | ```python
 79 | async def get_engine_factory(
 80 |   request: Request
 81 | ) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
 82 |   """Get cached engine and session maker from app state."""
 83 |   return request.app.state.engine, request.app.state.session_maker
 84 | ```
 85 | Phase 2: Optimize Project Reconciliation (Secondary - 20% of gains)
 86 | 
 87 | Problem: reconcile_projects_with_config() runs expensive sync repeatedly
 88 | Solution: Add module-level caching with time-based throttling
 89 | 
 90 | 1. Add Reconciliation Cache (services/initialization.py):
 91 | ```ptyhon
 92 | _project_reconciliation_completed = False
 93 | _last_reconciliation_time = 0
 94 | 
 95 | async def reconcile_projects_with_config(app_config, force=False):
 96 |   # Skip if recently completed (within 60 seconds) unless forced
 97 |   if recently_completed and not force:
 98 |       return
 99 |   # ... existing logic
100 | ```
101 | Phase 3: Cloud Environment Flags (Optional)
102 | 
103 | Problem: Force expensive initialization in production environments
104 | Solution: Add skip flags for cloud/stateless deployments
105 | 
106 | 1. Add Config Flag (config.py):
107 | skip_initialization_sync: bool = Field(default=False)
108 | 2. Configure in Cloud (basic-memory-cloud integration):
109 | BASIC_MEMORY_SKIP_INITIALIZATION_SYNC=true
110 | 
111 | How to Evaluate
112 | 
113 | Success Criteria
114 | 
115 | 1. Performance Metrics (Primary):
116 | - MCP tool response time reduced by 50%+ (measure before/after)
117 | - Database connection overhead eliminated (0ms vs 50-100ms)
118 | - Migration check overhead eliminated (0ms vs 100-500ms)
119 | - Project reconciliation overhead reduced by 90%+
120 | 2. Load Testing:
121 | - Concurrent MCP tool calls maintain performance
122 | - No memory leaks in cached connections
123 | - Database connection pool behaves correctly
124 | 3. Functional Correctness:
125 | - All existing API endpoints work identically
126 | - MCP tools maintain full functionality
127 | - CLI operations unaffected
128 | - Database migrations still execute properly
129 | 4. Backwards Compatibility:
130 | - No breaking changes to existing APIs
131 | - Config changes are optional with safe defaults
132 | - Non-cloud deployments work unchanged
133 | 
134 | Testing Strategy
135 | 
136 | Performance Testing:
137 | # Before optimization
138 | time basic-memory-mcp-tools write_note "test" "content" "folder"
139 | # Measure: ~1-3 seconds
140 | 
141 | # After optimization  
142 | time basic-memory-mcp-tools write_note "test" "content" "folder"
143 | # Target: <500ms
144 | 
145 | Load Testing:
146 | # Multiple concurrent MCP tool calls
147 | for i in {1..10}; do
148 | basic-memory-mcp-tools search "test" &
149 | done
150 | wait
151 | # Verify: No degradation, consistent response times
152 | 
153 | Regression Testing:
154 | # Full basic-memory test suite
155 | just test
156 | # All tests must pass
157 | 
158 | # Integration tests with cloud deployment
159 | # Verify MCP gateway → API → database flow works
160 | 
161 | Validation Checklist
162 | 
163 | - Phase 1 Complete: Database connections cached, dependency injection optimized
164 | - Performance Benchmark: 50%+ improvement in MCP tool response times
165 | - Memory Usage: No leaks in cached connections over 24h+ periods
166 | - Stress Testing: 100+ concurrent requests maintain performance
167 | - Backwards Compatibility: All existing functionality preserved
168 | - Documentation: Performance optimization documented in README
169 | - Cloud Integration: basic-memory-cloud sees performance benefits
170 | 
171 | Notes
172 | 
173 | Implementation Priority:
174 | - Phase 1 provides 80% of performance gains and should be implemented first
175 | - Phase 2 provides remaining 20% and addresses edge cases
176 | - Phase 3 is optional for maximum cloud optimization
177 | 
178 | Risk Mitigation:
179 | - All changes backwards compatible
180 | - Gradual rollout possible (Phase 1 → 2 → 3)
181 | - Easy rollback via configuration flags
182 | 
183 | Cloud Integration:
184 | - This optimization directly addresses basic-memory-cloud issue #82
185 | - Changes in core basic-memory will benefit all cloud tenants
186 | - No changes needed in basic-memory-cloud itself
187 | 
```

--------------------------------------------------------------------------------
/specs/SPEC-1 Specification-Driven Development Process.md:
--------------------------------------------------------------------------------

```markdown
  1 | ---
  2 | title: 'SPEC-1: Specification-Driven Development Process'
  3 | type: spec
  4 | permalink: specs/spec-1-specification-driven-development-process
  5 | tags:
  6 | - process
  7 | - specification
  8 | - development
  9 | - meta
 10 | ---
 11 | 
 12 | # SPEC-1: Specification-Driven Development Process
 13 | 
 14 | ## Why
 15 | We're implementing specification-driven development to solve the complexity and circular refactoring issues in our web development process. 
 16 | Instead of getting lost in framework details and type gymnastics, we start with clear specifications that drive implementation.
 17 | 
 18 | The default approach of adhoc development with AI agents tends to result in:
 19 | - Circular refactoring cycles
 20 | - Fighting framework complexity
 21 | - Lost context between sessions
 22 | - Unclear requirements and scope
 23 | 
 24 | ## What
 25 | This spec defines our process for using basic-memory as the specification engine to build basic-memory-cloud. 
 26 | We're creating a recursive development pattern where basic-memory manages the specs that drive the development of basic-memory-cloud.
 27 | 
 28 | **Affected Areas:**
 29 | - All future component development
 30 | - Architecture decisions
 31 | - Agent collaboration workflows
 32 | - Knowledge management and context preservation
 33 | 
 34 | ## How (High Level)
 35 | 
 36 | ### Specification Structure
 37 | 
 38 | Name: Spec names should be numbered sequentially, followed by a description eg. `SPEC-X - Simple Description.md`.
 39 | See: [[Spec-2: Slash Commands Reference]]
 40 | 
 41 | Every spec is a complete thought containing:
 42 | - **Why**: The reasoning and problem being solved
 43 | - **What**: What is affected or changed
 44 | - **How**: High-level approach to implementation
 45 | - **How to Evaluate**: Testing/validation procedure
 46 | - Additional context as needed
 47 | 
 48 | ### Living Specification Format
 49 | 
 50 | Specifications are **living documents** that evolve throughout implementation:
 51 | 
 52 | **Progress Tracking:**
 53 | - **Completed items**: Use ✅ checkmark emoji for implemented features
 54 | - **Pending items**: Use `- [ ]` GitHub-style checkboxes for remaining tasks
 55 | - **In-progress items**: Use `- [x]` when work is actively underway
 56 | 
 57 | **Status Philosophy:**
 58 | - **Avoid static status headers** like "COMPLETE" or "IN PROGRESS" that become stale
 59 | - **Use checklists within content** to show granular implementation progress
 60 | - **Keep specs informative** while providing clear progress visibility
 61 | - **Update continuously** as understanding and implementation evolve
 62 | 
 63 | **Example Format:**
 64 | ```markdown
 65 | ### ComponentName
 66 | - ✅ Basic functionality implemented
 67 | - ✅ Props and events defined
 68 | - - [ ] Add sorting controls
 69 | - - [ ] Improve accessibility
 70 | - - [x] Currently implementing responsive design
 71 | ```
 72 | 
 73 | This creates **git-friendly progress tracking** where `[ ]` easily becomes `[x]` or ✅ when completed, and specs remain valuable throughout the development lifecycle.
 74 | 
 75 | 
 76 | ## Claude Code 
 77 | 
 78 | We will leverage Claude Code capabilities to make the process semi-automated. 
 79 | 
 80 | - Slash commands: define repeatable steps in the process (create spec, implement, review, etc)
 81 | - Agents: define roles to carry out instructions  (front end developer, baskend developer, etc)
 82 | - MCP tools: enable agents to implement specs via actions (write code, test, etc)
 83 | 
 84 | ### Workflow
 85 | 1. **Create**: Write spec as complete thought in `/specs` folder
 86 | 2. **Discuss**: Iterate and refine through agent collaboration
 87 | 3. **Implement**: Hand spec to appropriate specialist agent
 88 | 4. **Validate**: Review implementation against spec criteria
 89 | 5. **Document**: Update spec with learnings and decisions
 90 | 
 91 | ### Slash Commands
 92 | 
 93 | Claude slash commands are used to manage the flow.
 94 | These are simple instructions to help make the process uniform. 
 95 | They can be updated and refined as needed. 
 96 | 
 97 | - `/spec create [name]` - Create new specification
 98 | - `/spec status` - Show current spec states
 99 | - `/spec implement [name]` - Hand to appropriate agent
100 | - `/spec review [name]` - Validate implementation
101 | 
102 | ### Agent Orchestration
103 | 
104 | Agents are defined with clear roles, for instance:
105 | 
106 | - **system-architect**: Creates high-level specs, ADRs, architectural decisions
107 | - **vue-developer**: Component specs, UI patterns, frontend architecture
108 | - **python-developer**: Implementation specs, technical details, backend logic
109 | - 
110 | - Each agent reads/updates specs through basic-memory tools. 
111 | 
112 | ## How to Evaluate
113 | 
114 | ### Success Criteria
115 | - Specs provide clear, actionable guidance for implementation
116 | - Reduced circular refactoring and scope creep
117 | - Persistent context across development sessions
118 | - Clean separation between "what/why" and implementation details
119 | - Specs record a history of what happened and why for historical context
120 | 
121 | ### Testing Procedure
122 | 1. Create a spec for an existing problematic component
123 | 2. Have an agent implement following only the spec
124 | 3. Compare result quality and development speed vs. ad-hoc approach
125 | 4. Measure context preservation across sessions
126 | 5. Evaluate spec clarity and completeness
127 | 
128 | ### Metrics
129 | - Time from spec to working implementation
130 | - Number of refactoring cycles required
131 | - Agent understanding of requirements
132 | - Spec reusability for similar components
133 | 
134 | ## Notes
135 | - Start simple: specs are just complete thoughts, not heavy processes
136 | - Use basic-memory's knowledge graph to link specs, decisions, components
137 | - Let the process evolve naturally based on what works
138 | - Focus on solving the actual problem: Manage complexity in development
139 | 
140 | ## Observations
141 | 
142 | - [problem] Web development without clear goals and documentation circular refactoring cycles #complexity
143 | - [solution] Specification-driven development reduces scope creep and context loss #process-improvement  
144 | - [pattern] basic-memory as specification engine creates recursive development loop #meta-development
145 | - [workflow] Five-step process: Create → Discuss → Implement → Validate → Document #methodology
146 | - [tool] Slash commands provide uniform process automation #automation
147 | - [agent-pattern] Three specialized agents handle different implementation domains #specialization
148 | - [success-metric] Time from spec to working implementation measures process efficiency #measurement
149 | - [learning] Process should evolve naturally based on what works in practice #adaptation
150 | - [format] Living specifications use checklists for progress tracking instead of static status headers #documentation
151 | - [evolution] Specs evolve throughout implementation maintaining value as working documents #continuous-improvement
152 | 
153 | ## Relations
154 | 
155 | - spec [[Spec-2: Slash Commands Reference]]
156 | - spec [[Spec-3: Agent Definitions]]
157 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/recent_activity.py:
--------------------------------------------------------------------------------

```python
  1 | """Recent activity prompts for Basic Memory MCP server.
  2 | 
  3 | These prompts help users see what has changed in their knowledge base recently.
  4 | """
  5 | 
  6 | from typing import Annotated, Optional
  7 | 
  8 | from loguru import logger
  9 | from pydantic import Field
 10 | 
 11 | from basic_memory.mcp.prompts.utils import format_prompt_context, PromptContext, PromptContextItem
 12 | from basic_memory.mcp.server import mcp
 13 | from basic_memory.mcp.tools.recent_activity import recent_activity
 14 | from basic_memory.schemas.base import TimeFrame
 15 | from basic_memory.schemas.memory import GraphContext, ProjectActivitySummary
 16 | from basic_memory.schemas.search import SearchItemType
 17 | 
 18 | 
 19 | @mcp.prompt(
 20 |     name="recent_activity",
 21 |     description="Get recent activity from a specific project or across all projects",
 22 | )
 23 | async def recent_activity_prompt(
 24 |     timeframe: Annotated[
 25 |         TimeFrame,
 26 |         Field(description="How far back to look for activity (e.g. '1d', '1 week')"),
 27 |     ] = "7d",
 28 |     project: Annotated[
 29 |         Optional[str],
 30 |         Field(
 31 |             description="Specific project to get activity from (None for discovery across all projects)"
 32 |         ),
 33 |     ] = None,
 34 | ) -> str:
 35 |     """Get recent activity from a specific project or across all projects.
 36 | 
 37 |     This prompt helps you see what's changed recently in the knowledge base.
 38 |     In discovery mode (project=None), it shows activity across all projects.
 39 |     In project-specific mode, it shows detailed activity for one project.
 40 | 
 41 |     Args:
 42 |         timeframe: How far back to look for activity (e.g. '1d', '1 week')
 43 |         project: Specific project to get activity from (None for discovery across all projects)
 44 | 
 45 |     Returns:
 46 |         Formatted summary of recent activity
 47 |     """
 48 |     logger.info(f"Getting recent activity, timeframe: {timeframe}, project: {project}")
 49 | 
 50 |     recent = await recent_activity.fn(
 51 |         project=project, timeframe=timeframe, type=[SearchItemType.ENTITY]
 52 |     )
 53 | 
 54 |     # Extract primary results from the hierarchical structure
 55 |     primary_results = []
 56 |     related_results = []
 57 | 
 58 |     if isinstance(recent, ProjectActivitySummary):
 59 |         # Discovery mode - extract results from all projects
 60 |         for _, project_activity in recent.projects.items():
 61 |             if project_activity.activity.results:
 62 |                 # Take up to 2 primary results per project
 63 |                 for item in project_activity.activity.results[:2]:
 64 |                     primary_results.append(item.primary_result)
 65 |                     # Add up to 1 related result per primary item
 66 |                     if item.related_results:
 67 |                         related_results.extend(item.related_results[:1])  # pragma: no cover
 68 | 
 69 |         # Limit total results for readability
 70 |         primary_results = primary_results[:8]
 71 |         related_results = related_results[:6]
 72 | 
 73 |     elif isinstance(recent, GraphContext):
 74 |         # Project-specific mode - use existing logic
 75 |         if recent.results:
 76 |             # Take up to 5 primary results
 77 |             for item in recent.results[:5]:
 78 |                 primary_results.append(item.primary_result)
 79 |                 # Add up to 2 related results per primary item
 80 |                 if item.related_results:
 81 |                     related_results.extend(item.related_results[:2])  # pragma: no cover
 82 | 
 83 |     # Set topic based on mode
 84 |     if project:
 85 |         topic = f"Recent Activity in {project} ({timeframe})"
 86 |     else:
 87 |         topic = f"Recent Activity Across All Projects ({timeframe})"
 88 | 
 89 |     prompt_context = format_prompt_context(
 90 |         PromptContext(
 91 |             topic=topic,
 92 |             timeframe=timeframe,
 93 |             results=[
 94 |                 PromptContextItem(
 95 |                     primary_results=primary_results,
 96 |                     related_results=related_results[:10],  # Limit total related results
 97 |                 )
 98 |             ],
 99 |         )
100 |     )
101 | 
102 |     # Add mode-specific suggestions
103 |     first_title = "Recent Topic"
104 |     if primary_results and len(primary_results) > 0:
105 |         first_title = primary_results[0].title
106 | 
107 |     if project:
108 |         # Project-specific suggestions
109 |         capture_suggestions = f"""
110 |     ## Opportunity to Capture Activity Summary
111 | 
112 |     Consider creating a summary note of recent activity in {project}:
113 | 
114 |     ```python
115 |     await write_note(
116 |         "{project}",
117 |         title="Activity Summary {timeframe}",
118 |         content='''
119 |         # Activity Summary for {project} ({timeframe})
120 | 
121 |         ## Overview
122 |         [Summary of key changes and developments in this project over this period]
123 | 
124 |         ## Key Updates
125 |         [List main updates and their significance within this project]
126 | 
127 |         ## Observations
128 |         - [trend] [Observation about patterns in recent activity]
129 |         - [insight] [Connection between different activities]
130 | 
131 |         ## Relations
132 |         - summarizes [[{first_title}]]
133 |         - relates_to [[{project} Overview]]
134 |         ''',
135 |         folder="summaries"
136 |     )
137 |     ```
138 | 
139 |     Summarizing periodic activity helps create high-level insights and connections within the project.
140 |     """
141 |     else:
142 |         # Discovery mode suggestions
143 |         project_count = len(recent.projects) if isinstance(recent, ProjectActivitySummary) else 0
144 |         most_active = (
145 |             getattr(recent.summary, "most_active_project", "Unknown")
146 |             if isinstance(recent, ProjectActivitySummary)
147 |             else "Unknown"
148 |         )
149 | 
150 |         capture_suggestions = f"""
151 |     ## Cross-Project Activity Discovery
152 | 
153 |     Found activity across {project_count} projects. Most active: **{most_active}**
154 | 
155 |     Consider creating a cross-project summary:
156 | 
157 |     ```python
158 |     await write_note(
159 |         "{most_active if most_active != "Unknown" else "main"}",
160 |         title="Cross-Project Activity Summary {timeframe}",
161 |         content='''
162 |         # Cross-Project Activity Summary ({timeframe})
163 | 
164 |         ## Overview
165 |         Activity found across {project_count} projects, with {most_active} showing the most activity.
166 | 
167 |         ## Key Developments
168 |         [Summarize important changes across all projects]
169 | 
170 |         ## Project Insights
171 |         [Note patterns or connections between projects]
172 | 
173 |         ## Observations
174 |         - [trend] [Cross-project patterns observed]
175 |         - [insight] [Connections between different project activities]
176 | 
177 |         ## Relations
178 |         - summarizes [[{first_title}]]
179 |         - relates_to [[Project Portfolio Overview]]
180 |         ''',
181 |         folder="summaries"
182 |     )
183 |     ```
184 | 
185 |     Cross-project summaries help identify broader trends and project interconnections.
186 |     """
187 | 
188 |     return prompt_context + capture_suggestions
189 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/deps/importers.py:
--------------------------------------------------------------------------------

```python
  1 | """Importer dependency injection for basic-memory.
  2 | 
  3 | This module provides importer dependencies:
  4 | - ChatGPTImporter
  5 | - ClaudeConversationsImporter
  6 | - ClaudeProjectsImporter
  7 | - MemoryJsonImporter
  8 | """
  9 | 
 10 | from typing import Annotated
 11 | 
 12 | from fastapi import Depends
 13 | 
 14 | from basic_memory.deps.projects import (
 15 |     ProjectConfigDep,
 16 |     ProjectConfigV2Dep,
 17 |     ProjectConfigV2ExternalDep,
 18 | )
 19 | from basic_memory.deps.services import (
 20 |     FileServiceDep,
 21 |     FileServiceV2Dep,
 22 |     FileServiceV2ExternalDep,
 23 |     MarkdownProcessorDep,
 24 |     MarkdownProcessorV2Dep,
 25 |     MarkdownProcessorV2ExternalDep,
 26 | )
 27 | from basic_memory.importers import (
 28 |     ChatGPTImporter,
 29 |     ClaudeConversationsImporter,
 30 |     ClaudeProjectsImporter,
 31 |     MemoryJsonImporter,
 32 | )
 33 | 
 34 | 
 35 | # --- ChatGPT Importer ---
 36 | 
 37 | 
 38 | async def get_chatgpt_importer(
 39 |     project_config: ProjectConfigDep,
 40 |     markdown_processor: MarkdownProcessorDep,
 41 |     file_service: FileServiceDep,
 42 | ) -> ChatGPTImporter:
 43 |     """Create ChatGPTImporter with dependencies."""
 44 |     return ChatGPTImporter(project_config.home, markdown_processor, file_service)
 45 | 
 46 | 
 47 | ChatGPTImporterDep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer)]
 48 | 
 49 | 
 50 | async def get_chatgpt_importer_v2(  # pragma: no cover
 51 |     project_config: ProjectConfigV2Dep,
 52 |     markdown_processor: MarkdownProcessorV2Dep,
 53 |     file_service: FileServiceV2Dep,
 54 | ) -> ChatGPTImporter:
 55 |     """Create ChatGPTImporter with v2 dependencies."""
 56 |     return ChatGPTImporter(project_config.home, markdown_processor, file_service)
 57 | 
 58 | 
 59 | ChatGPTImporterV2Dep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer_v2)]
 60 | 
 61 | 
 62 | async def get_chatgpt_importer_v2_external(
 63 |     project_config: ProjectConfigV2ExternalDep,
 64 |     markdown_processor: MarkdownProcessorV2ExternalDep,
 65 |     file_service: FileServiceV2ExternalDep,
 66 | ) -> ChatGPTImporter:
 67 |     """Create ChatGPTImporter with v2 external_id dependencies."""
 68 |     return ChatGPTImporter(project_config.home, markdown_processor, file_service)
 69 | 
 70 | 
 71 | ChatGPTImporterV2ExternalDep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer_v2_external)]
 72 | 
 73 | 
 74 | # --- Claude Conversations Importer ---
 75 | 
 76 | 
 77 | async def get_claude_conversations_importer(
 78 |     project_config: ProjectConfigDep,
 79 |     markdown_processor: MarkdownProcessorDep,
 80 |     file_service: FileServiceDep,
 81 | ) -> ClaudeConversationsImporter:
 82 |     """Create ClaudeConversationsImporter with dependencies."""
 83 |     return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)
 84 | 
 85 | 
 86 | ClaudeConversationsImporterDep = Annotated[
 87 |     ClaudeConversationsImporter, Depends(get_claude_conversations_importer)
 88 | ]
 89 | 
 90 | 
 91 | async def get_claude_conversations_importer_v2(  # pragma: no cover
 92 |     project_config: ProjectConfigV2Dep,
 93 |     markdown_processor: MarkdownProcessorV2Dep,
 94 |     file_service: FileServiceV2Dep,
 95 | ) -> ClaudeConversationsImporter:
 96 |     """Create ClaudeConversationsImporter with v2 dependencies."""
 97 |     return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)
 98 | 
 99 | 
100 | ClaudeConversationsImporterV2Dep = Annotated[
101 |     ClaudeConversationsImporter, Depends(get_claude_conversations_importer_v2)
102 | ]
103 | 
104 | 
105 | async def get_claude_conversations_importer_v2_external(
106 |     project_config: ProjectConfigV2ExternalDep,
107 |     markdown_processor: MarkdownProcessorV2ExternalDep,
108 |     file_service: FileServiceV2ExternalDep,
109 | ) -> ClaudeConversationsImporter:
110 |     """Create ClaudeConversationsImporter with v2 external_id dependencies."""
111 |     return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)
112 | 
113 | 
114 | ClaudeConversationsImporterV2ExternalDep = Annotated[
115 |     ClaudeConversationsImporter, Depends(get_claude_conversations_importer_v2_external)
116 | ]
117 | 
118 | 
119 | # --- Claude Projects Importer ---
120 | 
121 | 
122 | async def get_claude_projects_importer(
123 |     project_config: ProjectConfigDep,
124 |     markdown_processor: MarkdownProcessorDep,
125 |     file_service: FileServiceDep,
126 | ) -> ClaudeProjectsImporter:
127 |     """Create ClaudeProjectsImporter with dependencies."""
128 |     return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)
129 | 
130 | 
131 | ClaudeProjectsImporterDep = Annotated[ClaudeProjectsImporter, Depends(get_claude_projects_importer)]
132 | 
133 | 
134 | async def get_claude_projects_importer_v2(  # pragma: no cover
135 |     project_config: ProjectConfigV2Dep,
136 |     markdown_processor: MarkdownProcessorV2Dep,
137 |     file_service: FileServiceV2Dep,
138 | ) -> ClaudeProjectsImporter:
139 |     """Create ClaudeProjectsImporter with v2 dependencies."""
140 |     return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)
141 | 
142 | 
143 | ClaudeProjectsImporterV2Dep = Annotated[
144 |     ClaudeProjectsImporter, Depends(get_claude_projects_importer_v2)
145 | ]
146 | 
147 | 
148 | async def get_claude_projects_importer_v2_external(
149 |     project_config: ProjectConfigV2ExternalDep,
150 |     markdown_processor: MarkdownProcessorV2ExternalDep,
151 |     file_service: FileServiceV2ExternalDep,
152 | ) -> ClaudeProjectsImporter:
153 |     """Create ClaudeProjectsImporter with v2 external_id dependencies."""
154 |     return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)
155 | 
156 | 
157 | ClaudeProjectsImporterV2ExternalDep = Annotated[
158 |     ClaudeProjectsImporter, Depends(get_claude_projects_importer_v2_external)
159 | ]
160 | 
161 | 
162 | # --- Memory JSON Importer ---
163 | 
164 | 
165 | async def get_memory_json_importer(
166 |     project_config: ProjectConfigDep,
167 |     markdown_processor: MarkdownProcessorDep,
168 |     file_service: FileServiceDep,
169 | ) -> MemoryJsonImporter:
170 |     """Create MemoryJsonImporter with dependencies."""
171 |     return MemoryJsonImporter(project_config.home, markdown_processor, file_service)
172 | 
173 | 
174 | MemoryJsonImporterDep = Annotated[MemoryJsonImporter, Depends(get_memory_json_importer)]
175 | 
176 | 
177 | async def get_memory_json_importer_v2(  # pragma: no cover
178 |     project_config: ProjectConfigV2Dep,
179 |     markdown_processor: MarkdownProcessorV2Dep,
180 |     file_service: FileServiceV2Dep,
181 | ) -> MemoryJsonImporter:
182 |     """Create MemoryJsonImporter with v2 dependencies."""
183 |     return MemoryJsonImporter(project_config.home, markdown_processor, file_service)
184 | 
185 | 
186 | MemoryJsonImporterV2Dep = Annotated[MemoryJsonImporter, Depends(get_memory_json_importer_v2)]
187 | 
188 | 
189 | async def get_memory_json_importer_v2_external(
190 |     project_config: ProjectConfigV2ExternalDep,
191 |     markdown_processor: MarkdownProcessorV2ExternalDep,
192 |     file_service: FileServiceV2ExternalDep,
193 | ) -> MemoryJsonImporter:
194 |     """Create MemoryJsonImporter with v2 external_id dependencies."""
195 |     return MemoryJsonImporter(project_config.home, markdown_processor, file_service)
196 | 
197 | 
198 | MemoryJsonImporterV2ExternalDep = Annotated[
199 |     MemoryJsonImporter, Depends(get_memory_json_importer_v2_external)
200 | ]
201 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/importers/claude_conversations_importer.py:
--------------------------------------------------------------------------------

```python
  1 | """Claude conversations import service for Basic Memory."""
  2 | 
  3 | import logging
  4 | from datetime import datetime
  5 | from typing import Any, Dict, List, Optional
  6 | 
  7 | from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
  8 | from basic_memory.importers.base import Importer
  9 | from basic_memory.schemas.importer import ChatImportResult
 10 | from basic_memory.importers.utils import clean_filename, format_timestamp
 11 | 
 12 | logger = logging.getLogger(__name__)
 13 | 
 14 | 
 15 | class ClaudeConversationsImporter(Importer[ChatImportResult]):
 16 |     """Service for importing Claude conversations."""
 17 | 
 18 |     def handle_error(  # pragma: no cover
 19 |         self, message: str, error: Optional[Exception] = None
 20 |     ) -> ChatImportResult:
 21 |         """Return a failed ChatImportResult with an error message."""
 22 |         error_msg = f"{message}: {error}" if error else message
 23 |         return ChatImportResult(
 24 |             import_count={},
 25 |             success=False,
 26 |             error_message=error_msg,
 27 |             conversations=0,
 28 |             messages=0,
 29 |         )
 30 | 
 31 |     async def import_data(
 32 |         self, source_data, destination_folder: str, **kwargs: Any
 33 |     ) -> ChatImportResult:
 34 |         """Import conversations from Claude JSON export.
 35 | 
 36 |         Args:
 37 |             source_data: Path to the Claude conversations.json file.
 38 |             destination_folder: Destination folder within the project.
 39 |             **kwargs: Additional keyword arguments.
 40 | 
 41 |         Returns:
 42 |             ChatImportResult containing statistics and status of the import.
 43 |         """
 44 |         try:
 45 |             # Ensure the destination folder exists
 46 |             await self.ensure_folder_exists(destination_folder)
 47 | 
 48 |             conversations = source_data
 49 | 
 50 |             # Process each conversation
 51 |             messages_imported = 0
 52 |             chats_imported = 0
 53 | 
 54 |             for chat in conversations:
 55 |                 # Get name, providing default for unnamed conversations
 56 |                 chat_name = chat.get("name") or f"Conversation {chat.get('uuid', 'untitled')}"
 57 | 
 58 |                 # Convert to entity
 59 |                 entity = self._format_chat_content(
 60 |                     folder=destination_folder,
 61 |                     name=chat_name,
 62 |                     messages=chat["chat_messages"],
 63 |                     created_at=chat["created_at"],
 64 |                     modified_at=chat["updated_at"],
 65 |                 )
 66 | 
 67 |                 # Write file using relative path - FileService handles base_path
 68 |                 file_path = f"{entity.frontmatter.metadata['permalink']}.md"
 69 |                 await self.write_entity(entity, file_path)
 70 | 
 71 |                 chats_imported += 1
 72 |                 messages_imported += len(chat["chat_messages"])
 73 | 
 74 |             return ChatImportResult(
 75 |                 import_count={"conversations": chats_imported, "messages": messages_imported},
 76 |                 success=True,
 77 |                 conversations=chats_imported,
 78 |                 messages=messages_imported,
 79 |             )
 80 | 
 81 |         except Exception as e:  # pragma: no cover
 82 |             logger.exception("Failed to import Claude conversations")
 83 |             return self.handle_error("Failed to import Claude conversations", e)
 84 | 
 85 |     def _format_chat_content(
 86 |         self,
 87 |         folder: str,
 88 |         name: str,
 89 |         messages: List[Dict[str, Any]],
 90 |         created_at: str,
 91 |         modified_at: str,
 92 |     ) -> EntityMarkdown:
 93 |         """Convert chat messages to Basic Memory entity format.
 94 | 
 95 |         Args:
 96 |             folder: Destination folder name (relative path).
 97 |             name: Chat name.
 98 |             messages: List of chat messages.
 99 |             created_at: Creation timestamp.
100 |             modified_at: Modification timestamp.
101 | 
102 |         Returns:
103 |             EntityMarkdown instance representing the conversation.
104 |         """
105 |         # Generate permalink using folder name (relative path)
106 |         date_prefix = datetime.fromisoformat(created_at.replace("Z", "+00:00")).strftime("%Y%m%d")
107 |         clean_title = clean_filename(name)
108 |         permalink = f"{folder}/{date_prefix}-{clean_title}"
109 | 
110 |         # Format content
111 |         content = self._format_chat_markdown(
112 |             name=name,
113 |             messages=messages,
114 |             created_at=created_at,
115 |             modified_at=modified_at,
116 |             permalink=permalink,
117 |         )
118 | 
119 |         # Create entity
120 |         entity = EntityMarkdown(
121 |             frontmatter=EntityFrontmatter(
122 |                 metadata={
123 |                     "type": "conversation",
124 |                     "title": name,
125 |                     "created": created_at,
126 |                     "modified": modified_at,
127 |                     "permalink": permalink,
128 |                 }
129 |             ),
130 |             content=content,
131 |         )
132 | 
133 |         return entity
134 | 
135 |     def _format_chat_markdown(
136 |         self,
137 |         name: str,
138 |         messages: List[Dict[str, Any]],
139 |         created_at: str,
140 |         modified_at: str,
141 |         permalink: str,
142 |     ) -> str:
143 |         """Format chat as clean markdown.
144 | 
145 |         Args:
146 |             name: Chat name.
147 |             messages: List of chat messages.
148 |             created_at: Creation timestamp.
149 |             modified_at: Modification timestamp.
150 |             permalink: Permalink for the entity.
151 | 
152 |         Returns:
153 |             Formatted markdown content.
154 |         """
155 |         # Start with frontmatter and title
156 |         lines = [
157 |             f"# {name}\n",
158 |         ]
159 | 
160 |         # Add messages
161 |         for msg in messages:
162 |             # Format timestamp
163 |             ts = format_timestamp(msg["created_at"])
164 | 
165 |             # Add message header
166 |             lines.append(f"### {msg['sender'].title()} ({ts})")
167 | 
168 |             # Handle message content
169 |             content = msg.get("text", "")
170 |             if msg.get("content"):
171 |                 # Filter out None values before joining
172 |                 content = " ".join(
173 |                     str(c.get("text", ""))
174 |                     for c in msg["content"]
175 |                     if c and c.get("text") is not None
176 |                 )
177 |             lines.append(content)
178 | 
179 |             # Handle attachments
180 |             attachments = msg.get("attachments", [])
181 |             for attachment in attachments:
182 |                 if "file_name" in attachment:
183 |                     lines.append(f"\n**Attachment: {attachment['file_name']}**")
184 |                     if "extracted_content" in attachment:
185 |                         lines.append("```")
186 |                         lines.append(attachment["extracted_content"])
187 |                         lines.append("```")
188 | 
189 |             # Add spacing between messages
190 |             lines.append("")
191 | 
192 |         return "\n".join(lines)
193 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/status.py:
--------------------------------------------------------------------------------

```python
  1 | """Status command for basic-memory CLI."""
  2 | 
  3 | from typing import Set, Dict
  4 | from typing import Annotated, Optional
  5 | 
  6 | from mcp.server.fastmcp.exceptions import ToolError
  7 | import typer
  8 | from loguru import logger
  9 | from rich.console import Console
 10 | from rich.panel import Panel
 11 | from rich.tree import Tree
 12 | 
 13 | from basic_memory.cli.app import app
 14 | from basic_memory.mcp.async_client import get_client
 15 | from basic_memory.mcp.tools.utils import call_post
 16 | from basic_memory.schemas import SyncReportResponse
 17 | from basic_memory.mcp.project_context import get_active_project
 18 | 
 19 | # Create rich console
 20 | console = Console()
 21 | 
 22 | 
 23 | def add_files_to_tree(
 24 |     tree: Tree, paths: Set[str], style: str, checksums: Dict[str, str] | None = None
 25 | ):
 26 |     """Add files to tree, grouped by directory."""
 27 |     # Group by directory
 28 |     by_dir = {}
 29 |     for path in sorted(paths):
 30 |         parts = path.split("/", 1)
 31 |         dir_name = parts[0] if len(parts) > 1 else ""
 32 |         file_name = parts[1] if len(parts) > 1 else parts[0]
 33 |         by_dir.setdefault(dir_name, []).append((file_name, path))
 34 | 
 35 |     # Add to tree
 36 |     for dir_name, files in sorted(by_dir.items()):
 37 |         if dir_name:
 38 |             branch = tree.add(f"[bold]{dir_name}/[/bold]")
 39 |         else:
 40 |             branch = tree
 41 | 
 42 |         for file_name, full_path in sorted(files):
 43 |             if checksums and full_path in checksums:
 44 |                 checksum_short = checksums[full_path][:8]
 45 |                 branch.add(f"[{style}]{file_name}[/{style}] ({checksum_short})")
 46 |             else:
 47 |                 branch.add(f"[{style}]{file_name}[/{style}]")
 48 | 
 49 | 
 50 | def group_changes_by_directory(changes: SyncReportResponse) -> Dict[str, Dict[str, int]]:
 51 |     """Group changes by directory for summary view."""
 52 |     by_dir = {}
 53 |     for change_type, paths in [
 54 |         ("new", changes.new),
 55 |         ("modified", changes.modified),
 56 |         ("deleted", changes.deleted),
 57 |     ]:
 58 |         for path in paths:
 59 |             dir_name = path.split("/", 1)[0]
 60 |             by_dir.setdefault(dir_name, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
 61 |             by_dir[dir_name][change_type] += 1
 62 | 
 63 |     # Handle moves - count in both source and destination directories
 64 |     for old_path, new_path in changes.moves.items():
 65 |         old_dir = old_path.split("/", 1)[0]
 66 |         new_dir = new_path.split("/", 1)[0]
 67 |         by_dir.setdefault(old_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
 68 |         by_dir.setdefault(new_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
 69 |         by_dir[old_dir]["moved"] += 1
 70 |         if old_dir != new_dir:
 71 |             by_dir[new_dir]["moved"] += 1
 72 | 
 73 |     return by_dir
 74 | 
 75 | 
 76 | def build_directory_summary(counts: Dict[str, int]) -> str:
 77 |     """Build summary string for directory changes."""
 78 |     parts = []
 79 |     if counts["new"]:
 80 |         parts.append(f"[green]+{counts['new']} new[/green]")
 81 |     if counts["modified"]:
 82 |         parts.append(f"[yellow]~{counts['modified']} modified[/yellow]")
 83 |     if counts["moved"]:
 84 |         parts.append(f"[blue]↔{counts['moved']} moved[/blue]")
 85 |     if counts["deleted"]:
 86 |         parts.append(f"[red]-{counts['deleted']} deleted[/red]")
 87 |     return " ".join(parts)
 88 | 
 89 | 
 90 | def display_changes(
 91 |     project_name: str, title: str, changes: SyncReportResponse, verbose: bool = False
 92 | ):
 93 |     """Display changes using Rich for better visualization."""
 94 |     tree = Tree(f"{project_name}: {title}")
 95 | 
 96 |     if changes.total == 0 and not changes.skipped_files:
 97 |         tree.add("No changes")
 98 |         console.print(Panel(tree, expand=False))
 99 |         return
100 | 
101 |     if verbose:
102 |         # Full file listing with checksums
103 |         if changes.new:
104 |             new_branch = tree.add("[green]New Files[/green]")
105 |             add_files_to_tree(new_branch, changes.new, "green", changes.checksums)
106 |         if changes.modified:
107 |             mod_branch = tree.add("[yellow]Modified[/yellow]")
108 |             add_files_to_tree(mod_branch, changes.modified, "yellow", changes.checksums)
109 |         if changes.moves:
110 |             move_branch = tree.add("[blue]Moved[/blue]")
111 |             for old_path, new_path in sorted(changes.moves.items()):
112 |                 move_branch.add(f"[blue]{old_path}[/blue] → [blue]{new_path}[/blue]")
113 |         if changes.deleted:
114 |             del_branch = tree.add("[red]Deleted[/red]")
115 |             add_files_to_tree(del_branch, changes.deleted, "red")
116 |         if changes.skipped_files:
117 |             skip_branch = tree.add("[red]! Skipped (Circuit Breaker)[/red]")
118 |             for skipped in sorted(changes.skipped_files, key=lambda x: x.path):
119 |                 skip_branch.add(
120 |                     f"[red]{skipped.path}[/red] "
121 |                     f"(failures: {skipped.failure_count}, reason: {skipped.reason})"
122 |                 )
123 |     else:
124 |         # Show directory summaries
125 |         by_dir = group_changes_by_directory(changes)
126 |         for dir_name, counts in sorted(by_dir.items()):
127 |             summary = build_directory_summary(counts)
128 |             if summary:  # Only show directories with changes
129 |                 tree.add(f"[bold]{dir_name}/[/bold] {summary}")
130 | 
131 |         # Show skipped files summary in non-verbose mode
132 |         if changes.skipped_files:
133 |             skip_count = len(changes.skipped_files)
134 |             tree.add(
135 |                 f"[red]! {skip_count} file{'s' if skip_count != 1 else ''} "
136 |                 f"skipped due to repeated failures[/red]"
137 |             )
138 | 
139 |     console.print(Panel(tree, expand=False))
140 | 
141 | 
142 | async def run_status(project: Optional[str] = None, verbose: bool = False):  # pragma: no cover
143 |     """Check sync status of files vs database."""
144 | 
145 |     try:
146 |         async with get_client() as client:
147 |             project_item = await get_active_project(client, project, None)
148 |             response = await call_post(client, f"{project_item.project_url}/project/status")
149 |             sync_report = SyncReportResponse.model_validate(response.json())
150 | 
151 |             display_changes(project_item.name, "Status", sync_report, verbose)
152 | 
153 |     except (ValueError, ToolError) as e:
154 |         console.print(f"[red]Error: {e}[/red]")
155 |         raise typer.Exit(1)
156 | 
157 | 
158 | @app.command()
159 | def status(
160 |     project: Annotated[
161 |         Optional[str],
162 |         typer.Option(help="The project name."),
163 |     ] = None,
164 |     verbose: bool = typer.Option(False, "--verbose", "-v", help="Show detailed file information"),
165 | ):
166 |     """Show sync status between files and database."""
167 |     from basic_memory.cli.commands.command_utils import run_with_cleanup
168 | 
169 |     try:
170 |         run_with_cleanup(run_status(project, verbose))  # pragma: no cover
171 |     except Exception as e:
172 |         logger.error(f"Error checking status: {e}")
173 |         typer.echo(f"Error checking status: {e}", err=True)
174 |         raise typer.Exit(code=1)  # pragma: no cover
175 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/g9a0b3c4d5e6_add_external_id_to_project_and_entity.py:
--------------------------------------------------------------------------------

```python
  1 | """Add external_id UUID column to project and entity tables
  2 | 
  3 | Revision ID: g9a0b3c4d5e6
  4 | Revises: f8a9b2c3d4e5
  5 | Create Date: 2025-12-29 10:00:00.000000
  6 | 
  7 | """
  8 | 
  9 | import uuid
 10 | from typing import Sequence, Union
 11 | 
 12 | import sqlalchemy as sa
 13 | from alembic import op
 14 | from sqlalchemy import text
 15 | 
 16 | 
 17 | def column_exists(connection, table: str, column: str) -> bool:
 18 |     """Check if a column exists in a table (idempotent migration support)."""
 19 |     if connection.dialect.name == "postgresql":
 20 |         result = connection.execute(
 21 |             text(
 22 |                 "SELECT 1 FROM information_schema.columns "
 23 |                 "WHERE table_name = :table AND column_name = :column"
 24 |             ),
 25 |             {"table": table, "column": column},
 26 |         )
 27 |         return result.fetchone() is not None
 28 |     else:
 29 |         # SQLite
 30 |         result = connection.execute(text(f"PRAGMA table_info({table})"))
 31 |         columns = [row[1] for row in result]
 32 |         return column in columns
 33 | 
 34 | 
 35 | def index_exists(connection, index_name: str) -> bool:
 36 |     """Check if an index exists (idempotent migration support)."""
 37 |     if connection.dialect.name == "postgresql":
 38 |         result = connection.execute(
 39 |             text("SELECT 1 FROM pg_indexes WHERE indexname = :index_name"),
 40 |             {"index_name": index_name},
 41 |         )
 42 |         return result.fetchone() is not None
 43 |     else:
 44 |         # SQLite
 45 |         result = connection.execute(
 46 |             text("SELECT 1 FROM sqlite_master WHERE type='index' AND name = :index_name"),
 47 |             {"index_name": index_name},
 48 |         )
 49 |         return result.fetchone() is not None
 50 | 
 51 | 
 52 | # revision identifiers, used by Alembic.
 53 | revision: str = "g9a0b3c4d5e6"
 54 | down_revision: Union[str, None] = "f8a9b2c3d4e5"
 55 | branch_labels: Union[str, Sequence[str], None] = None
 56 | depends_on: Union[str, Sequence[str], None] = None
 57 | 
 58 | 
 59 | def upgrade() -> None:
 60 |     """Add external_id UUID column to project and entity tables.
 61 | 
 62 |     This migration:
 63 |     1. Adds external_id column to project table
 64 |     2. Adds external_id column to entity table
 65 |     3. Generates UUIDs for existing rows
 66 |     4. Creates unique indexes on both columns
 67 |     """
 68 |     connection = op.get_bind()
 69 |     dialect = connection.dialect.name
 70 | 
 71 |     # -------------------------------------------------------------------------
 72 |     # Add external_id to project table
 73 |     # -------------------------------------------------------------------------
 74 | 
 75 |     if not column_exists(connection, "project", "external_id"):
 76 |         # Step 1: Add external_id column as nullable first
 77 |         op.add_column("project", sa.Column("external_id", sa.String(), nullable=True))
 78 | 
 79 |         # Step 2: Generate UUIDs for existing rows
 80 |         if dialect == "postgresql":
 81 |             # Postgres has gen_random_uuid() function
 82 |             op.execute("""
 83 |                 UPDATE project
 84 |                 SET external_id = gen_random_uuid()::text
 85 |                 WHERE external_id IS NULL
 86 |             """)
 87 |         else:
 88 |             # SQLite: need to generate UUIDs in Python
 89 |             result = connection.execute(text("SELECT id FROM project WHERE external_id IS NULL"))
 90 |             for row in result:
 91 |                 new_uuid = str(uuid.uuid4())
 92 |                 connection.execute(
 93 |                     text("UPDATE project SET external_id = :uuid WHERE id = :id"),
 94 |                     {"uuid": new_uuid, "id": row[0]},
 95 |                 )
 96 | 
 97 |         # Step 3: Make external_id NOT NULL
 98 |         if dialect == "postgresql":
 99 |             op.alter_column("project", "external_id", nullable=False)
100 |         else:
101 |             # SQLite requires batch operations for ALTER COLUMN
102 |             with op.batch_alter_table("project") as batch_op:
103 |                 batch_op.alter_column("external_id", nullable=False)
104 | 
105 |     # Step 4: Create unique index on project.external_id (idempotent)
106 |     if not index_exists(connection, "ix_project_external_id"):
107 |         op.create_index("ix_project_external_id", "project", ["external_id"], unique=True)
108 | 
109 |     # -------------------------------------------------------------------------
110 |     # Add external_id to entity table
111 |     # -------------------------------------------------------------------------
112 | 
113 |     if not column_exists(connection, "entity", "external_id"):
114 |         # Step 1: Add external_id column as nullable first
115 |         op.add_column("entity", sa.Column("external_id", sa.String(), nullable=True))
116 | 
117 |         # Step 2: Generate UUIDs for existing rows
118 |         if dialect == "postgresql":
119 |             # Postgres has gen_random_uuid() function
120 |             op.execute("""
121 |                 UPDATE entity
122 |                 SET external_id = gen_random_uuid()::text
123 |                 WHERE external_id IS NULL
124 |             """)
125 |         else:
126 |             # SQLite: need to generate UUIDs in Python
127 |             result = connection.execute(text("SELECT id FROM entity WHERE external_id IS NULL"))
128 |             for row in result:
129 |                 new_uuid = str(uuid.uuid4())
130 |                 connection.execute(
131 |                     text("UPDATE entity SET external_id = :uuid WHERE id = :id"),
132 |                     {"uuid": new_uuid, "id": row[0]},
133 |                 )
134 | 
135 |         # Step 3: Make external_id NOT NULL
136 |         if dialect == "postgresql":
137 |             op.alter_column("entity", "external_id", nullable=False)
138 |         else:
139 |             # SQLite requires batch operations for ALTER COLUMN
140 |             with op.batch_alter_table("entity") as batch_op:
141 |                 batch_op.alter_column("external_id", nullable=False)
142 | 
143 |     # Step 4: Create unique index on entity.external_id (idempotent)
144 |     if not index_exists(connection, "ix_entity_external_id"):
145 |         op.create_index("ix_entity_external_id", "entity", ["external_id"], unique=True)
146 | 
147 | 
148 | def downgrade() -> None:
149 |     """Remove external_id columns from project and entity tables."""
150 |     connection = op.get_bind()
151 |     dialect = connection.dialect.name
152 | 
153 |     # Drop from entity table
154 |     if index_exists(connection, "ix_entity_external_id"):
155 |         op.drop_index("ix_entity_external_id", table_name="entity")
156 | 
157 |     if column_exists(connection, "entity", "external_id"):
158 |         if dialect == "postgresql":
159 |             op.drop_column("entity", "external_id")
160 |         else:
161 |             with op.batch_alter_table("entity") as batch_op:
162 |                 batch_op.drop_column("external_id")
163 | 
164 |     # Drop from project table
165 |     if index_exists(connection, "ix_project_external_id"):
166 |         op.drop_index("ix_project_external_id", table_name="project")
167 | 
168 |     if column_exists(connection, "project", "external_id"):
169 |         if dialect == "postgresql":
170 |             op.drop_column("project", "external_id")
171 |         else:
172 |             with op.batch_alter_table("project") as batch_op:
173 |                 batch_op.drop_column("external_id")
174 | 
```

--------------------------------------------------------------------------------
/tests/api/test_search_router.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for search router."""
  2 | 
  3 | from datetime import datetime, timezone
  4 | 
  5 | import pytest
  6 | import pytest_asyncio
  7 | from sqlalchemy import text
  8 | 
  9 | from basic_memory import db
 10 | from basic_memory.schemas import Entity as EntitySchema
 11 | from basic_memory.schemas.search import SearchItemType, SearchResponse
 12 | 
 13 | 
 14 | @pytest_asyncio.fixture
 15 | async def indexed_entity(full_entity, search_service):
 16 |     """Create an entity and index it."""
 17 |     await search_service.index_entity(full_entity)
 18 |     return full_entity
 19 | 
 20 | 
 21 | @pytest.mark.asyncio
 22 | async def test_search_basic(client, indexed_entity, project_url):
 23 |     """Test basic text search."""
 24 |     response = await client.post(f"{project_url}/search/", json={"text": "search"})
 25 |     assert response.status_code == 200
 26 |     search_results = SearchResponse.model_validate(response.json())
 27 |     assert len(search_results.results) == 3
 28 | 
 29 |     found = False
 30 |     for r in search_results.results:
 31 |         if r.type == SearchItemType.ENTITY.value:
 32 |             assert r.permalink == indexed_entity.permalink
 33 |             found = True
 34 | 
 35 |     assert found, "Expected to find indexed entity in results"
 36 | 
 37 | 
 38 | @pytest.mark.asyncio
 39 | async def test_search_basic_pagination(client, indexed_entity, project_url):
 40 |     """Test basic text search."""
 41 |     response = await client.post(
 42 |         f"{project_url}/search/?page=3&page_size=1", json={"text": "search"}
 43 |     )
 44 |     assert response.status_code == 200
 45 |     search_results = SearchResponse.model_validate(response.json())
 46 |     assert len(search_results.results) == 1
 47 | 
 48 |     assert search_results.current_page == 3
 49 |     assert search_results.page_size == 1
 50 | 
 51 | 
 52 | @pytest.mark.asyncio
 53 | async def test_search_with_entity_type_filter(client, indexed_entity, project_url):
 54 |     """Test search with type filter."""
 55 |     # Should find with correct type
 56 |     response = await client.post(
 57 |         f"{project_url}/search/",
 58 |         json={"text": "test", "entity_types": [SearchItemType.ENTITY.value]},
 59 |     )
 60 |     assert response.status_code == 200
 61 |     search_results = SearchResponse.model_validate(response.json())
 62 |     assert len(search_results.results) > 0
 63 | 
 64 |     # Should find with relation type
 65 |     response = await client.post(
 66 |         f"{project_url}/search/",
 67 |         json={"text": "test", "entity_types": [SearchItemType.RELATION.value]},
 68 |     )
 69 |     assert response.status_code == 200
 70 |     search_results = SearchResponse.model_validate(response.json())
 71 |     assert len(search_results.results) == 2
 72 | 
 73 | 
 74 | @pytest.mark.asyncio
 75 | async def test_search_with_type_filter(client, indexed_entity, project_url):
 76 |     """Test search with entity type filter."""
 77 |     # Should find with correct entity type
 78 |     response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["test"]})
 79 |     assert response.status_code == 200
 80 |     search_results = SearchResponse.model_validate(response.json())
 81 |     assert len(search_results.results) == 1
 82 | 
 83 |     # Should not find with wrong entity type
 84 |     response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["note"]})
 85 |     assert response.status_code == 200
 86 |     search_results = SearchResponse.model_validate(response.json())
 87 |     assert len(search_results.results) == 0
 88 | 
 89 | 
 90 | @pytest.mark.asyncio
 91 | async def test_search_with_date_filter(client, indexed_entity, project_url):
 92 |     """Test search with date filter."""
 93 |     # Should find with past date
 94 |     past_date = datetime(2020, 1, 1, tzinfo=timezone.utc)
 95 |     response = await client.post(
 96 |         f"{project_url}/search/", json={"text": "test", "after_date": past_date.isoformat()}
 97 |     )
 98 |     assert response.status_code == 200
 99 |     search_results = SearchResponse.model_validate(response.json())
100 | 
101 |     # Should not find with future date
102 |     future_date = datetime(2030, 1, 1, tzinfo=timezone.utc)
103 |     response = await client.post(
104 |         f"{project_url}/search/", json={"text": "test", "after_date": future_date.isoformat()}
105 |     )
106 |     assert response.status_code == 200
107 |     search_results = SearchResponse.model_validate(response.json())
108 |     assert len(search_results.results) == 0
109 | 
110 | 
111 | @pytest.mark.asyncio
112 | async def test_search_empty(search_service, client, project_url):
113 |     """Test search with no matches."""
114 |     response = await client.post(f"{project_url}/search/", json={"text": "nonexistent"})
115 |     assert response.status_code == 200
116 |     search_result = SearchResponse.model_validate(response.json())
117 |     assert len(search_result.results) == 0
118 | 
119 | 
120 | @pytest.mark.asyncio
121 | async def test_reindex(
122 |     client, search_service, entity_service, session_maker, project_url, app_config
123 | ):
124 |     """Test reindex endpoint."""
125 |     # Skip for Postgres - needs investigation of database connection isolation
126 |     from basic_memory.config import DatabaseBackend
127 | 
128 |     if app_config.database_backend == DatabaseBackend.POSTGRES:
129 |         pytest.skip("Not yet supported for Postgres - database connection isolation issue")
130 | 
131 |     # Create test entity and document
132 |     await entity_service.create_entity(
133 |         EntitySchema(
134 |             title="TestEntity1",
135 |             folder="test",
136 |             entity_type="test",
137 |         ),
138 |     )
139 | 
140 |     # Clear search index
141 |     async with db.scoped_session(session_maker) as session:
142 |         await session.execute(text("DELETE FROM search_index"))
143 |         await session.commit()
144 | 
145 |     # Verify nothing is searchable
146 |     response = await client.post(f"{project_url}/search/", json={"text": "test"})
147 |     search_results = SearchResponse.model_validate(response.json())
148 |     assert len(search_results.results) == 0
149 | 
150 |     # Trigger reindex
151 |     reindex_response = await client.post(f"{project_url}/search/reindex")
152 |     assert reindex_response.status_code == 200
153 |     assert reindex_response.json()["status"] == "ok"
154 | 
155 |     # Verify content is searchable again
156 |     search_response = await client.post(f"{project_url}/search/", json={"text": "test"})
157 |     search_results = SearchResponse.model_validate(search_response.json())
158 |     assert len(search_results.results) == 1
159 | 
160 | 
161 | @pytest.mark.asyncio
162 | async def test_multiple_filters(client, indexed_entity, project_url):
163 |     """Test search with multiple filters combined."""
164 |     response = await client.post(
165 |         f"{project_url}/search/",
166 |         json={
167 |             "text": "test",
168 |             "entity_types": [SearchItemType.ENTITY.value],
169 |             "types": ["test"],
170 |             "after_date": datetime(2020, 1, 1, tzinfo=timezone.utc).isoformat(),
171 |         },
172 |     )
173 |     assert response.status_code == 200
174 |     search_result = SearchResponse.model_validate(response.json())
175 |     assert len(search_result.results) == 1
176 |     result = search_result.results[0]
177 |     assert result.permalink == indexed_entity.permalink
178 |     assert result.type == SearchItemType.ENTITY.value
179 |     assert result.metadata["entity_type"] == "test"
180 | 
```
Page 5/27FirstPrevNextLast