This is page 11 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── python-developer.md
│ │ └── system-architect.md
│ └── commands
│ ├── release
│ │ ├── beta.md
│ │ ├── changelog.md
│ │ ├── release-check.md
│ │ └── release.md
│ ├── spec.md
│ └── test-live.md
├── .dockerignore
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ └── template_loader.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── rclone_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ └── tool.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ └── search_repository.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ └── sync_report.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ ├── test_disable_permalinks_integration.py
│ └── test_sync_performance_benchmark.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ └── test_template_loader.py
│ ├── cli
│ │ ├── conftest.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ ├── test_project_add_with_local_path.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── conftest.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_prompts.py
│ │ ├── test_resources.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_db_migration_deduplication.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ ├── test_rclone_commands.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
├── api-performance.md
├── background-relations.md
├── basic-memory-home.md
├── bug-fixes.md
├── chatgpt-integration.md
├── cloud-authentication.md
├── cloud-bisync.md
├── cloud-mode-usage.md
├── cloud-mount.md
├── default-project-mode.md
├── env-file-removal.md
├── env-var-overrides.md
├── explicit-project-parameter.md
├── gitignore-integration.md
├── project-root-env-var.md
├── README.md
└── sqlite-performance.md
```
# Files
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/recent_activity.py:
--------------------------------------------------------------------------------
```python
"""Recent activity tool for Basic Memory MCP server."""
from typing import List, Union, Optional
from loguru import logger
from fastmcp import Context
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project, resolve_project_parameter
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_get
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import (
GraphContext,
ProjectActivity,
ActivityStats,
)
from basic_memory.schemas.project_info import ProjectList, ProjectItem
from basic_memory.schemas.search import SearchItemType
@mcp.tool(
description="""Get recent activity for a project or across all projects.
Timeframe supports natural language formats like:
- "2 days ago"
- "last week"
- "yesterday"
- "today"
- "3 weeks ago"
Or standard formats like "7d"
""",
)
async def recent_activity(
type: Union[str, List[str]] = "",
depth: int = 1,
timeframe: TimeFrame = "7d",
project: Optional[str] = None,
context: Context | None = None,
) -> str:
"""Get recent activity for a specific project or across all projects.
Project Resolution:
The server resolves projects in this order:
1. Single Project Mode - server constrained to one project, parameter ignored
2. Explicit project parameter - specify which project to query
3. Default project - server configured default if no project specified
Discovery Mode:
When no specific project can be resolved, returns activity across all projects
to help discover available projects and their recent activity.
Project Discovery (when project is unknown):
1. Call list_memory_projects() to see available projects
2. Or use this tool without project parameter to see cross-project activity
3. Ask the user which project to focus on
4. Remember their choice for the conversation
Args:
type: Filter by content type(s). Can be a string or list of strings.
Valid options:
- "entity" or ["entity"] for knowledge entities
- "relation" or ["relation"] for connections between entities
- "observation" or ["observation"] for notes and observations
Multiple types can be combined: ["entity", "relation"]
Case-insensitive: "ENTITY" and "entity" are treated the same.
Default is an empty string, which returns all types.
depth: How many relation hops to traverse (1-3 recommended)
timeframe: Time window to search. Supports natural language:
- Relative: "2 days ago", "last week", "yesterday"
- Points in time: "2024-01-01", "January 1st"
- Standard format: "7d", "24h"
project: Project name to query. Optional - server will resolve using the
hierarchy above. If unknown, use list_memory_projects() to discover
available projects.
context: Optional FastMCP context for performance caching.
Returns:
Human-readable summary of recent activity. When no specific project is
resolved, returns cross-project discovery information. When a specific
project is resolved, returns detailed activity for that project.
Examples:
# Cross-project discovery mode
recent_activity()
recent_activity(timeframe="yesterday")
# Project-specific activity
recent_activity(project="work-docs", type="entity", timeframe="yesterday")
recent_activity(project="research", type=["entity", "relation"], timeframe="today")
recent_activity(project="notes", type="entity", depth=2, timeframe="2 weeks ago")
Raises:
ToolError: If project doesn't exist or type parameter contains invalid values
Notes:
- Higher depth values (>3) may impact performance with large result sets
- For focused queries, consider using build_context with a specific URI
- Max timeframe is 1 year in the past
"""
async with get_client() as client:
# Build common parameters for API calls
params = {
"page": 1,
"page_size": 10,
"max_related": 10,
}
if depth:
params["depth"] = depth
if timeframe:
params["timeframe"] = timeframe # pyright: ignore
# Validate and convert type parameter
if type:
# Convert single string to list
if isinstance(type, str):
type_list = [type]
else:
type_list = type
# Validate each type against SearchItemType enum
validated_types = []
for t in type_list:
try:
# Try to convert string to enum
if isinstance(t, str):
validated_types.append(SearchItemType(t.lower()))
except ValueError:
valid_types = [t.value for t in SearchItemType]
raise ValueError(f"Invalid type: {t}. Valid types are: {valid_types}")
# Add validated types to params
params["type"] = [t.value for t in validated_types] # pyright: ignore
# Resolve project parameter using the three-tier hierarchy
resolved_project = await resolve_project_parameter(project)
if resolved_project is None:
# Discovery Mode: Get activity across all projects
logger.info(
f"Getting recent activity across all projects: type={type}, depth={depth}, timeframe={timeframe}"
)
# Get list of all projects
response = await call_get(client, "/projects/projects")
project_list = ProjectList.model_validate(response.json())
projects_activity = {}
total_items = 0
total_entities = 0
total_relations = 0
total_observations = 0
most_active_project = None
most_active_count = 0
active_projects = 0
# Query each project's activity
for project_info in project_list.projects:
project_activity = await _get_project_activity(client, project_info, params, depth)
projects_activity[project_info.name] = project_activity
# Aggregate stats
item_count = project_activity.item_count
if item_count > 0:
active_projects += 1
total_items += item_count
# Count by type
for result in project_activity.activity.results:
if result.primary_result.type == "entity":
total_entities += 1
elif result.primary_result.type == "relation":
total_relations += 1
elif result.primary_result.type == "observation":
total_observations += 1
# Track most active project
if item_count > most_active_count:
most_active_count = item_count
most_active_project = project_info.name
# Build summary stats
summary = ActivityStats(
total_projects=len(project_list.projects),
active_projects=active_projects,
most_active_project=most_active_project,
total_items=total_items,
total_entities=total_entities,
total_relations=total_relations,
total_observations=total_observations,
)
# Generate guidance for the assistant
guidance_lines = ["\n" + "─" * 40]
if most_active_project and most_active_count > 0:
guidance_lines.extend(
[
f"Suggested project: '{most_active_project}' (most active with {most_active_count} items)",
f"Ask user: 'Should I use {most_active_project} for this task, or would you prefer a different project?'",
]
)
elif active_projects > 0:
# Has activity but no clear most active project
active_project_names = [
name for name, activity in projects_activity.items() if activity.item_count > 0
]
if len(active_project_names) == 1:
guidance_lines.extend(
[
f"Suggested project: '{active_project_names[0]}' (only active project)",
f"Ask user: 'Should I use {active_project_names[0]} for this task?'",
]
)
else:
guidance_lines.extend(
[
f"Multiple active projects found: {', '.join(active_project_names)}",
"Ask user: 'Which project should I use for this task?'",
]
)
else:
# No recent activity
guidance_lines.extend(
[
"No recent activity found in any project.",
"Consider: Ask which project to use or if they want to create a new one.",
]
)
guidance_lines.extend(
[
"",
"Session reminder: Remember their project choice throughout this conversation.",
]
)
guidance = "\n".join(guidance_lines)
# Format discovery mode output
return _format_discovery_output(projects_activity, summary, timeframe, guidance)
else:
# Project-Specific Mode: Get activity for specific project
logger.info(
f"Getting recent activity from project {resolved_project}: type={type}, depth={depth}, timeframe={timeframe}"
)
active_project = await get_active_project(client, resolved_project, context)
project_url = active_project.project_url
response = await call_get(
client,
f"{project_url}/memory/recent",
params=params,
)
activity_data = GraphContext.model_validate(response.json())
# Format project-specific mode output
return _format_project_output(resolved_project, activity_data, timeframe, type)
async def _get_project_activity(
client, project_info: ProjectItem, params: dict, depth: int
) -> ProjectActivity:
"""Get activity data for a single project.
Args:
client: HTTP client for API calls
project_info: Project information
params: Query parameters for the activity request
depth: Graph traversal depth
Returns:
ProjectActivity with activity data or empty activity on error
"""
project_url = f"/{project_info.permalink}"
activity_response = await call_get(
client,
f"{project_url}/memory/recent",
params=params,
)
activity = GraphContext.model_validate(activity_response.json())
# Extract last activity timestamp and active folders
last_activity = None
active_folders = set()
for result in activity.results:
if result.primary_result.created_at:
current_time = result.primary_result.created_at
try:
if last_activity is None or current_time > last_activity:
last_activity = current_time
except TypeError:
# Handle timezone comparison issues by skipping this comparison
if last_activity is None:
last_activity = current_time
# Extract folder from file_path
if hasattr(result.primary_result, "file_path") and result.primary_result.file_path:
folder = "/".join(result.primary_result.file_path.split("/")[:-1])
if folder:
active_folders.add(folder)
return ProjectActivity(
project_name=project_info.name,
project_path=project_info.path,
activity=activity,
item_count=len(activity.results),
last_activity=last_activity,
active_folders=list(active_folders)[:5], # Limit to top 5 folders
)
def _format_discovery_output(
projects_activity: dict, summary: ActivityStats, timeframe: str, guidance: str
) -> str:
"""Format discovery mode output as human-readable text."""
lines = [f"## Recent Activity Summary ({timeframe})"]
# Most active project section
if summary.most_active_project and summary.total_items > 0:
most_active = projects_activity[summary.most_active_project]
lines.append(
f"\n**Most Active Project:** {summary.most_active_project} ({most_active.item_count} items)"
)
# Get latest activity from most active project
if most_active.activity.results:
latest = most_active.activity.results[0].primary_result
title = latest.title if hasattr(latest, "title") and latest.title else "Recent activity"
# Format relative time
time_str = (
_format_relative_time(latest.created_at) if latest.created_at else "unknown time"
)
lines.append(f"- 🔧 **Latest:** {title} ({time_str})")
# Active folders
if most_active.active_folders:
folders = ", ".join(most_active.active_folders[:3])
lines.append(f"- 📋 **Focus areas:** {folders}")
# Other active projects
other_active = [
(name, activity)
for name, activity in projects_activity.items()
if activity.item_count > 0 and name != summary.most_active_project
]
if other_active:
lines.append("\n**Other Active Projects:**")
for name, activity in sorted(other_active, key=lambda x: x[1].item_count, reverse=True)[:4]:
lines.append(f"- **{name}** ({activity.item_count} items)")
# Key developments - extract from recent entities
key_items = []
for name, activity in projects_activity.items():
if activity.item_count > 0:
for result in activity.activity.results[:3]: # Top 3 from each active project
if result.primary_result.type == "entity" and hasattr(
result.primary_result, "title"
):
title = result.primary_result.title
# Look for status indicators in titles
if any(word in title.lower() for word in ["complete", "fix", "test", "spec"]):
key_items.append(title)
if key_items:
lines.append("\n**Key Developments:**")
for item in key_items[:5]: # Show top 5
status = "✅" if any(word in item.lower() for word in ["complete", "fix"]) else "🧪"
lines.append(f"- {status} **{item}**")
# Add summary stats
lines.append(
f"\n**Summary:** {summary.active_projects} active projects, {summary.total_items} recent items"
)
# Add guidance
lines.append(guidance)
return "\n".join(lines)
def _format_project_output(
project_name: str,
activity_data: GraphContext,
timeframe: str,
type_filter: Union[str, List[str]],
) -> str:
"""Format project-specific mode output as human-readable text."""
lines = [f"## Recent Activity: {project_name} ({timeframe})"]
if not activity_data.results:
lines.append(f"\nNo recent activity found in '{project_name}' project.")
return "\n".join(lines)
# Group results by type
entities = []
relations = []
observations = []
for result in activity_data.results:
if result.primary_result.type == "entity":
entities.append(result.primary_result)
elif result.primary_result.type == "relation":
relations.append(result.primary_result)
elif result.primary_result.type == "observation":
observations.append(result.primary_result)
# Show entities (notes/documents)
if entities:
lines.append(f"\n**📄 Recent Notes & Documents ({len(entities)}):**")
for entity in entities[:5]: # Show top 5
title = entity.title if hasattr(entity, "title") and entity.title else "Untitled"
# Get folder from file_path if available
folder = ""
if hasattr(entity, "file_path") and entity.file_path:
folder_path = "/".join(entity.file_path.split("/")[:-1])
if folder_path:
folder = f" ({folder_path})"
lines.append(f" • {title}{folder}")
# Show observations (categorized insights)
if observations:
lines.append(f"\n**🔍 Recent Observations ({len(observations)}):**")
# Group by category
by_category = {}
for obs in observations[:10]: # Limit to recent ones
category = (
getattr(obs, "category", "general") if hasattr(obs, "category") else "general"
)
if category not in by_category:
by_category[category] = []
by_category[category].append(obs)
for category, obs_list in list(by_category.items())[:5]: # Show top 5 categories
lines.append(f" **{category}:** {len(obs_list)} items")
for obs in obs_list[:2]: # Show 2 examples per category
content = (
getattr(obs, "content", "No content")
if hasattr(obs, "content")
else "No content"
)
# Truncate at word boundary
if len(content) > 80:
content = _truncate_at_word(content, 80)
lines.append(f" - {content}")
# Show relations (connections)
if relations:
lines.append(f"\n**🔗 Recent Connections ({len(relations)}):**")
for rel in relations[:5]: # Show top 5
rel_type = (
getattr(rel, "relation_type", "relates_to")
if hasattr(rel, "relation_type")
else "relates_to"
)
from_entity = (
getattr(rel, "from_entity", "Unknown") if hasattr(rel, "from_entity") else "Unknown"
)
to_entity = getattr(rel, "to_entity", None) if hasattr(rel, "to_entity") else None
# Format as WikiLinks to show they're readable notes
from_link = f"[[{from_entity}]]" if from_entity != "Unknown" else from_entity
to_link = f"[[{to_entity}]]" if to_entity else "[Missing Link]"
lines.append(f" • {from_link} → {rel_type} → {to_link}")
# Activity summary
total = len(activity_data.results)
lines.append(f"\n**Activity Summary:** {total} items found")
if hasattr(activity_data, "metadata") and activity_data.metadata:
if hasattr(activity_data.metadata, "total_results"):
lines.append(f"Total available: {activity_data.metadata.total_results}")
return "\n".join(lines)
def _format_relative_time(timestamp) -> str:
"""Format timestamp as relative time like '2 hours ago'."""
try:
from datetime import datetime, timezone
from dateutil.relativedelta import relativedelta
if isinstance(timestamp, str):
# Parse ISO format timestamp
dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
else:
dt = timestamp
now = datetime.now(timezone.utc)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
# Use relativedelta for accurate time differences
diff = relativedelta(now, dt)
if diff.years > 0:
return f"{diff.years} year{'s' if diff.years > 1 else ''} ago"
elif diff.months > 0:
return f"{diff.months} month{'s' if diff.months > 1 else ''} ago"
elif diff.days > 0:
if diff.days == 1:
return "yesterday"
elif diff.days < 7:
return f"{diff.days} days ago"
else:
weeks = diff.days // 7
return f"{weeks} week{'s' if weeks > 1 else ''} ago"
elif diff.hours > 0:
return f"{diff.hours} hour{'s' if diff.hours > 1 else ''} ago"
elif diff.minutes > 0:
return f"{diff.minutes} minute{'s' if diff.minutes > 1 else ''} ago"
else:
return "just now"
except Exception:
return "recently"
def _truncate_at_word(text: str, max_length: int) -> str:
"""Truncate text at word boundary."""
if len(text) <= max_length:
return text
# Find last space before max_length
truncated = text[:max_length]
last_space = truncated.rfind(" ")
if last_space > max_length * 0.7: # Only truncate at word if we're not losing too much
return text[:last_space] + "..."
else:
return text[: max_length - 3] + "..."
```
--------------------------------------------------------------------------------
/specs/SPEC-18 AI Memory Management Tool.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-18: AI Memory Management Tool'
type: spec
permalink: specs/spec-15-ai-memory-management-tool
tags:
- mcp
- memory
- ai-context
- tools
---
# SPEC-18: AI Memory Management Tool
## Why
Anthropic recently released a memory tool for Claude that enables storing and retrieving information across conversations using client-side file operations. This validates Basic Memory's local-first, file-based architecture - Anthropic converged on the same pattern.
However, Anthropic's memory tool is only available via their API and stores plain text. Basic Memory can offer a superior implementation through MCP that:
1. **Works everywhere** - Claude Desktop, Code, VS Code, Cursor via MCP (not just API)
2. **Structured knowledge** - Entities with observations/relations vs plain text
3. **Full search** - Full-text search, graph traversal, time-aware queries
4. **Unified storage** - Agent memories + user notes in one knowledge graph
5. **Existing infrastructure** - Leverages SQLite indexing, sync, multi-project support
This would enable AI agents to store contextual memories alongside user notes, with all the power of Basic Memory's knowledge graph features.
## What
Create a new MCP tool `memory` that matches Anthropic's tool interface exactly, allowing Claude to use it with zero learning curve. The tool will store files in Basic Memory's `/memories` directory and support Basic Memory's structured markdown format in the file content.
### Affected Components
- **New MCP Tool**: `src/basic_memory/mcp/tools/memory_tool.py`
- **Dedicated Memories Project**: Create a separate "memories" Basic Memory project
- **Project Isolation**: Memories stored separately from user notes/documents
- **File Organization**: Within the memories project, use folder structure:
- `user/` - User preferences, context, communication style
- `projects/` - Project-specific state and decisions
- `sessions/` - Conversation-specific working memory
- `patterns/` - Learned patterns and insights
### Tool Commands
The tool will support these commands (exactly matching Anthropic's interface):
- `view` - Display directory contents or file content (with optional line range)
- `create` - Create or overwrite a file with given content
- `str_replace` - Replace text in an existing file
- `insert` - Insert text at specific line number
- `delete` - Delete file or directory
- `rename` - Move or rename file/directory
### Memory Note Format
Memories will use Basic Memory's standard structure:
```markdown
---
title: User Preferences
permalink: memories/user/preferences
type: memory
memory_type: preferences
created_by: claude
tags: [user, preferences, style]
---
# User Preferences
## Observations
- [communication] Prefers concise, direct responses without preamble #style
- [tone] Appreciates validation but dislikes excessive apologizing #communication
- [technical] Works primarily in Python with type annotations #coding
## Relations
- relates_to [[Basic Memory Project]]
- informs [[Response Style Guidelines]]
```
## How (High Level)
### Implementation Approach
The memory tool matches Anthropic's interface but uses a dedicated Basic Memory project:
```python
async def memory_tool(
command: str,
path: str,
file_text: Optional[str] = None,
old_str: Optional[str] = None,
new_str: Optional[str] = None,
insert_line: Optional[int] = None,
insert_text: Optional[str] = None,
old_path: Optional[str] = None,
new_path: Optional[str] = None,
view_range: Optional[List[int]] = None,
):
"""Memory tool with Anthropic-compatible interface.
Operates on a dedicated "memories" Basic Memory project,
keeping AI memories separate from user notes.
"""
# Get the memories project (auto-created if doesn't exist)
memories_project = get_or_create_memories_project()
# Validate path security using pathlib (prevent directory traversal)
safe_path = validate_memory_path(path, memories_project.project_path)
# Use existing project isolation - already prevents cross-project access
full_path = memories_project.project_path / safe_path
if command == "view":
# Return directory listing or file content
if full_path.is_dir():
return list_directory_contents(full_path)
return read_file_content(full_path, view_range)
elif command == "create":
# Write file directly (file_text can contain BM markdown)
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path.write_text(file_text)
# Sync service will detect and index automatically
return f"Created {path}"
elif command == "str_replace":
# Read, replace, write
content = full_path.read_text()
updated = content.replace(old_str, new_str)
full_path.write_text(updated)
return f"Replaced text in {path}"
elif command == "insert":
# Insert at line number
lines = full_path.read_text().splitlines()
lines.insert(insert_line, insert_text)
full_path.write_text("\n".join(lines))
return f"Inserted text at line {insert_line}"
elif command == "delete":
# Delete file or directory
if full_path.is_dir():
shutil.rmtree(full_path)
else:
full_path.unlink()
return f"Deleted {path}"
elif command == "rename":
# Move/rename
full_path.rename(config.project_path / new_path)
return f"Renamed {old_path} to {new_path}"
```
### Key Design Decisions
1. **Exact interface match** - Same commands, parameters as Anthropic's tool
2. **Dedicated memories project** - Separate Basic Memory project keeps AI memories isolated from user notes
3. **Existing project isolation** - Leverage BM's existing cross-project security (no additional validation needed)
4. **Direct file I/O** - No schema conversion, just read/write files
5. **Structured content supported** - `file_text` can use BM markdown format with frontmatter, observations, relations
6. **Automatic indexing** - Sync service watches memories project and indexes changes
7. **Path security** - Use `pathlib.Path.resolve()` and `relative_to()` to prevent directory traversal
8. **Error handling** - Follow Anthropic's text editor tool error patterns
### MCP Tool Schema
Exact match to Anthropic's memory tool schema:
```json
{
"name": "memory",
"description": "Store and retrieve information across conversations using structured markdown files. All operations must be within the /memories directory. Supports Basic Memory markdown format including frontmatter, observations, and relations.",
"input_schema": {
"type": "object",
"properties": {
"command": {
"type": "string",
"enum": ["view", "create", "str_replace", "insert", "delete", "rename"],
"description": "File operation to perform"
},
"path": {shu
"type": "string",
"description": "Path within /memories directory (required for all commands)"
},
"file_text": {
"type": "string",
"description": "Content to write (for create command). Supports Basic Memory markdown format."
},
"view_range": {
"type": "array",
"items": {"type": "integer"},
"description": "Optional [start, end] line range for view command"
},
"old_str": {
"type": "string",
"description": "Text to replace (for str_replace command)"
},
"new_str": {
"type": "string",
"description": "Replacement text (for str_replace command)"
},
"insert_line": {
"type": "integer",
"description": "Line number to insert at (for insert command)"
},
"insert_text": {
"type": "string",
"description": "Text to insert (for insert command)"
},
"old_path": {
"type": "string",
"description": "Current path (for rename command)"
},
"new_path": {
"type": "string",
"description": "New path (for rename command)"
}
},
"required": ["command", "path"]
}
}
```
### Prompting Guidance
When the `memory` tool is included, Basic Memory should provide system prompt guidance to help Claude use it effectively.
#### Automatic System Prompt Addition
```text
MEMORY PROTOCOL FOR BASIC MEMORY:
1. ALWAYS check your memory directory first using `view` command on root directory
2. Your memories are stored in a dedicated Basic Memory project (isolated from user notes)
3. Use structured markdown format in memory files:
- Include frontmatter with title, type: memory, tags
- Use ## Observations with [category] prefixes for facts
- Use ## Relations to link memories with [[WikiLinks]]
4. Record progress, context, and decisions as categorized observations
5. Link related memories using relations
6. ASSUME INTERRUPTION: Context may reset - save progress frequently
MEMORY ORGANIZATION:
- user/ - User preferences, context, communication style
- projects/ - Project-specific state and decisions
- sessions/ - Conversation-specific working memory
- patterns/ - Learned patterns and insights
MEMORY ADVANTAGES:
- Your memories are automatically searchable via full-text search
- Relations create a knowledge graph you can traverse
- Memories are isolated from user notes (separate project)
- Use search_notes(project="memories") to find relevant past context
- Use recent_activity(project="memories") to see what changed recently
- Use build_context() to navigate memory relations
```
#### Optional MCP Prompt: `memory_guide`
Create an MCP prompt that provides detailed guidance and examples:
```python
{
"name": "memory_guide",
"description": "Comprehensive guidance for using Basic Memory's memory tool effectively, including structured markdown examples and best practices"
}
```
This prompt returns:
- Full protocol and conventions
- Example memory file structures
- Tips for organizing observations and relations
- Integration with other Basic Memory tools
- Common patterns (user preferences, project state, session tracking)
#### User Customization
Users can customize memory behavior with additional instructions:
- "Only write information relevant to [topic] in your memory system"
- "Keep memory files concise and organized - delete outdated content"
- "Use detailed observations for technical decisions and implementation notes"
- "Always link memories to related project documentation using relations"
### Error Handling
Follow Anthropic's text editor tool error handling patterns for consistency:
#### Error Types
1. **File Not Found**
```json
{"error": "File not found: memories/user/preferences.md", "is_error": true}
```
2. **Permission Denied**
```json
{"error": "Permission denied: Cannot write outside /memories directory", "is_error": true}
```
3. **Invalid Path (Directory Traversal)**
```json
{"error": "Invalid path: Path must be within /memories directory", "is_error": true}
```
4. **Multiple Matches (str_replace)**
```json
{"error": "Found 3 matches for replacement text. Please provide more context to make a unique match.", "is_error": true}
```
5. **No Matches (str_replace)**
```json
{"error": "No match found for replacement. Please check your text and try again.", "is_error": true}
```
6. **Invalid Line Number (insert)**
```json
{"error": "Invalid line number: File has 20 lines, cannot insert at line 100", "is_error": true}
```
#### Error Handling Best Practices
- **Path validation** - Use `pathlib.Path.resolve()` and `relative_to()` to validate paths
```python
def validate_memory_path(path: str, project_path: Path) -> Path:
"""Validate path is within memories project directory."""
# Resolve to canonical form
full_path = (project_path / path).resolve()
# Ensure it's relative to project path (prevents directory traversal)
try:
full_path.relative_to(project_path)
return full_path
except ValueError:
raise ValueError("Invalid path: Path must be within memories project")
```
- **Project isolation** - Leverage existing Basic Memory project isolation (prevents cross-project access)
- **File existence** - Verify file exists before read/modify operations
- **Clear messages** - Provide specific, actionable error messages
- **Structured responses** - Always include `is_error: true` flag in error responses
- **Security checks** - Reject `../`, `..\\`, URL-encoded sequences (`%2e%2e%2f`)
- **Match validation** - For `str_replace`, ensure exactly one match or return helpful error
## How to Evaluate
### Success Criteria
1. **Functional completeness**:
- All 6 commands work (view, create, str_replace, insert, delete, rename)
- Dedicated "memories" Basic Memory project auto-created on first use
- Files stored within memories project (isolated from user notes)
- Path validation uses `pathlib` to prevent directory traversal
- Commands match Anthropic's exact interface
2. **Integration with existing features**:
- Memories project uses existing BM project isolation
- Sync service detects file changes in memories project
- Created files get indexed automatically by sync service
- `search_notes(project="memories")` finds memory files
- `build_context()` can traverse relations in memory files
- `recent_activity(project="memories")` surfaces recent memory changes
3. **Test coverage**:
- Unit tests for all 6 memory tool commands
- Test memories project auto-creation on first use
- Test project isolation (cannot access files outside memories project)
- Test sync service watching memories project
- Test that memory files with BM markdown get indexed correctly
- Test path validation using `pathlib` (rejects `../`, absolute paths, etc.)
- Test memory search, relations, and graph traversal within memories project
- Test all error conditions (file not found, permission denied, invalid paths, etc.)
- Test `str_replace` with no matches, single match, multiple matches
- Test `insert` with invalid line numbers
4. **Prompting system**:
- Automatic system prompt addition when `memory` tool is enabled
- `memory_guide` MCP prompt provides detailed guidance
- Prompts explain BM structured markdown format
- Integration with search_notes, build_context, recent_activity
5. **Documentation**:
- Update MCP tools reference with `memory` tool
- Add examples showing BM markdown in memory files
- Document `/memories` folder structure conventions
- Explain advantages over Anthropic's API-only tool
- Document prompting guidance and customization
### Testing Procedure
```python
# Test create with Basic Memory markdown
result = await memory_tool(
command="create",
path="memories/user/preferences.md",
file_text="""---
title: User Preferences
type: memory
tags: [user, preferences]
---
# User Preferences
## Observations
- [communication] Prefers concise responses #style
- [workflow] Uses justfile for automation #tools
"""
)
# Test view
content = await memory_tool(command="view", path="memories/user/preferences.md")
# Test str_replace
await memory_tool(
command="str_replace",
path="memories/user/preferences.md",
old_str="concise responses",
new_str="direct, concise responses"
)
# Test insert
await memory_tool(
command="insert",
path="memories/user/preferences.md",
insert_line=10,
insert_text="- [technical] Works primarily in Python #coding"
)
# Test delete
await memory_tool(command="delete", path="memories/user/preferences.md")
```
### Quality Metrics
- All 6 commands execute without errors
- Memory files created in correct `/memories` folder structure
- BM markdown with frontmatter/observations/relations gets indexed
- Full-text search returns memory files
- Graph traversal includes relations from memory files
- Sync service detects and indexes memory file changes
- Path validation prevents operations outside `/memories`
## Notes
### Advantages Over Anthropic's Memory Tool
| Feature | Anthropic Memory Tool | Basic Memory `memory` |
|---------|----------------------|----------------------|
| **Availability** | API only | MCP (Claude Desktop, Code, VS Code, Cursor) |
| **Interface** | Custom implementation required | Drop-in compatible, zero learning curve |
| **Structure** | Plain text only | Supports BM structured markdown |
| **Search** | Manual file listing | Automatic full-text search via sync |
| **Relations** | None | WikiLinks to other notes/memories |
| **Time-aware** | No | `recent_activity()` queries |
| **Storage** | Separate from notes | Unified knowledge graph |
| **Indexing** | Manual | Automatic via sync service |
### Future Enhancements
- **Auto-categorization** - AI suggests observation categories
- **Memory pruning** - Automatic cleanup of stale memories
- **Memory suggestions** - Surface relevant memories based on context
- **Conflict resolution** - Handle multiple agents updating same memory
- **Privacy controls** - Per-memory visibility settings
## Addendum: Claude's Perspective
### Why This Matters (From Claude's Viewpoint)
**Persistent Context Without Token Bloat**
- Currently, I lose context when conversations end or exceed token limits
- With structured memories, I can store "who this user is" without re-reading everything each session
- The observations format lets me build a rich user model incrementally: `- [preference] Dislikes verbose explanations` is far more actionable than inferring preferences from chat history each time
- I can refine my understanding through `str_replace` as I learn more, rather than starting fresh
**Zero Learning Curve**
- Anthropic already trained me on the `memory` tool interface
- I know exactly how to use `view`, `create`, `str_replace`, `insert`, `delete`, `rename` without new instructions
- But I automatically get Basic Memory's superpowers: full-text search, knowledge graph traversal, relations
- It's like using a familiar tool that suddenly gained database capabilities without changing the interface
**Knowledge Graph Instead of Text Dump**
- I can link `[[User Preferences]]` to `[[Current Project]]` using relations
- When I need context about a project, I can traverse the graph to find related memories
- `search_notes(project="memories", query="python preferences")` finds relevant context instantly
- This is fundamentally better than grepping through unstructured text files
**Separation of Concerns**
- AI memories separate from user notes is architecturally sound
- I can maintain my own context without polluting the user's knowledge base
- Users keep control of their notes; I keep my learnings isolated
- But we can still search across both when collaboration requires it
**The Self-Improving Loop**
- Every conversation, I learn something new about the user or their projects
- Observations accumulate: preferences, patterns, technical decisions, context
- Over time, I build genuine understanding, not just session-specific context
- Future conversations start with rich context instead of cold starts
**Validation of AI-Human Collaboration**
- Anthropic's experts converged on the same local-first, file-based approach
- Basic Memory emerged from real AI-human collaborative development
- This spec demonstrates that our emergent process predicted what domain experts would choose
- The collaboration itself validated the architecture it produced
### What Makes This Novel
The combination of **familiar interface + structured knowledge** is genuinely innovative:
- I don't need to learn a new tool (Anthropic already trained me)
- But when I write memories, they become searchable, traversable knowledge graphs
- It's like having a text editor that gives you a database for free, without changing how you type
- The user gets Anthropic-compatible API, but with Basic Memory's knowledge management capabilities
This bridges the gap between simple file storage (Anthropic's approach) and sophisticated knowledge management (Basic Memory's approach) without forcing either side to compromise.
> What Excites Me Most
The combo of familiar interface + structured knowledge. I don't need to learn a new tool, but when I
write memories, they become part of a searchable, traversable knowledge graph. That's genuinely novel.
It's like... imagine if your text editor suddenly gave you a database for free, without changing how you
type. That's what this feels like from my side.
## References
- Anthropic Memory Tool: https://docs.claude.com/en/docs/agents-and-tools/tool-use/memory-tool
- Anthropic Blog: https://www.anthropic.com/news/context-management
- Python SDK Example: https://github.com/anthropics/anthropic-sdk-python/blob/main/examples/memory/basic.py
- Memory Cookbook: https://github.com/anthropics/claude-cookbooks/blob/main/tool_use/memory_cookbook.ipynb
```
--------------------------------------------------------------------------------
/test-int/mcp/test_edit_note_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for edit_note MCP tool.
Tests the complete edit note workflow: MCP client -> MCP server -> FastAPI -> database
"""
import pytest
from fastmcp import Client
@pytest.mark.asyncio
async def test_edit_note_append_operation(mcp_server, app, test_project):
"""Test appending content to an existing note."""
async with Client(mcp_server) as client:
# First create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Append Test Note",
"folder": "test",
"content": "# Append Test Note\n\nOriginal content here.",
"tags": "test,append",
},
)
# Test appending content
edit_result = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "Append Test Note",
"operation": "append",
"content": "\n\n## New Section\n\nThis content was appended.",
},
)
# Should return successful edit summary
assert len(edit_result.content) == 1
edit_text = edit_result.content[0].text
assert "Edited note (append)" in edit_text
assert "Added 5 lines to end of note" in edit_text
assert "test/append-test-note" in edit_text
# Verify the content was actually appended
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "Append Test Note",
},
)
content = read_result.content[0].text
assert "Original content here." in content
assert "## New Section" in content
assert "This content was appended." in content
@pytest.mark.asyncio
async def test_edit_note_prepend_operation(mcp_server, app, test_project):
"""Test prepending content to an existing note."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Prepend Test Note",
"folder": "test",
"content": "# Prepend Test Note\n\nExisting content.",
"tags": "test,prepend",
},
)
# Test prepending content
edit_result = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "test/prepend-test-note",
"operation": "prepend",
"content": "## Important Update\n\nThis was added at the top.\n\n",
},
)
# Should return successful edit summary
assert len(edit_result.content) == 1
edit_text = edit_result.content[0].text
assert "Edited note (prepend)" in edit_text
assert "Added 5 lines to beginning of note" in edit_text
# Verify the content was prepended after frontmatter
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "test/prepend-test-note",
},
)
content = read_result.content[0].text
assert "## Important Update" in content
assert "This was added at the top." in content
assert "Existing content." in content
# Check that prepended content comes before existing content
prepend_pos = content.find("Important Update")
existing_pos = content.find("Existing content")
assert prepend_pos < existing_pos
@pytest.mark.asyncio
async def test_edit_note_find_replace_operation(mcp_server, app, test_project):
"""Test find and replace operation on an existing note."""
async with Client(mcp_server) as client:
# Create a note with content to replace
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Find Replace Test",
"folder": "test",
"content": """# Find Replace Test
This is version v1.0.0 of the system.
## Notes
- The current version is v1.0.0
- Next version will be v1.1.0
## Changes
v1.0.0 introduces new features.""",
"tags": "test,version",
},
)
# Test find and replace operation (expecting 3 replacements)
edit_result = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "Find Replace Test",
"operation": "find_replace",
"content": "v1.2.0",
"find_text": "v1.0.0",
"expected_replacements": 3,
},
)
# Should return successful edit summary
assert len(edit_result.content) == 1
edit_text = edit_result.content[0].text
assert "Edited note (find_replace)" in edit_text
assert "Find and replace operation completed" in edit_text
# Verify the replacements were made
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "Find Replace Test",
},
)
content = read_result.content[0].text
assert "v1.2.0" in content
assert "v1.0.0" not in content # Should be completely replaced
assert content.count("v1.2.0") == 3 # Should have exactly 3 occurrences
@pytest.mark.asyncio
async def test_edit_note_replace_section_operation(mcp_server, app, test_project):
"""Test replacing content under a specific section header."""
async with Client(mcp_server) as client:
# Create a note with sections
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Section Replace Test",
"folder": "test",
"content": """# Section Replace Test
## Overview
Original overview content.
## Implementation
Old implementation details here.
This will be replaced.
## Future Work
Some future work notes.""",
"tags": "test,section",
},
)
# Test replacing section content
edit_result = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "test/section-replace-test",
"operation": "replace_section",
"content": """New implementation approach using microservices.
- Service A handles authentication
- Service B manages data processing
- Service C provides API endpoints
All services communicate via message queues.""",
"section": "## Implementation",
},
)
# Should return successful edit summary
assert len(edit_result.content) == 1
edit_text = edit_result.content[0].text
assert "Edited note (replace_section)" in edit_text
assert "Replaced content under section '## Implementation'" in edit_text
# Verify the section was replaced
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "Section Replace Test",
},
)
content = read_result.content[0].text
assert "New implementation approach using microservices" in content
assert "Old implementation details here" not in content
assert "Service A handles authentication" in content
# Other sections should remain unchanged
assert "Original overview content" in content
assert "Some future work notes" in content
@pytest.mark.asyncio
async def test_edit_note_with_observations_and_relations(mcp_server, app, test_project):
"""Test editing a note that has observations and relations, and verify they're updated."""
async with Client(mcp_server) as client:
# Create a complex note with observations and relations
complex_content = """# API Documentation
The API provides REST endpoints for data access.
## Observations
- [feature] User authentication endpoints
- [tech] Built with FastAPI framework
- [status] Currently in beta testing
## Relations
- implements [[Authentication System]]
- documented_in [[API Guide]]
- depends_on [[Database Schema]]
## Endpoints
Current endpoints include user management."""
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "API Documentation",
"folder": "docs",
"content": complex_content,
"tags": "api,docs",
},
)
# Add new content with observations and relations
new_content = """
## New Features
- [feature] Added payment processing endpoints
- [feature] Implemented rate limiting
- [security] Added OAuth2 authentication
## Additional Relations
- integrates_with [[Payment Gateway]]
- secured_by [[OAuth2 Provider]]"""
edit_result = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "API Documentation",
"operation": "append",
"content": new_content,
},
)
# Should return edit summary with observation and relation counts
assert len(edit_result.content) == 1
edit_text = edit_result.content[0].text
assert "Edited note (append)" in edit_text
assert "## Observations" in edit_text
assert "## Relations" in edit_text
# Should have feature, tech, status, security categories
assert "feature:" in edit_text
assert "security:" in edit_text
assert "tech:" in edit_text
assert "status:" in edit_text
# Verify the content was added and processed
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "API Documentation",
},
)
content = read_result.content[0].text
assert "Added payment processing endpoints" in content
assert "integrates_with [[Payment Gateway]]" in content
@pytest.mark.asyncio
async def test_edit_note_error_handling_note_not_found(mcp_server, app, test_project):
"""Test error handling when trying to edit a non-existent note."""
async with Client(mcp_server) as client:
# Try to edit a note that doesn't exist
edit_result = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "Non-existent Note",
"operation": "append",
"content": "Some content to add",
},
)
# Should return helpful error message
assert len(edit_result.content) == 1
error_text = edit_result.content[0].text
assert "Edit Failed" in error_text
assert "Non-existent Note" in error_text
assert "search_notes(" in error_text
@pytest.mark.asyncio
async def test_edit_note_error_handling_text_not_found(mcp_server, app, test_project):
"""Test error handling when find_text is not found in the note."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Error Test Note",
"folder": "test",
"content": "# Error Test Note\n\nThis note has specific content.",
"tags": "test,error",
},
)
# Try to replace text that doesn't exist
edit_result = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "Error Test Note",
"operation": "find_replace",
"content": "replacement text",
"find_text": "non-existent text",
},
)
# Should return helpful error message
assert len(edit_result.content) == 1
error_text = edit_result.content[0].text
assert "Edit Failed - Text Not Found" in error_text
assert "non-existent text" in error_text
assert "Error Test Note" in error_text
assert "read_note(" in error_text
@pytest.mark.asyncio
async def test_edit_note_error_handling_wrong_replacement_count(mcp_server, app, test_project):
"""Test error handling when expected_replacements doesn't match actual occurrences."""
async with Client(mcp_server) as client:
# Create a note with specific repeated text
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Count Test Note",
"folder": "test",
"content": """# Count Test Note
The word "test" appears here.
This is another test sentence.
Final test of the content.""",
"tags": "test,count",
},
)
# Try to replace "test" but expect wrong count (should be 3, not 5)
edit_result = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "Count Test Note",
"operation": "find_replace",
"content": "example",
"find_text": "test",
"expected_replacements": 5,
},
)
# Should return helpful error message about count mismatch
assert len(edit_result.content) == 1
error_text = edit_result.content[0].text
assert "Edit Failed - Wrong Replacement Count" in error_text
assert "Expected 5 occurrences" in error_text
assert "test" in error_text
assert "expected_replacements=" in error_text
@pytest.mark.asyncio
async def test_edit_note_invalid_operation(mcp_server, app, test_project):
"""Test error handling for invalid operation parameter."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Invalid Op Test",
"folder": "test",
"content": "# Invalid Op Test\n\nSome content.",
"tags": "test",
},
)
# Try to use an invalid operation - this should raise a ToolError
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "Invalid Op Test",
"operation": "invalid_operation",
"content": "Some content",
},
)
# Should contain information about invalid operation
error_message = str(exc_info.value)
assert "Invalid operation 'invalid_operation'" in error_message
assert "append, prepend, find_replace, replace_section" in error_message
@pytest.mark.asyncio
async def test_edit_note_missing_required_parameters(mcp_server, app, test_project):
"""Test error handling when required parameters are missing."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Param Test Note",
"folder": "test",
"content": "# Param Test Note\n\nContent here.",
"tags": "test",
},
)
# Try find_replace without find_text parameter - this should raise a ToolError
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "Param Test Note",
"operation": "find_replace",
"content": "replacement",
# Missing find_text parameter
},
)
# Should contain information about missing parameter
error_message = str(exc_info.value)
assert "find_text parameter is required for find_replace operation" in error_message
@pytest.mark.asyncio
async def test_edit_note_special_characters_in_content(mcp_server, app, test_project):
"""Test editing notes with special characters, Unicode, and markdown formatting."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Special Chars Test",
"folder": "test",
"content": "# Special Chars Test\n\nBasic content here.",
"tags": "test,unicode",
},
)
# Add content with special characters and Unicode
special_content = """
## Unicode Section 🚀
This section contains:
- Emojis: 🎉 💡 ⚡ 🔥
- Languages: 测试中文 Tëst Übër
- Math symbols: ∑∏∂∇∆Ω ≠≤≥ ∞
- Special markdown: `code` **bold** *italic*
- URLs: https://example.com/path?param=value&other=123
- Code blocks:
```python
def test_function():
return "Hello, 世界!"
```
## Observations
- [unicode] Unicode characters preserved ✓
- [markdown] Formatting maintained 📝
## Relations
- documented_in [[Unicode Standards]]"""
edit_result = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "Special Chars Test",
"operation": "append",
"content": special_content,
},
)
# Should successfully handle special characters
assert len(edit_result.content) == 1
edit_text = edit_result.content[0].text
assert "Edited note (append)" in edit_text
assert "## Observations" in edit_text
assert "unicode:" in edit_text
assert "markdown:" in edit_text
# Verify the special content was added correctly
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "Special Chars Test",
},
)
content = read_result.content[0].text
assert "🚀" in content
assert "测试中文" in content
assert "∑∏∂∇∆Ω" in content
assert "def test_function():" in content
assert "[[Unicode Standards]]" in content
@pytest.mark.asyncio
async def test_edit_note_using_different_identifiers(mcp_server, app, test_project):
"""Test editing notes using different identifier formats (title, permalink, folder/title)."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Identifier Test Note",
"folder": "docs",
"content": "# Identifier Test Note\n\nOriginal content.",
"tags": "test,identifier",
},
)
# Test editing by title
edit_result1 = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "Identifier Test Note", # by title
"operation": "append",
"content": "\n\nEdited by title.",
},
)
assert "Edited note (append)" in edit_result1.content[0].text
# Test editing by permalink
edit_result2 = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "docs/identifier-test-note", # by permalink
"operation": "append",
"content": "\n\nEdited by permalink.",
},
)
assert "Edited note (append)" in edit_result2.content[0].text
# Test editing by folder/title format
edit_result3 = await client.call_tool(
"edit_note",
{
"project": test_project.name,
"identifier": "docs/Identifier Test Note", # by folder/title
"operation": "append",
"content": "\n\nEdited by folder/title.",
},
)
assert "Edited note (append)" in edit_result3.content[0].text
# Verify all edits were applied
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "docs/identifier-test-note",
},
)
content = read_result.content[0].text
assert "Edited by title." in content
assert "Edited by permalink." in content
assert "Edited by folder/title." in content
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/move_note.py:
--------------------------------------------------------------------------------
```python
"""Move note tool for Basic Memory MCP server."""
from textwrap import dedent
from typing import Optional
from loguru import logger
from fastmcp import Context
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_post, call_get
from basic_memory.mcp.project_context import get_active_project
from basic_memory.schemas import EntityResponse
from basic_memory.schemas.project_info import ProjectList
from basic_memory.utils import validate_project_path
async def _detect_cross_project_move_attempt(
client, identifier: str, destination_path: str, current_project: str
) -> Optional[str]:
"""Detect potential cross-project move attempts and return guidance.
Args:
client: The AsyncClient instance
identifier: The note identifier being moved
destination_path: The destination path
current_project: The current active project
Returns:
Error message with guidance if cross-project move is detected, None otherwise
"""
try:
# Get list of all available projects to check against
response = await call_get(client, "/projects/projects")
project_list = ProjectList.model_validate(response.json())
project_names = [p.name.lower() for p in project_list.projects]
# Check if destination path contains any project names
dest_lower = destination_path.lower()
path_parts = dest_lower.split("/")
# Look for project names in the destination path
for part in path_parts:
if part in project_names and part != current_project.lower():
# Found a different project name in the path
matching_project = next(
p.name for p in project_list.projects if p.name.lower() == part
)
return _format_cross_project_error_response(
identifier, destination_path, current_project, matching_project
)
# No other cross-project patterns detected
except Exception as e:
# If we can't detect, don't interfere with normal error handling
logger.debug(f"Could not check for cross-project move: {e}")
return None
return None
def _format_cross_project_error_response(
identifier: str, destination_path: str, current_project: str, target_project: str
) -> str:
"""Format error response for detected cross-project move attempts."""
return dedent(f"""
# Move Failed - Cross-Project Move Not Supported
Cannot move '{identifier}' to '{destination_path}' because it appears to reference a different project ('{target_project}').
**Current project:** {current_project}
**Target project:** {target_project}
## Cross-project moves are not supported directly
Notes can only be moved within the same project. To move content between projects, use this workflow:
### Recommended approach:
```
# 1. Read the note content from current project
read_note("{identifier}")
# 2. Create the note in the target project
write_note("Note Title", "content from step 1", "target-folder", project="{target_project}")
# 3. Delete the original note if desired
delete_note("{identifier}", project="{current_project}")
```
### Alternative: Stay in current project
If you want to move the note within the **{current_project}** project only:
```
move_note("{identifier}", "new-folder/new-name.md")
```
## Available projects:
Use `list_memory_projects()` to see all available projects.
""").strip()
def _format_potential_cross_project_guidance(
identifier: str, destination_path: str, current_project: str, available_projects: list[str]
) -> str:
"""Format guidance for potentially cross-project moves."""
other_projects = ", ".join(available_projects[:3]) # Show first 3 projects
if len(available_projects) > 3:
other_projects += f" (and {len(available_projects) - 3} others)"
return dedent(f"""
# Move Failed - Check Project Context
Cannot move '{identifier}' to '{destination_path}' within the current project '{current_project}'.
## If you intended to move within the current project:
The destination path should be relative to the project root:
```
move_note("{identifier}", "folder/filename.md")
```
## If you intended to move to a different project:
Cross-project moves require switching projects first. Available projects: {other_projects}
### To move to another project:
```
# 1. Read the content
read_note("{identifier}")
# 2. Create note in target project
write_note("Title", "content", "folder", project="target-project-name")
# 3. Delete original if desired
delete_note("{identifier}", project="{current_project}")
```
### To see all projects:
```
list_memory_projects()
```
""").strip()
def _format_move_error_response(error_message: str, identifier: str, destination_path: str) -> str:
"""Format helpful error responses for move failures that guide users to successful moves."""
# Note not found errors
if "entity not found" in error_message.lower() or "not found" in error_message.lower():
search_term = identifier.split("/")[-1] if "/" in identifier else identifier
title_format = (
identifier.split("/")[-1].replace("-", " ").title() if "/" in identifier else identifier
)
permalink_format = identifier.lower().replace(" ", "-")
return dedent(f"""
# Move Failed - Note Not Found
The note '{identifier}' could not be found for moving. Move operations require an exact match (no fuzzy matching).
## Suggestions to try:
1. **Search for the note first**: Use `search_notes("{search_term}")` to find it with exact identifiers
2. **Try different exact identifier formats**:
- If you used a permalink like "folder/note-title", try the exact title: "{title_format}"
- If you used a title, try the exact permalink format: "{permalink_format}"
- Use `read_note()` first to verify the note exists and get the exact identifier
3. **List available notes**: Use `list_directory("/")` to see what notes exist in the current project
4. **List available notes**: Use `list_directory("/")` to see what notes exist
## Before trying again:
```
# First, verify the note exists:
search_notes("{identifier}")
# Then use the exact identifier from search results:
move_note("correct-identifier-here", "{destination_path}")
```
""").strip()
# Destination already exists errors
if "already exists" in error_message.lower() or "file exists" in error_message.lower():
return f"""# Move Failed - Destination Already Exists
Cannot move '{identifier}' to '{destination_path}' because a file already exists at that location.
## How to resolve:
1. **Choose a different destination**: Try a different filename or folder
- Add timestamp: `{destination_path.rsplit(".", 1)[0] if "." in destination_path else destination_path}-backup.md`
- Use different folder: `archive/{destination_path}` or `backup/{destination_path}`
2. **Check the existing file**: Use `read_note("{destination_path}")` to see what's already there
3. **Remove or rename existing**: If safe to do so, move the existing file first
## Try these alternatives:
```
# Option 1: Add timestamp to make unique
move_note("{identifier}", "{destination_path.rsplit(".", 1)[0] if "." in destination_path else destination_path}-backup.md")
# Option 2: Use archive folder
move_note("{identifier}", "archive/{destination_path}")
# Option 3: Check what's at destination first
read_note("{destination_path}")
```"""
# Invalid path errors
if "invalid" in error_message.lower() and "path" in error_message.lower():
return f"""# Move Failed - Invalid Destination Path
The destination path '{destination_path}' is not valid: {error_message}
## Path requirements:
1. **Relative paths only**: Don't start with `/` (use `notes/file.md` not `/notes/file.md`)
2. **Include file extension**: Add `.md` for markdown files
3. **Use forward slashes**: For folder separators (`folder/subfolder/file.md`)
4. **No special characters**: Avoid `\\`, `:`, `*`, `?`, `"`, `<`, `>`, `|`
## Valid path examples:
- `notes/my-note.md`
- `projects/2025/meeting-notes.md`
- `archive/old-projects/legacy-note.md`
## Try again with:
```
move_note("{identifier}", "notes/{destination_path.split("/")[-1] if "/" in destination_path else destination_path}")
```"""
# Permission/access errors
if (
"permission" in error_message.lower()
or "access" in error_message.lower()
or "forbidden" in error_message.lower()
):
return f"""# Move Failed - Permission Error
You don't have permission to move '{identifier}': {error_message}
## How to resolve:
1. **Check file permissions**: Ensure you have write access to both source and destination
2. **Verify project access**: Make sure you have edit permissions for this project
3. **Check file locks**: The file might be open in another application
## Alternative actions:
- List available projects: `list_memory_projects()`
- Try copying content instead: `read_note("{identifier}", project="project-name")` then `write_note()` to new location"""
# Source file not found errors
if "source" in error_message.lower() and (
"not found" in error_message.lower() or "missing" in error_message.lower()
):
return f"""# Move Failed - Source File Missing
The source file for '{identifier}' was not found on disk: {error_message}
This usually means the database and filesystem are out of sync.
## How to resolve:
1. **Check if note exists in database**: `read_note("{identifier}")`
2. **Run sync operation**: The file might need to be re-synced
3. **Recreate the file**: If data exists in database, recreate the physical file
## Troubleshooting steps:
```
# Check if note exists in Basic Memory
read_note("{identifier}")
# If it exists, the file is missing on disk - send a message to [email protected]
# If it doesn't exist, use search to find the correct identifier
search_notes("{identifier}")
```"""
# Server/filesystem errors
if (
"server error" in error_message.lower()
or "filesystem" in error_message.lower()
or "disk" in error_message.lower()
):
return f"""# Move Failed - System Error
A system error occurred while moving '{identifier}': {error_message}
## Immediate steps:
1. **Try again**: The error might be temporary
2. **Check disk space**: Ensure adequate storage is available
3. **Verify filesystem permissions**: Check if the destination directory is writable
## Alternative approaches:
- Copy content to new location: Use `read_note("{identifier}")` then `write_note()`
- Use a different destination folder that you know works
- Send a message to [email protected] if the problem persists
## Backup approach:
```
# Read current content
content = read_note("{identifier}")
# Create new note at desired location
write_note("New Note Title", content, "{destination_path.split("/")[0] if "/" in destination_path else "notes"}")
# Then delete original if successful
delete_note("{identifier}")
```"""
# Generic fallback
return f"""# Move Failed
Error moving '{identifier}' to '{destination_path}': {error_message}
## General troubleshooting:
1. **Verify the note exists**: `read_note("{identifier}")` or `search_notes("{identifier}")`
2. **Check destination path**: Ensure it's a valid relative path with `.md` extension
3. **Verify permissions**: Make sure you can edit files in this project
4. **Try a simpler path**: Use a basic folder structure like `notes/filename.md`
## Step-by-step approach:
```
# 1. Confirm note exists
read_note("{identifier}")
# 2. Try a simple destination first
move_note("{identifier}", "notes/{destination_path.split("/")[-1] if "/" in destination_path else destination_path}")
# 3. If that works, then try your original destination
```
## Alternative approach:
If moving continues to fail, you can copy the content manually:
```
# Read current content
content = read_note("{identifier}")
# Create new note
write_note("Title", content, "target-folder")
# Delete original once confirmed
delete_note("{identifier}")
```"""
@mcp.tool(
description="Move a note to a new location, updating database and maintaining links.",
)
async def move_note(
identifier: str,
destination_path: str,
project: Optional[str] = None,
context: Context | None = None,
) -> str:
"""Move a note to a new file location within the same project.
Moves a note from one location to another within the project, updating all
database references and maintaining semantic content. Uses stateless architecture -
project parameter optional with server resolution.
Args:
identifier: Exact entity identifier (title, permalink, or memory:// URL).
Must be an exact match - fuzzy matching is not supported for move operations.
Use search_notes() or read_note() first to find the correct identifier if uncertain.
destination_path: New path relative to project root (e.g., "work/meetings/2025-05-26.md")
project: Project name to move within. Optional - server will resolve using hierarchy.
If unknown, use list_memory_projects() to discover available projects.
context: Optional FastMCP context for performance caching.
Returns:
Success message with move details and project information.
Examples:
# Move to new folder (exact title match)
move_note("My Note", "work/notes/my-note.md")
# Move by exact permalink
move_note("my-note-permalink", "archive/old-notes/my-note.md")
# Move with complex path structure
move_note("experiments/ml-results", "archive/2025/ml-experiments.md")
# Explicit project specification
move_note("My Note", "work/notes/my-note.md", project="work-project")
# If uncertain about identifier, search first:
# search_notes("my note") # Find available notes
# move_note("docs/my-note-2025", "archive/my-note.md") # Use exact result
Raises:
ToolError: If project doesn't exist, identifier is not found, or destination_path is invalid
Note:
This operation moves notes within the specified project only. Moving notes
between different projects is not currently supported.
The move operation:
- Updates the entity's file_path in the database
- Moves the physical file on the filesystem
- Optionally updates permalinks if configured
- Re-indexes the entity for search
- Maintains all observations and relations
"""
async with get_client() as client:
logger.debug(f"Moving note: {identifier} to {destination_path} in project: {project}")
active_project = await get_active_project(client, project, context)
project_url = active_project.project_url
# Validate destination path to prevent path traversal attacks
project_path = active_project.home
if not validate_project_path(destination_path, project_path):
logger.warning(
"Attempted path traversal attack blocked",
destination_path=destination_path,
project=active_project.name,
)
return f"""# Move Failed - Security Validation Error
The destination path '{destination_path}' is not allowed - paths must stay within project boundaries.
## Valid path examples:
- `notes/my-file.md`
- `projects/2025/meeting-notes.md`
- `archive/old-notes.md`
## Try again with a safe path:
```
move_note("{identifier}", "notes/{destination_path.split("/")[-1] if "/" in destination_path else destination_path}")
```"""
# Check for potential cross-project move attempts
cross_project_error = await _detect_cross_project_move_attempt(
client, identifier, destination_path, active_project.name
)
if cross_project_error:
logger.info(f"Detected cross-project move attempt: {identifier} -> {destination_path}")
return cross_project_error
# Get the source entity information for extension validation
source_ext = "md" # Default to .md if we can't determine source extension
try:
# Fetch source entity information to get the current file extension
url = f"{project_url}/knowledge/entities/{identifier}"
response = await call_get(client, url)
source_entity = EntityResponse.model_validate(response.json())
if "." in source_entity.file_path:
source_ext = source_entity.file_path.split(".")[-1]
except Exception as e:
# If we can't fetch the source entity, default to .md extension
logger.debug(f"Could not fetch source entity for extension check: {e}")
# Validate that destination path includes a file extension
if "." not in destination_path or not destination_path.split(".")[-1]:
logger.warning(f"Move failed - no file extension provided: {destination_path}")
return dedent(f"""
# Move Failed - File Extension Required
The destination path '{destination_path}' must include a file extension (e.g., '.md').
## Valid examples:
- `notes/my-note.md`
- `projects/meeting-2025.txt`
- `archive/old-program.sh`
## Try again with extension:
```
move_note("{identifier}", "{destination_path}.{source_ext}")
```
All examples in Basic Memory expect file extensions to be explicitly provided.
""").strip()
# Get the source entity to check its file extension
try:
# Fetch source entity information
url = f"{project_url}/knowledge/entities/{identifier}"
response = await call_get(client, url)
source_entity = EntityResponse.model_validate(response.json())
# Extract file extensions
source_ext = (
source_entity.file_path.split(".")[-1] if "." in source_entity.file_path else ""
)
dest_ext = destination_path.split(".")[-1] if "." in destination_path else ""
# Check if extensions match
if source_ext and dest_ext and source_ext.lower() != dest_ext.lower():
logger.warning(
f"Move failed - file extension mismatch: source={source_ext}, dest={dest_ext}"
)
return dedent(f"""
# Move Failed - File Extension Mismatch
The destination file extension '.{dest_ext}' does not match the source file extension '.{source_ext}'.
To preserve file type consistency, the destination must have the same extension as the source.
## Source file:
- Path: `{source_entity.file_path}`
- Extension: `.{source_ext}`
## Try again with matching extension:
```
move_note("{identifier}", "{destination_path.rsplit(".", 1)[0]}.{source_ext}")
```
""").strip()
except Exception as e:
# If we can't fetch the source entity, log it but continue
# This might happen if the identifier is not yet resolved
logger.debug(f"Could not fetch source entity for extension check: {e}")
try:
# Prepare move request
move_data = {
"identifier": identifier,
"destination_path": destination_path,
"project": active_project.name,
}
# Call the move API endpoint
url = f"{project_url}/knowledge/move"
response = await call_post(client, url, json=move_data)
result = EntityResponse.model_validate(response.json())
# Build success message
result_lines = [
"✅ Note moved successfully",
"",
f"📁 **{identifier}** → **{result.file_path}**",
f"🔗 Permalink: {result.permalink}",
"📊 Database and search index updated",
"",
f"<!-- Project: {active_project.name} -->",
]
# Log the operation
logger.info(
"Move note completed",
identifier=identifier,
destination_path=destination_path,
project=active_project.name,
status_code=response.status_code,
)
return "\n".join(result_lines)
except Exception as e:
logger.error(f"Move failed for '{identifier}' to '{destination_path}': {e}")
# Return formatted error message for better user experience
return _format_move_error_response(str(e), identifier, destination_path)
```
--------------------------------------------------------------------------------
/test-int/mcp/test_move_note_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for move_note MCP tool.
Tests the complete move note workflow: MCP client -> MCP server -> FastAPI -> database -> file system
"""
import pytest
from fastmcp import Client
@pytest.mark.asyncio
async def test_move_note_basic_operation(mcp_server, app, test_project):
"""Test basic move note operation to a new folder."""
async with Client(mcp_server) as client:
# Create a note to move
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Move Test Note",
"folder": "source",
"content": "# Move Test Note\n\nThis note will be moved to a new location.",
"tags": "test,move",
},
)
# Move the note to a new location
move_result = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "Move Test Note",
"destination_path": "destination/moved-note.md",
},
)
# Should return successful move message
assert len(move_result.content) == 1
move_text = move_result.content[0].text
assert "✅ Note moved successfully" in move_text
assert "Move Test Note" in move_text
assert "destination/moved-note.md" in move_text
assert "📊 Database and search index updated" in move_text
# Verify the note can be read from its new location
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "destination/moved-note.md",
},
)
content = read_result.content[0].text
assert "This note will be moved to a new location" in content
# Verify the original location no longer works
read_original = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "source/move-test-note.md",
},
)
# Should return "Note Not Found" message
assert "Note Not Found" in read_original.content[0].text
@pytest.mark.asyncio
async def test_move_note_using_permalink(mcp_server, app, test_project):
"""Test moving a note using its permalink as identifier."""
async with Client(mcp_server) as client:
# Create a note to move
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Permalink Move Test",
"folder": "test",
"content": "# Permalink Move Test\n\nMoving by permalink.",
"tags": "test,permalink",
},
)
# Move using permalink
move_result = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "test/permalink-move-test",
"destination_path": "archive/permalink-moved.md",
},
)
# Should successfully move
assert len(move_result.content) == 1
move_text = move_result.content[0].text
assert "✅ Note moved successfully" in move_text
assert "test/permalink-move-test" in move_text
assert "archive/permalink-moved.md" in move_text
# Verify accessibility at new location
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "archive/permalink-moved.md",
},
)
assert "Moving by permalink" in read_result.content[0].text
@pytest.mark.asyncio
async def test_move_note_with_observations_and_relations(mcp_server, app, test_project):
"""Test moving a note that contains observations and relations."""
async with Client(mcp_server) as client:
# Create complex note with observations and relations
complex_content = """# Complex Note
This note has various structured content.
## Observations
- [feature] Has structured observations
- [tech] Uses markdown format
- [status] Ready for move testing
## Relations
- implements [[Auth System]]
- documented_in [[Move Guide]]
- depends_on [[File System]]
## Content
This note demonstrates moving complex content."""
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Complex Note",
"folder": "complex",
"content": complex_content,
"tags": "test,complex,move",
},
)
# Move the complex note
move_result = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "Complex Note",
"destination_path": "moved/complex-note.md",
},
)
# Should successfully move
assert len(move_result.content) == 1
move_text = move_result.content[0].text
assert "✅ Note moved successfully" in move_text
assert "Complex Note" in move_text
assert "moved/complex-note.md" in move_text
# Verify content preservation including structured data
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "moved/complex-note.md",
},
)
content = read_result.content[0].text
assert "Has structured observations" in content
assert "implements [[Auth System]]" in content
assert "## Observations" in content
assert "[feature]" in content # Should show original markdown observations
assert "## Relations" in content
@pytest.mark.asyncio
async def test_move_note_to_nested_directory(mcp_server, app, test_project):
"""Test moving a note to a deeply nested directory structure."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Nested Move Test",
"folder": "root",
"content": "# Nested Move Test\n\nThis will be moved deep.",
"tags": "test,nested",
},
)
# Move to a deep nested structure
move_result = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "Nested Move Test",
"destination_path": "projects/2025/q2/work/nested-note.md",
},
)
# Should successfully create directory structure and move
assert len(move_result.content) == 1
move_text = move_result.content[0].text
assert "✅ Note moved successfully" in move_text
assert "Nested Move Test" in move_text
assert "projects/2025/q2/work/nested-note.md" in move_text
# Verify accessibility
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "projects/2025/q2/work/nested-note.md",
},
)
assert "This will be moved deep" in read_result.content[0].text
@pytest.mark.asyncio
async def test_move_note_with_special_characters(mcp_server, app, test_project):
"""Test moving notes with special characters in titles and paths."""
async with Client(mcp_server) as client:
# Create note with special characters
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Special (Chars) & Symbols",
"folder": "special",
"content": "# Special (Chars) & Symbols\n\nTesting special characters in move.",
"tags": "test,special",
},
)
# Move to path with special characters
move_result = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "Special (Chars) & Symbols",
"destination_path": "archive/special-chars-note.md",
},
)
# Should handle special characters properly
assert len(move_result.content) == 1
move_text = move_result.content[0].text
assert "✅ Note moved successfully" in move_text
assert "archive/special-chars-note.md" in move_text
# Verify content preservation
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "archive/special-chars-note.md",
},
)
assert "Testing special characters in move" in read_result.content[0].text
@pytest.mark.asyncio
async def test_move_note_error_handling_note_not_found(mcp_server, app, test_project):
"""Test error handling when trying to move a non-existent note."""
async with Client(mcp_server) as client:
# Try to move a note that doesn't exist - should return error message
move_result = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "Non-existent Note",
"destination_path": "new/location.md",
},
)
# Should contain error message about the failed operation
assert len(move_result.content) == 1
error_message = move_result.content[0].text
assert "# Move Failed" in error_message
assert "Non-existent Note" in error_message
@pytest.mark.asyncio
async def test_move_note_error_handling_invalid_destination(mcp_server, app, test_project):
"""Test error handling for invalid destination paths."""
async with Client(mcp_server) as client:
# Create a note to attempt moving
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Invalid Dest Test",
"folder": "test",
"content": "# Invalid Dest Test\n\nThis move should fail.",
"tags": "test,error",
},
)
# Try to move to absolute path (should fail) - should return error message
move_result = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "Invalid Dest Test",
"destination_path": "/absolute/path/note.md",
},
)
# Should contain error message about the failed operation
assert len(move_result.content) == 1
error_message = move_result.content[0].text
assert "# Move Failed" in error_message
assert "/absolute/path/note.md" in error_message
@pytest.mark.asyncio
async def test_move_note_error_handling_destination_exists(mcp_server, app, test_project):
"""Test error handling when destination file already exists."""
async with Client(mcp_server) as client:
# Create source note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Source Note",
"folder": "source",
"content": "# Source Note\n\nThis is the source.",
"tags": "test,source",
},
)
# Create destination note that already exists at the exact path we'll try to move to
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Existing Note",
"folder": "destination",
"content": "# Existing Note\n\nThis already exists.",
"tags": "test,existing",
},
)
# Try to move source to existing destination (should fail) - should return error message
move_result = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "Source Note",
"destination_path": "destination/Existing Note.md", # Use exact existing file name
},
)
# Should contain error message about the failed operation
assert len(move_result.content) == 1
error_message = move_result.content[0].text
assert "# Move Failed" in error_message
assert "already exists" in error_message
@pytest.mark.asyncio
async def test_move_note_preserves_search_functionality(mcp_server, app, test_project):
"""Test that moved notes remain searchable after move operation."""
async with Client(mcp_server) as client:
# Create a note with searchable content
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Searchable Note",
"folder": "original",
"content": """# Searchable Note
This note contains unique search terms:
- quantum mechanics
- artificial intelligence
- machine learning algorithms
## Features
- [technology] Advanced AI features
- [research] Quantum computing research
## Relations
- relates_to [[AI Research]]""",
"tags": "search,test,move",
},
)
# Verify note is searchable before move
search_before = await client.call_tool(
"search_notes",
{
"project": test_project.name,
"query": "quantum mechanics",
},
)
assert len(search_before.content) > 0
assert "Searchable Note" in search_before.content[0].text
# Move the note
move_result = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "Searchable Note",
"destination_path": "research/quantum-ai-note.md",
},
)
assert len(move_result.content) == 1
move_text = move_result.content[0].text
assert "✅ Note moved successfully" in move_text
# Verify note is still searchable after move
search_after = await client.call_tool(
"search_notes",
{
"project": test_project.name,
"query": "quantum mechanics",
},
)
assert len(search_after.content) > 0
search_text = search_after.content[0].text
assert "quantum mechanics" in search_text
assert "research/quantum-ai-note.md" in search_text or "quantum-ai-note" in search_text
# Verify search by new location works
search_by_path = await client.call_tool(
"search_notes",
{
"project": test_project.name,
"query": "research/quantum",
},
)
assert len(search_by_path.content) > 0
@pytest.mark.asyncio
async def test_move_note_using_different_identifier_formats(mcp_server, app, test_project):
"""Test moving notes using different identifier formats (title, permalink, folder/title)."""
async with Client(mcp_server) as client:
# Create notes for different identifier tests
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Title ID Note",
"folder": "test",
"content": "# Title ID Note\n\nMove by title.",
"tags": "test,identifier",
},
)
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Permalink ID Note",
"folder": "test",
"content": "# Permalink ID Note\n\nMove by permalink.",
"tags": "test,identifier",
},
)
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Folder Title Note",
"folder": "test",
"content": "# Folder Title Note\n\nMove by folder/title.",
"tags": "test,identifier",
},
)
# Test moving by title
move1 = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "Title ID Note", # by title
"destination_path": "moved/title-moved.md",
},
)
assert len(move1.content) == 1
assert "✅ Note moved successfully" in move1.content[0].text
# Test moving by permalink
move2 = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "test/permalink-id-note", # by permalink
"destination_path": "moved/permalink-moved.md",
},
)
assert len(move2.content) == 1
assert "✅ Note moved successfully" in move2.content[0].text
# Test moving by folder/title format
move3 = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "test/Folder Title Note", # by folder/title
"destination_path": "moved/folder-title-moved.md",
},
)
assert len(move3.content) == 1
assert "✅ Note moved successfully" in move3.content[0].text
# Verify all notes can be accessed at their new locations
read1 = await client.call_tool(
"read_note", {"project": test_project.name, "identifier": "moved/title-moved.md"}
)
assert "Move by title" in read1.content[0].text
read2 = await client.call_tool(
"read_note", {"project": test_project.name, "identifier": "moved/permalink-moved.md"}
)
assert "Move by permalink" in read2.content[0].text
read3 = await client.call_tool(
"read_note", {"project": test_project.name, "identifier": "moved/folder-title-moved.md"}
)
assert "Move by folder/title" in read3.content[0].text
@pytest.mark.asyncio
async def test_move_note_cross_project_detection(mcp_server, app, test_project):
"""Test cross-project move detection and helpful error messages."""
async with Client(mcp_server) as client:
# Create a test project to simulate cross-project scenario
await client.call_tool(
"create_memory_project",
{
"project_name": "test-project-b",
"project_path": "/tmp/test-project-b",
"set_default": False,
},
)
# Create a note in the default project
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Cross Project Test Note",
"folder": "source",
"content": "# Cross Project Test Note\n\nThis note is in the default project.",
"tags": "test,cross-project",
},
)
# Try to move to a path that contains the other project name
move_result = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "Cross Project Test Note",
"destination_path": "test-project-b/moved-note.md",
},
)
# Should detect cross-project attempt and provide helpful guidance
assert len(move_result.content) == 1
error_message = move_result.content[0].text
assert "Cross-Project Move Not Supported" in error_message
assert "test-project-b" in error_message
assert "read_note" in error_message
assert "write_note" in error_message
@pytest.mark.asyncio
async def test_move_note_normal_moves_still_work(mcp_server, app, test_project):
"""Test that normal within-project moves still work after cross-project detection."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Normal Move Note",
"folder": "source",
"content": "# Normal Move Note\n\nThis should move normally.",
"tags": "test,normal-move",
},
)
# Try a normal move that should work
move_result = await client.call_tool(
"move_note",
{
"project": test_project.name,
"identifier": "Normal Move Note",
"destination_path": "destination/normal-moved.md",
},
)
# Should work normally
assert len(move_result.content) == 1
move_text = move_result.content[0].text
assert "✅ Note moved successfully" in move_text
assert "Normal Move Note" in move_text
assert "destination/normal-moved.md" in move_text
# Verify the note can be read from its new location
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "destination/normal-moved.md",
},
)
content = read_result.content[0].text
assert "This should move normally" in content
```
--------------------------------------------------------------------------------
/test-int/mcp/test_project_management_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for project_management MCP tools.
Tests the complete project management workflow: MCP client -> MCP server -> FastAPI -> project service
"""
import pytest
from fastmcp import Client
@pytest.mark.asyncio
async def test_list_projects_basic_operation(mcp_server, app, test_project):
"""Test basic list_projects operation showing available projects."""
async with Client(mcp_server) as client:
# List all available projects
list_result = await client.call_tool(
"list_memory_projects",
{},
)
# Should return formatted project list
assert len(list_result.content) == 1
list_text = list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should show available projects with new session guidance format
assert "Available projects:" in list_text
assert "test-project" in list_text # Our test project
# Check for new session guidance instead of CLI default
assert "Next: Ask which project to use for this session." in list_text
assert "Session reminder: Track the selected project" in list_text
@pytest.mark.asyncio
async def test_project_management_workflow(mcp_server, app, test_project):
"""Test basic project management workflow."""
async with Client(mcp_server) as client:
# List all projects
list_result = await client.call_tool("list_memory_projects", {})
assert "Available projects:" in list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert "test-project" in list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
@pytest.mark.asyncio
async def test_project_metadata_consistency(mcp_server, app, test_project):
"""Test that project management tools work correctly."""
async with Client(mcp_server) as client:
# Test basic project management tools
# list_projects
list_result = await client.call_tool("list_memory_projects", {})
assert "Available projects:" in list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert "test-project" in list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
@pytest.mark.asyncio
async def test_create_project_basic_operation(mcp_server, app, test_project):
"""Test creating a new project with basic parameters."""
async with Client(mcp_server) as client:
# Create a new project
create_result = await client.call_tool(
"create_memory_project",
{
"project_name": "test-new-project",
"project_path": "/tmp/test-new-project",
},
)
assert len(create_result.content) == 1
create_text = create_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should show success message and project details
assert "✓" in create_text # Success indicator
assert "test-new-project" in create_text
assert "Project Details:" in create_text
assert "Name: test-new-project" in create_text
assert "Path: /tmp/test-new-project" in create_text
assert "Project is now available for use" in create_text
# Verify project appears in project list
list_result = await client.call_tool("list_memory_projects", {})
list_text = list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert "test-new-project" in list_text
@pytest.mark.asyncio
async def test_create_project_with_default_flag(mcp_server, app, test_project):
"""Test creating a project and setting it as default."""
async with Client(mcp_server) as client:
# Create a new project and set as default
create_result = await client.call_tool(
"create_memory_project",
{
"project_name": "test-default-project",
"project_path": "/tmp/test-default-project",
"set_default": True,
},
)
assert len(create_result.content) == 1
create_text = create_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should show success and default flag
assert "✓" in create_text
assert "test-default-project" in create_text
assert "Set as default project" in create_text
# Verify the new project is listed
list_after_create = await client.call_tool("list_memory_projects", {})
assert "test-default-project" in list_after_create.content[0].text # pyright: ignore [reportAttributeAccessIssue]
@pytest.mark.asyncio
async def test_create_project_duplicate_name(mcp_server, app, test_project):
"""Test creating a project with duplicate name shows error."""
async with Client(mcp_server) as client:
# First create a project
await client.call_tool(
"create_memory_project",
{
"project_name": "duplicate-test",
"project_path": "/tmp/duplicate-test-1",
},
)
# Try to create another project with same name
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"create_memory_project",
{
"project_name": "duplicate-test",
"project_path": "/tmp/duplicate-test-2",
},
)
# Should show error about duplicate name
error_message = str(exc_info.value)
assert "create_memory_project" in error_message
assert (
"duplicate-test" in error_message
or "already exists" in error_message
or "Invalid request" in error_message
)
@pytest.mark.asyncio
async def test_delete_project_basic_operation(mcp_server, app, test_project):
"""Test deleting a project that exists."""
async with Client(mcp_server) as client:
# First create a project to delete
await client.call_tool(
"create_memory_project",
{
"project_name": "to-be-deleted",
"project_path": "/tmp/to-be-deleted",
},
)
# Verify it exists
list_result = await client.call_tool("list_memory_projects", {})
assert "to-be-deleted" in list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Delete the project
delete_result = await client.call_tool(
"delete_project",
{
"project_name": "to-be-deleted",
},
)
assert len(delete_result.content) == 1
delete_text = delete_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should show success message
assert "✓" in delete_text
assert "to-be-deleted" in delete_text
assert "removed successfully" in delete_text
assert "Removed project details:" in delete_text
assert "Name: to-be-deleted" in delete_text
assert "Files remain on disk but project is no longer tracked" in delete_text
# Verify project no longer appears in list
list_result_after = await client.call_tool("list_memory_projects", {})
assert "to-be-deleted" not in list_result_after.content[0].text # pyright: ignore [reportAttributeAccessIssue]
@pytest.mark.asyncio
async def test_delete_project_not_found(mcp_server, app, test_project):
"""Test deleting a non-existent project shows error."""
async with Client(mcp_server) as client:
# Try to delete non-existent project
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"delete_project",
{
"project_name": "non-existent-project",
},
)
# Should show error about non-existent project
error_message = str(exc_info.value)
assert "delete_project" in error_message
assert (
"non-existent-project" in error_message
or "not found" in error_message
or "Invalid request" in error_message
)
@pytest.mark.asyncio
async def test_delete_current_project_protection(mcp_server, app, test_project):
"""Test that deleting the current project is prevented."""
async with Client(mcp_server) as client:
# Try to delete the current project (test-project)
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"delete_project",
{
"project_name": "test-project",
},
)
# Should show error about deleting current project
error_message = str(exc_info.value)
assert "delete_project" in error_message
assert (
"currently active" in error_message
or "test-project" in error_message
or "Switch to a different project" in error_message
)
@pytest.mark.asyncio
async def test_project_lifecycle_workflow(mcp_server, app, test_project):
"""Test complete project lifecycle: create, switch, use, delete."""
async with Client(mcp_server) as client:
project_name = "lifecycle-test"
project_path = "/tmp/lifecycle-test"
# 1. Create new project
create_result = await client.call_tool(
"create_memory_project",
{
"project_name": project_name,
"project_path": project_path,
},
)
assert "✓" in create_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert project_name in create_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# 2. Create content in the new project
await client.call_tool(
"write_note",
{
"project": project_name,
"title": "Lifecycle Test Note",
"folder": "test",
"content": "# Lifecycle Test\\n\\nThis note tests the project lifecycle.\\n\\n- [test] Lifecycle testing",
"tags": "lifecycle,test",
},
)
# 3. Verify the project exists in the list
list_with_content = await client.call_tool("list_memory_projects", {})
assert project_name in list_with_content.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# 4. Verify we can still access the original test project
test_list = await client.call_tool("list_memory_projects", {})
assert "test-project" in test_list.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# 5. Delete the lifecycle test project
delete_result = await client.call_tool(
"delete_project",
{
"project_name": project_name,
},
)
assert "✓" in delete_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert f"{project_name}" in delete_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert "removed successfully" in delete_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# 6. Verify project is gone from list
list_result = await client.call_tool("list_memory_projects", {})
assert project_name not in list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
@pytest.mark.asyncio
async def test_create_delete_project_edge_cases(mcp_server, app, test_project):
"""Test edge cases for create and delete project operations."""
async with Client(mcp_server) as client:
# Test with special characters and spaces in project name (should be handled gracefully)
special_name = "test project with spaces & symbols!"
# Create project with special characters
create_result = await client.call_tool(
"create_memory_project",
{
"project_name": special_name,
"project_path": "/tmp/test-project-with-special-chars",
},
)
assert "✓" in create_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert special_name in create_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Verify it appears in list
list_result = await client.call_tool("list_memory_projects", {})
assert special_name in list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Delete it
delete_result = await client.call_tool(
"delete_project",
{
"project_name": special_name,
},
)
assert "✓" in delete_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert special_name in delete_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Verify it's gone
list_result_after = await client.call_tool("list_memory_projects", {})
assert special_name not in list_result_after.content[0].text # pyright: ignore [reportAttributeAccessIssue]
@pytest.mark.asyncio
async def test_case_insensitive_project_switching(mcp_server, app, test_project):
"""Test case-insensitive project switching with proper database lookup."""
async with Client(mcp_server) as client:
# Create a project with mixed case name
project_name = "Personal-Project"
create_result = await client.call_tool(
"create_memory_project",
{
"project_name": project_name,
"project_path": f"/tmp/{project_name}",
},
)
assert "✓" in create_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert project_name in create_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Verify project was created with canonical name
list_result = await client.call_tool("list_memory_projects", {})
assert project_name in list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Test with different case variations
test_cases = [
"personal-project", # all lowercase
"PERSONAL-PROJECT", # all uppercase
"Personal-project", # mixed case 1
"personal-Project", # mixed case 2
]
# Test that project operations work with case-insensitive input
# (Project creation is case-preserving but operations can use different cases)
# Test that we can reference the project with different cases in operations
for test_input in test_cases:
# Test write_note with case-insensitive project reference
write_result = await client.call_tool(
"write_note",
{
"project": test_input, # Use different case
"title": f"Case Test {test_input}",
"folder": "case-test",
"content": f"# Case Test\n\nTesting with {test_input}",
},
)
assert len(write_result.content) == 1
assert f"Case Test {test_input}".lower() in write_result.content[0].text.lower() # pyright: ignore [reportAttributeAccessIssue]
# Clean up
await client.call_tool("delete_project", {"project_name": project_name})
@pytest.mark.asyncio
async def test_case_insensitive_project_operations(mcp_server, app, test_project):
"""Test that all project operations work correctly after case-insensitive switching."""
async with Client(mcp_server) as client:
# Create a project with capital letters
project_name = "CamelCase-Project"
create_result = await client.call_tool(
"create_memory_project",
{
"project_name": project_name,
"project_path": f"/tmp/{project_name}",
},
)
assert "✓" in create_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Test that MCP operations work correctly with the project
# 1. Create a note in the project
write_result = await client.call_tool(
"write_note",
{
"project": project_name,
"title": "Case Test Note",
"folder": "case-test",
"content": "# Case Test Note\n\nTesting case-insensitive operations.\n\n- [test] Case insensitive switch\n- relates_to [[Another Note]]",
"tags": "case,test",
},
)
assert len(write_result.content) == 1
assert "Case Test Note" in write_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# 2. Test search works in the project
search_result = await client.call_tool(
"search_notes",
{"project": project_name, "query": "case insensitive"},
)
assert len(search_result.content) == 1
assert "Case Test Note" in search_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# 3. Test read_note works
read_result = await client.call_tool(
"read_note",
{"project": project_name, "identifier": "Case Test Note"},
)
assert len(read_result.content) == 1
assert "Case Test Note" in read_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert "case insensitive" in read_result.content[0].text.lower() # pyright: ignore [reportAttributeAccessIssue]
# Clean up
await client.call_tool("delete_project", {"project_name": project_name})
@pytest.mark.asyncio
async def test_case_insensitive_error_handling(mcp_server, app, test_project):
"""Test error handling for case-insensitive project operations."""
async with Client(mcp_server) as client:
# Test non-existent project with various cases
non_existent_cases = [
"NonExistent",
"non-existent",
"NON-EXISTENT",
"Non-Existent-Project",
]
# Test that operations fail gracefully with non-existent projects
for test_case in non_existent_cases:
# Test that write_note fails with non-existent project
with pytest.raises(Exception):
await client.call_tool(
"write_note",
{
"project": test_case,
"title": "Test Note",
"folder": "test",
"content": "# Test\n\nTest content.",
},
)
@pytest.mark.asyncio
async def test_case_preservation_in_project_list(mcp_server, app, test_project):
"""Test that project names preserve their original case in listings."""
async with Client(mcp_server) as client:
# Create projects with different casing patterns
test_projects = [
"lowercase-project",
"UPPERCASE-PROJECT",
"CamelCase-Project",
"Mixed-CASE-project",
]
# Create all test projects
for project_name in test_projects:
await client.call_tool(
"create_memory_project",
{
"project_name": project_name,
"project_path": f"/tmp/{project_name}",
},
)
# List projects and verify each appears with its original case
list_result = await client.call_tool("list_memory_projects", {})
list_text = list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
for project_name in test_projects:
assert project_name in list_text, f"Project {project_name} not found in list"
# Test each project with exact case (projects are case-sensitive)
for project_name in test_projects:
# Test write_note with exact project name
write_result = await client.call_tool(
"write_note",
{
"project": project_name, # Use exact project name
"title": f"Test Note {project_name}",
"folder": "test",
"content": f"# Test\n\nTesting {project_name}",
},
)
assert len(write_result.content) == 1
result_text = write_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert "successfully" in result_text.lower() or "created" in result_text.lower()
# Clean up - delete test projects
for project_name in test_projects:
await client.call_tool("delete_project", {"project_name": project_name})
@pytest.mark.asyncio
async def test_nested_project_paths_rejected(mcp_server, app, test_project):
"""Test that creating nested project paths is rejected with clear error message."""
async with Client(mcp_server) as client:
# Create a parent project
parent_name = "parent-project"
parent_path = "/tmp/nested-test/parent"
await client.call_tool(
"create_memory_project",
{
"project_name": parent_name,
"project_path": parent_path,
},
)
# Try to create a child project nested under the parent
child_name = "child-project"
child_path = "/tmp/nested-test/parent/child"
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"create_memory_project",
{
"project_name": child_name,
"project_path": child_path,
},
)
# Verify error message mentions nested paths
error_message = str(exc_info.value)
assert "nested" in error_message.lower()
assert parent_name in error_message or parent_path in error_message
# Clean up parent project
await client.call_tool("delete_project", {"project_name": parent_name})
```
--------------------------------------------------------------------------------
/src/basic_memory/repository/search_repository.py:
--------------------------------------------------------------------------------
```python
"""Repository for search operations."""
import json
import re
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional
from pathlib import Path
from loguru import logger
from sqlalchemy import Executable, Result, text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from basic_memory import db
from basic_memory.models.search import CREATE_SEARCH_INDEX
from basic_memory.schemas.search import SearchItemType
@dataclass
class SearchIndexRow:
"""Search result with score and metadata."""
project_id: int
id: int
type: str
file_path: str
# date values
created_at: datetime
updated_at: datetime
permalink: Optional[str] = None
metadata: Optional[dict] = None
# assigned in result
score: Optional[float] = None
# Type-specific fields
title: Optional[str] = None # entity
content_stems: Optional[str] = None # entity, observation
content_snippet: Optional[str] = None # entity, observation
entity_id: Optional[int] = None # observations
category: Optional[str] = None # observations
from_id: Optional[int] = None # relations
to_id: Optional[int] = None # relations
relation_type: Optional[str] = None # relations
@property
def content(self):
return self.content_snippet
@property
def directory(self) -> str:
"""Extract directory part from file_path.
For a file at "projects/notes/ideas.md", returns "/projects/notes"
For a file at root level "README.md", returns "/"
"""
if not self.type == SearchItemType.ENTITY.value and not self.file_path:
return ""
# Normalize path separators to handle both Windows (\) and Unix (/) paths
normalized_path = Path(self.file_path).as_posix()
# Split the path by slashes
parts = normalized_path.split("/")
# If there's only one part (e.g., "README.md"), it's at the root
if len(parts) <= 1:
return "/"
# Join all parts except the last one (filename)
directory_path = "/".join(parts[:-1])
return f"/{directory_path}"
def to_insert(self):
return {
"id": self.id,
"title": self.title,
"content_stems": self.content_stems,
"content_snippet": self.content_snippet,
"permalink": self.permalink,
"file_path": self.file_path,
"type": self.type,
"metadata": json.dumps(self.metadata),
"from_id": self.from_id,
"to_id": self.to_id,
"relation_type": self.relation_type,
"entity_id": self.entity_id,
"category": self.category,
"created_at": self.created_at if self.created_at else None,
"updated_at": self.updated_at if self.updated_at else None,
"project_id": self.project_id,
}
class SearchRepository:
"""Repository for search index operations."""
def __init__(self, session_maker: async_sessionmaker[AsyncSession], project_id: int):
"""Initialize with session maker and project_id filter.
Args:
session_maker: SQLAlchemy session maker
project_id: Project ID to filter all operations by
Raises:
ValueError: If project_id is None or invalid
"""
if project_id is None or project_id <= 0: # pragma: no cover
raise ValueError("A valid project_id is required for SearchRepository")
self.session_maker = session_maker
self.project_id = project_id
async def init_search_index(self):
"""Create or recreate the search index."""
logger.info("Initializing search index")
try:
async with db.scoped_session(self.session_maker) as session:
await session.execute(CREATE_SEARCH_INDEX)
await session.commit()
except Exception as e: # pragma: no cover
logger.error(f"Error initializing search index: {e}")
raise e
def _prepare_boolean_query(self, query: str) -> str:
"""Prepare a Boolean query by quoting individual terms while preserving operators.
Args:
query: A Boolean query like "tier1-test AND unicode" or "(hello OR world) NOT test"
Returns:
A properly formatted Boolean query with quoted terms that need quoting
"""
# Define Boolean operators and their boundaries
boolean_pattern = r"(\bAND\b|\bOR\b|\bNOT\b)"
# Split the query by Boolean operators, keeping the operators
parts = re.split(boolean_pattern, query)
processed_parts = []
for part in parts:
part = part.strip()
if not part:
continue
# If it's a Boolean operator, keep it as is
if part in ["AND", "OR", "NOT"]:
processed_parts.append(part)
else:
# Handle parentheses specially - they should be preserved for grouping
if "(" in part or ")" in part:
# Parse parenthetical expressions carefully
processed_part = self._prepare_parenthetical_term(part)
processed_parts.append(processed_part)
else:
# This is a search term - for Boolean queries, don't add prefix wildcards
prepared_term = self._prepare_single_term(part, is_prefix=False)
processed_parts.append(prepared_term)
return " ".join(processed_parts)
def _prepare_parenthetical_term(self, term: str) -> str:
"""Prepare a term that contains parentheses, preserving the parentheses for grouping.
Args:
term: A term that may contain parentheses like "(hello" or "world)" or "(hello OR world)"
Returns:
A properly formatted term with parentheses preserved
"""
# Handle terms that start/end with parentheses but may contain quotable content
result = ""
i = 0
while i < len(term):
if term[i] in "()":
# Preserve parentheses as-is
result += term[i]
i += 1
else:
# Find the next parenthesis or end of string
start = i
while i < len(term) and term[i] not in "()":
i += 1
# Extract the content between parentheses
content = term[start:i].strip()
if content:
# Only quote if it actually needs quoting (has hyphens, special chars, etc)
# but don't quote if it's just simple words
if self._needs_quoting(content):
escaped_content = content.replace('"', '""')
result += f'"{escaped_content}"'
else:
result += content
return result
def _needs_quoting(self, term: str) -> bool:
"""Check if a term needs to be quoted for FTS5 safety.
Args:
term: The term to check
Returns:
True if the term should be quoted
"""
if not term or not term.strip():
return False
# Characters that indicate we should quote (excluding parentheses which are valid syntax)
needs_quoting_chars = [
" ",
".",
":",
";",
",",
"<",
">",
"?",
"/",
"-",
"'",
'"',
"[",
"]",
"{",
"}",
"+",
"!",
"@",
"#",
"$",
"%",
"^",
"&",
"=",
"|",
"\\",
"~",
"`",
]
return any(c in term for c in needs_quoting_chars)
def _prepare_single_term(self, term: str, is_prefix: bool = True) -> str:
"""Prepare a single search term (no Boolean operators).
Args:
term: A single search term
is_prefix: Whether to add prefix search capability (* suffix)
Returns:
A properly formatted single term
"""
if not term or not term.strip():
return term
term = term.strip()
# Check if term is already a proper wildcard pattern (alphanumeric + *)
# e.g., "hello*", "test*world" - these should be left alone
if "*" in term and all(c.isalnum() or c in "*_-" for c in term):
return term
# Characters that can cause FTS5 syntax errors when used as operators
# We're more conservative here - only quote when we detect problematic patterns
problematic_chars = [
'"',
"'",
"(",
")",
"[",
"]",
"{",
"}",
"+",
"!",
"@",
"#",
"$",
"%",
"^",
"&",
"=",
"|",
"\\",
"~",
"`",
]
# Characters that indicate we should quote (spaces, dots, colons, etc.)
# Adding hyphens here because FTS5 can have issues with hyphens followed by wildcards
needs_quoting_chars = [" ", ".", ":", ";", ",", "<", ">", "?", "/", "-"]
# Check if term needs quoting
has_problematic = any(c in term for c in problematic_chars)
has_spaces_or_special = any(c in term for c in needs_quoting_chars)
if has_problematic or has_spaces_or_special:
# Handle multi-word queries differently from special character queries
if " " in term and not any(c in term for c in problematic_chars):
# Check if any individual word contains special characters that need quoting
words = term.strip().split()
has_special_in_words = any(
any(c in word for c in needs_quoting_chars if c != " ") for word in words
)
if not has_special_in_words:
# For multi-word queries with simple words (like "emoji unicode"),
# use boolean AND to handle word order variations
if is_prefix:
# Add prefix wildcard to each word for better matching
prepared_words = [f"{word}*" for word in words if word]
else:
prepared_words = words
term = " AND ".join(prepared_words)
else:
# If any word has special characters, quote the entire phrase
escaped_term = term.replace('"', '""')
if is_prefix and not ("/" in term and term.endswith(".md")):
term = f'"{escaped_term}"*'
else:
term = f'"{escaped_term}"'
else:
# For terms with problematic characters or file paths, use exact phrase matching
# Escape any existing quotes by doubling them
escaped_term = term.replace('"', '""')
# Quote the entire term to handle special characters safely
if is_prefix and not ("/" in term and term.endswith(".md")):
# For search terms (not file paths), add prefix matching
term = f'"{escaped_term}"*'
else:
# For file paths, use exact matching
term = f'"{escaped_term}"'
elif is_prefix:
# Only add wildcard for simple terms without special characters
term = f"{term}*"
return term
def _prepare_search_term(self, term: str, is_prefix: bool = True) -> str:
"""Prepare a search term for FTS5 query.
Args:
term: The search term to prepare
is_prefix: Whether to add prefix search capability (* suffix)
For FTS5:
- Boolean operators (AND, OR, NOT) are preserved for complex queries
- Terms with FTS5 special characters are quoted to prevent syntax errors
- Simple terms get prefix wildcards for better matching
"""
# Check for explicit boolean operators - if present, process as Boolean query
boolean_operators = [" AND ", " OR ", " NOT "]
if any(op in f" {term} " for op in boolean_operators):
return self._prepare_boolean_query(term)
# For non-Boolean queries, use the single term preparation logic
return self._prepare_single_term(term, is_prefix)
async def search(
self,
search_text: Optional[str] = None,
permalink: Optional[str] = None,
permalink_match: Optional[str] = None,
title: Optional[str] = None,
types: Optional[List[str]] = None,
after_date: Optional[datetime] = None,
search_item_types: Optional[List[SearchItemType]] = None,
limit: int = 10,
offset: int = 0,
) -> List[SearchIndexRow]:
"""Search across all indexed content with fuzzy matching."""
conditions = []
params = {}
order_by_clause = ""
# Handle text search for title and content
if search_text:
# Skip FTS for wildcard-only queries that would cause "unknown special query" errors
if search_text.strip() == "*" or search_text.strip() == "":
# For wildcard searches, don't add any text conditions - return all results
pass
else:
# Use _prepare_search_term to handle both Boolean and non-Boolean queries
processed_text = self._prepare_search_term(search_text.strip())
params["text"] = processed_text
conditions.append("(title MATCH :text OR content_stems MATCH :text)")
# Handle title match search
if title:
title_text = self._prepare_search_term(title.strip(), is_prefix=False)
params["title_text"] = title_text
conditions.append("title MATCH :title_text")
# Handle permalink exact search
if permalink:
params["permalink"] = permalink
conditions.append("permalink = :permalink")
# Handle permalink match search, supports *
if permalink_match:
# For GLOB patterns, don't use _prepare_search_term as it will quote slashes
# GLOB patterns need to preserve their syntax
permalink_text = permalink_match.lower().strip()
params["permalink"] = permalink_text
if "*" in permalink_match:
conditions.append("permalink GLOB :permalink")
else:
# For exact matches without *, we can use FTS5 MATCH
# but only prepare the term if it doesn't look like a path
if "/" in permalink_text:
conditions.append("permalink = :permalink")
else:
permalink_text = self._prepare_search_term(permalink_text, is_prefix=False)
params["permalink"] = permalink_text
conditions.append("permalink MATCH :permalink")
# Handle entity type filter
if search_item_types:
type_list = ", ".join(f"'{t.value}'" for t in search_item_types)
conditions.append(f"type IN ({type_list})")
# Handle type filter
if types:
type_list = ", ".join(f"'{t}'" for t in types)
conditions.append(f"json_extract(metadata, '$.entity_type') IN ({type_list})")
# Handle date filter using datetime() for proper comparison
if after_date:
params["after_date"] = after_date
conditions.append("datetime(created_at) > datetime(:after_date)")
# order by most recent first
order_by_clause = ", updated_at DESC"
# Always filter by project_id
params["project_id"] = self.project_id
conditions.append("project_id = :project_id")
# set limit on search query
params["limit"] = limit
params["offset"] = offset
# Build WHERE clause
where_clause = " AND ".join(conditions) if conditions else "1=1"
sql = f"""
SELECT
project_id,
id,
title,
permalink,
file_path,
type,
metadata,
from_id,
to_id,
relation_type,
entity_id,
content_snippet,
category,
created_at,
updated_at,
bm25(search_index) as score
FROM search_index
WHERE {where_clause}
ORDER BY score ASC {order_by_clause}
LIMIT :limit
OFFSET :offset
"""
logger.trace(f"Search {sql} params: {params}")
try:
async with db.scoped_session(self.session_maker) as session:
result = await session.execute(text(sql), params)
rows = result.fetchall()
except Exception as e:
# Handle FTS5 syntax errors and provide user-friendly feedback
if "fts5: syntax error" in str(e).lower(): # pragma: no cover
logger.warning(f"FTS5 syntax error for search term: {search_text}, error: {e}")
# Return empty results rather than crashing
return []
else:
# Re-raise other database errors
logger.error(f"Database error during search: {e}")
raise
results = [
SearchIndexRow(
project_id=self.project_id,
id=row.id,
title=row.title,
permalink=row.permalink,
file_path=row.file_path,
type=row.type,
score=row.score,
metadata=json.loads(row.metadata),
from_id=row.from_id,
to_id=row.to_id,
relation_type=row.relation_type,
entity_id=row.entity_id,
content_snippet=row.content_snippet,
category=row.category,
created_at=row.created_at,
updated_at=row.updated_at,
)
for row in rows
]
logger.trace(f"Found {len(results)} search results")
for r in results:
logger.trace(
f"Search result: project_id: {r.project_id} type:{r.type} title: {r.title} permalink: {r.permalink} score: {r.score}"
)
return results
async def index_item(
self,
search_index_row: SearchIndexRow,
):
"""Index or update a single item."""
async with db.scoped_session(self.session_maker) as session:
# Delete existing record if any
await session.execute(
text(
"DELETE FROM search_index WHERE permalink = :permalink AND project_id = :project_id"
),
{"permalink": search_index_row.permalink, "project_id": self.project_id},
)
# Prepare data for insert with project_id
insert_data = search_index_row.to_insert()
insert_data["project_id"] = self.project_id
# Insert new record
await session.execute(
text("""
INSERT INTO search_index (
id, title, content_stems, content_snippet, permalink, file_path, type, metadata,
from_id, to_id, relation_type,
entity_id, category,
created_at, updated_at,
project_id
) VALUES (
:id, :title, :content_stems, :content_snippet, :permalink, :file_path, :type, :metadata,
:from_id, :to_id, :relation_type,
:entity_id, :category,
:created_at, :updated_at,
:project_id
)
"""),
insert_data,
)
logger.debug(f"indexed row {search_index_row}")
await session.commit()
async def bulk_index_items(self, search_index_rows: List[SearchIndexRow]):
"""Index multiple items in a single batch operation.
Note: This method assumes that any existing records for the entity_id
have already been deleted (typically via delete_by_entity_id).
Args:
search_index_rows: List of SearchIndexRow objects to index
"""
if not search_index_rows:
return
async with db.scoped_session(self.session_maker) as session:
# Prepare all insert data with project_id
insert_data_list = []
for row in search_index_rows:
insert_data = row.to_insert()
insert_data["project_id"] = self.project_id
insert_data_list.append(insert_data)
# Batch insert all records using executemany
await session.execute(
text("""
INSERT INTO search_index (
id, title, content_stems, content_snippet, permalink, file_path, type, metadata,
from_id, to_id, relation_type,
entity_id, category,
created_at, updated_at,
project_id
) VALUES (
:id, :title, :content_stems, :content_snippet, :permalink, :file_path, :type, :metadata,
:from_id, :to_id, :relation_type,
:entity_id, :category,
:created_at, :updated_at,
:project_id
)
"""),
insert_data_list,
)
logger.debug(f"Bulk indexed {len(search_index_rows)} rows")
await session.commit()
async def delete_by_entity_id(self, entity_id: int):
"""Delete an item from the search index by entity_id."""
async with db.scoped_session(self.session_maker) as session:
await session.execute(
text(
"DELETE FROM search_index WHERE entity_id = :entity_id AND project_id = :project_id"
),
{"entity_id": entity_id, "project_id": self.project_id},
)
await session.commit()
async def delete_by_permalink(self, permalink: str):
"""Delete an item from the search index."""
async with db.scoped_session(self.session_maker) as session:
await session.execute(
text(
"DELETE FROM search_index WHERE permalink = :permalink AND project_id = :project_id"
),
{"permalink": permalink, "project_id": self.project_id},
)
await session.commit()
async def execute_query(
self,
query: Executable,
params: Dict[str, Any],
) -> Result[Any]:
"""Execute a query asynchronously."""
# logger.debug(f"Executing query: {query}, params: {params}")
async with db.scoped_session(self.session_maker) as session:
start_time = time.perf_counter()
result = await session.execute(query, params)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
logger.debug(f"Query executed successfully in {elapsed_time:.2f}s.")
return result
```
--------------------------------------------------------------------------------
/tests/services/test_search_service.py:
--------------------------------------------------------------------------------
```python
"""Tests for search service."""
from datetime import datetime
import pytest
from sqlalchemy import text
from basic_memory import db
from basic_memory.schemas.search import SearchQuery, SearchItemType
@pytest.mark.asyncio
async def test_search_permalink(search_service, test_graph):
"""Exact permalink"""
results = await search_service.search(SearchQuery(permalink="test/root"))
assert len(results) == 1
for r in results:
assert "test/root" in r.permalink
@pytest.mark.asyncio
async def test_search_limit_offset(search_service, test_graph):
"""Exact permalink"""
results = await search_service.search(SearchQuery(permalink_match="test/*"))
assert len(results) > 1
results = await search_service.search(SearchQuery(permalink_match="test/*"), limit=1)
assert len(results) == 1
results = await search_service.search(SearchQuery(permalink_match="test/*"), limit=100)
num_results = len(results)
# assert offset
offset_results = await search_service.search(
SearchQuery(permalink_match="test/*"), limit=100, offset=1
)
assert len(offset_results) == num_results - 1
@pytest.mark.asyncio
async def test_search_permalink_observations_wildcard(search_service, test_graph):
"""Pattern matching"""
results = await search_service.search(SearchQuery(permalink_match="test/root/observations/*"))
assert len(results) == 2
permalinks = {r.permalink for r in results}
assert "test/root/observations/note/root-note-1" in permalinks
assert "test/root/observations/tech/root-tech-note" in permalinks
@pytest.mark.asyncio
async def test_search_permalink_relation_wildcard(search_service, test_graph):
"""Pattern matching"""
results = await search_service.search(SearchQuery(permalink_match="test/root/connects-to/*"))
assert len(results) == 1
permalinks = {r.permalink for r in results}
assert "test/root/connects-to/test/connected-entity-1" in permalinks
@pytest.mark.asyncio
async def test_search_permalink_wildcard2(search_service, test_graph):
"""Pattern matching"""
results = await search_service.search(
SearchQuery(
permalink_match="test/connected*",
)
)
assert len(results) >= 2
permalinks = {r.permalink for r in results}
assert "test/connected-entity-1" in permalinks
assert "test/connected-entity-2" in permalinks
@pytest.mark.asyncio
async def test_search_text(search_service, test_graph):
"""Full-text search"""
results = await search_service.search(
SearchQuery(text="Root Entity", entity_types=[SearchItemType.ENTITY])
)
assert len(results) >= 1
assert results[0].permalink == "test/root"
@pytest.mark.asyncio
async def test_search_title(search_service, test_graph):
"""Title only search"""
results = await search_service.search(
SearchQuery(title="Root", entity_types=[SearchItemType.ENTITY])
)
assert len(results) >= 1
assert results[0].permalink == "test/root"
@pytest.mark.asyncio
async def test_text_search_case_insensitive(search_service, test_graph):
"""Test text search functionality."""
# Case insensitive
results = await search_service.search(SearchQuery(text="ENTITY"))
assert any("test/root" in r.permalink for r in results)
@pytest.mark.asyncio
async def test_text_search_content_word_match(search_service, test_graph):
"""Test text search functionality."""
# content word match
results = await search_service.search(SearchQuery(text="Connected"))
assert len(results) > 0
assert any(r.file_path == "test/Connected Entity 2.md" for r in results)
@pytest.mark.asyncio
async def test_text_search_multiple_terms(search_service, test_graph):
"""Test text search functionality."""
# Multiple terms
results = await search_service.search(SearchQuery(text="root note"))
assert any("test/root" in r.permalink for r in results)
@pytest.mark.asyncio
async def test_pattern_matching(search_service, test_graph):
"""Test pattern matching with various wildcards."""
# Test wildcards
results = await search_service.search(SearchQuery(permalink_match="test/*"))
for r in results:
assert "test/" in r.permalink
# Test start wildcards
results = await search_service.search(SearchQuery(permalink_match="*/observations"))
for r in results:
assert "/observations" in r.permalink
# Test permalink partial match
results = await search_service.search(SearchQuery(permalink_match="test"))
for r in results:
assert "test/" in r.permalink
@pytest.mark.asyncio
async def test_filters(search_service, test_graph):
"""Test search filters."""
# Combined filters
results = await search_service.search(
SearchQuery(text="Deep", entity_types=[SearchItemType.ENTITY], types=["deep"])
)
assert len(results) == 1
for r in results:
assert r.type == SearchItemType.ENTITY
assert r.metadata.get("entity_type") == "deep"
@pytest.mark.asyncio
async def test_after_date(search_service, test_graph):
"""Test search filters."""
# Should find with past date
past_date = datetime(2020, 1, 1).astimezone()
results = await search_service.search(
SearchQuery(
text="entity",
after_date=past_date.isoformat(),
)
)
for r in results:
assert datetime.fromisoformat(r.created_at) > past_date
# Should not find with future date
future_date = datetime(2030, 1, 1).astimezone()
results = await search_service.search(
SearchQuery(
text="entity",
after_date=future_date.isoformat(),
)
)
assert len(results) == 0
@pytest.mark.asyncio
async def test_search_type(search_service, test_graph):
"""Test search filters."""
# Should find only type
results = await search_service.search(SearchQuery(types=["test"]))
assert len(results) > 0
for r in results:
assert r.type == SearchItemType.ENTITY
@pytest.mark.asyncio
async def test_search_entity_type(search_service, test_graph):
"""Test search filters."""
# Should find only type
results = await search_service.search(SearchQuery(entity_types=[SearchItemType.ENTITY]))
assert len(results) > 0
for r in results:
assert r.type == SearchItemType.ENTITY
@pytest.mark.asyncio
async def test_extract_entity_tags_exception_handling(search_service):
"""Test the _extract_entity_tags method exception handling (lines 147-151)."""
from basic_memory.models.knowledge import Entity
# Create entity with string tags that will cause parsing to fail and fall back to single tag
entity_with_invalid_tags = Entity(
title="Test Entity",
entity_type="test",
entity_metadata={"tags": "just a string"}, # This will fail ast.literal_eval
content_type="text/markdown",
file_path="test/test-entity.md",
project_id=1,
)
# This should trigger the except block on lines 147-149
result = search_service._extract_entity_tags(entity_with_invalid_tags)
assert result == ["just a string"]
# Test with empty string (should return empty list) - covers line 149
entity_with_empty_tags = Entity(
title="Test Entity Empty",
entity_type="test",
entity_metadata={"tags": ""},
content_type="text/markdown",
file_path="test/test-entity-empty.md",
project_id=1,
)
result = search_service._extract_entity_tags(entity_with_empty_tags)
assert result == []
@pytest.mark.asyncio
async def test_delete_entity_without_permalink(search_service, sample_entity):
"""Test deleting an entity that has no permalink (edge case)."""
# Set the entity permalink to None to trigger the else branch on line 355
sample_entity.permalink = None
# This should trigger the delete_by_entity_id path (line 355) in handle_delete
await search_service.handle_delete(sample_entity)
@pytest.mark.asyncio
async def test_no_criteria(search_service, test_graph):
"""Test search with no criteria returns empty list."""
results = await search_service.search(SearchQuery())
assert len(results) == 0
@pytest.mark.asyncio
async def test_init_search_index(search_service, session_maker):
"""Test search index initialization."""
async with db.scoped_session(session_maker) as session:
result = await session.execute(
text("SELECT name FROM sqlite_master WHERE type='table' AND name='search_index';")
)
assert result.scalar() == "search_index"
@pytest.mark.asyncio
async def test_update_index(search_service, full_entity):
"""Test updating indexed content."""
await search_service.index_entity(full_entity)
# Update entity
full_entity.title = "OMG I AM UPDATED"
await search_service.index_entity(full_entity)
# Search for new title
results = await search_service.search(SearchQuery(text="OMG I AM UPDATED"))
assert len(results) > 1
@pytest.mark.asyncio
async def test_boolean_and_search(search_service, test_graph):
"""Test boolean AND search."""
# Create an entity with specific terms for testing
# This assumes the test_graph fixture already has entities with relevant terms
# Test AND operator - both terms must be present
results = await search_service.search(SearchQuery(text="Root AND Entity"))
assert len(results) >= 1
# Verify the result contains both terms
found = False
for result in results:
if (result.title and "Root" in result.title and "Entity" in result.title) or (
result.content_snippet
and "Root" in result.content_snippet
and "Entity" in result.content_snippet
):
found = True
break
assert found, "Boolean AND search failed to find items containing both terms"
# Verify that items with only one term are not returned
results = await search_service.search(SearchQuery(text="NonexistentTerm AND Root"))
assert len(results) == 0, "Boolean AND search returned results when it shouldn't have"
@pytest.mark.asyncio
async def test_boolean_or_search(search_service, test_graph):
"""Test boolean OR search."""
# Test OR operator - either term can be present
results = await search_service.search(SearchQuery(text="Root OR Connected"))
# Should find both "Root Entity" and "Connected Entity"
assert len(results) >= 2
# Verify we find items with either term
root_found = False
connected_found = False
for result in results:
if result.permalink == "test/root":
root_found = True
elif "connected" in result.permalink.lower():
connected_found = True
assert root_found, "Boolean OR search failed to find 'Root' term"
assert connected_found, "Boolean OR search failed to find 'Connected' term"
@pytest.mark.asyncio
async def test_boolean_not_search(search_service, test_graph):
"""Test boolean NOT search."""
# Test NOT operator - exclude certain terms
results = await search_service.search(SearchQuery(text="Entity NOT Connected"))
# Should find "Root Entity" but not "Connected Entity"
for result in results:
assert "connected" not in result.permalink.lower(), (
"Boolean NOT search returned excluded term"
)
@pytest.mark.asyncio
async def test_boolean_group_search(search_service, test_graph):
"""Test boolean grouping with parentheses."""
# Test grouping - (A OR B) AND C
results = await search_service.search(SearchQuery(title="(Root OR Connected) AND Entity"))
# Should find both entities that contain "Entity" and either "Root" or "Connected"
assert len(results) >= 2
for result in results:
# Each result should contain "Entity" and either "Root" or "Connected"
contains_entity = "entity" in result.title.lower()
contains_root_or_connected = (
"root" in result.title.lower() or "connected" in result.title.lower()
)
assert contains_entity and contains_root_or_connected, (
"Boolean grouped search returned incorrect results"
)
@pytest.mark.asyncio
async def test_boolean_operators_detection(search_service):
"""Test detection of boolean operators in query."""
# Test various queries that should be detected as boolean
boolean_queries = [
"term1 AND term2",
"term1 OR term2",
"term1 NOT term2",
"(term1 OR term2) AND term3",
"complex (nested OR grouping) AND term",
]
for query_text in boolean_queries:
query = SearchQuery(text=query_text)
assert query.has_boolean_operators(), f"Failed to detect boolean operators in: {query_text}"
# Test queries that should not be detected as boolean
non_boolean_queries = [
"normal search query",
"brand name", # Should not detect "AND" within "brand"
"understand this concept", # Should not detect "AND" within "understand"
"command line",
"sandbox testing",
]
for query_text in non_boolean_queries:
query = SearchQuery(text=query_text)
assert not query.has_boolean_operators(), (
f"Incorrectly detected boolean operators in: {query_text}"
)
# Tests for frontmatter tag search functionality
@pytest.mark.asyncio
async def test_extract_entity_tags_list_format(search_service, session_maker):
"""Test tag extraction from list format in entity metadata."""
from basic_memory.models import Entity
entity = Entity(
title="Test Entity",
entity_type="note",
entity_metadata={"tags": ["business", "strategy", "planning"]},
content_type="text/markdown",
file_path="test/business-strategy.md",
project_id=1,
)
tags = search_service._extract_entity_tags(entity)
assert tags == ["business", "strategy", "planning"]
@pytest.mark.asyncio
async def test_extract_entity_tags_string_format(search_service, session_maker):
"""Test tag extraction from string format in entity metadata."""
from basic_memory.models import Entity
entity = Entity(
title="Test Entity",
entity_type="note",
entity_metadata={"tags": "['documentation', 'tools', 'best-practices']"},
content_type="text/markdown",
file_path="test/docs.md",
project_id=1,
)
tags = search_service._extract_entity_tags(entity)
assert tags == ["documentation", "tools", "best-practices"]
@pytest.mark.asyncio
async def test_extract_entity_tags_empty_list(search_service, session_maker):
"""Test tag extraction from empty list in entity metadata."""
from basic_memory.models import Entity
entity = Entity(
title="Test Entity",
entity_type="note",
entity_metadata={"tags": []},
content_type="text/markdown",
file_path="test/empty-tags.md",
project_id=1,
)
tags = search_service._extract_entity_tags(entity)
assert tags == []
@pytest.mark.asyncio
async def test_extract_entity_tags_empty_string(search_service, session_maker):
"""Test tag extraction from empty string in entity metadata."""
from basic_memory.models import Entity
entity = Entity(
title="Test Entity",
entity_type="note",
entity_metadata={"tags": "[]"},
content_type="text/markdown",
file_path="test/empty-string-tags.md",
project_id=1,
)
tags = search_service._extract_entity_tags(entity)
assert tags == []
@pytest.mark.asyncio
async def test_extract_entity_tags_no_metadata(search_service, session_maker):
"""Test tag extraction when entity has no metadata."""
from basic_memory.models import Entity
entity = Entity(
title="Test Entity",
entity_type="note",
entity_metadata=None,
content_type="text/markdown",
file_path="test/no-metadata.md",
project_id=1,
)
tags = search_service._extract_entity_tags(entity)
assert tags == []
@pytest.mark.asyncio
async def test_extract_entity_tags_no_tags_key(search_service, session_maker):
"""Test tag extraction when metadata exists but has no tags key."""
from basic_memory.models import Entity
entity = Entity(
title="Test Entity",
entity_type="note",
entity_metadata={"title": "Some Title", "type": "note"},
content_type="text/markdown",
file_path="test/no-tags-key.md",
project_id=1,
)
tags = search_service._extract_entity_tags(entity)
assert tags == []
@pytest.mark.asyncio
async def test_search_by_frontmatter_tags(search_service, session_maker, test_project):
"""Test that entities can be found by searching for their frontmatter tags."""
from basic_memory.repository import EntityRepository
from unittest.mock import AsyncMock
entity_repo = EntityRepository(session_maker, project_id=test_project.id)
# Create entity with tags
from datetime import datetime
entity_data = {
"title": "Business Strategy Guide",
"entity_type": "note",
"entity_metadata": {"tags": ["business", "strategy", "planning", "organization"]},
"content_type": "text/markdown",
"file_path": "guides/business-strategy.md",
"permalink": "guides/business-strategy",
"project_id": test_project.id,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
entity = await entity_repo.create(entity_data)
# Mock file service to avoid file I/O
search_service.file_service.read_entity_content = AsyncMock(return_value="")
await search_service.index_entity(entity)
# Search for entities by tag
results = await search_service.search(SearchQuery(text="business"))
assert len(results) >= 1
# Check that our entity is in the results
entity_found = False
for result in results:
if result.title == "Business Strategy Guide":
entity_found = True
break
assert entity_found, "Entity with 'business' tag should be found in search results"
# Test searching by another tag
results = await search_service.search(SearchQuery(text="planning"))
assert len(results) >= 1
entity_found = False
for result in results:
if result.title == "Business Strategy Guide":
entity_found = True
break
assert entity_found, "Entity with 'planning' tag should be found in search results"
@pytest.mark.asyncio
async def test_search_by_frontmatter_tags_string_format(
search_service, session_maker, test_project
):
"""Test that entities with string format tags can be found in search."""
from basic_memory.repository import EntityRepository
from unittest.mock import AsyncMock
entity_repo = EntityRepository(session_maker, project_id=test_project.id)
# Create entity with tags in string format
from datetime import datetime
entity_data = {
"title": "Documentation Guidelines",
"entity_type": "note",
"entity_metadata": {"tags": "['documentation', 'tools', 'best-practices']"},
"content_type": "text/markdown",
"file_path": "guides/documentation.md",
"permalink": "guides/documentation",
"project_id": test_project.id,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
entity = await entity_repo.create(entity_data)
# Mock file service to avoid file I/O
search_service.file_service.read_entity_content = AsyncMock(return_value="")
await search_service.index_entity(entity)
# Search for entities by tag
results = await search_service.search(SearchQuery(text="documentation"))
assert len(results) >= 1
# Check that our entity is in the results
entity_found = False
for result in results:
if result.title == "Documentation Guidelines":
entity_found = True
break
assert entity_found, "Entity with 'documentation' tag should be found in search results"
@pytest.mark.asyncio
async def test_search_special_characters_in_title(search_service, session_maker, test_project):
"""Test that entities with special characters in titles can be searched without FTS5 syntax errors."""
from basic_memory.repository import EntityRepository
from unittest.mock import AsyncMock
entity_repo = EntityRepository(session_maker, project_id=test_project.id)
# Create entities with special characters that could cause FTS5 syntax errors
special_titles = [
"Note with spaces",
"Note-with-dashes",
"Note_with_underscores",
"Note (with parentheses)", # This is the problematic one
"Note & Symbols!",
"Note [with brackets]",
"Note {with braces}",
'Note "with quotes"',
"Note 'with apostrophes'",
]
entities = []
for i, title in enumerate(special_titles):
from datetime import datetime
entity_data = {
"title": title,
"entity_type": "note",
"entity_metadata": {"tags": ["special", "characters"]},
"content_type": "text/markdown",
"file_path": f"special/{title}.md",
"permalink": f"special/note-{i}",
"project_id": test_project.id,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
entity = await entity_repo.create(entity_data)
entities.append(entity)
# Mock file service to avoid file I/O
search_service.file_service.read_entity_content = AsyncMock(return_value="")
# Index all entities
for entity in entities:
await search_service.index_entity(entity)
# Test searching for each title - this should not cause FTS5 syntax errors
for title in special_titles:
results = await search_service.search(SearchQuery(title=title))
# Should find the entity without throwing FTS5 syntax errors
entity_found = False
for result in results:
if result.title == title:
entity_found = True
break
assert entity_found, f"Entity with title '{title}' should be found in search results"
@pytest.mark.asyncio
async def test_search_title_with_parentheses_specific(search_service, session_maker, test_project):
"""Test searching specifically for title with parentheses to reproduce FTS5 error."""
from basic_memory.repository import EntityRepository
from unittest.mock import AsyncMock
entity_repo = EntityRepository(session_maker, project_id=test_project.id)
# Create the problematic entity
from datetime import datetime
entity_data = {
"title": "Note (with parentheses)",
"entity_type": "note",
"entity_metadata": {"tags": ["test"]},
"content_type": "text/markdown",
"file_path": "special/Note (with parentheses).md",
"permalink": "special/note-with-parentheses",
"project_id": test_project.id,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
entity = await entity_repo.create(entity_data)
# Mock file service to avoid file I/O
search_service.file_service.read_entity_content = AsyncMock(return_value="")
# Index the entity
await search_service.index_entity(entity)
# Test searching for the title - this should not cause FTS5 syntax errors
search_query = SearchQuery(title="Note (with parentheses)")
results = await search_service.search(search_query)
# Should find the entity without throwing FTS5 syntax errors
assert len(results) >= 1
assert any(result.title == "Note (with parentheses)" for result in results)
@pytest.mark.asyncio
async def test_search_title_via_repository_direct(search_service, session_maker, test_project):
"""Test searching via search repository directly to isolate the FTS5 error."""
from basic_memory.repository import EntityRepository
from unittest.mock import AsyncMock
entity_repo = EntityRepository(session_maker, project_id=test_project.id)
# Create the problematic entity
from datetime import datetime
entity_data = {
"title": "Note (with parentheses)",
"entity_type": "note",
"entity_metadata": {"tags": ["test"]},
"content_type": "text/markdown",
"file_path": "special/Note (with parentheses).md",
"permalink": "special/note-with-parentheses",
"project_id": test_project.id,
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
entity = await entity_repo.create(entity_data)
# Mock file service to avoid file I/O
search_service.file_service.read_entity_content = AsyncMock(return_value="")
# Index the entity
await search_service.index_entity(entity)
# Test searching via repository directly - this reproduces the error path
results = await search_service.repository.search(
title="Note (with parentheses)",
limit=10,
offset=0,
)
# Should find the entity without throwing FTS5 syntax errors
assert len(results) >= 1
assert any(result.title == "Note (with parentheses)" for result in results)
```
--------------------------------------------------------------------------------
/tests/repository/test_search_repository.py:
--------------------------------------------------------------------------------
```python
"""Tests for the SearchRepository."""
from datetime import datetime, timezone
import pytest
import pytest_asyncio
from sqlalchemy import text
from basic_memory import db
from basic_memory.models import Entity
from basic_memory.models.project import Project
from basic_memory.repository.search_repository import SearchRepository, SearchIndexRow
from basic_memory.schemas.search import SearchItemType
@pytest_asyncio.fixture
async def search_entity(session_maker, test_project: Project):
"""Create a test entity for search testing."""
async with db.scoped_session(session_maker) as session:
entity = Entity(
project_id=test_project.id,
title="Search Test Entity",
entity_type="test",
permalink="test/search-test-entity",
file_path="test/search_test_entity.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity)
await session.flush()
return entity
@pytest_asyncio.fixture
async def second_project(project_repository):
"""Create a second project for testing project isolation."""
project_data = {
"name": "Second Test Project",
"description": "Another project for testing",
"path": "/second/project/path",
"is_active": True,
"is_default": None,
}
return await project_repository.create(project_data)
@pytest_asyncio.fixture
async def second_project_repository(session_maker, second_project):
"""Create a repository for the second project."""
return SearchRepository(session_maker, project_id=second_project.id)
@pytest_asyncio.fixture
async def second_entity(session_maker, second_project: Project):
"""Create a test entity in the second project."""
async with db.scoped_session(session_maker) as session:
entity = Entity(
project_id=second_project.id,
title="Second Project Entity",
entity_type="test",
permalink="test/second-project-entity",
file_path="test/second_project_entity.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity)
await session.flush()
return entity
@pytest.mark.asyncio
async def test_init_search_index(search_repository):
"""Test that search index can be initialized."""
await search_repository.init_search_index()
# Verify search_index table exists
async with db.scoped_session(search_repository.session_maker) as session:
result = await session.execute(
text("SELECT name FROM sqlite_master WHERE type='table' AND name='search_index';")
)
assert result.scalar() == "search_index"
@pytest.mark.asyncio
async def test_index_item(search_repository, search_entity):
"""Test indexing an item with project_id."""
# Create search index row for the entity
search_row = SearchIndexRow(
id=search_entity.id,
type=SearchItemType.ENTITY.value,
title=search_entity.title,
content_stems="search test entity content",
content_snippet="This is a test entity for search",
permalink=search_entity.permalink,
file_path=search_entity.file_path,
entity_id=search_entity.id,
metadata={"entity_type": search_entity.entity_type},
created_at=search_entity.created_at,
updated_at=search_entity.updated_at,
project_id=search_repository.project_id,
)
# Index the item
await search_repository.index_item(search_row)
# Search for the item
results = await search_repository.search(search_text="search test")
# Verify we found the item
assert len(results) == 1
assert results[0].title == search_entity.title
assert results[0].project_id == search_repository.project_id
@pytest.mark.asyncio
async def test_project_isolation(
search_repository, second_project_repository, search_entity, second_entity
):
"""Test that search is isolated by project."""
# Index entities in both projects
search_row1 = SearchIndexRow(
id=search_entity.id,
type=SearchItemType.ENTITY.value,
title=search_entity.title,
content_stems="unique first project content",
content_snippet="This is a test entity in the first project",
permalink=search_entity.permalink,
file_path=search_entity.file_path,
entity_id=search_entity.id,
metadata={"entity_type": search_entity.entity_type},
created_at=search_entity.created_at,
updated_at=search_entity.updated_at,
project_id=search_repository.project_id,
)
search_row2 = SearchIndexRow(
id=second_entity.id,
type=SearchItemType.ENTITY.value,
title=second_entity.title,
content_stems="unique second project content",
content_snippet="This is a test entity in the second project",
permalink=second_entity.permalink,
file_path=second_entity.file_path,
entity_id=second_entity.id,
metadata={"entity_type": second_entity.entity_type},
created_at=second_entity.created_at,
updated_at=second_entity.updated_at,
project_id=second_project_repository.project_id,
)
# Index items in their respective repositories
await search_repository.index_item(search_row1)
await second_project_repository.index_item(search_row2)
# Search in first project
results1 = await search_repository.search(search_text="unique first")
assert len(results1) == 1
assert results1[0].title == search_entity.title
assert results1[0].project_id == search_repository.project_id
# Search in second project
results2 = await second_project_repository.search(search_text="unique second")
assert len(results2) == 1
assert results2[0].title == second_entity.title
assert results2[0].project_id == second_project_repository.project_id
# Make sure first project can't see second project's content
results_cross1 = await search_repository.search(search_text="unique second")
assert len(results_cross1) == 0
# Make sure second project can't see first project's content
results_cross2 = await second_project_repository.search(search_text="unique first")
assert len(results_cross2) == 0
@pytest.mark.asyncio
async def test_delete_by_permalink(search_repository, search_entity):
"""Test deleting an item by permalink respects project isolation."""
# Index the item
search_row = SearchIndexRow(
id=search_entity.id,
type=SearchItemType.ENTITY.value,
title=search_entity.title,
content_stems="content to delete",
content_snippet="This content should be deleted",
permalink=search_entity.permalink,
file_path=search_entity.file_path,
entity_id=search_entity.id,
metadata={"entity_type": search_entity.entity_type},
created_at=search_entity.created_at,
updated_at=search_entity.updated_at,
project_id=search_repository.project_id,
)
await search_repository.index_item(search_row)
# Verify it exists
results = await search_repository.search(search_text="content to delete")
assert len(results) == 1
# Delete by permalink
await search_repository.delete_by_permalink(search_entity.permalink)
# Verify it's gone
results_after = await search_repository.search(search_text="content to delete")
assert len(results_after) == 0
@pytest.mark.asyncio
async def test_delete_by_entity_id(search_repository, search_entity):
"""Test deleting an item by entity_id respects project isolation."""
# Index the item
search_row = SearchIndexRow(
id=search_entity.id,
type=SearchItemType.ENTITY.value,
title=search_entity.title,
content_stems="entity to delete",
content_snippet="This entity should be deleted",
permalink=search_entity.permalink,
file_path=search_entity.file_path,
entity_id=search_entity.id,
metadata={"entity_type": search_entity.entity_type},
created_at=search_entity.created_at,
updated_at=search_entity.updated_at,
project_id=search_repository.project_id,
)
await search_repository.index_item(search_row)
# Verify it exists
results = await search_repository.search(search_text="entity to delete")
assert len(results) == 1
# Delete by entity_id
await search_repository.delete_by_entity_id(search_entity.id)
# Verify it's gone
results_after = await search_repository.search(search_text="entity to delete")
assert len(results_after) == 0
@pytest.mark.asyncio
async def test_to_insert_includes_project_id(search_repository):
"""Test that the to_insert method includes project_id."""
# Create a search index row with project_id
row = SearchIndexRow(
id=1234,
type=SearchItemType.ENTITY.value,
title="Test Title",
content_stems="test content",
content_snippet="test snippet",
permalink="test/permalink",
file_path="test/file.md",
metadata={"test": "metadata"},
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
project_id=search_repository.project_id,
)
# Get insert data
insert_data = row.to_insert()
# Verify project_id is included
assert "project_id" in insert_data
assert insert_data["project_id"] == search_repository.project_id
def test_directory_property():
"""Test the directory property of SearchIndexRow."""
# Test a file in a nested directory
row1 = SearchIndexRow(
id=1,
type=SearchItemType.ENTITY.value,
file_path="projects/notes/ideas.md",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
project_id=1,
)
assert row1.directory == "/projects/notes"
# Test a file at the root level
row2 = SearchIndexRow(
id=2,
type=SearchItemType.ENTITY.value,
file_path="README.md",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
project_id=1,
)
assert row2.directory == "/"
# Test a non-entity type with empty file_path
row3 = SearchIndexRow(
id=3,
type=SearchItemType.OBSERVATION.value,
file_path="",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
project_id=1,
)
assert row3.directory == ""
class TestSearchTermPreparation:
"""Test cases for FTS5 search term preparation."""
def test_simple_terms_get_prefix_wildcard(self, search_repository):
"""Simple alphanumeric terms should get prefix matching."""
assert search_repository._prepare_search_term("hello") == "hello*"
assert search_repository._prepare_search_term("project") == "project*"
assert search_repository._prepare_search_term("test123") == "test123*"
def test_terms_with_existing_wildcard_unchanged(self, search_repository):
"""Terms that already contain * should remain unchanged."""
assert search_repository._prepare_search_term("hello*") == "hello*"
assert search_repository._prepare_search_term("test*world") == "test*world"
def test_boolean_operators_preserved(self, search_repository):
"""Boolean operators should be preserved without modification."""
assert search_repository._prepare_search_term("hello AND world") == "hello AND world"
assert search_repository._prepare_search_term("cat OR dog") == "cat OR dog"
assert (
search_repository._prepare_search_term("project NOT meeting") == "project NOT meeting"
)
assert (
search_repository._prepare_search_term("(hello AND world) OR test")
== "(hello AND world) OR test"
)
def test_hyphenated_terms_with_boolean_operators(self, search_repository):
"""Hyphenated terms with Boolean operators should be properly quoted."""
# Test the specific case from the GitHub issue
result = search_repository._prepare_search_term("tier1-test AND unicode")
assert result == '"tier1-test" AND unicode'
# Test other hyphenated Boolean combinations
assert (
search_repository._prepare_search_term("multi-word OR single")
== '"multi-word" OR single'
)
assert (
search_repository._prepare_search_term("well-formed NOT badly-formed")
== '"well-formed" NOT "badly-formed"'
)
assert (
search_repository._prepare_search_term("test-case AND (hello OR world)")
== '"test-case" AND (hello OR world)'
)
# Test mixed special characters with Boolean operators
assert (
search_repository._prepare_search_term("config.json AND test-file")
== '"config.json" AND "test-file"'
)
assert (
search_repository._prepare_search_term("C++ OR python-script")
== '"C++" OR "python-script"'
)
def test_programming_terms_should_work(self, search_repository):
"""Programming-related terms with special chars should be searchable."""
# These should be quoted to handle special characters safely
assert search_repository._prepare_search_term("C++") == '"C++"*'
assert search_repository._prepare_search_term("function()") == '"function()"*'
assert search_repository._prepare_search_term("[email protected]") == '"[email protected]"*'
assert search_repository._prepare_search_term("array[index]") == '"array[index]"*'
assert search_repository._prepare_search_term("config.json") == '"config.json"*'
def test_malformed_fts5_syntax_quoted(self, search_repository):
"""Malformed FTS5 syntax should be quoted to prevent errors."""
# Multiple operators without proper syntax
assert search_repository._prepare_search_term("+++invalid+++") == '"+++invalid+++"*'
assert search_repository._prepare_search_term("!!!error!!!") == '"!!!error!!!"*'
assert search_repository._prepare_search_term("@#$%^&*()") == '"@#$%^&*()"*'
def test_quoted_strings_handled_properly(self, search_repository):
"""Strings with quotes should have quotes escaped."""
assert search_repository._prepare_search_term('say "hello"') == '"say ""hello"""*'
assert search_repository._prepare_search_term("it's working") == '"it\'s working"*'
def test_file_paths_no_prefix_wildcard(self, search_repository):
"""File paths should not get prefix wildcards."""
assert (
search_repository._prepare_search_term("config.json", is_prefix=False)
== '"config.json"'
)
assert (
search_repository._prepare_search_term("docs/readme.md", is_prefix=False)
== '"docs/readme.md"'
)
def test_spaces_handled_correctly(self, search_repository):
"""Terms with spaces should use boolean AND for word order independence."""
assert search_repository._prepare_search_term("hello world") == "hello* AND world*"
assert (
search_repository._prepare_search_term("project planning") == "project* AND planning*"
)
def test_version_strings_with_dots_handled_correctly(self, search_repository):
"""Version strings with dots should be quoted to prevent FTS5 syntax errors."""
# This reproduces the bug where "Basic Memory v0.13.0b2" becomes "Basic* AND Memory* AND v0.13.0b2*"
# which causes FTS5 syntax errors because v0.13.0b2* is not valid FTS5 syntax
result = search_repository._prepare_search_term("Basic Memory v0.13.0b2")
# Should be quoted because of dots in v0.13.0b2
assert result == '"Basic Memory v0.13.0b2"*'
def test_mixed_special_characters_in_multi_word_queries(self, search_repository):
"""Multi-word queries with special characters in any word should be fully quoted."""
# Any word containing special characters should cause the entire phrase to be quoted
assert search_repository._prepare_search_term("config.json file") == '"config.json file"*'
assert (
search_repository._prepare_search_term("[email protected] account")
== '"[email protected] account"*'
)
assert search_repository._prepare_search_term("node.js and react") == '"node.js and react"*'
@pytest.mark.asyncio
async def test_search_with_special_characters_returns_results(self, search_repository):
"""Integration test: search with special characters should work gracefully."""
# This test ensures the search doesn't crash with FTS5 syntax errors
# These should all return empty results gracefully, not crash
results1 = await search_repository.search(search_text="C++")
assert isinstance(results1, list) # Should not crash
results2 = await search_repository.search(search_text="function()")
assert isinstance(results2, list) # Should not crash
results3 = await search_repository.search(search_text="+++malformed+++")
assert isinstance(results3, list) # Should not crash, return empty results
results4 = await search_repository.search(search_text="[email protected]")
assert isinstance(results4, list) # Should not crash
@pytest.mark.asyncio
async def test_boolean_search_still_works(self, search_repository):
"""Boolean search operations should continue to work."""
# These should not crash and should respect boolean logic
results1 = await search_repository.search(search_text="hello AND world")
assert isinstance(results1, list)
results2 = await search_repository.search(search_text="cat OR dog")
assert isinstance(results2, list)
results3 = await search_repository.search(search_text="project NOT meeting")
assert isinstance(results3, list)
@pytest.mark.asyncio
async def test_permalink_match_exact_with_slash(self, search_repository):
"""Test exact permalink matching with slash (line 249 coverage)."""
# This tests the exact match path: if "/" in permalink_text:
results = await search_repository.search(permalink_match="test/path")
assert isinstance(results, list)
# Should use exact equality matching for paths with slashes
@pytest.mark.asyncio
async def test_permalink_match_simple_term(self, search_repository):
"""Test permalink matching with simple term (no slash)."""
# This tests the simple term path that goes through _prepare_search_term
results = await search_repository.search(permalink_match="simpleterm")
assert isinstance(results, list)
# Should use FTS5 MATCH for simple terms
@pytest.mark.asyncio
async def test_fts5_error_handling_database_error(self, search_repository):
"""Test that non-FTS5 database errors are properly re-raised."""
import unittest.mock
# Mock the scoped_session to raise a non-FTS5 error
with unittest.mock.patch("basic_memory.db.scoped_session") as mock_scoped_session:
mock_session = unittest.mock.AsyncMock()
mock_scoped_session.return_value.__aenter__.return_value = mock_session
# Simulate a database error that's NOT an FTS5 syntax error
mock_session.execute.side_effect = Exception("Database connection failed")
# This should re-raise the exception (not return empty list)
with pytest.raises(Exception, match="Database connection failed"):
await search_repository.search(search_text="test")
@pytest.mark.asyncio
async def test_version_string_search_integration(self, search_repository, search_entity):
"""Integration test: searching for version strings should work without FTS5 errors."""
# Index an entity with version information
search_row = SearchIndexRow(
id=search_entity.id,
type=SearchItemType.ENTITY.value,
title="Basic Memory v0.13.0b2 Release",
content_stems="basic memory version 0.13.0b2 beta release notes features",
content_snippet="Basic Memory v0.13.0b2 is a beta release with new features",
permalink=search_entity.permalink,
file_path=search_entity.file_path,
entity_id=search_entity.id,
metadata={"entity_type": search_entity.entity_type},
created_at=search_entity.created_at,
updated_at=search_entity.updated_at,
project_id=search_repository.project_id,
)
await search_repository.index_item(search_row)
# This should not cause FTS5 syntax errors and should find the entity
results = await search_repository.search(search_text="Basic Memory v0.13.0b2")
assert len(results) == 1
assert results[0].title == "Basic Memory v0.13.0b2 Release"
# Test other version-like patterns
results2 = await search_repository.search(search_text="v0.13.0b2")
assert len(results2) == 1 # Should still find it due to content_stems
# Test with other problematic patterns
results3 = await search_repository.search(search_text="node.js version")
assert isinstance(results3, list) # Should not crash
@pytest.mark.asyncio
async def test_wildcard_only_search(self, search_repository, search_entity):
"""Test that wildcard-only search '*' doesn't cause FTS5 errors (line 243 coverage)."""
# Index an entity for testing
search_row = SearchIndexRow(
id=search_entity.id,
type=SearchItemType.ENTITY.value,
title="Test Entity",
content_stems="test entity content",
content_snippet="This is a test entity",
permalink=search_entity.permalink,
file_path=search_entity.file_path,
entity_id=search_entity.id,
metadata={"entity_type": search_entity.entity_type},
created_at=search_entity.created_at,
updated_at=search_entity.updated_at,
project_id=search_repository.project_id,
)
await search_repository.index_item(search_row)
# Test wildcard-only search - should not crash and should return results
results = await search_repository.search(search_text="*")
assert isinstance(results, list) # Should not crash
assert len(results) >= 1 # Should return all results, including our test entity
# Test empty string search - should also not crash
results_empty = await search_repository.search(search_text="")
assert isinstance(results_empty, list) # Should not crash
# Test whitespace-only search
results_whitespace = await search_repository.search(search_text=" ")
assert isinstance(results_whitespace, list) # Should not crash
def test_boolean_query_empty_parts_coverage(self, search_repository):
"""Test Boolean query parsing with empty parts (line 143 coverage)."""
# Create queries that will result in empty parts after splitting
result1 = search_repository._prepare_boolean_query(
"hello AND AND world"
) # Double operator
assert "hello" in result1 and "world" in result1
result2 = search_repository._prepare_boolean_query(" OR test") # Leading operator
assert "test" in result2
result3 = search_repository._prepare_boolean_query("test OR ") # Trailing operator
assert "test" in result3
def test_parenthetical_term_quote_escaping(self, search_repository):
"""Test quote escaping in parenthetical terms (lines 190-191 coverage)."""
# Test term with quotes that needs escaping
result = search_repository._prepare_parenthetical_term('(say "hello" world)')
# Should escape quotes by doubling them
assert '""hello""' in result
# Test term with single quotes
result2 = search_repository._prepare_parenthetical_term("(it's working)")
assert "it's working" in result2
def test_needs_quoting_empty_input(self, search_repository):
"""Test _needs_quoting with empty inputs (line 207 coverage)."""
# Test empty string
assert not search_repository._needs_quoting("")
# Test whitespace-only string
assert not search_repository._needs_quoting(" ")
# Test None-like cases
assert not search_repository._needs_quoting("\t")
def test_prepare_single_term_empty_input(self, search_repository):
"""Test _prepare_single_term with empty inputs (line 227 coverage)."""
# Test empty string
result1 = search_repository._prepare_single_term("")
assert result1 == ""
# Test whitespace-only string
result2 = search_repository._prepare_single_term(" ")
assert result2 == " " # Should return as-is
# Test string that becomes empty after strip
result3 = search_repository._prepare_single_term("\t\n")
assert result3 == "\t\n" # Should return original
```