This is page 5 of 27. Use http://codebase.md/basicmachines-co/basic-memory?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ ├── release
│ │ │ ├── beta.md
│ │ │ ├── changelog.md
│ │ │ ├── release-check.md
│ │ │ └── release.md
│ │ ├── spec.md
│ │ └── test-live.md
│ └── settings.json
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── ARCHITECTURE.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ ├── Docker.md
│ └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 6830751f5fb6_merge_multiple_heads.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ │ ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│ │ └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── container.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ ├── template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ └── routers
│ │ ├── __init__.py
│ │ ├── directory_router.py
│ │ ├── importer_router.py
│ │ ├── knowledge_router.py
│ │ ├── memory_router.py
│ │ ├── project_router.py
│ │ ├── prompt_router.py
│ │ ├── resource_router.py
│ │ └── search_router.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── rclone_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── format.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ ├── telemetry.py
│ │ │ └── tool.py
│ │ ├── container.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── db.py
│ │ ├── importers.py
│ │ ├── projects.py
│ │ ├── repositories.py
│ │ └── services.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ ├── directory.py
│ │ │ ├── knowledge.py
│ │ │ ├── memory.py
│ │ │ ├── project.py
│ │ │ ├── resource.py
│ │ │ └── search.py
│ │ ├── container.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── project_resolver.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── postgres_search_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ ├── search_index_row.py
│ │ ├── search_repository_base.py
│ │ ├── search_repository.py
│ │ └── sqlite_search_repository.py
│ ├── runtime.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ ├── sync_report.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── entity.py
│ │ └── resource.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── coordinator.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── telemetry.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ └── test_disable_permalinks_integration.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_api_container.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ ├── test_template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_resource_router.py
│ │ └── test_search_router.py
│ ├── cli
│ │ ├── cloud
│ │ │ ├── test_cloud_api_client_and_utils.py
│ │ │ ├── test_rclone_config_and_bmignore_filters.py
│ │ │ └── test_upload_path.py
│ │ ├── conftest.py
│ │ ├── test_auth_cli_auth.py
│ │ ├── test_cli_container.py
│ │ ├── test_cli_exit.py
│ │ ├── test_cli_tool_exit.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ ├── test_project_add_with_local_path.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_conversation_indexing.py
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ └── test_clients.py
│ │ ├── conftest.py
│ │ ├── test_async_client_modes.py
│ │ ├── test_mcp_container.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_project_context.py
│ │ ├── test_prompts.py
│ │ ├── test_recent_activity_prompt_modes.py
│ │ ├── test_resources.py
│ │ ├── test_server_lifespan_branches.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_project_management.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note_kebab_filenames.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── README.md
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_postgres_search_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_relation_response_reference_resolution.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization_cloud_mode_branches.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_coordinator.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_atomic_adds.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ ├── test_project_resolver.py
│ ├── test_rclone_commands.py
│ ├── test_runtime.py
│ ├── test_telemetry.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_timezone_utils.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/canvas.py:
--------------------------------------------------------------------------------
```python
1 | """Canvas creation tool for Basic Memory MCP server.
2 |
3 | This tool creates Obsidian canvas files (.canvas) using the JSON Canvas 1.0 spec.
4 | """
5 |
6 | import json
7 | from typing import Dict, List, Any, Optional
8 |
9 | from loguru import logger
10 | from fastmcp import Context
11 |
12 | from basic_memory.mcp.async_client import get_client
13 | from basic_memory.mcp.project_context import get_active_project
14 | from basic_memory.mcp.server import mcp
15 | from basic_memory.mcp.tools.utils import call_put, call_post, resolve_entity_id
16 | from basic_memory.telemetry import track_mcp_tool
17 |
18 |
19 | @mcp.tool(
20 | description="Create an Obsidian canvas file to visualize concepts and connections.",
21 | )
22 | async def canvas(
23 | nodes: List[Dict[str, Any]],
24 | edges: List[Dict[str, Any]],
25 | title: str,
26 | folder: str,
27 | project: Optional[str] = None,
28 | context: Context | None = None,
29 | ) -> str:
30 | """Create an Obsidian canvas file with the provided nodes and edges.
31 |
32 | This tool creates a .canvas file compatible with Obsidian's Canvas feature,
33 | allowing visualization of relationships between concepts or documents.
34 |
35 | Project Resolution:
36 | Server resolves projects in this order: Single Project Mode → project parameter → default project.
37 | If project unknown, use list_memory_projects() or recent_activity() first.
38 |
39 | For the full JSON Canvas 1.0 specification, see the 'spec://canvas' resource.
40 |
41 | Args:
42 | project: Project name to create canvas in. Optional - server will resolve using hierarchy.
43 | If unknown, use list_memory_projects() to discover available projects.
44 | nodes: List of node objects following JSON Canvas 1.0 spec
45 | edges: List of edge objects following JSON Canvas 1.0 spec
46 | title: The title of the canvas (will be saved as title.canvas)
47 | folder: Folder path relative to project root where the canvas should be saved.
48 | Use forward slashes (/) as separators. Examples: "diagrams", "projects/2025", "visual/maps"
49 | context: Optional FastMCP context for performance caching.
50 |
51 | Returns:
52 | A summary of the created canvas file
53 |
54 | Important Notes:
55 | - When referencing files, use the exact file path as shown in Obsidian
56 | Example: "folder/Document Name.md" (not permalink format)
57 | - For file nodes, the "file" attribute must reference an existing file
58 | - Nodes require id, type, x, y, width, height properties
59 | - Edges require id, fromNode, toNode properties
60 | - Position nodes in a logical layout (x,y coordinates in pixels)
61 | - Use color attributes ("1"-"6" or hex) for visual organization
62 |
63 | Basic Structure:
64 | ```json
65 | {
66 | "nodes": [
67 | {
68 | "id": "node1",
69 | "type": "file", // Options: "file", "text", "link", "group"
70 | "file": "folder/Document.md",
71 | "x": 0,
72 | "y": 0,
73 | "width": 400,
74 | "height": 300
75 | }
76 | ],
77 | "edges": [
78 | {
79 | "id": "edge1",
80 | "fromNode": "node1",
81 | "toNode": "node2",
82 | "label": "connects to"
83 | }
84 | ]
85 | }
86 | ```
87 |
88 | Examples:
89 | # Create canvas in project
90 | canvas("my-project", nodes=[...], edges=[...], title="My Canvas", folder="diagrams")
91 |
92 | # Create canvas in work project
93 | canvas("work-project", nodes=[...], edges=[...], title="Process Flow", folder="visual/maps")
94 |
95 | Raises:
96 | ToolError: If project doesn't exist or folder path is invalid
97 | """
98 | track_mcp_tool("canvas")
99 | async with get_client() as client:
100 | active_project = await get_active_project(client, project, context)
101 |
102 | # Ensure path has .canvas extension
103 | file_title = title if title.endswith(".canvas") else f"{title}.canvas"
104 | file_path = f"{folder}/{file_title}"
105 |
106 | # Create canvas data structure
107 | canvas_data = {"nodes": nodes, "edges": edges}
108 |
109 | # Convert to JSON
110 | canvas_json = json.dumps(canvas_data, indent=2)
111 |
112 | # Try to create the canvas file first (optimistic create)
113 | logger.info(f"Creating canvas file: {file_path} in project {project}")
114 | try:
115 | response = await call_post(
116 | client,
117 | f"/v2/projects/{active_project.external_id}/resource",
118 | json={"file_path": file_path, "content": canvas_json},
119 | )
120 | action = "Created"
121 | except Exception as e:
122 | # If creation failed due to conflict (already exists), try to update
123 | if (
124 | "409" in str(e)
125 | or "conflict" in str(e).lower()
126 | or "already exists" in str(e).lower()
127 | ):
128 | logger.info(f"Canvas file exists, updating instead: {file_path}")
129 | try:
130 | entity_id = await resolve_entity_id(
131 | client, active_project.external_id, file_path
132 | )
133 | # For update, send content in JSON body
134 | response = await call_put(
135 | client,
136 | f"/v2/projects/{active_project.external_id}/resource/{entity_id}",
137 | json={"content": canvas_json},
138 | )
139 | action = "Updated"
140 | except Exception as update_error: # pragma: no cover
141 | # Re-raise the original error if update also fails
142 | raise e from update_error # pragma: no cover
143 | else:
144 | # Re-raise if it's not a conflict error
145 | raise # pragma: no cover
146 |
147 | # Parse response
148 | result = response.json()
149 | logger.debug(result)
150 |
151 | # Build summary
152 | summary = [f"# {action}: {file_path}", "\nThe canvas is ready to open in Obsidian."]
153 |
154 | return "\n".join(summary)
155 |
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_read_content.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the read_content MCP tool security validation.
2 |
3 | We keep these tests focused on path boundary/security checks, and rely on
4 | `tests/mcp/test_tool_resource.py` for full-stack content-type behavior.
5 | """
6 |
7 | from __future__ import annotations
8 |
9 | import pytest
10 | from mcp.server.fastmcp.exceptions import ToolError
11 |
12 | from basic_memory.mcp.tools import read_content, write_note
13 |
14 |
15 | @pytest.mark.asyncio
16 | async def test_read_content_blocks_path_traversal_unix(client, test_project):
17 | attack_paths = [
18 | "../secrets.txt",
19 | "../../etc/passwd",
20 | "../../../root/.ssh/id_rsa",
21 | "notes/../../../etc/shadow",
22 | "folder/../../outside/file.md",
23 | "../../../../etc/hosts",
24 | "../../../home/user/.env",
25 | ]
26 |
27 | for attack_path in attack_paths:
28 | result = await read_content.fn(project=test_project.name, path=attack_path)
29 | assert result["type"] == "error"
30 | assert "paths must stay within project boundaries" in result["error"]
31 | assert attack_path in result["error"]
32 |
33 |
34 | @pytest.mark.asyncio
35 | async def test_read_content_blocks_path_traversal_windows(client, test_project):
36 | attack_paths = [
37 | "..\\secrets.txt",
38 | "..\\..\\Windows\\System32\\config\\SAM",
39 | "notes\\..\\..\\..\\Windows\\System32",
40 | "\\\\server\\share\\file.txt",
41 | "..\\..\\Users\\user\\.env",
42 | "\\\\..\\..\\Windows",
43 | "..\\..\\..\\Boot.ini",
44 | ]
45 |
46 | for attack_path in attack_paths:
47 | result = await read_content.fn(project=test_project.name, path=attack_path)
48 | assert result["type"] == "error"
49 | assert "paths must stay within project boundaries" in result["error"]
50 | assert attack_path in result["error"]
51 |
52 |
53 | @pytest.mark.asyncio
54 | async def test_read_content_blocks_absolute_paths(client, test_project):
55 | attack_paths = [
56 | "/etc/passwd",
57 | "/home/user/.env",
58 | "/var/log/auth.log",
59 | "/root/.ssh/id_rsa",
60 | "C:\\Windows\\System32\\config\\SAM",
61 | "C:\\Users\\user\\.env",
62 | "D:\\secrets\\config.json",
63 | "/tmp/malicious.txt",
64 | "/usr/local/bin/evil",
65 | ]
66 |
67 | for attack_path in attack_paths:
68 | result = await read_content.fn(project=test_project.name, path=attack_path)
69 | assert result["type"] == "error"
70 | assert "paths must stay within project boundaries" in result["error"]
71 | assert attack_path in result["error"]
72 |
73 |
74 | @pytest.mark.asyncio
75 | async def test_read_content_blocks_home_directory_access(client, test_project):
76 | attack_paths = [
77 | "~/secrets.txt",
78 | "~/.env",
79 | "~/.ssh/id_rsa",
80 | "~/Documents/passwords.txt",
81 | "~\\AppData\\secrets",
82 | "~\\Desktop\\config.ini",
83 | "~/.bashrc",
84 | "~/Library/Preferences/secret.plist",
85 | ]
86 |
87 | for attack_path in attack_paths:
88 | result = await read_content.fn(project=test_project.name, path=attack_path)
89 | assert result["type"] == "error"
90 | assert "paths must stay within project boundaries" in result["error"]
91 | assert attack_path in result["error"]
92 |
93 |
94 | @pytest.mark.asyncio
95 | async def test_read_content_blocks_memory_url_attacks(client, test_project):
96 | attack_paths = [
97 | "memory://../../etc/passwd",
98 | "memory://../../../root/.ssh/id_rsa",
99 | "memory://~/.env",
100 | "memory:///etc/passwd",
101 | ]
102 |
103 | for attack_path in attack_paths:
104 | result = await read_content.fn(project=test_project.name, path=attack_path)
105 | assert result["type"] == "error"
106 | assert "paths must stay within project boundaries" in result["error"]
107 |
108 |
109 | @pytest.mark.asyncio
110 | async def test_read_content_unicode_path_attacks(client, test_project):
111 | unicode_attacks = [
112 | "notes/文档/../../../etc/passwd",
113 | "docs/café/../../.env",
114 | "files/αβγ/../../../secret.txt",
115 | ]
116 |
117 | for attack_path in unicode_attacks:
118 | result = await read_content.fn(project=test_project.name, path=attack_path)
119 | assert result["type"] == "error"
120 | assert "paths must stay within project boundaries" in result["error"]
121 |
122 |
123 | @pytest.mark.asyncio
124 | async def test_read_content_very_long_attack_path(client, test_project):
125 | long_attack = "../" * 1000 + "etc/passwd"
126 | result = await read_content.fn(project=test_project.name, path=long_attack)
127 | assert result["type"] == "error"
128 | assert "paths must stay within project boundaries" in result["error"]
129 |
130 |
131 | @pytest.mark.asyncio
132 | async def test_read_content_case_variations_attacks(client, test_project):
133 | case_attacks = [
134 | "../ETC/passwd",
135 | "../Etc/PASSWD",
136 | "..\\WINDOWS\\system32",
137 | "~/.SSH/id_rsa",
138 | ]
139 |
140 | for attack_path in case_attacks:
141 | result = await read_content.fn(project=test_project.name, path=attack_path)
142 | assert result["type"] == "error"
143 | assert "paths must stay within project boundaries" in result["error"]
144 |
145 |
146 | @pytest.mark.asyncio
147 | async def test_read_content_allows_safe_path_integration(client, test_project):
148 | await write_note.fn(
149 | project=test_project.name,
150 | title="Meeting",
151 | folder="notes",
152 | content="This is a safe note for read_content()",
153 | )
154 |
155 | result = await read_content.fn(project=test_project.name, path="notes/meeting")
156 | assert result["type"] == "text"
157 | assert "safe note" in result["text"]
158 |
159 |
160 | @pytest.mark.asyncio
161 | async def test_read_content_empty_path_does_not_trigger_security_error(client, test_project):
162 | try:
163 | result = await read_content.fn(project=test_project.name, path="")
164 | if isinstance(result, dict) and result.get("type") == "error":
165 | assert "paths must stay within project boundaries" not in result.get("error", "")
166 | except ToolError:
167 | # Acceptable: resource resolution may treat empty path as not-found.
168 | pass
169 |
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/utils.py:
--------------------------------------------------------------------------------
```python
1 | """Utility functions for formatting prompt responses.
2 |
3 | These utilities help format data from various tools into consistent,
4 | user-friendly markdown summaries.
5 | """
6 |
7 | from dataclasses import dataclass
8 | from textwrap import dedent
9 | from typing import List
10 |
11 | from basic_memory.schemas.base import TimeFrame
12 | from basic_memory.schemas.memory import (
13 | normalize_memory_url,
14 | EntitySummary,
15 | RelationSummary,
16 | ObservationSummary,
17 | )
18 |
19 |
20 | @dataclass
21 | class PromptContextItem:
22 | primary_results: List[EntitySummary]
23 | related_results: List[EntitySummary | RelationSummary | ObservationSummary]
24 |
25 |
26 | @dataclass
27 | class PromptContext:
28 | timeframe: TimeFrame
29 | topic: str
30 | results: List[PromptContextItem]
31 |
32 |
33 | def format_prompt_context(context: PromptContext) -> str:
34 | """Format continuation context into a helpful summary.
35 | Returns:
36 | Formatted continuation summary
37 | """
38 | if not context.results: # pragma: no cover
39 | return dedent(f"""
40 | # Continuing conversation on: {context.topic}
41 |
42 | This is a memory retrieval session.
43 | The supplied query did not return any information specifically on this topic.
44 |
45 | ## Opportunity to Capture New Knowledge!
46 |
47 | This is an excellent chance to start documenting this topic:
48 |
49 | ```python
50 | await write_note(
51 | title="{context.topic}",
52 | content=f'''
53 | # {context.topic}
54 |
55 | ## Overview
56 | [Summary of what we know about {context.topic}]
57 |
58 | ## Key Points
59 | [Main aspects or components of {context.topic}]
60 |
61 | ## Observations
62 | - [category] [First important observation about {context.topic}]
63 | - [category] [Second observation about {context.topic}]
64 |
65 | ## Relations
66 | - relates_to [[Related Topic]]
67 | - part_of [[Broader Context]]
68 | '''
69 | )
70 | ```
71 |
72 | ## Other Options
73 |
74 | Please use the available basic-memory tools to gather relevant context before responding.
75 | You can also:
76 | - Try a different search term
77 | - Check recent activity with `recent_activity(timeframe="1w")`
78 | """)
79 |
80 | # Start building our summary with header - add knowledge capture emphasis
81 | summary = dedent(f"""
82 | # Continuing conversation on: {context.topic}
83 |
84 | This is a memory retrieval session.
85 |
86 | Please use the available basic-memory tools to gather relevant context before responding.
87 | Start by executing one of the suggested commands below to retrieve content.
88 |
89 | Here's what I found from previous conversations:
90 |
91 | > **Knowledge Capture Recommendation:** As you continue this conversation, actively look for opportunities to record new information, decisions, or insights that emerge. Use `write_note()` to document important context.
92 | """)
93 |
94 | # Track what we've added to avoid duplicates
95 | added_permalinks = set()
96 | sections = []
97 |
98 | # Process each context
99 | for context in context.results: # pyright: ignore
100 | for primary in context.primary_results: # pyright: ignore
101 | if primary.permalink not in added_permalinks:
102 | primary_permalink = primary.permalink
103 |
104 | added_permalinks.add(primary_permalink)
105 |
106 | # Use permalink if available, otherwise use file_path
107 | if primary_permalink:
108 | memory_url = normalize_memory_url(primary_permalink)
109 | read_command = f'read_note("{primary_permalink}")'
110 | else:
111 | memory_url = f"file://{primary.file_path}"
112 | read_command = f'read_file("{primary.file_path}")'
113 |
114 | section = dedent(f"""
115 | --- {memory_url}
116 |
117 | ## {primary.title}
118 | - **Type**: {primary.type}
119 | """)
120 |
121 | # Add creation date
122 | section += f"- **Created**: {primary.created_at.strftime('%Y-%m-%d %H:%M')}\n"
123 |
124 | # Add content snippet
125 | if hasattr(primary, "content") and primary.content: # pyright: ignore
126 | content = primary.content or "" # pyright: ignore # pragma: no cover
127 | if content: # pragma: no cover
128 | section += f"\n**Excerpt**:\n{content}\n" # pragma: no cover
129 |
130 | section += dedent(f"""
131 |
132 | You can read this document with: `{read_command}`
133 | """)
134 | sections.append(section)
135 |
136 | if context.related_results: # pyright: ignore
137 | section += dedent( # pyright: ignore
138 | """
139 | ## Related Context
140 | """
141 | )
142 |
143 | for related in context.related_results: # pyright: ignore
144 | section_content = dedent(f"""
145 | - type: **{related.type}**
146 | - title: {related.title}
147 | """)
148 | if related.permalink: # pragma: no cover
149 | section_content += (
150 | f'You can view this document with: `read_note("{related.permalink}")`'
151 | )
152 | else: # pragma: no cover
153 | section_content += (
154 | f'You can view this file with: `read_file("{related.file_path}")`'
155 | )
156 |
157 | section += section_content
158 | sections.append(section)
159 |
160 | # Add all sections
161 | summary += "\n".join(sections)
162 | return summary
163 |
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/list_directory.py:
--------------------------------------------------------------------------------
```python
1 | """List directory tool for Basic Memory MCP server."""
2 |
3 | from typing import Optional
4 |
5 | from loguru import logger
6 | from fastmcp import Context
7 |
8 | from basic_memory.mcp.async_client import get_client
9 | from basic_memory.mcp.project_context import get_active_project
10 | from basic_memory.mcp.server import mcp
11 | from basic_memory.telemetry import track_mcp_tool
12 |
13 |
14 | @mcp.tool(
15 | description="List directory contents with filtering and depth control.",
16 | )
17 | async def list_directory(
18 | dir_name: str = "/",
19 | depth: int = 1,
20 | file_name_glob: Optional[str] = None,
21 | project: Optional[str] = None,
22 | context: Context | None = None,
23 | ) -> str:
24 | """List directory contents from the knowledge base with optional filtering.
25 |
26 | This tool provides 'ls' functionality for browsing the knowledge base directory structure.
27 | It can list immediate children or recursively explore subdirectories with depth control,
28 | and supports glob pattern filtering for finding specific files.
29 |
30 | Args:
31 | dir_name: Directory path to list (default: root "/")
32 | Examples: "/", "/projects", "/research/ml"
33 | depth: Recursion depth (1-10, default: 1 for immediate children only)
34 | Higher values show subdirectory contents recursively
35 | file_name_glob: Optional glob pattern for filtering file names
36 | Examples: "*.md", "*meeting*", "project_*"
37 | project: Project name to list directory from. Optional - server will resolve using hierarchy.
38 | If unknown, use list_memory_projects() to discover available projects.
39 | context: Optional FastMCP context for performance caching.
40 |
41 | Returns:
42 | Formatted listing of directory contents with file metadata
43 |
44 | Examples:
45 | # List root directory contents
46 | list_directory()
47 |
48 | # List specific folder
49 | list_directory(dir_name="/projects")
50 |
51 | # Find all markdown files
52 | list_directory(file_name_glob="*.md")
53 |
54 | # Deep exploration of research folder
55 | list_directory(dir_name="/research", depth=3)
56 |
57 | # Find meeting notes in projects folder
58 | list_directory(dir_name="/projects", file_name_glob="*meeting*")
59 |
60 | # Explicit project specification
61 | list_directory(project="work-docs", dir_name="/projects")
62 |
63 | Raises:
64 | ToolError: If project doesn't exist or directory path is invalid
65 | """
66 | track_mcp_tool("list_directory")
67 | async with get_client() as client:
68 | active_project = await get_active_project(client, project, context)
69 |
70 | logger.debug(
71 | f"Listing directory '{dir_name}' in project {project} with depth={depth}, glob='{file_name_glob}'"
72 | )
73 |
74 | # Import here to avoid circular import
75 | from basic_memory.mcp.clients import DirectoryClient
76 |
77 | # Use typed DirectoryClient for API calls
78 | directory_client = DirectoryClient(client, active_project.external_id)
79 | nodes = await directory_client.list(dir_name, depth=depth, file_name_glob=file_name_glob)
80 |
81 | if not nodes:
82 | filter_desc = ""
83 | if file_name_glob:
84 | filter_desc = f" matching '{file_name_glob}'"
85 | return f"No files found in directory '{dir_name}'{filter_desc}"
86 |
87 | # Format the results
88 | output_lines = []
89 | if file_name_glob:
90 | output_lines.append(
91 | f"Files in '{dir_name}' matching '{file_name_glob}' (depth {depth}):"
92 | )
93 | else:
94 | output_lines.append(f"Contents of '{dir_name}' (depth {depth}):")
95 | output_lines.append("")
96 |
97 | # Group by type and sort
98 | directories = [n for n in nodes if n["type"] == "directory"]
99 | files = [n for n in nodes if n["type"] == "file"]
100 |
101 | # Sort by name
102 | directories.sort(key=lambda x: x["name"])
103 | files.sort(key=lambda x: x["name"])
104 |
105 | # Display directories first
106 | for node in directories:
107 | path_display = node["directory_path"]
108 | output_lines.append(f"📁 {node['name']:<30} {path_display}")
109 |
110 | # Add separator if we have both directories and files
111 | if directories and files:
112 | output_lines.append("")
113 |
114 | # Display files with metadata
115 | for node in files:
116 | path_display = node["directory_path"]
117 | title = node.get("title", "")
118 | updated = node.get("updated_at", "")
119 |
120 | # Remove leading slash if present, requesting the file via read_note does not use the beginning slash'
121 | if path_display.startswith("/"):
122 | path_display = path_display[1:]
123 |
124 | # Format date if available
125 | date_str = ""
126 | if updated:
127 | try:
128 | from datetime import datetime
129 |
130 | dt = datetime.fromisoformat(updated.replace("Z", "+00:00"))
131 | date_str = dt.strftime("%Y-%m-%d")
132 | except Exception: # pragma: no cover
133 | date_str = updated[:10] if len(updated) >= 10 else ""
134 |
135 | # Create formatted line
136 | file_line = f"📄 {node['name']:<30} {path_display}"
137 | if title and title != node["name"]:
138 | file_line += f" | {title}"
139 | if date_str:
140 | file_line += f" | {date_str}"
141 |
142 | output_lines.append(file_line)
143 |
144 | # Add summary
145 | output_lines.append("")
146 | total_count = len(directories) + len(files)
147 | summary_parts = []
148 | if directories:
149 | summary_parts.append(
150 | f"{len(directories)} director{'y' if len(directories) == 1 else 'ies'}"
151 | )
152 | if files:
153 | summary_parts.append(f"{len(files)} file{'s' if len(files) != 1 else ''}")
154 |
155 | output_lines.append(f"Total: {total_count} items ({', '.join(summary_parts)})")
156 |
157 | return "\n".join(output_lines)
158 |
```
--------------------------------------------------------------------------------
/tests/cli/cloud/test_cloud_api_client_and_utils.py:
--------------------------------------------------------------------------------
```python
1 | from contextlib import asynccontextmanager
2 | import json
3 |
4 | import httpx
5 | import pytest
6 |
7 | from basic_memory.cli.auth import CLIAuth
8 | from basic_memory.cli.commands.cloud.api_client import (
9 | SubscriptionRequiredError,
10 | make_api_request,
11 | )
12 | from basic_memory.cli.commands.cloud.cloud_utils import (
13 | create_cloud_project,
14 | fetch_cloud_projects,
15 | project_exists,
16 | )
17 |
18 |
19 | @pytest.mark.asyncio
20 | async def test_make_api_request_success_injects_auth_and_accept_encoding(
21 | config_home, config_manager
22 | ):
23 | # Arrange: create a token on disk so CLIAuth can authenticate without any network.
24 | auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
25 | auth.token_file.parent.mkdir(parents=True, exist_ok=True)
26 | auth.token_file.write_text(
27 | '{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
28 | encoding="utf-8",
29 | )
30 |
31 | async def handler(request: httpx.Request) -> httpx.Response:
32 | assert request.headers.get("authorization") == "Bearer token-123"
33 | assert request.headers.get("accept-encoding") == "identity"
34 | return httpx.Response(200, json={"ok": True})
35 |
36 | transport = httpx.MockTransport(handler)
37 |
38 | @asynccontextmanager
39 | async def http_client_factory():
40 | async with httpx.AsyncClient(transport=transport) as client:
41 | yield client
42 |
43 | # Act
44 | resp = await make_api_request(
45 | method="GET",
46 | url="https://cloud.example.test/proxy/health",
47 | auth=auth,
48 | http_client_factory=http_client_factory,
49 | )
50 |
51 | # Assert
52 | assert resp.json()["ok"] is True
53 |
54 |
55 | @pytest.mark.asyncio
56 | async def test_make_api_request_raises_subscription_required(config_home, config_manager):
57 | auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
58 | auth.token_file.parent.mkdir(parents=True, exist_ok=True)
59 | auth.token_file.write_text(
60 | '{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
61 | encoding="utf-8",
62 | )
63 |
64 | async def handler(_request: httpx.Request) -> httpx.Response:
65 | return httpx.Response(
66 | 403,
67 | json={
68 | "detail": {
69 | "error": "subscription_required",
70 | "message": "Need subscription",
71 | "subscribe_url": "https://example.test/subscribe",
72 | }
73 | },
74 | )
75 |
76 | transport = httpx.MockTransport(handler)
77 |
78 | @asynccontextmanager
79 | async def http_client_factory():
80 | async with httpx.AsyncClient(transport=transport) as client:
81 | yield client
82 |
83 | with pytest.raises(SubscriptionRequiredError) as exc:
84 | await make_api_request(
85 | method="GET",
86 | url="https://cloud.example.test/proxy/health",
87 | auth=auth,
88 | http_client_factory=http_client_factory,
89 | )
90 |
91 | assert exc.value.subscribe_url == "https://example.test/subscribe"
92 |
93 |
94 | @pytest.mark.asyncio
95 | async def test_cloud_utils_fetch_and_exists_and_create_project(
96 | config_home, config_manager, monkeypatch
97 | ):
98 | # Point config.cloud_host at our mocked base URL
99 | config = config_manager.load_config()
100 | config.cloud_host = "https://cloud.example.test"
101 | config_manager.save_config(config)
102 |
103 | auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
104 | auth.token_file.parent.mkdir(parents=True, exist_ok=True)
105 | auth.token_file.write_text(
106 | '{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
107 | encoding="utf-8",
108 | )
109 |
110 | seen = {"create_payload": None}
111 |
112 | async def handler(request: httpx.Request) -> httpx.Response:
113 | if request.method == "GET" and request.url.path == "/proxy/projects/projects":
114 | return httpx.Response(
115 | 200,
116 | json={
117 | "projects": [
118 | {"id": 1, "name": "alpha", "path": "alpha", "is_default": True},
119 | {"id": 2, "name": "beta", "path": "beta", "is_default": False},
120 | ]
121 | },
122 | )
123 |
124 | if request.method == "POST" and request.url.path == "/proxy/projects/projects":
125 | # httpx.Request doesn't have .json(); parse bytes payload.
126 | seen["create_payload"] = json.loads(request.content.decode("utf-8"))
127 | return httpx.Response(
128 | 200,
129 | json={
130 | "message": "created",
131 | "status": "success",
132 | "default": False,
133 | "old_project": None,
134 | "new_project": {
135 | "name": seen["create_payload"]["name"],
136 | "path": seen["create_payload"]["path"],
137 | },
138 | },
139 | )
140 |
141 | raise AssertionError(f"Unexpected request: {request.method} {request.url}")
142 |
143 | transport = httpx.MockTransport(handler)
144 |
145 | @asynccontextmanager
146 | async def http_client_factory():
147 | async with httpx.AsyncClient(
148 | transport=transport, base_url="https://cloud.example.test"
149 | ) as client:
150 | yield client
151 |
152 | async def api_request(**kwargs):
153 | return await make_api_request(auth=auth, http_client_factory=http_client_factory, **kwargs)
154 |
155 | projects = await fetch_cloud_projects(api_request=api_request)
156 | assert [p.name for p in projects.projects] == ["alpha", "beta"]
157 |
158 | assert await project_exists("alpha", api_request=api_request) is True
159 | assert await project_exists("missing", api_request=api_request) is False
160 |
161 | created = await create_cloud_project("My Project", api_request=api_request)
162 | assert created.new_project is not None
163 | assert created.new_project["name"] == "My Project"
164 | # Path should be permalink-like (kebab)
165 | assert seen["create_payload"]["path"] == "my-project"
166 |
```
--------------------------------------------------------------------------------
/src/basic_memory/api/v2/routers/importer_router.py:
--------------------------------------------------------------------------------
```python
1 | """V2 Import Router - ID-based data import operations.
2 |
3 | This router uses v2 dependencies for consistent project handling with external_id UUIDs.
4 | Import endpoints use project_id in the path for consistency with other v2 endpoints.
5 | """
6 |
7 | import json
8 | import logging
9 |
10 | from fastapi import APIRouter, Form, HTTPException, UploadFile, status, Path
11 |
12 | from basic_memory.deps import (
13 | ChatGPTImporterV2ExternalDep,
14 | ClaudeConversationsImporterV2ExternalDep,
15 | ClaudeProjectsImporterV2ExternalDep,
16 | MemoryJsonImporterV2ExternalDep,
17 | )
18 | from basic_memory.importers import Importer
19 | from basic_memory.schemas.importer import (
20 | ChatImportResult,
21 | EntityImportResult,
22 | ProjectImportResult,
23 | )
24 |
25 | logger = logging.getLogger(__name__)
26 |
27 | router = APIRouter(prefix="/import", tags=["import-v2"])
28 |
29 |
30 | @router.post("/chatgpt", response_model=ChatImportResult)
31 | async def import_chatgpt(
32 | importer: ChatGPTImporterV2ExternalDep,
33 | file: UploadFile,
34 | project_id: str = Path(..., description="Project external UUID"),
35 | folder: str = Form("conversations"),
36 | ) -> ChatImportResult:
37 | """Import conversations from ChatGPT JSON export.
38 |
39 | Args:
40 | project_id: Project external UUID from URL path
41 | file: The ChatGPT conversations.json file.
42 | folder: The folder to place the files in.
43 | importer: ChatGPT importer instance.
44 |
45 | Returns:
46 | ChatImportResult with import statistics.
47 |
48 | Raises:
49 | HTTPException: If import fails.
50 | """
51 | logger.info(f"V2 Importing ChatGPT conversations for project {project_id}")
52 | return await import_file(importer, file, folder)
53 |
54 |
55 | @router.post("/claude/conversations", response_model=ChatImportResult)
56 | async def import_claude_conversations(
57 | importer: ClaudeConversationsImporterV2ExternalDep,
58 | file: UploadFile,
59 | project_id: str = Path(..., description="Project external UUID"),
60 | folder: str = Form("conversations"),
61 | ) -> ChatImportResult:
62 | """Import conversations from Claude conversations.json export.
63 |
64 | Args:
65 | project_id: Project external UUID from URL path
66 | file: The Claude conversations.json file.
67 | folder: The folder to place the files in.
68 | importer: Claude conversations importer instance.
69 |
70 | Returns:
71 | ChatImportResult with import statistics.
72 |
73 | Raises:
74 | HTTPException: If import fails.
75 | """
76 | logger.info(f"V2 Importing Claude conversations for project {project_id}")
77 | return await import_file(importer, file, folder)
78 |
79 |
80 | @router.post("/claude/projects", response_model=ProjectImportResult)
81 | async def import_claude_projects(
82 | importer: ClaudeProjectsImporterV2ExternalDep,
83 | file: UploadFile,
84 | project_id: str = Path(..., description="Project external UUID"),
85 | folder: str = Form("projects"),
86 | ) -> ProjectImportResult:
87 | """Import projects from Claude projects.json export.
88 |
89 | Args:
90 | project_id: Project external UUID from URL path
91 | file: The Claude projects.json file.
92 | folder: The base folder to place the files in.
93 | importer: Claude projects importer instance.
94 |
95 | Returns:
96 | ProjectImportResult with import statistics.
97 |
98 | Raises:
99 | HTTPException: If import fails.
100 | """
101 | logger.info(f"V2 Importing Claude projects for project {project_id}")
102 | return await import_file(importer, file, folder)
103 |
104 |
105 | @router.post("/memory-json", response_model=EntityImportResult)
106 | async def import_memory_json(
107 | importer: MemoryJsonImporterV2ExternalDep,
108 | file: UploadFile,
109 | project_id: str = Path(..., description="Project external UUID"),
110 | folder: str = Form("conversations"),
111 | ) -> EntityImportResult:
112 | """Import entities and relations from a memory.json file.
113 |
114 | Args:
115 | project_id: Project external UUID from URL path
116 | file: The memory.json file.
117 | folder: Optional destination folder within the project.
118 | importer: Memory JSON importer instance.
119 |
120 | Returns:
121 | EntityImportResult with import statistics.
122 |
123 | Raises:
124 | HTTPException: If import fails.
125 | """
126 | logger.info(f"V2 Importing memory.json for project {project_id}")
127 | try:
128 | file_data = []
129 | file_bytes = await file.read()
130 | file_str = file_bytes.decode("utf-8")
131 | for line in file_str.splitlines():
132 | json_data = json.loads(line)
133 | file_data.append(json_data)
134 |
135 | result = await importer.import_data(file_data, folder)
136 | if not result.success: # pragma: no cover
137 | raise HTTPException(
138 | status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
139 | detail=result.error_message or "Import failed",
140 | )
141 | except Exception as e:
142 | logger.exception("V2 Import failed")
143 | raise HTTPException(
144 | status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
145 | detail=f"Import failed: {str(e)}",
146 | )
147 | return result
148 |
149 |
150 | async def import_file(importer: Importer, file: UploadFile, destination_folder: str):
151 | """Helper function to import a file using an importer instance.
152 |
153 | Args:
154 | importer: The importer instance to use
155 | file: The file to import
156 | destination_folder: Destination folder for imported content
157 |
158 | Returns:
159 | Import result from the importer
160 |
161 | Raises:
162 | HTTPException: If import fails
163 | """
164 | try:
165 | # Process file
166 | json_data = json.load(file.file)
167 | result = await importer.import_data(json_data, destination_folder)
168 | if not result.success: # pragma: no cover
169 | raise HTTPException(
170 | status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
171 | detail=result.error_message or "Import failed",
172 | )
173 |
174 | return result
175 |
176 | except Exception as e:
177 | logger.exception("V2 Import failed")
178 | raise HTTPException(
179 | status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
180 | detail=f"Import failed: {str(e)}",
181 | )
182 |
```
--------------------------------------------------------------------------------
/test-int/cli/test_project_commands_integration.py:
--------------------------------------------------------------------------------
```python
1 | """Integration tests for project CLI commands."""
2 |
3 | import tempfile
4 | from pathlib import Path
5 |
6 | from typer.testing import CliRunner
7 |
8 | from basic_memory.cli.main import app as cli_app
9 |
10 |
11 | def test_project_list(app, app_config, test_project, config_manager):
12 | """Test 'bm project list' command shows projects."""
13 | runner = CliRunner()
14 | result = runner.invoke(cli_app, ["project", "list"])
15 |
16 | if result.exit_code != 0:
17 | print(f"STDOUT: {result.stdout}")
18 | print(f"STDERR: {result.stderr}")
19 | print(f"Exception: {result.exception}")
20 | assert result.exit_code == 0
21 | assert "test-project" in result.stdout
22 | assert "[X]" in result.stdout # default marker
23 |
24 |
25 | def test_project_info(app, app_config, test_project, config_manager):
26 | """Test 'bm project info' command shows project details."""
27 | runner = CliRunner()
28 | result = runner.invoke(cli_app, ["project", "info", "test-project"])
29 |
30 | if result.exit_code != 0:
31 | print(f"STDOUT: {result.stdout}")
32 | print(f"STDERR: {result.stderr}")
33 | assert result.exit_code == 0
34 | assert "Basic Memory Project Info" in result.stdout
35 | assert "test-project" in result.stdout
36 | assert "Statistics" in result.stdout
37 |
38 |
39 | def test_project_info_json(app, app_config, test_project, config_manager):
40 | """Test 'bm project info --json' command outputs valid JSON."""
41 | import json
42 |
43 | runner = CliRunner()
44 | result = runner.invoke(cli_app, ["project", "info", "test-project", "--json"])
45 |
46 | if result.exit_code != 0:
47 | print(f"STDOUT: {result.stdout}")
48 | print(f"STDERR: {result.stderr}")
49 | assert result.exit_code == 0
50 |
51 | # Parse JSON to verify it's valid
52 | data = json.loads(result.stdout)
53 | assert data["project_name"] == "test-project"
54 | assert "statistics" in data
55 | assert "system" in data
56 |
57 |
58 | def test_project_add_and_remove(app, app_config, config_manager):
59 | """Test adding and removing a project."""
60 | runner = CliRunner()
61 |
62 | # Use a separate temporary directory to avoid nested path conflicts
63 | with tempfile.TemporaryDirectory() as temp_dir:
64 | new_project_path = Path(temp_dir) / "new-project"
65 | new_project_path.mkdir()
66 |
67 | # Add project
68 | result = runner.invoke(cli_app, ["project", "add", "new-project", str(new_project_path)])
69 |
70 | if result.exit_code != 0:
71 | print(f"STDOUT: {result.stdout}")
72 | print(f"STDERR: {result.stderr}")
73 | assert result.exit_code == 0
74 | assert (
75 | "Project 'new-project' added successfully" in result.stdout
76 | or "added" in result.stdout.lower()
77 | )
78 |
79 | # Verify it shows up in list
80 | result = runner.invoke(cli_app, ["project", "list"])
81 | assert result.exit_code == 0
82 | assert "new-project" in result.stdout
83 |
84 | # Remove project
85 | result = runner.invoke(cli_app, ["project", "remove", "new-project"])
86 | assert result.exit_code == 0
87 | assert "removed" in result.stdout.lower() or "deleted" in result.stdout.lower()
88 |
89 |
90 | def test_project_set_default(app, app_config, config_manager):
91 | """Test setting default project."""
92 | runner = CliRunner()
93 |
94 | # Use a separate temporary directory to avoid nested path conflicts
95 | with tempfile.TemporaryDirectory() as temp_dir:
96 | new_project_path = Path(temp_dir) / "another-project"
97 | new_project_path.mkdir()
98 |
99 | # Add a second project
100 | result = runner.invoke(
101 | cli_app, ["project", "add", "another-project", str(new_project_path)]
102 | )
103 | if result.exit_code != 0:
104 | print(f"STDOUT: {result.stdout}")
105 | print(f"STDERR: {result.stderr}")
106 | assert result.exit_code == 0
107 |
108 | # Set as default
109 | result = runner.invoke(cli_app, ["project", "default", "another-project"])
110 | if result.exit_code != 0:
111 | print(f"STDOUT: {result.stdout}")
112 | print(f"STDERR: {result.stderr}")
113 | assert result.exit_code == 0
114 | assert "default" in result.stdout.lower()
115 |
116 | # Verify in list
117 | result = runner.invoke(cli_app, ["project", "list"])
118 | assert result.exit_code == 0
119 | # The new project should have the [X] marker now
120 | lines = result.stdout.split("\n")
121 | for line in lines:
122 | if "another-project" in line:
123 | assert "[X]" in line
124 |
125 |
126 | def test_remove_main_project(app, app_config, config_manager):
127 | """Test that removing main project then listing projects prevents main from reappearing (issue #397)."""
128 | runner = CliRunner()
129 |
130 | # Create separate temp dirs for each project
131 | with (
132 | tempfile.TemporaryDirectory() as main_dir,
133 | tempfile.TemporaryDirectory() as new_default_dir,
134 | ):
135 | main_path = Path(main_dir)
136 | new_default_path = Path(new_default_dir)
137 |
138 | # Ensure main exists
139 | result = runner.invoke(cli_app, ["project", "list"])
140 | if "main" not in result.stdout:
141 | result = runner.invoke(cli_app, ["project", "add", "main", str(main_path)])
142 | print(result.stdout)
143 | assert result.exit_code == 0
144 |
145 | # Confirm main is present
146 | result = runner.invoke(cli_app, ["project", "list"])
147 | assert "main" in result.stdout
148 |
149 | # Add a second project
150 | result = runner.invoke(cli_app, ["project", "add", "new_default", str(new_default_path)])
151 | assert result.exit_code == 0
152 |
153 | # Set new_default as default (if needed)
154 | result = runner.invoke(cli_app, ["project", "default", "new_default"])
155 | assert result.exit_code == 0
156 |
157 | # Remove main
158 | result = runner.invoke(cli_app, ["project", "remove", "main"])
159 | assert result.exit_code == 0
160 |
161 | # Confirm only new_default exists and main does not
162 | result = runner.invoke(cli_app, ["project", "list"])
163 | assert result.exit_code == 0
164 | assert "main" not in result.stdout
165 | assert "new_default" in result.stdout
166 |
```
--------------------------------------------------------------------------------
/tests/sync/test_tmp_files.py:
--------------------------------------------------------------------------------
```python
1 | """Test proper handling of .tmp files during sync."""
2 |
3 | import asyncio
4 | from pathlib import Path
5 |
6 | import pytest
7 | from watchfiles import Change
8 |
9 |
10 | async def create_test_file(path: Path, content: str = "test content") -> None:
11 | """Create a test file with given content."""
12 | path.parent.mkdir(parents=True, exist_ok=True)
13 | path.write_text(content)
14 |
15 |
16 | @pytest.mark.asyncio
17 | async def test_temp_file_filter(watch_service, app_config, project_config, test_project):
18 | """Test that .tmp files are correctly filtered out."""
19 | # Test filter_changes method directly
20 | tmp_path = Path(test_project.path) / "test.tmp"
21 | assert not watch_service.filter_changes(Change.added, str(tmp_path))
22 |
23 | # Test with valid file
24 | valid_path = Path(test_project.path) / "test.md"
25 | assert watch_service.filter_changes(Change.added, str(valid_path))
26 |
27 |
28 | @pytest.mark.asyncio
29 | async def test_handle_tmp_files(watch_service, project_config, test_project, sync_service):
30 | """Test handling of .tmp files during sync process."""
31 | project_dir = Path(test_project.path)
32 |
33 | # Create a .tmp file - this simulates a file being written with write_file_atomic
34 | tmp_file = project_dir / "test.tmp"
35 | await create_test_file(tmp_file, "This is a temporary file")
36 |
37 | # Create the target final file
38 | final_file = project_dir / "test.md"
39 | await create_test_file(final_file, "This is the final file")
40 |
41 | # Setup changes that include both the .tmp and final file
42 | changes = {
43 | (Change.added, str(tmp_file)),
44 | (Change.added, str(final_file)),
45 | }
46 |
47 | # Handle changes
48 | await watch_service.handle_changes(test_project, changes)
49 |
50 | # Verify only the final file got an entity
51 | tmp_entity = await sync_service.entity_repository.get_by_file_path("test.tmp")
52 | final_entity = await sync_service.entity_repository.get_by_file_path("test.md")
53 |
54 | assert tmp_entity is None, "Temp file should not have an entity"
55 | assert final_entity is not None, "Final file should have an entity"
56 |
57 |
58 | @pytest.mark.asyncio
59 | async def test_atomic_write_tmp_file_handling(
60 | watch_service, project_config, test_project, sync_service
61 | ):
62 | """Test handling of file changes during atomic write operations."""
63 | project_dir = project_config.home
64 |
65 | # This test simulates the full atomic write process:
66 | # 1. First a .tmp file is created
67 | # 2. Then the .tmp file is renamed to the final file
68 | # 3. Both events are processed by the watch service
69 |
70 | # Setup file paths
71 | tmp_path = project_dir / "document.tmp"
72 | final_path = project_dir / "document.md"
73 |
74 | # Create mockup of the atomic write process
75 | await create_test_file(tmp_path, "Content for document")
76 |
77 | # First batch of changes - .tmp file created
78 | changes1 = {(Change.added, str(tmp_path))}
79 |
80 | # Process first batch
81 | await watch_service.handle_changes(test_project, changes1)
82 |
83 | # Now "replace" the temp file with the final file
84 | tmp_path.rename(final_path)
85 |
86 | # Second batch of changes - .tmp file deleted, final file added
87 | changes2 = {(Change.deleted, str(tmp_path)), (Change.added, str(final_path))}
88 |
89 | # Process second batch
90 | await watch_service.handle_changes(test_project, changes2)
91 |
92 | # Verify only the final file is in the database
93 | tmp_entity = await sync_service.entity_repository.get_by_file_path("document.tmp")
94 | final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
95 |
96 | assert tmp_entity is None, "Temp file should not have an entity"
97 | assert final_entity is not None, "Final file should have an entity"
98 |
99 | # Check events
100 | new_events = [e for e in watch_service.state.recent_events if e.action == "new"]
101 | assert len(new_events) == 1
102 | assert new_events[0].path == "document.md"
103 |
104 |
105 | @pytest.mark.asyncio
106 | async def test_rapid_atomic_writes(watch_service, project_config, test_project, sync_service):
107 | """Test handling of rapid atomic writes to the same destination."""
108 | project_dir = Path(test_project.path)
109 |
110 | # This test simulates multiple rapid atomic writes to the same file:
111 | # 1. Several .tmp files are created one after another
112 | # 2. Each is then renamed to the same final file
113 | # 3. Events are batched and processed together
114 |
115 | # Setup file paths
116 | tmp1_path = project_dir / "document.1.tmp"
117 | tmp2_path = project_dir / "document.2.tmp"
118 | final_path = project_dir / "document.md"
119 |
120 | # Create multiple temp files that will be used in sequence
121 | await create_test_file(tmp1_path, "First version")
122 | await create_test_file(tmp2_path, "Second version")
123 |
124 | # Simulate the first atomic write
125 | tmp1_path.replace(final_path)
126 |
127 | # Brief pause to ensure file system registers the change
128 | await asyncio.sleep(0.1)
129 |
130 | # Read content to verify
131 | content1 = final_path.read_text(encoding="utf-8")
132 | assert content1 == "First version"
133 |
134 | # Simulate the second atomic write
135 | tmp2_path.replace(final_path)
136 |
137 | # Verify content was updated
138 | content2 = final_path.read_text(encoding="utf-8")
139 | assert content2 == "Second version"
140 |
141 | # Create a batch of changes that might arrive in mixed order
142 | changes = {
143 | (Change.added, str(tmp1_path)),
144 | (Change.deleted, str(tmp1_path)),
145 | (Change.added, str(tmp2_path)),
146 | (Change.deleted, str(tmp2_path)),
147 | (Change.added, str(final_path)),
148 | (Change.modified, str(final_path)),
149 | }
150 |
151 | # Process all changes
152 | await watch_service.handle_changes(test_project, changes)
153 |
154 | # Verify only the final file is in the database
155 | final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
156 | assert final_entity is not None
157 |
158 | # Also verify no tmp entities were created
159 | tmp1_entity = await sync_service.entity_repository.get_by_file_path("document.1.tmp")
160 | tmp2_entity = await sync_service.entity_repository.get_by_file_path("document.2.tmp")
161 | assert tmp1_entity is None
162 | assert tmp2_entity is None
163 |
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/project_context.py:
--------------------------------------------------------------------------------
```python
1 | """Project context utilities for Basic Memory MCP server.
2 |
3 | Provides project lookup utilities for MCP tools.
4 | Handles project validation and context management in one place.
5 |
6 | Note: This module uses ProjectResolver for unified project resolution.
7 | The resolve_project_parameter function is a thin wrapper for backwards
8 | compatibility with existing MCP tools.
9 | """
10 |
11 | from typing import Optional, List
12 | from httpx import AsyncClient
13 | from httpx._types import (
14 | HeaderTypes,
15 | )
16 | from loguru import logger
17 | from fastmcp import Context
18 |
19 | from basic_memory.config import ConfigManager
20 | from basic_memory.project_resolver import ProjectResolver
21 | from basic_memory.schemas.project_info import ProjectItem, ProjectList
22 | from basic_memory.utils import generate_permalink
23 |
24 |
25 | async def resolve_project_parameter(
26 | project: Optional[str] = None,
27 | allow_discovery: bool = False,
28 | cloud_mode: Optional[bool] = None,
29 | default_project_mode: Optional[bool] = None,
30 | default_project: Optional[str] = None,
31 | ) -> Optional[str]:
32 | """Resolve project parameter using three-tier hierarchy.
33 |
34 | This is a thin wrapper around ProjectResolver for backwards compatibility.
35 | New code should consider using ProjectResolver directly for more detailed
36 | resolution information.
37 |
38 | if cloud_mode:
39 | project is required (unless allow_discovery=True for tools that support discovery mode)
40 | else:
41 | Resolution order:
42 | 1. Single Project Mode (--project cli arg, or BASIC_MEMORY_MCP_PROJECT env var) - highest priority
43 | 2. Explicit project parameter - medium priority
44 | 3. Default project if default_project_mode=true - lowest priority
45 |
46 | Args:
47 | project: Optional explicit project parameter
48 | allow_discovery: If True, allows returning None in cloud mode for discovery mode
49 | (used by tools like recent_activity that can operate across all projects)
50 | cloud_mode: Optional explicit cloud mode. If not provided, reads from ConfigManager.
51 | default_project_mode: Optional explicit default project mode. If not provided, reads from ConfigManager.
52 | default_project: Optional explicit default project. If not provided, reads from ConfigManager.
53 |
54 | Returns:
55 | Resolved project name or None if no resolution possible
56 | """
57 | # Load config for any values not explicitly provided
58 | if cloud_mode is None or default_project_mode is None or default_project is None:
59 | config = ConfigManager().config
60 | if cloud_mode is None:
61 | cloud_mode = config.cloud_mode
62 | if default_project_mode is None:
63 | default_project_mode = config.default_project_mode
64 | if default_project is None:
65 | default_project = config.default_project
66 |
67 | # Create resolver with configuration and resolve
68 | resolver = ProjectResolver.from_env(
69 | cloud_mode=cloud_mode,
70 | default_project_mode=default_project_mode,
71 | default_project=default_project,
72 | )
73 | result = resolver.resolve(project=project, allow_discovery=allow_discovery)
74 | return result.project
75 |
76 |
77 | async def get_project_names(client: AsyncClient, headers: HeaderTypes | None = None) -> List[str]:
78 | # Deferred import to avoid circular dependency with tools
79 | from basic_memory.mcp.tools.utils import call_get
80 |
81 | response = await call_get(client, "/projects/projects", headers=headers)
82 | project_list = ProjectList.model_validate(response.json())
83 | return [project.name for project in project_list.projects]
84 |
85 |
86 | async def get_active_project(
87 | client: AsyncClient,
88 | project: Optional[str] = None,
89 | context: Optional[Context] = None,
90 | headers: HeaderTypes | None = None,
91 | ) -> ProjectItem:
92 | """Get and validate project, setting it in context if available.
93 |
94 | Args:
95 | client: HTTP client for API calls
96 | project: Optional project name (resolved using hierarchy)
97 | context: Optional FastMCP context to cache the result
98 |
99 | Returns:
100 | The validated project item
101 |
102 | Raises:
103 | ValueError: If no project can be resolved
104 | HTTPError: If project doesn't exist or is inaccessible
105 | """
106 | # Deferred import to avoid circular dependency with tools
107 | from basic_memory.mcp.tools.utils import call_get
108 |
109 | resolved_project = await resolve_project_parameter(project)
110 | if not resolved_project:
111 | project_names = await get_project_names(client, headers)
112 | raise ValueError(
113 | "No project specified. "
114 | "Either set 'default_project_mode=true' in config, or use 'project' argument.\n"
115 | f"Available projects: {project_names}"
116 | )
117 |
118 | project = resolved_project
119 |
120 | # Check if already cached in context
121 | if context:
122 | cached_project = context.get_state("active_project")
123 | if cached_project and cached_project.name == project:
124 | logger.debug(f"Using cached project from context: {project}")
125 | return cached_project
126 |
127 | # Validate project exists by calling API
128 | logger.debug(f"Validating project: {project}")
129 | permalink = generate_permalink(project)
130 | response = await call_get(client, f"/{permalink}/project/item", headers=headers)
131 | active_project = ProjectItem.model_validate(response.json())
132 |
133 | # Cache in context if available
134 | if context:
135 | context.set_state("active_project", active_project)
136 | logger.debug(f"Cached project in context: {project}")
137 |
138 | logger.debug(f"Validated project: {active_project.name}")
139 | return active_project
140 |
141 |
142 | def add_project_metadata(result: str, project_name: str) -> str:
143 | """Add project context as metadata footer for assistant session tracking.
144 |
145 | Provides clear project context to help the assistant remember which
146 | project is being used throughout the conversation session.
147 |
148 | Args:
149 | result: The tool result string
150 | project_name: The project name that was used
151 |
152 | Returns:
153 | Result with project session tracking metadata
154 | """
155 | return f"{result}\n\n[Session: Using project '{project_name}']"
156 |
```
--------------------------------------------------------------------------------
/test-int/test_db_wal_mode.py:
--------------------------------------------------------------------------------
```python
1 | """Integration tests for WAL mode and Windows-specific SQLite optimizations.
2 |
3 | These tests use real filesystem databases (not in-memory) to verify WAL mode
4 | and other SQLite configuration settings work correctly in production scenarios.
5 | """
6 |
7 | import pytest
8 | from sqlalchemy import text
9 |
10 |
11 | @pytest.mark.asyncio
12 | async def test_wal_mode_enabled(engine_factory, db_backend):
13 | """Test that WAL mode is enabled on filesystem database connections."""
14 | if db_backend == "postgres":
15 | pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
16 |
17 | engine, _ = engine_factory
18 |
19 | # Execute a query to verify WAL mode is enabled
20 | async with engine.connect() as conn:
21 | result = await conn.execute(text("PRAGMA journal_mode"))
22 | journal_mode = result.fetchone()[0]
23 |
24 | # WAL mode should be enabled for filesystem databases
25 | assert journal_mode.upper() == "WAL"
26 |
27 |
28 | @pytest.mark.asyncio
29 | async def test_busy_timeout_configured(engine_factory, db_backend):
30 | """Test that busy timeout is configured for database connections."""
31 | if db_backend == "postgres":
32 | pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
33 |
34 | engine, _ = engine_factory
35 |
36 | async with engine.connect() as conn:
37 | result = await conn.execute(text("PRAGMA busy_timeout"))
38 | busy_timeout = result.fetchone()[0]
39 |
40 | # Busy timeout should be 10 seconds (10000 milliseconds)
41 | assert busy_timeout == 10000
42 |
43 |
44 | @pytest.mark.asyncio
45 | async def test_synchronous_mode_configured(engine_factory, db_backend):
46 | """Test that synchronous mode is set to NORMAL for performance."""
47 | if db_backend == "postgres":
48 | pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
49 |
50 | engine, _ = engine_factory
51 |
52 | async with engine.connect() as conn:
53 | result = await conn.execute(text("PRAGMA synchronous"))
54 | synchronous = result.fetchone()[0]
55 |
56 | # Synchronous should be NORMAL (1)
57 | assert synchronous == 1
58 |
59 |
60 | @pytest.mark.asyncio
61 | async def test_cache_size_configured(engine_factory, db_backend):
62 | """Test that cache size is configured for performance."""
63 | if db_backend == "postgres":
64 | pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
65 |
66 | engine, _ = engine_factory
67 |
68 | async with engine.connect() as conn:
69 | result = await conn.execute(text("PRAGMA cache_size"))
70 | cache_size = result.fetchone()[0]
71 |
72 | # Cache size should be -64000 (64MB)
73 | assert cache_size == -64000
74 |
75 |
76 | @pytest.mark.asyncio
77 | async def test_temp_store_configured(engine_factory, db_backend):
78 | """Test that temp_store is set to MEMORY."""
79 | if db_backend == "postgres":
80 | pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")
81 |
82 | engine, _ = engine_factory
83 |
84 | async with engine.connect() as conn:
85 | result = await conn.execute(text("PRAGMA temp_store"))
86 | temp_store = result.fetchone()[0]
87 |
88 | # temp_store should be MEMORY (2)
89 | assert temp_store == 2
90 |
91 |
92 | @pytest.mark.asyncio
93 | @pytest.mark.windows
94 | @pytest.mark.skipif(
95 | __import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
96 | )
97 | async def test_windows_locking_mode_when_on_windows(tmp_path, monkeypatch, config_manager):
98 | """Test that Windows-specific locking mode is set when running on Windows."""
99 | from basic_memory.db import engine_session_factory, DatabaseType
100 | from basic_memory.config import DatabaseBackend
101 |
102 | # Force SQLite backend for this SQLite-specific test
103 | config_manager.config.database_backend = DatabaseBackend.SQLITE
104 |
105 | # Set HOME environment variable
106 | monkeypatch.setenv("HOME", str(tmp_path))
107 | monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
108 |
109 | db_path = tmp_path / "test_windows.db"
110 |
111 | async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (
112 | engine,
113 | _,
114 | ):
115 | async with engine.connect() as conn:
116 | result = await conn.execute(text("PRAGMA locking_mode"))
117 | locking_mode = result.fetchone()[0]
118 |
119 | # Locking mode should be NORMAL on Windows
120 | assert locking_mode.upper() == "NORMAL"
121 |
122 |
123 | @pytest.mark.asyncio
124 | @pytest.mark.windows
125 | @pytest.mark.skipif(
126 | __import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
127 | )
128 | async def test_null_pool_on_windows(tmp_path, monkeypatch):
129 | """Test that NullPool is used on Windows to avoid connection pooling issues."""
130 | from basic_memory.db import engine_session_factory, DatabaseType
131 | from sqlalchemy.pool import NullPool
132 |
133 | # Set HOME environment variable
134 | monkeypatch.setenv("HOME", str(tmp_path))
135 | monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
136 |
137 | db_path = tmp_path / "test_windows_pool.db"
138 |
139 | async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (engine, _):
140 | # Engine should be using NullPool on Windows
141 | assert isinstance(engine.pool, NullPool)
142 |
143 |
144 | @pytest.mark.asyncio
145 | @pytest.mark.windows
146 | @pytest.mark.skipif(
147 | __import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
148 | )
149 | async def test_memory_database_no_null_pool_on_windows(tmp_path, monkeypatch):
150 | """Test that in-memory databases do NOT use NullPool even on Windows.
151 |
152 | NullPool closes connections immediately, which destroys in-memory databases.
153 | This test ensures in-memory databases maintain connection pooling.
154 | """
155 | from basic_memory.db import engine_session_factory, DatabaseType
156 | from sqlalchemy.pool import NullPool
157 |
158 | # Set HOME environment variable
159 | monkeypatch.setenv("HOME", str(tmp_path))
160 | monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
161 |
162 | db_path = tmp_path / "test_memory.db"
163 |
164 | async with engine_session_factory(db_path, DatabaseType.MEMORY) as (engine, _):
165 | # In-memory databases should NOT use NullPool on Windows
166 | assert not isinstance(engine.pool, NullPool)
167 |
```
--------------------------------------------------------------------------------
/src/basic_memory/repository/relation_repository.py:
--------------------------------------------------------------------------------
```python
1 | """Repository for managing Relation objects."""
2 |
3 | from typing import Sequence, List, Optional, Any, cast
4 |
5 | from sqlalchemy import and_, delete, select
6 | from sqlalchemy.engine import CursorResult
7 | from sqlalchemy.dialects.postgresql import insert as pg_insert
8 | from sqlalchemy.dialects.sqlite import insert as sqlite_insert
9 | from sqlalchemy.ext.asyncio import async_sessionmaker
10 | from sqlalchemy.orm import selectinload, aliased
11 | from sqlalchemy.orm.interfaces import LoaderOption
12 |
13 | from basic_memory import db
14 | from basic_memory.models import Relation, Entity
15 | from basic_memory.repository.repository import Repository
16 |
17 |
18 | class RelationRepository(Repository[Relation]):
19 | """Repository for Relation model with memory-specific operations."""
20 |
21 | def __init__(self, session_maker: async_sessionmaker, project_id: int):
22 | """Initialize with session maker and project_id filter.
23 |
24 | Args:
25 | session_maker: SQLAlchemy session maker
26 | project_id: Project ID to filter all operations by
27 | """
28 | super().__init__(session_maker, Relation, project_id=project_id)
29 |
30 | async def find_relation(
31 | self, from_permalink: str, to_permalink: str, relation_type: str
32 | ) -> Optional[Relation]:
33 | """Find a relation by its from and to path IDs."""
34 | from_entity = aliased(Entity)
35 | to_entity = aliased(Entity)
36 |
37 | query = (
38 | select(Relation)
39 | .join(from_entity, Relation.from_id == from_entity.id)
40 | .join(to_entity, Relation.to_id == to_entity.id)
41 | .where(
42 | and_(
43 | from_entity.permalink == from_permalink,
44 | to_entity.permalink == to_permalink,
45 | Relation.relation_type == relation_type,
46 | )
47 | )
48 | )
49 | return await self.find_one(query)
50 |
51 | async def find_by_entities(self, from_id: int, to_id: int) -> Sequence[Relation]:
52 | """Find all relations between two entities."""
53 | query = select(Relation).where((Relation.from_id == from_id) & (Relation.to_id == to_id))
54 | result = await self.execute_query(query)
55 | return result.scalars().all()
56 |
57 | async def find_by_type(self, relation_type: str) -> Sequence[Relation]:
58 | """Find all relations of a specific type."""
59 | query = select(Relation).filter(Relation.relation_type == relation_type)
60 | result = await self.execute_query(query)
61 | return result.scalars().all()
62 |
63 | async def delete_outgoing_relations_from_entity(self, entity_id: int) -> None:
64 | """Delete outgoing relations for an entity.
65 |
66 | Only deletes relations where this entity is the source (from_id),
67 | as these are the ones owned by this entity's markdown file.
68 | """
69 | async with db.scoped_session(self.session_maker) as session:
70 | await session.execute(delete(Relation).where(Relation.from_id == entity_id))
71 |
72 | async def find_unresolved_relations(self) -> Sequence[Relation]:
73 | """Find all unresolved relations, where to_id is null."""
74 | query = select(Relation).filter(Relation.to_id.is_(None))
75 | result = await self.execute_query(query)
76 | return result.scalars().all()
77 |
78 | async def find_unresolved_relations_for_entity(self, entity_id: int) -> Sequence[Relation]:
79 | """Find unresolved relations for a specific entity.
80 |
81 | Args:
82 | entity_id: The entity whose unresolved outgoing relations to find.
83 |
84 | Returns:
85 | List of unresolved relations where this entity is the source.
86 | """
87 | query = select(Relation).filter(Relation.from_id == entity_id, Relation.to_id.is_(None))
88 | result = await self.execute_query(query)
89 | return result.scalars().all()
90 |
91 | async def add_all_ignore_duplicates(self, relations: List[Relation]) -> int:
92 | """Bulk insert relations, ignoring duplicates.
93 |
94 | Uses ON CONFLICT DO NOTHING to skip relations that would violate the
95 | unique constraint on (from_id, to_name, relation_type). This is useful
96 | for bulk operations where the same link may appear multiple times in
97 | a document.
98 |
99 | Works with both SQLite and PostgreSQL dialects.
100 |
101 | Args:
102 | relations: List of Relation objects to insert
103 |
104 | Returns:
105 | Number of relations actually inserted (excludes duplicates)
106 | """
107 | if not relations:
108 | return 0
109 |
110 | # Convert Relation objects to dicts for insert
111 | values = [
112 | {
113 | "project_id": r.project_id if r.project_id else self.project_id,
114 | "from_id": r.from_id,
115 | "to_id": r.to_id,
116 | "to_name": r.to_name,
117 | "relation_type": r.relation_type,
118 | "context": r.context,
119 | }
120 | for r in relations
121 | ]
122 |
123 | async with db.scoped_session(self.session_maker) as session:
124 | # Check dialect to use appropriate insert
125 | dialect_name = session.bind.dialect.name if session.bind else "sqlite"
126 |
127 | if dialect_name == "postgresql": # pragma: no cover
128 | # PostgreSQL: use RETURNING to count inserted rows
129 | # (rowcount is 0 for ON CONFLICT DO NOTHING)
130 | stmt = ( # pragma: no cover
131 | pg_insert(Relation)
132 | .values(values)
133 | .on_conflict_do_nothing()
134 | .returning(Relation.id)
135 | )
136 | result = await session.execute(stmt) # pragma: no cover
137 | return len(result.fetchall()) # pragma: no cover
138 | else:
139 | # SQLite: rowcount works correctly
140 | stmt = sqlite_insert(Relation).values(values)
141 | stmt = stmt.on_conflict_do_nothing()
142 | result = cast(CursorResult[Any], await session.execute(stmt))
143 | return result.rowcount if result.rowcount > 0 else 0
144 |
145 | def get_load_options(self) -> List[LoaderOption]:
146 | return [selectinload(Relation.from_entity), selectinload(Relation.to_entity)]
147 |
```
--------------------------------------------------------------------------------
/test-int/mcp/test_build_context_underscore.py:
--------------------------------------------------------------------------------
```python
1 | """Integration test for build_context with underscore in memory:// URLs."""
2 |
3 | import pytest
4 | from fastmcp import Client
5 |
6 |
7 | @pytest.mark.asyncio
8 | async def test_build_context_underscore_normalization(mcp_server, app, test_project):
9 | """Test that build_context normalizes underscores in relation types."""
10 |
11 | async with Client(mcp_server) as client:
12 | # Create parent note
13 | await client.call_tool(
14 | "write_note",
15 | {
16 | "project": test_project.name,
17 | "title": "Parent Entity",
18 | "folder": "testing",
19 | "content": "# Parent Entity\n\nMain entity for testing underscore relations.",
20 | "tags": "test,parent",
21 | },
22 | )
23 |
24 | # Create child notes with different relation formats
25 | await client.call_tool(
26 | "write_note",
27 | {
28 | "project": test_project.name,
29 | "title": "Child with Underscore",
30 | "folder": "testing",
31 | "content": """# Child with Underscore
32 |
33 | - part_of [[Parent Entity]]
34 | - related_to [[Parent Entity]]
35 | """,
36 | "tags": "test,child",
37 | },
38 | )
39 |
40 | await client.call_tool(
41 | "write_note",
42 | {
43 | "project": test_project.name,
44 | "title": "Child with Hyphen",
45 | "folder": "testing",
46 | "content": """# Child with Hyphen
47 |
48 | - part-of [[Parent Entity]]
49 | - related-to [[Parent Entity]]
50 | """,
51 | "tags": "test,child",
52 | },
53 | )
54 |
55 | # Test 1: Search with underscore format should return results
56 | # Relation permalinks are: source/relation_type/target
57 | # So child-with-underscore/part-of/parent-entity
58 | result_underscore = await client.call_tool(
59 | "build_context",
60 | {
61 | "project": test_project.name,
62 | "url": "memory://testing/*/part_of/*parent*", # Using underscore
63 | },
64 | )
65 |
66 | # Parse response
67 | assert len(result_underscore.content) == 1
68 | response_text = result_underscore.content[0].text # pyright: ignore
69 | assert '"results"' in response_text
70 |
71 | # Both relations should be found since they both connect to parent-entity
72 | # The system should normalize the underscore to hyphen internally
73 | assert "part-of" in response_text.lower()
74 |
75 | # Test 2: Search with hyphen format should also return results
76 | result_hyphen = await client.call_tool(
77 | "build_context",
78 | {
79 | "project": test_project.name,
80 | "url": "memory://testing/*/part-of/*parent*", # Using hyphen
81 | },
82 | )
83 |
84 | response_text_hyphen = result_hyphen.content[0].text # pyright: ignore
85 | assert '"results"' in response_text_hyphen
86 | assert "part-of" in response_text_hyphen.lower()
87 |
88 | # Test 3: Test with related_to/related-to as well
89 | result_related = await client.call_tool(
90 | "build_context",
91 | {
92 | "project": test_project.name,
93 | "url": "memory://testing/*/related_to/*parent*", # Using underscore
94 | },
95 | )
96 |
97 | response_text_related = result_related.content[0].text # pyright: ignore
98 | assert '"results"' in response_text_related
99 | assert "related-to" in response_text_related.lower()
100 |
101 | # Test 4: Test exact path (non-wildcard) with underscore
102 | # Exact relation permalink would be child/relation/target
103 | result_exact = await client.call_tool(
104 | "build_context",
105 | {
106 | "project": test_project.name,
107 | "url": "memory://testing/child-with-underscore/part_of/testing/parent-entity",
108 | },
109 | )
110 |
111 | response_text_exact = result_exact.content[0].text # pyright: ignore
112 | assert '"results"' in response_text_exact
113 | assert "part-of" in response_text_exact.lower()
114 |
115 |
116 | @pytest.mark.asyncio
117 | async def test_build_context_complex_underscore_paths(mcp_server, app, test_project):
118 | """Test build_context with complex paths containing underscores."""
119 |
120 | async with Client(mcp_server) as client:
121 | # Create notes with underscores in titles and relations
122 | await client.call_tool(
123 | "write_note",
124 | {
125 | "project": test_project.name,
126 | "title": "workflow_manager_agent",
127 | "folder": "specs",
128 | "content": """# Workflow Manager Agent
129 |
130 | Specification for the workflow manager agent.
131 | """,
132 | "tags": "spec,workflow",
133 | },
134 | )
135 |
136 | await client.call_tool(
137 | "write_note",
138 | {
139 | "project": test_project.name,
140 | "title": "task_parser",
141 | "folder": "components",
142 | "content": """# Task Parser
143 |
144 | - part_of [[workflow_manager_agent]]
145 | - implements_for [[workflow_manager_agent]]
146 | """,
147 | "tags": "component,parser",
148 | },
149 | )
150 |
151 | # Test with underscores in all parts of the path
152 | # Relations are created as: task-parser/part-of/workflow-manager-agent
153 | # So search for */part_of/* or */part-of/* to find them
154 | test_cases = [
155 | "memory://components/*/part_of/*workflow*",
156 | "memory://components/*/part-of/*workflow*",
157 | "memory://*/task*/part_of/*",
158 | "memory://*/task*/part-of/*",
159 | ]
160 |
161 | for url in test_cases:
162 | result = await client.call_tool(
163 | "build_context", {"project": test_project.name, "url": url}
164 | )
165 |
166 | # All variations should work and find the related content
167 | assert len(result.content) == 1
168 | response = result.content[0].text # pyright: ignore
169 | assert '"results"' in response
170 | # The relation should be found showing part-of connection
171 | assert "part-of" in response.lower(), f"Failed for URL: {url}"
172 |
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_utils.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for MCP tool utilities."""
2 |
3 | import pytest
4 | from httpx import HTTPStatusError
5 | from mcp.server.fastmcp.exceptions import ToolError
6 |
7 | from basic_memory.mcp.tools.utils import (
8 | call_get,
9 | call_post,
10 | call_put,
11 | call_delete,
12 | get_error_message,
13 | )
14 |
15 |
16 | @pytest.fixture
17 | def mock_response(monkeypatch):
18 | """Create a mock response."""
19 |
20 | class MockResponse:
21 | def __init__(self, status_code=200):
22 | self.status_code = status_code
23 | self.is_success = status_code < 400
24 | self.json = lambda: {}
25 |
26 | def raise_for_status(self):
27 | if self.status_code >= 400:
28 | raise HTTPStatusError(
29 | message=f"HTTP Error {self.status_code}", request=None, response=self
30 | )
31 |
32 | return MockResponse
33 |
34 |
35 | class _Client:
36 | def __init__(self):
37 | self.calls: list[tuple[str, tuple, dict]] = []
38 | self._responses: dict[str, object] = {}
39 |
40 | def set_response(self, method: str, response):
41 | self._responses[method.lower()] = response
42 |
43 | async def get(self, *args, **kwargs):
44 | self.calls.append(("get", args, kwargs))
45 | return self._responses["get"]
46 |
47 | async def post(self, *args, **kwargs):
48 | self.calls.append(("post", args, kwargs))
49 | return self._responses["post"]
50 |
51 | async def put(self, *args, **kwargs):
52 | self.calls.append(("put", args, kwargs))
53 | return self._responses["put"]
54 |
55 | async def delete(self, *args, **kwargs):
56 | self.calls.append(("delete", args, kwargs))
57 | return self._responses["delete"]
58 |
59 |
60 | @pytest.mark.asyncio
61 | async def test_call_get_success(mock_response):
62 | """Test successful GET request."""
63 | client = _Client()
64 | client.set_response("get", mock_response())
65 |
66 | response = await call_get(client, "http://test.com")
67 | assert response.status_code == 200
68 |
69 |
70 | @pytest.mark.asyncio
71 | async def test_call_get_error(mock_response):
72 | """Test GET request with error."""
73 | client = _Client()
74 | client.set_response("get", mock_response(404))
75 |
76 | with pytest.raises(ToolError) as exc:
77 | await call_get(client, "http://test.com")
78 | assert "Resource not found" in str(exc.value)
79 |
80 |
81 | @pytest.mark.asyncio
82 | async def test_call_post_success(mock_response):
83 | """Test successful POST request."""
84 | client = _Client()
85 | response = mock_response()
86 | response.json = lambda: {"test": "data"}
87 | client.set_response("post", response)
88 |
89 | response = await call_post(client, "http://test.com", json={"test": "data"})
90 | assert response.status_code == 200
91 |
92 |
93 | @pytest.mark.asyncio
94 | async def test_call_post_error(mock_response):
95 | """Test POST request with error."""
96 | client = _Client()
97 | response = mock_response(500)
98 | response.json = lambda: {"test": "error"}
99 |
100 | client.set_response("post", response)
101 |
102 | with pytest.raises(ToolError) as exc:
103 | await call_post(client, "http://test.com", json={"test": "data"})
104 | assert "Internal server error" in str(exc.value)
105 |
106 |
107 | @pytest.mark.asyncio
108 | async def test_call_put_success(mock_response):
109 | """Test successful PUT request."""
110 | client = _Client()
111 | client.set_response("put", mock_response())
112 |
113 | response = await call_put(client, "http://test.com", json={"test": "data"})
114 | assert response.status_code == 200
115 |
116 |
117 | @pytest.mark.asyncio
118 | async def test_call_put_error(mock_response):
119 | """Test PUT request with error."""
120 | client = _Client()
121 | client.set_response("put", mock_response(400))
122 |
123 | with pytest.raises(ToolError) as exc:
124 | await call_put(client, "http://test.com", json={"test": "data"})
125 | assert "Invalid request" in str(exc.value)
126 |
127 |
128 | @pytest.mark.asyncio
129 | async def test_call_delete_success(mock_response):
130 | """Test successful DELETE request."""
131 | client = _Client()
132 | client.set_response("delete", mock_response())
133 |
134 | response = await call_delete(client, "http://test.com")
135 | assert response.status_code == 200
136 |
137 |
138 | @pytest.mark.asyncio
139 | async def test_call_delete_error(mock_response):
140 | """Test DELETE request with error."""
141 | client = _Client()
142 | client.set_response("delete", mock_response(403))
143 |
144 | with pytest.raises(ToolError) as exc:
145 | await call_delete(client, "http://test.com")
146 | assert "Access denied" in str(exc.value)
147 |
148 |
149 | @pytest.mark.asyncio
150 | async def test_call_get_with_params(mock_response):
151 | """Test GET request with query parameters."""
152 | client = _Client()
153 | client.set_response("get", mock_response())
154 |
155 | params = {"key": "value", "test": "data"}
156 | await call_get(client, "http://test.com", params=params)
157 |
158 | assert len(client.calls) == 1
159 | method, _args, kwargs = client.calls[0]
160 | assert method == "get"
161 | assert kwargs["params"] == params
162 |
163 |
164 | @pytest.mark.asyncio
165 | async def test_get_error_message():
166 | """Test the get_error_message function."""
167 |
168 | # Test 400 status code
169 | message = get_error_message(400, "http://test.com/resource", "GET")
170 | assert "Invalid request" in message
171 | assert "resource" in message
172 |
173 | # Test 404 status code
174 | message = get_error_message(404, "http://test.com/missing", "GET")
175 | assert "Resource not found" in message
176 | assert "missing" in message
177 |
178 | # Test 500 status code
179 | message = get_error_message(500, "http://test.com/server", "POST")
180 | assert "Internal server error" in message
181 | assert "server" in message
182 |
183 | # Test URL object handling
184 | from httpx import URL
185 |
186 | url = URL("http://test.com/complex/path")
187 | message = get_error_message(403, url, "DELETE")
188 | assert "Access denied" in message
189 | assert "path" in message
190 |
191 |
192 | @pytest.mark.asyncio
193 | async def test_call_post_with_json(mock_response):
194 | """Test POST request with JSON payload."""
195 | client = _Client()
196 | response = mock_response()
197 | response.json = lambda: {"test": "data"}
198 |
199 | client.set_response("post", response)
200 |
201 | json_data = {"key": "value", "nested": {"test": "data"}}
202 | await call_post(client, "http://test.com", json=json_data)
203 |
204 | assert len(client.calls) == 1
205 | method, _args, kwargs = client.calls[0]
206 | assert method == "post"
207 | assert kwargs["json"] == json_data
208 |
```
--------------------------------------------------------------------------------
/test-int/test_disable_permalinks_integration.py:
--------------------------------------------------------------------------------
```python
1 | """Integration tests for the disable_permalinks configuration."""
2 |
3 | import pytest
4 |
5 | from basic_memory.markdown import EntityParser, MarkdownProcessor
6 | from basic_memory.repository import (
7 | EntityRepository,
8 | ObservationRepository,
9 | RelationRepository,
10 | ProjectRepository,
11 | )
12 | from basic_memory.repository.postgres_search_repository import PostgresSearchRepository
13 | from basic_memory.repository.sqlite_search_repository import SQLiteSearchRepository
14 | from basic_memory.schemas import Entity as EntitySchema
15 | from basic_memory.services import FileService
16 | from basic_memory.services.entity_service import EntityService
17 | from basic_memory.services.link_resolver import LinkResolver
18 | from basic_memory.services.search_service import SearchService
19 | from basic_memory.sync.sync_service import SyncService
20 |
21 |
22 | @pytest.mark.asyncio
23 | async def test_disable_permalinks_create_entity(tmp_path, engine_factory, app_config, test_project):
24 | """Test that entities created with disable_permalinks=True don't have permalinks."""
25 | from basic_memory.config import DatabaseBackend
26 |
27 | engine, session_maker = engine_factory
28 |
29 | # Override app config to enable disable_permalinks
30 | app_config.disable_permalinks = True
31 |
32 | # Setup repositories
33 | entity_repository = EntityRepository(session_maker, project_id=test_project.id)
34 | observation_repository = ObservationRepository(session_maker, project_id=test_project.id)
35 | relation_repository = RelationRepository(session_maker, project_id=test_project.id)
36 |
37 | # Use database-specific search repository
38 | if app_config.database_backend == DatabaseBackend.POSTGRES:
39 | search_repository = PostgresSearchRepository(session_maker, project_id=test_project.id)
40 | else:
41 | search_repository = SQLiteSearchRepository(session_maker, project_id=test_project.id)
42 |
43 | # Setup services
44 | entity_parser = EntityParser(tmp_path)
45 | markdown_processor = MarkdownProcessor(entity_parser)
46 | file_service = FileService(tmp_path, markdown_processor)
47 | search_service = SearchService(search_repository, entity_repository, file_service)
48 | await search_service.init_search_index()
49 | link_resolver = LinkResolver(entity_repository, search_service)
50 |
51 | entity_service = EntityService(
52 | entity_parser=entity_parser,
53 | entity_repository=entity_repository,
54 | observation_repository=observation_repository,
55 | relation_repository=relation_repository,
56 | file_service=file_service,
57 | link_resolver=link_resolver,
58 | app_config=app_config,
59 | )
60 |
61 | # Create entity via API
62 | entity_data = EntitySchema(
63 | title="Test Note",
64 | folder="test",
65 | entity_type="note",
66 | content="Test content",
67 | )
68 |
69 | created = await entity_service.create_entity(entity_data)
70 |
71 | # Verify entity has no permalink
72 | assert created.permalink is None
73 |
74 | # Verify file has no permalink in frontmatter
75 | file_path = tmp_path / "test" / "Test Note.md"
76 | assert file_path.exists()
77 | content = file_path.read_text()
78 | assert "permalink:" not in content
79 | assert "Test content" in content
80 |
81 |
82 | @pytest.mark.asyncio
83 | async def test_disable_permalinks_sync_workflow(tmp_path, engine_factory, app_config, test_project):
84 | """Test full sync workflow with disable_permalinks enabled."""
85 | from basic_memory.config import DatabaseBackend
86 |
87 | engine, session_maker = engine_factory
88 |
89 | # Override app config to enable disable_permalinks
90 | app_config.disable_permalinks = True
91 |
92 | # Create a test markdown file without frontmatter
93 | test_file = tmp_path / "test_note.md"
94 | test_file.write_text("# Test Note\nThis is test content.")
95 |
96 | # Setup repositories
97 | entity_repository = EntityRepository(session_maker, project_id=test_project.id)
98 | observation_repository = ObservationRepository(session_maker, project_id=test_project.id)
99 | relation_repository = RelationRepository(session_maker, project_id=test_project.id)
100 |
101 | # Use database-specific search repository
102 | if app_config.database_backend == DatabaseBackend.POSTGRES:
103 | search_repository = PostgresSearchRepository(session_maker, project_id=test_project.id)
104 | else:
105 | search_repository = SQLiteSearchRepository(session_maker, project_id=test_project.id)
106 |
107 | project_repository = ProjectRepository(session_maker)
108 |
109 | # Setup services
110 | entity_parser = EntityParser(tmp_path)
111 | markdown_processor = MarkdownProcessor(entity_parser)
112 | file_service = FileService(tmp_path, markdown_processor)
113 | search_service = SearchService(search_repository, entity_repository, file_service)
114 | await search_service.init_search_index()
115 | link_resolver = LinkResolver(entity_repository, search_service)
116 |
117 | entity_service = EntityService(
118 | entity_parser=entity_parser,
119 | entity_repository=entity_repository,
120 | observation_repository=observation_repository,
121 | relation_repository=relation_repository,
122 | file_service=file_service,
123 | link_resolver=link_resolver,
124 | app_config=app_config,
125 | )
126 |
127 | sync_service = SyncService(
128 | app_config=app_config,
129 | entity_service=entity_service,
130 | project_repository=project_repository,
131 | entity_parser=entity_parser,
132 | entity_repository=entity_repository,
133 | relation_repository=relation_repository,
134 | search_service=search_service,
135 | file_service=file_service,
136 | )
137 |
138 | # Run sync
139 | report = await sync_service.scan(tmp_path)
140 | # Note: scan may pick up database files too, so just check our file is there
141 | assert "test_note.md" in report.new
142 |
143 | # Sync the file
144 | await sync_service.sync_file("test_note.md", new=True)
145 |
146 | # Verify file has no permalink added
147 | content = test_file.read_text()
148 | assert "permalink:" not in content
149 | assert "# Test Note" in content
150 |
151 | # Verify entity in database has no permalink
152 | entities = await entity_repository.find_all()
153 | assert len(entities) == 1
154 | assert entities[0].permalink is None
155 | # Title is extracted from filename when no frontmatter, or from frontmatter when present
156 | assert entities[0].title in ("test_note", "Test Note")
157 |
```
--------------------------------------------------------------------------------
/tests/repository/test_repository.py:
--------------------------------------------------------------------------------
```python
1 | """Test repository implementation."""
2 |
3 | from datetime import datetime, UTC
4 | import pytest
5 | from sqlalchemy import String, DateTime
6 | from sqlalchemy.orm import Mapped, mapped_column
7 |
8 | from basic_memory.models import Base
9 | from basic_memory.repository.repository import Repository
10 |
11 |
12 | class ModelTest(Base):
13 | """Test model for repository tests."""
14 |
15 | __tablename__ = "test_model"
16 |
17 | id: Mapped[str] = mapped_column(String(255), primary_key=True)
18 | name: Mapped[str] = mapped_column(String(255))
19 | description: Mapped[str | None] = mapped_column(String(255), nullable=True)
20 | created_at: Mapped[datetime] = mapped_column(
21 | DateTime, default=lambda: datetime.now(UTC).replace(tzinfo=None)
22 | )
23 | updated_at: Mapped[datetime] = mapped_column(
24 | DateTime,
25 | default=lambda: datetime.now(UTC).replace(tzinfo=None),
26 | onupdate=lambda: datetime.now(UTC).replace(tzinfo=None),
27 | )
28 |
29 |
30 | @pytest.fixture
31 | def repository(session_maker):
32 | """Create a test repository."""
33 | return Repository(session_maker, ModelTest)
34 |
35 |
36 | @pytest.mark.asyncio
37 | async def test_add(repository):
38 | """Test bulk creation of entities."""
39 | # Create test instances
40 | instance = ModelTest(id="test_add", name="Test Add")
41 | await repository.add(instance)
42 |
43 | # Verify we can find in db
44 | found = await repository.find_by_id("test_add")
45 | assert found is not None
46 | assert found.name == "Test Add"
47 |
48 |
49 | @pytest.mark.asyncio
50 | async def test_add_all(repository):
51 | """Test bulk creation of entities."""
52 | # Create test instances
53 | instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]
54 | await repository.add_all(instances)
55 |
56 | # Verify we can find them in db
57 | found = await repository.find_by_id("test_0")
58 | assert found is not None
59 | assert found.name == "Test 0"
60 |
61 |
62 | @pytest.mark.asyncio
63 | async def test_bulk_create(repository):
64 | """Test bulk creation of entities."""
65 | # Create test instances
66 | instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]
67 |
68 | # Bulk create
69 | await repository.create_all([instance.__dict__ for instance in instances])
70 |
71 | # Verify we can find them in db
72 | found = await repository.find_by_id("test_0")
73 | assert found is not None
74 | assert found.name == "Test 0"
75 |
76 |
77 | @pytest.mark.asyncio
78 | async def test_find_all(repository):
79 | """Test finding multiple entities by IDs."""
80 | # Create test data
81 | instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
82 | await repository.create_all([instance.__dict__ for instance in instances])
83 |
84 | found = await repository.find_all(limit=3)
85 | assert len(found) == 3
86 |
87 |
88 | @pytest.mark.asyncio
89 | async def test_find_by_ids(repository):
90 | """Test finding multiple entities by IDs."""
91 | # Create test data
92 | instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
93 | await repository.create_all([instance.__dict__ for instance in instances])
94 |
95 | # Test finding subset of entities
96 | ids_to_find = ["test_0", "test_2", "test_4"]
97 | found = await repository.find_by_ids(ids_to_find)
98 | assert len(found) == 3
99 | assert sorted([e.id for e in found]) == sorted(ids_to_find)
100 |
101 | # Test finding with some non-existent IDs
102 | mixed_ids = ["test_0", "nonexistent", "test_4"]
103 | partial_found = await repository.find_by_ids(mixed_ids)
104 | assert len(partial_found) == 2
105 | assert sorted([e.id for e in partial_found]) == ["test_0", "test_4"]
106 |
107 | # Test with empty list
108 | empty_found = await repository.find_by_ids([])
109 | assert len(empty_found) == 0
110 |
111 | # Test with all non-existent IDs
112 | not_found = await repository.find_by_ids(["fake1", "fake2"])
113 | assert len(not_found) == 0
114 |
115 |
116 | @pytest.mark.asyncio
117 | async def test_delete_by_ids(repository):
118 | """Test finding multiple entities by IDs."""
119 | # Create test data
120 | instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
121 | await repository.create_all([instance.__dict__ for instance in instances])
122 |
123 | # Test delete subset of entities
124 | ids_to_delete = ["test_0", "test_2", "test_4"]
125 | deleted_count = await repository.delete_by_ids(ids_to_delete)
126 | assert deleted_count == 3
127 |
128 | # Test finding subset of entities
129 | ids_to_find = ["test_1", "test_3"]
130 | found = await repository.find_by_ids(ids_to_find)
131 | assert len(found) == 2
132 | assert sorted([e.id for e in found]) == sorted(ids_to_find)
133 |
134 | assert await repository.find_by_id(ids_to_delete[0]) is None
135 | assert await repository.find_by_id(ids_to_delete[1]) is None
136 | assert await repository.find_by_id(ids_to_delete[2]) is None
137 |
138 |
139 | @pytest.mark.asyncio
140 | async def test_update(repository):
141 | """Test finding entities modified since a timestamp."""
142 | # Create initial test data
143 | instance = ModelTest(id="test_add", name="Test Add")
144 | await repository.add(instance)
145 |
146 | instance = ModelTest(id="test_add", name="Updated")
147 |
148 | # Find recently modified
149 | modified = await repository.update(instance.id, {"name": "Updated"})
150 | assert modified is not None
151 | assert modified.name == "Updated"
152 |
153 |
154 | @pytest.mark.asyncio
155 | async def test_update_model(repository):
156 | """Test finding entities modified since a timestamp."""
157 | # Create initial test data
158 | instance = ModelTest(id="test_add", name="Test Add")
159 | await repository.add(instance)
160 |
161 | instance.name = "Updated"
162 |
163 | # Find recently modified
164 | modified = await repository.update(instance.id, instance)
165 | assert modified is not None
166 | assert modified.name == "Updated"
167 |
168 |
169 | @pytest.mark.asyncio
170 | async def test_update_model_not_found(repository):
171 | """Test finding entities modified since a timestamp."""
172 | # Create initial test data
173 | instance = ModelTest(id="test_add", name="Test Add")
174 | await repository.add(instance)
175 |
176 | modified = await repository.update("0", {}) # Use string ID for Postgres compatibility
177 | assert modified is None
178 |
179 |
180 | @pytest.mark.asyncio
181 | async def test_count(repository):
182 | """Test bulk creation of entities."""
183 | # Create test instances
184 | instance = ModelTest(id="test_add", name="Test Add")
185 | await repository.add(instance)
186 |
187 | # Verify we can count in db
188 | count = await repository.count()
189 | assert count == 1
190 |
```
--------------------------------------------------------------------------------
/tests/mcp/test_obsidian_yaml_formatting.py:
--------------------------------------------------------------------------------
```python
1 | """Integration tests for Obsidian-compatible YAML formatting in write_note tool."""
2 |
3 | import pytest
4 |
5 | from basic_memory.mcp.tools import write_note
6 |
7 |
8 | @pytest.mark.asyncio
9 | async def test_write_note_tags_yaml_format(app, project_config, test_project):
10 | """Test that write_note creates files with proper YAML list format for tags."""
11 | # Create a note with tags using write_note
12 | result = await write_note.fn(
13 | project=test_project.name,
14 | title="YAML Format Test",
15 | folder="test",
16 | content="Testing YAML tag formatting",
17 | tags=["system", "overview", "reference"],
18 | )
19 |
20 | # Verify the note was created successfully
21 | assert "Created note" in result
22 | assert "file_path: test/YAML Format Test.md" in result
23 |
24 | # Read the file directly to check YAML formatting
25 | file_path = project_config.home / "test" / "YAML Format Test.md"
26 | content = file_path.read_text(encoding="utf-8")
27 |
28 | # Should use YAML list format
29 | assert "tags:" in content
30 | assert "- system" in content
31 | assert "- overview" in content
32 | assert "- reference" in content
33 |
34 | # Should NOT use JSON array format
35 | assert '["system"' not in content
36 | assert '"overview"' not in content
37 | assert '"reference"]' not in content
38 |
39 |
40 | @pytest.mark.asyncio
41 | async def test_write_note_stringified_json_tags(app, project_config, test_project):
42 | """Test that stringified JSON arrays are handled correctly."""
43 | # This simulates the issue where AI assistants pass tags as stringified JSON
44 | result = await write_note.fn(
45 | project=test_project.name,
46 | title="Stringified JSON Test",
47 | folder="test",
48 | content="Testing stringified JSON tag input",
49 | tags='["python", "testing", "json"]', # Stringified JSON array
50 | )
51 |
52 | # Verify the note was created successfully
53 | assert "Created note" in result
54 |
55 | # Read the file to check formatting
56 | file_path = project_config.home / "test" / "Stringified JSON Test.md"
57 | content = file_path.read_text(encoding="utf-8")
58 |
59 | # Should properly parse the JSON and format as YAML list
60 | assert "tags:" in content
61 | assert "- python" in content
62 | assert "- testing" in content
63 | assert "- json" in content
64 |
65 | # Should NOT have the original stringified format issues
66 | assert '["python"' not in content
67 | assert '"testing"' not in content
68 | assert '"json"]' not in content
69 |
70 |
71 | @pytest.mark.asyncio
72 | async def test_write_note_single_tag_yaml_format(app, project_config, test_project):
73 | """Test that single tags are still formatted as YAML lists."""
74 | await write_note.fn(
75 | project=test_project.name,
76 | title="Single Tag Test",
77 | folder="test",
78 | content="Testing single tag formatting",
79 | tags=["solo-tag"],
80 | )
81 |
82 | file_path = project_config.home / "test" / "Single Tag Test.md"
83 | content = file_path.read_text(encoding="utf-8")
84 |
85 | # Single tag should still use list format
86 | assert "tags:" in content
87 | assert "- solo-tag" in content
88 |
89 |
90 | @pytest.mark.asyncio
91 | async def test_write_note_no_tags(app, project_config, test_project):
92 | """Test that notes without tags work normally."""
93 | await write_note.fn(
94 | project=test_project.name,
95 | title="No Tags Test",
96 | folder="test",
97 | content="Testing note without tags",
98 | tags=None,
99 | )
100 |
101 | file_path = project_config.home / "test" / "No Tags Test.md"
102 | content = file_path.read_text(encoding="utf-8")
103 |
104 | # Should not have tags field in frontmatter
105 | assert "tags:" not in content
106 | assert "title: No Tags Test" in content
107 |
108 |
109 | @pytest.mark.asyncio
110 | async def test_write_note_empty_tags_list(app, project_config, test_project):
111 | """Test that empty tag lists are handled properly."""
112 | await write_note.fn(
113 | project=test_project.name,
114 | title="Empty Tags Test",
115 | folder="test",
116 | content="Testing empty tag list",
117 | tags=[],
118 | )
119 |
120 | file_path = project_config.home / "test" / "Empty Tags Test.md"
121 | content = file_path.read_text(encoding="utf-8")
122 |
123 | # Should not add tags field to frontmatter for empty lists
124 | assert "tags:" not in content
125 |
126 |
127 | @pytest.mark.asyncio
128 | async def test_write_note_update_preserves_yaml_format(app, project_config, test_project):
129 | """Test that updating a note preserves the YAML list format."""
130 | # First, create the note
131 | await write_note.fn(
132 | project=test_project.name,
133 | title="Update Format Test",
134 | folder="test",
135 | content="Initial content",
136 | tags=["initial", "tag"],
137 | )
138 |
139 | # Then update it with new tags
140 | result = await write_note.fn(
141 | project=test_project.name,
142 | title="Update Format Test",
143 | folder="test",
144 | content="Updated content",
145 | tags=["updated", "new-tag", "format"],
146 | )
147 |
148 | # Should be an update, not a new creation
149 | assert "Updated note" in result
150 |
151 | # Check the file format
152 | file_path = project_config.home / "test" / "Update Format Test.md"
153 | content = file_path.read_text(encoding="utf-8")
154 |
155 | # Should have proper YAML formatting for updated tags
156 | assert "tags:" in content
157 | assert "- updated" in content
158 | assert "- new-tag" in content
159 | assert "- format" in content
160 |
161 | # Old tags should be gone
162 | assert "- initial" not in content
163 | assert "- tag" not in content
164 |
165 | # Content should be updated
166 | assert "Updated content" in content
167 | assert "Initial content" not in content
168 |
169 |
170 | @pytest.mark.asyncio
171 | async def test_complex_tags_yaml_format(app, project_config, test_project):
172 | """Test that complex tags with special characters format correctly."""
173 | await write_note.fn(
174 | project=test_project.name,
175 | title="Complex Tags Test",
176 | folder="test",
177 | content="Testing complex tag formats",
178 | tags=["python-3.9", "api_integration", "v2.0", "nested/category", "under_score"],
179 | )
180 |
181 | file_path = project_config.home / "test" / "Complex Tags Test.md"
182 | content = file_path.read_text(encoding="utf-8")
183 |
184 | # All complex tags should format correctly
185 | assert "- python-3.9" in content
186 | assert "- api_integration" in content
187 | assert "- v2.0" in content
188 | assert "- nested/category" in content
189 | assert "- under_score" in content
190 |
```
--------------------------------------------------------------------------------
/specs/SPEC-11 Basic Memory API Performance Optimization.md:
--------------------------------------------------------------------------------
```markdown
1 | ---
2 | title: 'SPEC-11: Basic Memory API Performance Optimization'
3 | type: spec
4 | permalink: specs/spec-11-basic-memory-api-performance-optimization
5 | tags:
6 | - performance
7 | - api
8 | - mcp
9 | - database
10 | - cloud
11 | ---
12 |
13 | # SPEC-11: Basic Memory API Performance Optimization
14 |
15 | ## Why
16 |
17 | The Basic Memory API experiences significant performance issues in cloud environments due to expensive per-request initialization. MCP tools making
18 | HTTP requests to the API suffer from 350ms-2.6s latency overhead **before** any actual operation occurs.
19 |
20 | **Root Cause Analysis:**
21 | - GitHub Issue #82 shows repeated initialization sequences in logs (16:29:35 and 16:49:58)
22 | - Each MCP tool call triggers full database initialization + project reconciliation
23 | - `get_engine_factory()` dependency calls `db.get_or_create_db()` on every request
24 | - `reconcile_projects_with_config()` runs expensive sync operations repeatedly
25 |
26 | **Performance Impact:**
27 | - Database connection setup: ~50-100ms per request
28 | - Migration checks: ~100-500ms per request
29 | - Project reconciliation: ~200ms-2s per request
30 | - **Total overhead**: ~350ms-2.6s per MCP tool call
31 |
32 | This creates compounding effects with tenant auto-start delays and increases timeout risk in cloud deployments.
33 |
34 | ## What
35 |
36 | This optimization affects the **core basic-memory repository** components:
37 |
38 | 1. **API Lifespan Management** (`src/basic_memory/api/app.py`)
39 | - Cache database connections in app state during startup
40 | - Avoid repeated expensive initialization
41 |
42 | 2. **Dependency Injection** (`src/basic_memory/deps.py`)
43 | - Modify `get_engine_factory()` to use cached connections
44 | - Eliminate per-request database setup
45 |
46 | 3. **Initialization Service** (`src/basic_memory/services/initialization.py`)
47 | - Add caching/throttling to project reconciliation
48 | - Skip expensive operations when appropriate
49 |
50 | 4. **Configuration** (`src/basic_memory/config.py`)
51 | - Add optional performance flags for cloud environments
52 |
53 | **Backwards Compatibility**: All changes must be backwards compatible with existing CLI and non-cloud usage.
54 |
55 | ## How (High Level)
56 |
57 | ### Phase 1: Cache Database Connections (Critical - 80% of gains)
58 |
59 | **Problem**: `get_engine_factory()` calls `db.get_or_create_db()` per request
60 | **Solution**: Cache database engine/session in app state during lifespan
61 |
62 | 1. **Modify API Lifespan** (`api/app.py`):
63 | ```python
64 | @asynccontextmanager
65 | async def lifespan(app: FastAPI):
66 | app_config = ConfigManager().config
67 | await initialize_app(app_config)
68 |
69 | # Cache database connection in app state
70 | engine, session_maker = await db.get_or_create_db(app_config.database_path)
71 | app.state.engine = engine
72 | app.state.session_maker = session_maker
73 |
74 | # ... rest of startup logic
75 | ```
76 |
77 | 2. Modify Dependency Injection (deps.py):
78 | ```python
79 | async def get_engine_factory(
80 | request: Request
81 | ) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
82 | """Get cached engine and session maker from app state."""
83 | return request.app.state.engine, request.app.state.session_maker
84 | ```
85 | Phase 2: Optimize Project Reconciliation (Secondary - 20% of gains)
86 |
87 | Problem: reconcile_projects_with_config() runs expensive sync repeatedly
88 | Solution: Add module-level caching with time-based throttling
89 |
90 | 1. Add Reconciliation Cache (services/initialization.py):
91 | ```ptyhon
92 | _project_reconciliation_completed = False
93 | _last_reconciliation_time = 0
94 |
95 | async def reconcile_projects_with_config(app_config, force=False):
96 | # Skip if recently completed (within 60 seconds) unless forced
97 | if recently_completed and not force:
98 | return
99 | # ... existing logic
100 | ```
101 | Phase 3: Cloud Environment Flags (Optional)
102 |
103 | Problem: Force expensive initialization in production environments
104 | Solution: Add skip flags for cloud/stateless deployments
105 |
106 | 1. Add Config Flag (config.py):
107 | skip_initialization_sync: bool = Field(default=False)
108 | 2. Configure in Cloud (basic-memory-cloud integration):
109 | BASIC_MEMORY_SKIP_INITIALIZATION_SYNC=true
110 |
111 | How to Evaluate
112 |
113 | Success Criteria
114 |
115 | 1. Performance Metrics (Primary):
116 | - MCP tool response time reduced by 50%+ (measure before/after)
117 | - Database connection overhead eliminated (0ms vs 50-100ms)
118 | - Migration check overhead eliminated (0ms vs 100-500ms)
119 | - Project reconciliation overhead reduced by 90%+
120 | 2. Load Testing:
121 | - Concurrent MCP tool calls maintain performance
122 | - No memory leaks in cached connections
123 | - Database connection pool behaves correctly
124 | 3. Functional Correctness:
125 | - All existing API endpoints work identically
126 | - MCP tools maintain full functionality
127 | - CLI operations unaffected
128 | - Database migrations still execute properly
129 | 4. Backwards Compatibility:
130 | - No breaking changes to existing APIs
131 | - Config changes are optional with safe defaults
132 | - Non-cloud deployments work unchanged
133 |
134 | Testing Strategy
135 |
136 | Performance Testing:
137 | # Before optimization
138 | time basic-memory-mcp-tools write_note "test" "content" "folder"
139 | # Measure: ~1-3 seconds
140 |
141 | # After optimization
142 | time basic-memory-mcp-tools write_note "test" "content" "folder"
143 | # Target: <500ms
144 |
145 | Load Testing:
146 | # Multiple concurrent MCP tool calls
147 | for i in {1..10}; do
148 | basic-memory-mcp-tools search "test" &
149 | done
150 | wait
151 | # Verify: No degradation, consistent response times
152 |
153 | Regression Testing:
154 | # Full basic-memory test suite
155 | just test
156 | # All tests must pass
157 |
158 | # Integration tests with cloud deployment
159 | # Verify MCP gateway → API → database flow works
160 |
161 | Validation Checklist
162 |
163 | - Phase 1 Complete: Database connections cached, dependency injection optimized
164 | - Performance Benchmark: 50%+ improvement in MCP tool response times
165 | - Memory Usage: No leaks in cached connections over 24h+ periods
166 | - Stress Testing: 100+ concurrent requests maintain performance
167 | - Backwards Compatibility: All existing functionality preserved
168 | - Documentation: Performance optimization documented in README
169 | - Cloud Integration: basic-memory-cloud sees performance benefits
170 |
171 | Notes
172 |
173 | Implementation Priority:
174 | - Phase 1 provides 80% of performance gains and should be implemented first
175 | - Phase 2 provides remaining 20% and addresses edge cases
176 | - Phase 3 is optional for maximum cloud optimization
177 |
178 | Risk Mitigation:
179 | - All changes backwards compatible
180 | - Gradual rollout possible (Phase 1 → 2 → 3)
181 | - Easy rollback via configuration flags
182 |
183 | Cloud Integration:
184 | - This optimization directly addresses basic-memory-cloud issue #82
185 | - Changes in core basic-memory will benefit all cloud tenants
186 | - No changes needed in basic-memory-cloud itself
187 |
```
--------------------------------------------------------------------------------
/specs/SPEC-1 Specification-Driven Development Process.md:
--------------------------------------------------------------------------------
```markdown
1 | ---
2 | title: 'SPEC-1: Specification-Driven Development Process'
3 | type: spec
4 | permalink: specs/spec-1-specification-driven-development-process
5 | tags:
6 | - process
7 | - specification
8 | - development
9 | - meta
10 | ---
11 |
12 | # SPEC-1: Specification-Driven Development Process
13 |
14 | ## Why
15 | We're implementing specification-driven development to solve the complexity and circular refactoring issues in our web development process.
16 | Instead of getting lost in framework details and type gymnastics, we start with clear specifications that drive implementation.
17 |
18 | The default approach of adhoc development with AI agents tends to result in:
19 | - Circular refactoring cycles
20 | - Fighting framework complexity
21 | - Lost context between sessions
22 | - Unclear requirements and scope
23 |
24 | ## What
25 | This spec defines our process for using basic-memory as the specification engine to build basic-memory-cloud.
26 | We're creating a recursive development pattern where basic-memory manages the specs that drive the development of basic-memory-cloud.
27 |
28 | **Affected Areas:**
29 | - All future component development
30 | - Architecture decisions
31 | - Agent collaboration workflows
32 | - Knowledge management and context preservation
33 |
34 | ## How (High Level)
35 |
36 | ### Specification Structure
37 |
38 | Name: Spec names should be numbered sequentially, followed by a description eg. `SPEC-X - Simple Description.md`.
39 | See: [[Spec-2: Slash Commands Reference]]
40 |
41 | Every spec is a complete thought containing:
42 | - **Why**: The reasoning and problem being solved
43 | - **What**: What is affected or changed
44 | - **How**: High-level approach to implementation
45 | - **How to Evaluate**: Testing/validation procedure
46 | - Additional context as needed
47 |
48 | ### Living Specification Format
49 |
50 | Specifications are **living documents** that evolve throughout implementation:
51 |
52 | **Progress Tracking:**
53 | - **Completed items**: Use ✅ checkmark emoji for implemented features
54 | - **Pending items**: Use `- [ ]` GitHub-style checkboxes for remaining tasks
55 | - **In-progress items**: Use `- [x]` when work is actively underway
56 |
57 | **Status Philosophy:**
58 | - **Avoid static status headers** like "COMPLETE" or "IN PROGRESS" that become stale
59 | - **Use checklists within content** to show granular implementation progress
60 | - **Keep specs informative** while providing clear progress visibility
61 | - **Update continuously** as understanding and implementation evolve
62 |
63 | **Example Format:**
64 | ```markdown
65 | ### ComponentName
66 | - ✅ Basic functionality implemented
67 | - ✅ Props and events defined
68 | - - [ ] Add sorting controls
69 | - - [ ] Improve accessibility
70 | - - [x] Currently implementing responsive design
71 | ```
72 |
73 | This creates **git-friendly progress tracking** where `[ ]` easily becomes `[x]` or ✅ when completed, and specs remain valuable throughout the development lifecycle.
74 |
75 |
76 | ## Claude Code
77 |
78 | We will leverage Claude Code capabilities to make the process semi-automated.
79 |
80 | - Slash commands: define repeatable steps in the process (create spec, implement, review, etc)
81 | - Agents: define roles to carry out instructions (front end developer, baskend developer, etc)
82 | - MCP tools: enable agents to implement specs via actions (write code, test, etc)
83 |
84 | ### Workflow
85 | 1. **Create**: Write spec as complete thought in `/specs` folder
86 | 2. **Discuss**: Iterate and refine through agent collaboration
87 | 3. **Implement**: Hand spec to appropriate specialist agent
88 | 4. **Validate**: Review implementation against spec criteria
89 | 5. **Document**: Update spec with learnings and decisions
90 |
91 | ### Slash Commands
92 |
93 | Claude slash commands are used to manage the flow.
94 | These are simple instructions to help make the process uniform.
95 | They can be updated and refined as needed.
96 |
97 | - `/spec create [name]` - Create new specification
98 | - `/spec status` - Show current spec states
99 | - `/spec implement [name]` - Hand to appropriate agent
100 | - `/spec review [name]` - Validate implementation
101 |
102 | ### Agent Orchestration
103 |
104 | Agents are defined with clear roles, for instance:
105 |
106 | - **system-architect**: Creates high-level specs, ADRs, architectural decisions
107 | - **vue-developer**: Component specs, UI patterns, frontend architecture
108 | - **python-developer**: Implementation specs, technical details, backend logic
109 | -
110 | - Each agent reads/updates specs through basic-memory tools.
111 |
112 | ## How to Evaluate
113 |
114 | ### Success Criteria
115 | - Specs provide clear, actionable guidance for implementation
116 | - Reduced circular refactoring and scope creep
117 | - Persistent context across development sessions
118 | - Clean separation between "what/why" and implementation details
119 | - Specs record a history of what happened and why for historical context
120 |
121 | ### Testing Procedure
122 | 1. Create a spec for an existing problematic component
123 | 2. Have an agent implement following only the spec
124 | 3. Compare result quality and development speed vs. ad-hoc approach
125 | 4. Measure context preservation across sessions
126 | 5. Evaluate spec clarity and completeness
127 |
128 | ### Metrics
129 | - Time from spec to working implementation
130 | - Number of refactoring cycles required
131 | - Agent understanding of requirements
132 | - Spec reusability for similar components
133 |
134 | ## Notes
135 | - Start simple: specs are just complete thoughts, not heavy processes
136 | - Use basic-memory's knowledge graph to link specs, decisions, components
137 | - Let the process evolve naturally based on what works
138 | - Focus on solving the actual problem: Manage complexity in development
139 |
140 | ## Observations
141 |
142 | - [problem] Web development without clear goals and documentation circular refactoring cycles #complexity
143 | - [solution] Specification-driven development reduces scope creep and context loss #process-improvement
144 | - [pattern] basic-memory as specification engine creates recursive development loop #meta-development
145 | - [workflow] Five-step process: Create → Discuss → Implement → Validate → Document #methodology
146 | - [tool] Slash commands provide uniform process automation #automation
147 | - [agent-pattern] Three specialized agents handle different implementation domains #specialization
148 | - [success-metric] Time from spec to working implementation measures process efficiency #measurement
149 | - [learning] Process should evolve naturally based on what works in practice #adaptation
150 | - [format] Living specifications use checklists for progress tracking instead of static status headers #documentation
151 | - [evolution] Specs evolve throughout implementation maintaining value as working documents #continuous-improvement
152 |
153 | ## Relations
154 |
155 | - spec [[Spec-2: Slash Commands Reference]]
156 | - spec [[Spec-3: Agent Definitions]]
157 |
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/recent_activity.py:
--------------------------------------------------------------------------------
```python
1 | """Recent activity prompts for Basic Memory MCP server.
2 |
3 | These prompts help users see what has changed in their knowledge base recently.
4 | """
5 |
6 | from typing import Annotated, Optional
7 |
8 | from loguru import logger
9 | from pydantic import Field
10 |
11 | from basic_memory.mcp.prompts.utils import format_prompt_context, PromptContext, PromptContextItem
12 | from basic_memory.mcp.server import mcp
13 | from basic_memory.mcp.tools.recent_activity import recent_activity
14 | from basic_memory.schemas.base import TimeFrame
15 | from basic_memory.schemas.memory import GraphContext, ProjectActivitySummary
16 | from basic_memory.schemas.search import SearchItemType
17 |
18 |
19 | @mcp.prompt(
20 | name="recent_activity",
21 | description="Get recent activity from a specific project or across all projects",
22 | )
23 | async def recent_activity_prompt(
24 | timeframe: Annotated[
25 | TimeFrame,
26 | Field(description="How far back to look for activity (e.g. '1d', '1 week')"),
27 | ] = "7d",
28 | project: Annotated[
29 | Optional[str],
30 | Field(
31 | description="Specific project to get activity from (None for discovery across all projects)"
32 | ),
33 | ] = None,
34 | ) -> str:
35 | """Get recent activity from a specific project or across all projects.
36 |
37 | This prompt helps you see what's changed recently in the knowledge base.
38 | In discovery mode (project=None), it shows activity across all projects.
39 | In project-specific mode, it shows detailed activity for one project.
40 |
41 | Args:
42 | timeframe: How far back to look for activity (e.g. '1d', '1 week')
43 | project: Specific project to get activity from (None for discovery across all projects)
44 |
45 | Returns:
46 | Formatted summary of recent activity
47 | """
48 | logger.info(f"Getting recent activity, timeframe: {timeframe}, project: {project}")
49 |
50 | recent = await recent_activity.fn(
51 | project=project, timeframe=timeframe, type=[SearchItemType.ENTITY]
52 | )
53 |
54 | # Extract primary results from the hierarchical structure
55 | primary_results = []
56 | related_results = []
57 |
58 | if isinstance(recent, ProjectActivitySummary):
59 | # Discovery mode - extract results from all projects
60 | for _, project_activity in recent.projects.items():
61 | if project_activity.activity.results:
62 | # Take up to 2 primary results per project
63 | for item in project_activity.activity.results[:2]:
64 | primary_results.append(item.primary_result)
65 | # Add up to 1 related result per primary item
66 | if item.related_results:
67 | related_results.extend(item.related_results[:1]) # pragma: no cover
68 |
69 | # Limit total results for readability
70 | primary_results = primary_results[:8]
71 | related_results = related_results[:6]
72 |
73 | elif isinstance(recent, GraphContext):
74 | # Project-specific mode - use existing logic
75 | if recent.results:
76 | # Take up to 5 primary results
77 | for item in recent.results[:5]:
78 | primary_results.append(item.primary_result)
79 | # Add up to 2 related results per primary item
80 | if item.related_results:
81 | related_results.extend(item.related_results[:2]) # pragma: no cover
82 |
83 | # Set topic based on mode
84 | if project:
85 | topic = f"Recent Activity in {project} ({timeframe})"
86 | else:
87 | topic = f"Recent Activity Across All Projects ({timeframe})"
88 |
89 | prompt_context = format_prompt_context(
90 | PromptContext(
91 | topic=topic,
92 | timeframe=timeframe,
93 | results=[
94 | PromptContextItem(
95 | primary_results=primary_results,
96 | related_results=related_results[:10], # Limit total related results
97 | )
98 | ],
99 | )
100 | )
101 |
102 | # Add mode-specific suggestions
103 | first_title = "Recent Topic"
104 | if primary_results and len(primary_results) > 0:
105 | first_title = primary_results[0].title
106 |
107 | if project:
108 | # Project-specific suggestions
109 | capture_suggestions = f"""
110 | ## Opportunity to Capture Activity Summary
111 |
112 | Consider creating a summary note of recent activity in {project}:
113 |
114 | ```python
115 | await write_note(
116 | "{project}",
117 | title="Activity Summary {timeframe}",
118 | content='''
119 | # Activity Summary for {project} ({timeframe})
120 |
121 | ## Overview
122 | [Summary of key changes and developments in this project over this period]
123 |
124 | ## Key Updates
125 | [List main updates and their significance within this project]
126 |
127 | ## Observations
128 | - [trend] [Observation about patterns in recent activity]
129 | - [insight] [Connection between different activities]
130 |
131 | ## Relations
132 | - summarizes [[{first_title}]]
133 | - relates_to [[{project} Overview]]
134 | ''',
135 | folder="summaries"
136 | )
137 | ```
138 |
139 | Summarizing periodic activity helps create high-level insights and connections within the project.
140 | """
141 | else:
142 | # Discovery mode suggestions
143 | project_count = len(recent.projects) if isinstance(recent, ProjectActivitySummary) else 0
144 | most_active = (
145 | getattr(recent.summary, "most_active_project", "Unknown")
146 | if isinstance(recent, ProjectActivitySummary)
147 | else "Unknown"
148 | )
149 |
150 | capture_suggestions = f"""
151 | ## Cross-Project Activity Discovery
152 |
153 | Found activity across {project_count} projects. Most active: **{most_active}**
154 |
155 | Consider creating a cross-project summary:
156 |
157 | ```python
158 | await write_note(
159 | "{most_active if most_active != "Unknown" else "main"}",
160 | title="Cross-Project Activity Summary {timeframe}",
161 | content='''
162 | # Cross-Project Activity Summary ({timeframe})
163 |
164 | ## Overview
165 | Activity found across {project_count} projects, with {most_active} showing the most activity.
166 |
167 | ## Key Developments
168 | [Summarize important changes across all projects]
169 |
170 | ## Project Insights
171 | [Note patterns or connections between projects]
172 |
173 | ## Observations
174 | - [trend] [Cross-project patterns observed]
175 | - [insight] [Connections between different project activities]
176 |
177 | ## Relations
178 | - summarizes [[{first_title}]]
179 | - relates_to [[Project Portfolio Overview]]
180 | ''',
181 | folder="summaries"
182 | )
183 | ```
184 |
185 | Cross-project summaries help identify broader trends and project interconnections.
186 | """
187 |
188 | return prompt_context + capture_suggestions
189 |
```
--------------------------------------------------------------------------------
/src/basic_memory/deps/importers.py:
--------------------------------------------------------------------------------
```python
1 | """Importer dependency injection for basic-memory.
2 |
3 | This module provides importer dependencies:
4 | - ChatGPTImporter
5 | - ClaudeConversationsImporter
6 | - ClaudeProjectsImporter
7 | - MemoryJsonImporter
8 | """
9 |
10 | from typing import Annotated
11 |
12 | from fastapi import Depends
13 |
14 | from basic_memory.deps.projects import (
15 | ProjectConfigDep,
16 | ProjectConfigV2Dep,
17 | ProjectConfigV2ExternalDep,
18 | )
19 | from basic_memory.deps.services import (
20 | FileServiceDep,
21 | FileServiceV2Dep,
22 | FileServiceV2ExternalDep,
23 | MarkdownProcessorDep,
24 | MarkdownProcessorV2Dep,
25 | MarkdownProcessorV2ExternalDep,
26 | )
27 | from basic_memory.importers import (
28 | ChatGPTImporter,
29 | ClaudeConversationsImporter,
30 | ClaudeProjectsImporter,
31 | MemoryJsonImporter,
32 | )
33 |
34 |
35 | # --- ChatGPT Importer ---
36 |
37 |
38 | async def get_chatgpt_importer(
39 | project_config: ProjectConfigDep,
40 | markdown_processor: MarkdownProcessorDep,
41 | file_service: FileServiceDep,
42 | ) -> ChatGPTImporter:
43 | """Create ChatGPTImporter with dependencies."""
44 | return ChatGPTImporter(project_config.home, markdown_processor, file_service)
45 |
46 |
47 | ChatGPTImporterDep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer)]
48 |
49 |
50 | async def get_chatgpt_importer_v2( # pragma: no cover
51 | project_config: ProjectConfigV2Dep,
52 | markdown_processor: MarkdownProcessorV2Dep,
53 | file_service: FileServiceV2Dep,
54 | ) -> ChatGPTImporter:
55 | """Create ChatGPTImporter with v2 dependencies."""
56 | return ChatGPTImporter(project_config.home, markdown_processor, file_service)
57 |
58 |
59 | ChatGPTImporterV2Dep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer_v2)]
60 |
61 |
62 | async def get_chatgpt_importer_v2_external(
63 | project_config: ProjectConfigV2ExternalDep,
64 | markdown_processor: MarkdownProcessorV2ExternalDep,
65 | file_service: FileServiceV2ExternalDep,
66 | ) -> ChatGPTImporter:
67 | """Create ChatGPTImporter with v2 external_id dependencies."""
68 | return ChatGPTImporter(project_config.home, markdown_processor, file_service)
69 |
70 |
71 | ChatGPTImporterV2ExternalDep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer_v2_external)]
72 |
73 |
74 | # --- Claude Conversations Importer ---
75 |
76 |
77 | async def get_claude_conversations_importer(
78 | project_config: ProjectConfigDep,
79 | markdown_processor: MarkdownProcessorDep,
80 | file_service: FileServiceDep,
81 | ) -> ClaudeConversationsImporter:
82 | """Create ClaudeConversationsImporter with dependencies."""
83 | return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)
84 |
85 |
86 | ClaudeConversationsImporterDep = Annotated[
87 | ClaudeConversationsImporter, Depends(get_claude_conversations_importer)
88 | ]
89 |
90 |
91 | async def get_claude_conversations_importer_v2( # pragma: no cover
92 | project_config: ProjectConfigV2Dep,
93 | markdown_processor: MarkdownProcessorV2Dep,
94 | file_service: FileServiceV2Dep,
95 | ) -> ClaudeConversationsImporter:
96 | """Create ClaudeConversationsImporter with v2 dependencies."""
97 | return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)
98 |
99 |
100 | ClaudeConversationsImporterV2Dep = Annotated[
101 | ClaudeConversationsImporter, Depends(get_claude_conversations_importer_v2)
102 | ]
103 |
104 |
105 | async def get_claude_conversations_importer_v2_external(
106 | project_config: ProjectConfigV2ExternalDep,
107 | markdown_processor: MarkdownProcessorV2ExternalDep,
108 | file_service: FileServiceV2ExternalDep,
109 | ) -> ClaudeConversationsImporter:
110 | """Create ClaudeConversationsImporter with v2 external_id dependencies."""
111 | return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)
112 |
113 |
114 | ClaudeConversationsImporterV2ExternalDep = Annotated[
115 | ClaudeConversationsImporter, Depends(get_claude_conversations_importer_v2_external)
116 | ]
117 |
118 |
119 | # --- Claude Projects Importer ---
120 |
121 |
122 | async def get_claude_projects_importer(
123 | project_config: ProjectConfigDep,
124 | markdown_processor: MarkdownProcessorDep,
125 | file_service: FileServiceDep,
126 | ) -> ClaudeProjectsImporter:
127 | """Create ClaudeProjectsImporter with dependencies."""
128 | return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)
129 |
130 |
131 | ClaudeProjectsImporterDep = Annotated[ClaudeProjectsImporter, Depends(get_claude_projects_importer)]
132 |
133 |
134 | async def get_claude_projects_importer_v2( # pragma: no cover
135 | project_config: ProjectConfigV2Dep,
136 | markdown_processor: MarkdownProcessorV2Dep,
137 | file_service: FileServiceV2Dep,
138 | ) -> ClaudeProjectsImporter:
139 | """Create ClaudeProjectsImporter with v2 dependencies."""
140 | return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)
141 |
142 |
143 | ClaudeProjectsImporterV2Dep = Annotated[
144 | ClaudeProjectsImporter, Depends(get_claude_projects_importer_v2)
145 | ]
146 |
147 |
148 | async def get_claude_projects_importer_v2_external(
149 | project_config: ProjectConfigV2ExternalDep,
150 | markdown_processor: MarkdownProcessorV2ExternalDep,
151 | file_service: FileServiceV2ExternalDep,
152 | ) -> ClaudeProjectsImporter:
153 | """Create ClaudeProjectsImporter with v2 external_id dependencies."""
154 | return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)
155 |
156 |
157 | ClaudeProjectsImporterV2ExternalDep = Annotated[
158 | ClaudeProjectsImporter, Depends(get_claude_projects_importer_v2_external)
159 | ]
160 |
161 |
162 | # --- Memory JSON Importer ---
163 |
164 |
165 | async def get_memory_json_importer(
166 | project_config: ProjectConfigDep,
167 | markdown_processor: MarkdownProcessorDep,
168 | file_service: FileServiceDep,
169 | ) -> MemoryJsonImporter:
170 | """Create MemoryJsonImporter with dependencies."""
171 | return MemoryJsonImporter(project_config.home, markdown_processor, file_service)
172 |
173 |
174 | MemoryJsonImporterDep = Annotated[MemoryJsonImporter, Depends(get_memory_json_importer)]
175 |
176 |
177 | async def get_memory_json_importer_v2( # pragma: no cover
178 | project_config: ProjectConfigV2Dep,
179 | markdown_processor: MarkdownProcessorV2Dep,
180 | file_service: FileServiceV2Dep,
181 | ) -> MemoryJsonImporter:
182 | """Create MemoryJsonImporter with v2 dependencies."""
183 | return MemoryJsonImporter(project_config.home, markdown_processor, file_service)
184 |
185 |
186 | MemoryJsonImporterV2Dep = Annotated[MemoryJsonImporter, Depends(get_memory_json_importer_v2)]
187 |
188 |
189 | async def get_memory_json_importer_v2_external(
190 | project_config: ProjectConfigV2ExternalDep,
191 | markdown_processor: MarkdownProcessorV2ExternalDep,
192 | file_service: FileServiceV2ExternalDep,
193 | ) -> MemoryJsonImporter:
194 | """Create MemoryJsonImporter with v2 external_id dependencies."""
195 | return MemoryJsonImporter(project_config.home, markdown_processor, file_service)
196 |
197 |
198 | MemoryJsonImporterV2ExternalDep = Annotated[
199 | MemoryJsonImporter, Depends(get_memory_json_importer_v2_external)
200 | ]
201 |
```
--------------------------------------------------------------------------------
/src/basic_memory/importers/claude_conversations_importer.py:
--------------------------------------------------------------------------------
```python
1 | """Claude conversations import service for Basic Memory."""
2 |
3 | import logging
4 | from datetime import datetime
5 | from typing import Any, Dict, List, Optional
6 |
7 | from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
8 | from basic_memory.importers.base import Importer
9 | from basic_memory.schemas.importer import ChatImportResult
10 | from basic_memory.importers.utils import clean_filename, format_timestamp
11 |
12 | logger = logging.getLogger(__name__)
13 |
14 |
15 | class ClaudeConversationsImporter(Importer[ChatImportResult]):
16 | """Service for importing Claude conversations."""
17 |
18 | def handle_error( # pragma: no cover
19 | self, message: str, error: Optional[Exception] = None
20 | ) -> ChatImportResult:
21 | """Return a failed ChatImportResult with an error message."""
22 | error_msg = f"{message}: {error}" if error else message
23 | return ChatImportResult(
24 | import_count={},
25 | success=False,
26 | error_message=error_msg,
27 | conversations=0,
28 | messages=0,
29 | )
30 |
31 | async def import_data(
32 | self, source_data, destination_folder: str, **kwargs: Any
33 | ) -> ChatImportResult:
34 | """Import conversations from Claude JSON export.
35 |
36 | Args:
37 | source_data: Path to the Claude conversations.json file.
38 | destination_folder: Destination folder within the project.
39 | **kwargs: Additional keyword arguments.
40 |
41 | Returns:
42 | ChatImportResult containing statistics and status of the import.
43 | """
44 | try:
45 | # Ensure the destination folder exists
46 | await self.ensure_folder_exists(destination_folder)
47 |
48 | conversations = source_data
49 |
50 | # Process each conversation
51 | messages_imported = 0
52 | chats_imported = 0
53 |
54 | for chat in conversations:
55 | # Get name, providing default for unnamed conversations
56 | chat_name = chat.get("name") or f"Conversation {chat.get('uuid', 'untitled')}"
57 |
58 | # Convert to entity
59 | entity = self._format_chat_content(
60 | folder=destination_folder,
61 | name=chat_name,
62 | messages=chat["chat_messages"],
63 | created_at=chat["created_at"],
64 | modified_at=chat["updated_at"],
65 | )
66 |
67 | # Write file using relative path - FileService handles base_path
68 | file_path = f"{entity.frontmatter.metadata['permalink']}.md"
69 | await self.write_entity(entity, file_path)
70 |
71 | chats_imported += 1
72 | messages_imported += len(chat["chat_messages"])
73 |
74 | return ChatImportResult(
75 | import_count={"conversations": chats_imported, "messages": messages_imported},
76 | success=True,
77 | conversations=chats_imported,
78 | messages=messages_imported,
79 | )
80 |
81 | except Exception as e: # pragma: no cover
82 | logger.exception("Failed to import Claude conversations")
83 | return self.handle_error("Failed to import Claude conversations", e)
84 |
85 | def _format_chat_content(
86 | self,
87 | folder: str,
88 | name: str,
89 | messages: List[Dict[str, Any]],
90 | created_at: str,
91 | modified_at: str,
92 | ) -> EntityMarkdown:
93 | """Convert chat messages to Basic Memory entity format.
94 |
95 | Args:
96 | folder: Destination folder name (relative path).
97 | name: Chat name.
98 | messages: List of chat messages.
99 | created_at: Creation timestamp.
100 | modified_at: Modification timestamp.
101 |
102 | Returns:
103 | EntityMarkdown instance representing the conversation.
104 | """
105 | # Generate permalink using folder name (relative path)
106 | date_prefix = datetime.fromisoformat(created_at.replace("Z", "+00:00")).strftime("%Y%m%d")
107 | clean_title = clean_filename(name)
108 | permalink = f"{folder}/{date_prefix}-{clean_title}"
109 |
110 | # Format content
111 | content = self._format_chat_markdown(
112 | name=name,
113 | messages=messages,
114 | created_at=created_at,
115 | modified_at=modified_at,
116 | permalink=permalink,
117 | )
118 |
119 | # Create entity
120 | entity = EntityMarkdown(
121 | frontmatter=EntityFrontmatter(
122 | metadata={
123 | "type": "conversation",
124 | "title": name,
125 | "created": created_at,
126 | "modified": modified_at,
127 | "permalink": permalink,
128 | }
129 | ),
130 | content=content,
131 | )
132 |
133 | return entity
134 |
135 | def _format_chat_markdown(
136 | self,
137 | name: str,
138 | messages: List[Dict[str, Any]],
139 | created_at: str,
140 | modified_at: str,
141 | permalink: str,
142 | ) -> str:
143 | """Format chat as clean markdown.
144 |
145 | Args:
146 | name: Chat name.
147 | messages: List of chat messages.
148 | created_at: Creation timestamp.
149 | modified_at: Modification timestamp.
150 | permalink: Permalink for the entity.
151 |
152 | Returns:
153 | Formatted markdown content.
154 | """
155 | # Start with frontmatter and title
156 | lines = [
157 | f"# {name}\n",
158 | ]
159 |
160 | # Add messages
161 | for msg in messages:
162 | # Format timestamp
163 | ts = format_timestamp(msg["created_at"])
164 |
165 | # Add message header
166 | lines.append(f"### {msg['sender'].title()} ({ts})")
167 |
168 | # Handle message content
169 | content = msg.get("text", "")
170 | if msg.get("content"):
171 | # Filter out None values before joining
172 | content = " ".join(
173 | str(c.get("text", ""))
174 | for c in msg["content"]
175 | if c and c.get("text") is not None
176 | )
177 | lines.append(content)
178 |
179 | # Handle attachments
180 | attachments = msg.get("attachments", [])
181 | for attachment in attachments:
182 | if "file_name" in attachment:
183 | lines.append(f"\n**Attachment: {attachment['file_name']}**")
184 | if "extracted_content" in attachment:
185 | lines.append("```")
186 | lines.append(attachment["extracted_content"])
187 | lines.append("```")
188 |
189 | # Add spacing between messages
190 | lines.append("")
191 |
192 | return "\n".join(lines)
193 |
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/status.py:
--------------------------------------------------------------------------------
```python
1 | """Status command for basic-memory CLI."""
2 |
3 | from typing import Set, Dict
4 | from typing import Annotated, Optional
5 |
6 | from mcp.server.fastmcp.exceptions import ToolError
7 | import typer
8 | from loguru import logger
9 | from rich.console import Console
10 | from rich.panel import Panel
11 | from rich.tree import Tree
12 |
13 | from basic_memory.cli.app import app
14 | from basic_memory.mcp.async_client import get_client
15 | from basic_memory.mcp.tools.utils import call_post
16 | from basic_memory.schemas import SyncReportResponse
17 | from basic_memory.mcp.project_context import get_active_project
18 |
19 | # Create rich console
20 | console = Console()
21 |
22 |
23 | def add_files_to_tree(
24 | tree: Tree, paths: Set[str], style: str, checksums: Dict[str, str] | None = None
25 | ):
26 | """Add files to tree, grouped by directory."""
27 | # Group by directory
28 | by_dir = {}
29 | for path in sorted(paths):
30 | parts = path.split("/", 1)
31 | dir_name = parts[0] if len(parts) > 1 else ""
32 | file_name = parts[1] if len(parts) > 1 else parts[0]
33 | by_dir.setdefault(dir_name, []).append((file_name, path))
34 |
35 | # Add to tree
36 | for dir_name, files in sorted(by_dir.items()):
37 | if dir_name:
38 | branch = tree.add(f"[bold]{dir_name}/[/bold]")
39 | else:
40 | branch = tree
41 |
42 | for file_name, full_path in sorted(files):
43 | if checksums and full_path in checksums:
44 | checksum_short = checksums[full_path][:8]
45 | branch.add(f"[{style}]{file_name}[/{style}] ({checksum_short})")
46 | else:
47 | branch.add(f"[{style}]{file_name}[/{style}]")
48 |
49 |
50 | def group_changes_by_directory(changes: SyncReportResponse) -> Dict[str, Dict[str, int]]:
51 | """Group changes by directory for summary view."""
52 | by_dir = {}
53 | for change_type, paths in [
54 | ("new", changes.new),
55 | ("modified", changes.modified),
56 | ("deleted", changes.deleted),
57 | ]:
58 | for path in paths:
59 | dir_name = path.split("/", 1)[0]
60 | by_dir.setdefault(dir_name, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
61 | by_dir[dir_name][change_type] += 1
62 |
63 | # Handle moves - count in both source and destination directories
64 | for old_path, new_path in changes.moves.items():
65 | old_dir = old_path.split("/", 1)[0]
66 | new_dir = new_path.split("/", 1)[0]
67 | by_dir.setdefault(old_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
68 | by_dir.setdefault(new_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
69 | by_dir[old_dir]["moved"] += 1
70 | if old_dir != new_dir:
71 | by_dir[new_dir]["moved"] += 1
72 |
73 | return by_dir
74 |
75 |
76 | def build_directory_summary(counts: Dict[str, int]) -> str:
77 | """Build summary string for directory changes."""
78 | parts = []
79 | if counts["new"]:
80 | parts.append(f"[green]+{counts['new']} new[/green]")
81 | if counts["modified"]:
82 | parts.append(f"[yellow]~{counts['modified']} modified[/yellow]")
83 | if counts["moved"]:
84 | parts.append(f"[blue]↔{counts['moved']} moved[/blue]")
85 | if counts["deleted"]:
86 | parts.append(f"[red]-{counts['deleted']} deleted[/red]")
87 | return " ".join(parts)
88 |
89 |
90 | def display_changes(
91 | project_name: str, title: str, changes: SyncReportResponse, verbose: bool = False
92 | ):
93 | """Display changes using Rich for better visualization."""
94 | tree = Tree(f"{project_name}: {title}")
95 |
96 | if changes.total == 0 and not changes.skipped_files:
97 | tree.add("No changes")
98 | console.print(Panel(tree, expand=False))
99 | return
100 |
101 | if verbose:
102 | # Full file listing with checksums
103 | if changes.new:
104 | new_branch = tree.add("[green]New Files[/green]")
105 | add_files_to_tree(new_branch, changes.new, "green", changes.checksums)
106 | if changes.modified:
107 | mod_branch = tree.add("[yellow]Modified[/yellow]")
108 | add_files_to_tree(mod_branch, changes.modified, "yellow", changes.checksums)
109 | if changes.moves:
110 | move_branch = tree.add("[blue]Moved[/blue]")
111 | for old_path, new_path in sorted(changes.moves.items()):
112 | move_branch.add(f"[blue]{old_path}[/blue] → [blue]{new_path}[/blue]")
113 | if changes.deleted:
114 | del_branch = tree.add("[red]Deleted[/red]")
115 | add_files_to_tree(del_branch, changes.deleted, "red")
116 | if changes.skipped_files:
117 | skip_branch = tree.add("[red]! Skipped (Circuit Breaker)[/red]")
118 | for skipped in sorted(changes.skipped_files, key=lambda x: x.path):
119 | skip_branch.add(
120 | f"[red]{skipped.path}[/red] "
121 | f"(failures: {skipped.failure_count}, reason: {skipped.reason})"
122 | )
123 | else:
124 | # Show directory summaries
125 | by_dir = group_changes_by_directory(changes)
126 | for dir_name, counts in sorted(by_dir.items()):
127 | summary = build_directory_summary(counts)
128 | if summary: # Only show directories with changes
129 | tree.add(f"[bold]{dir_name}/[/bold] {summary}")
130 |
131 | # Show skipped files summary in non-verbose mode
132 | if changes.skipped_files:
133 | skip_count = len(changes.skipped_files)
134 | tree.add(
135 | f"[red]! {skip_count} file{'s' if skip_count != 1 else ''} "
136 | f"skipped due to repeated failures[/red]"
137 | )
138 |
139 | console.print(Panel(tree, expand=False))
140 |
141 |
142 | async def run_status(project: Optional[str] = None, verbose: bool = False): # pragma: no cover
143 | """Check sync status of files vs database."""
144 |
145 | try:
146 | async with get_client() as client:
147 | project_item = await get_active_project(client, project, None)
148 | response = await call_post(client, f"{project_item.project_url}/project/status")
149 | sync_report = SyncReportResponse.model_validate(response.json())
150 |
151 | display_changes(project_item.name, "Status", sync_report, verbose)
152 |
153 | except (ValueError, ToolError) as e:
154 | console.print(f"[red]Error: {e}[/red]")
155 | raise typer.Exit(1)
156 |
157 |
158 | @app.command()
159 | def status(
160 | project: Annotated[
161 | Optional[str],
162 | typer.Option(help="The project name."),
163 | ] = None,
164 | verbose: bool = typer.Option(False, "--verbose", "-v", help="Show detailed file information"),
165 | ):
166 | """Show sync status between files and database."""
167 | from basic_memory.cli.commands.command_utils import run_with_cleanup
168 |
169 | try:
170 | run_with_cleanup(run_status(project, verbose)) # pragma: no cover
171 | except Exception as e:
172 | logger.error(f"Error checking status: {e}")
173 | typer.echo(f"Error checking status: {e}", err=True)
174 | raise typer.Exit(code=1) # pragma: no cover
175 |
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/g9a0b3c4d5e6_add_external_id_to_project_and_entity.py:
--------------------------------------------------------------------------------
```python
1 | """Add external_id UUID column to project and entity tables
2 |
3 | Revision ID: g9a0b3c4d5e6
4 | Revises: f8a9b2c3d4e5
5 | Create Date: 2025-12-29 10:00:00.000000
6 |
7 | """
8 |
9 | import uuid
10 | from typing import Sequence, Union
11 |
12 | import sqlalchemy as sa
13 | from alembic import op
14 | from sqlalchemy import text
15 |
16 |
17 | def column_exists(connection, table: str, column: str) -> bool:
18 | """Check if a column exists in a table (idempotent migration support)."""
19 | if connection.dialect.name == "postgresql":
20 | result = connection.execute(
21 | text(
22 | "SELECT 1 FROM information_schema.columns "
23 | "WHERE table_name = :table AND column_name = :column"
24 | ),
25 | {"table": table, "column": column},
26 | )
27 | return result.fetchone() is not None
28 | else:
29 | # SQLite
30 | result = connection.execute(text(f"PRAGMA table_info({table})"))
31 | columns = [row[1] for row in result]
32 | return column in columns
33 |
34 |
35 | def index_exists(connection, index_name: str) -> bool:
36 | """Check if an index exists (idempotent migration support)."""
37 | if connection.dialect.name == "postgresql":
38 | result = connection.execute(
39 | text("SELECT 1 FROM pg_indexes WHERE indexname = :index_name"),
40 | {"index_name": index_name},
41 | )
42 | return result.fetchone() is not None
43 | else:
44 | # SQLite
45 | result = connection.execute(
46 | text("SELECT 1 FROM sqlite_master WHERE type='index' AND name = :index_name"),
47 | {"index_name": index_name},
48 | )
49 | return result.fetchone() is not None
50 |
51 |
52 | # revision identifiers, used by Alembic.
53 | revision: str = "g9a0b3c4d5e6"
54 | down_revision: Union[str, None] = "f8a9b2c3d4e5"
55 | branch_labels: Union[str, Sequence[str], None] = None
56 | depends_on: Union[str, Sequence[str], None] = None
57 |
58 |
59 | def upgrade() -> None:
60 | """Add external_id UUID column to project and entity tables.
61 |
62 | This migration:
63 | 1. Adds external_id column to project table
64 | 2. Adds external_id column to entity table
65 | 3. Generates UUIDs for existing rows
66 | 4. Creates unique indexes on both columns
67 | """
68 | connection = op.get_bind()
69 | dialect = connection.dialect.name
70 |
71 | # -------------------------------------------------------------------------
72 | # Add external_id to project table
73 | # -------------------------------------------------------------------------
74 |
75 | if not column_exists(connection, "project", "external_id"):
76 | # Step 1: Add external_id column as nullable first
77 | op.add_column("project", sa.Column("external_id", sa.String(), nullable=True))
78 |
79 | # Step 2: Generate UUIDs for existing rows
80 | if dialect == "postgresql":
81 | # Postgres has gen_random_uuid() function
82 | op.execute("""
83 | UPDATE project
84 | SET external_id = gen_random_uuid()::text
85 | WHERE external_id IS NULL
86 | """)
87 | else:
88 | # SQLite: need to generate UUIDs in Python
89 | result = connection.execute(text("SELECT id FROM project WHERE external_id IS NULL"))
90 | for row in result:
91 | new_uuid = str(uuid.uuid4())
92 | connection.execute(
93 | text("UPDATE project SET external_id = :uuid WHERE id = :id"),
94 | {"uuid": new_uuid, "id": row[0]},
95 | )
96 |
97 | # Step 3: Make external_id NOT NULL
98 | if dialect == "postgresql":
99 | op.alter_column("project", "external_id", nullable=False)
100 | else:
101 | # SQLite requires batch operations for ALTER COLUMN
102 | with op.batch_alter_table("project") as batch_op:
103 | batch_op.alter_column("external_id", nullable=False)
104 |
105 | # Step 4: Create unique index on project.external_id (idempotent)
106 | if not index_exists(connection, "ix_project_external_id"):
107 | op.create_index("ix_project_external_id", "project", ["external_id"], unique=True)
108 |
109 | # -------------------------------------------------------------------------
110 | # Add external_id to entity table
111 | # -------------------------------------------------------------------------
112 |
113 | if not column_exists(connection, "entity", "external_id"):
114 | # Step 1: Add external_id column as nullable first
115 | op.add_column("entity", sa.Column("external_id", sa.String(), nullable=True))
116 |
117 | # Step 2: Generate UUIDs for existing rows
118 | if dialect == "postgresql":
119 | # Postgres has gen_random_uuid() function
120 | op.execute("""
121 | UPDATE entity
122 | SET external_id = gen_random_uuid()::text
123 | WHERE external_id IS NULL
124 | """)
125 | else:
126 | # SQLite: need to generate UUIDs in Python
127 | result = connection.execute(text("SELECT id FROM entity WHERE external_id IS NULL"))
128 | for row in result:
129 | new_uuid = str(uuid.uuid4())
130 | connection.execute(
131 | text("UPDATE entity SET external_id = :uuid WHERE id = :id"),
132 | {"uuid": new_uuid, "id": row[0]},
133 | )
134 |
135 | # Step 3: Make external_id NOT NULL
136 | if dialect == "postgresql":
137 | op.alter_column("entity", "external_id", nullable=False)
138 | else:
139 | # SQLite requires batch operations for ALTER COLUMN
140 | with op.batch_alter_table("entity") as batch_op:
141 | batch_op.alter_column("external_id", nullable=False)
142 |
143 | # Step 4: Create unique index on entity.external_id (idempotent)
144 | if not index_exists(connection, "ix_entity_external_id"):
145 | op.create_index("ix_entity_external_id", "entity", ["external_id"], unique=True)
146 |
147 |
148 | def downgrade() -> None:
149 | """Remove external_id columns from project and entity tables."""
150 | connection = op.get_bind()
151 | dialect = connection.dialect.name
152 |
153 | # Drop from entity table
154 | if index_exists(connection, "ix_entity_external_id"):
155 | op.drop_index("ix_entity_external_id", table_name="entity")
156 |
157 | if column_exists(connection, "entity", "external_id"):
158 | if dialect == "postgresql":
159 | op.drop_column("entity", "external_id")
160 | else:
161 | with op.batch_alter_table("entity") as batch_op:
162 | batch_op.drop_column("external_id")
163 |
164 | # Drop from project table
165 | if index_exists(connection, "ix_project_external_id"):
166 | op.drop_index("ix_project_external_id", table_name="project")
167 |
168 | if column_exists(connection, "project", "external_id"):
169 | if dialect == "postgresql":
170 | op.drop_column("project", "external_id")
171 | else:
172 | with op.batch_alter_table("project") as batch_op:
173 | batch_op.drop_column("external_id")
174 |
```
--------------------------------------------------------------------------------
/tests/api/test_search_router.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for search router."""
2 |
3 | from datetime import datetime, timezone
4 |
5 | import pytest
6 | import pytest_asyncio
7 | from sqlalchemy import text
8 |
9 | from basic_memory import db
10 | from basic_memory.schemas import Entity as EntitySchema
11 | from basic_memory.schemas.search import SearchItemType, SearchResponse
12 |
13 |
14 | @pytest_asyncio.fixture
15 | async def indexed_entity(full_entity, search_service):
16 | """Create an entity and index it."""
17 | await search_service.index_entity(full_entity)
18 | return full_entity
19 |
20 |
21 | @pytest.mark.asyncio
22 | async def test_search_basic(client, indexed_entity, project_url):
23 | """Test basic text search."""
24 | response = await client.post(f"{project_url}/search/", json={"text": "search"})
25 | assert response.status_code == 200
26 | search_results = SearchResponse.model_validate(response.json())
27 | assert len(search_results.results) == 3
28 |
29 | found = False
30 | for r in search_results.results:
31 | if r.type == SearchItemType.ENTITY.value:
32 | assert r.permalink == indexed_entity.permalink
33 | found = True
34 |
35 | assert found, "Expected to find indexed entity in results"
36 |
37 |
38 | @pytest.mark.asyncio
39 | async def test_search_basic_pagination(client, indexed_entity, project_url):
40 | """Test basic text search."""
41 | response = await client.post(
42 | f"{project_url}/search/?page=3&page_size=1", json={"text": "search"}
43 | )
44 | assert response.status_code == 200
45 | search_results = SearchResponse.model_validate(response.json())
46 | assert len(search_results.results) == 1
47 |
48 | assert search_results.current_page == 3
49 | assert search_results.page_size == 1
50 |
51 |
52 | @pytest.mark.asyncio
53 | async def test_search_with_entity_type_filter(client, indexed_entity, project_url):
54 | """Test search with type filter."""
55 | # Should find with correct type
56 | response = await client.post(
57 | f"{project_url}/search/",
58 | json={"text": "test", "entity_types": [SearchItemType.ENTITY.value]},
59 | )
60 | assert response.status_code == 200
61 | search_results = SearchResponse.model_validate(response.json())
62 | assert len(search_results.results) > 0
63 |
64 | # Should find with relation type
65 | response = await client.post(
66 | f"{project_url}/search/",
67 | json={"text": "test", "entity_types": [SearchItemType.RELATION.value]},
68 | )
69 | assert response.status_code == 200
70 | search_results = SearchResponse.model_validate(response.json())
71 | assert len(search_results.results) == 2
72 |
73 |
74 | @pytest.mark.asyncio
75 | async def test_search_with_type_filter(client, indexed_entity, project_url):
76 | """Test search with entity type filter."""
77 | # Should find with correct entity type
78 | response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["test"]})
79 | assert response.status_code == 200
80 | search_results = SearchResponse.model_validate(response.json())
81 | assert len(search_results.results) == 1
82 |
83 | # Should not find with wrong entity type
84 | response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["note"]})
85 | assert response.status_code == 200
86 | search_results = SearchResponse.model_validate(response.json())
87 | assert len(search_results.results) == 0
88 |
89 |
90 | @pytest.mark.asyncio
91 | async def test_search_with_date_filter(client, indexed_entity, project_url):
92 | """Test search with date filter."""
93 | # Should find with past date
94 | past_date = datetime(2020, 1, 1, tzinfo=timezone.utc)
95 | response = await client.post(
96 | f"{project_url}/search/", json={"text": "test", "after_date": past_date.isoformat()}
97 | )
98 | assert response.status_code == 200
99 | search_results = SearchResponse.model_validate(response.json())
100 |
101 | # Should not find with future date
102 | future_date = datetime(2030, 1, 1, tzinfo=timezone.utc)
103 | response = await client.post(
104 | f"{project_url}/search/", json={"text": "test", "after_date": future_date.isoformat()}
105 | )
106 | assert response.status_code == 200
107 | search_results = SearchResponse.model_validate(response.json())
108 | assert len(search_results.results) == 0
109 |
110 |
111 | @pytest.mark.asyncio
112 | async def test_search_empty(search_service, client, project_url):
113 | """Test search with no matches."""
114 | response = await client.post(f"{project_url}/search/", json={"text": "nonexistent"})
115 | assert response.status_code == 200
116 | search_result = SearchResponse.model_validate(response.json())
117 | assert len(search_result.results) == 0
118 |
119 |
120 | @pytest.mark.asyncio
121 | async def test_reindex(
122 | client, search_service, entity_service, session_maker, project_url, app_config
123 | ):
124 | """Test reindex endpoint."""
125 | # Skip for Postgres - needs investigation of database connection isolation
126 | from basic_memory.config import DatabaseBackend
127 |
128 | if app_config.database_backend == DatabaseBackend.POSTGRES:
129 | pytest.skip("Not yet supported for Postgres - database connection isolation issue")
130 |
131 | # Create test entity and document
132 | await entity_service.create_entity(
133 | EntitySchema(
134 | title="TestEntity1",
135 | folder="test",
136 | entity_type="test",
137 | ),
138 | )
139 |
140 | # Clear search index
141 | async with db.scoped_session(session_maker) as session:
142 | await session.execute(text("DELETE FROM search_index"))
143 | await session.commit()
144 |
145 | # Verify nothing is searchable
146 | response = await client.post(f"{project_url}/search/", json={"text": "test"})
147 | search_results = SearchResponse.model_validate(response.json())
148 | assert len(search_results.results) == 0
149 |
150 | # Trigger reindex
151 | reindex_response = await client.post(f"{project_url}/search/reindex")
152 | assert reindex_response.status_code == 200
153 | assert reindex_response.json()["status"] == "ok"
154 |
155 | # Verify content is searchable again
156 | search_response = await client.post(f"{project_url}/search/", json={"text": "test"})
157 | search_results = SearchResponse.model_validate(search_response.json())
158 | assert len(search_results.results) == 1
159 |
160 |
161 | @pytest.mark.asyncio
162 | async def test_multiple_filters(client, indexed_entity, project_url):
163 | """Test search with multiple filters combined."""
164 | response = await client.post(
165 | f"{project_url}/search/",
166 | json={
167 | "text": "test",
168 | "entity_types": [SearchItemType.ENTITY.value],
169 | "types": ["test"],
170 | "after_date": datetime(2020, 1, 1, tzinfo=timezone.utc).isoformat(),
171 | },
172 | )
173 | assert response.status_code == 200
174 | search_result = SearchResponse.model_validate(response.json())
175 | assert len(search_result.results) == 1
176 | result = search_result.results[0]
177 | assert result.permalink == indexed_entity.permalink
178 | assert result.type == SearchItemType.ENTITY.value
179 | assert result.metadata["entity_type"] == "test"
180 |
```