This is page 8 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── python-developer.md
│ │ └── system-architect.md
│ └── commands
│ ├── release
│ │ ├── beta.md
│ │ ├── changelog.md
│ │ ├── release-check.md
│ │ └── release.md
│ ├── spec.md
│ └── test-live.md
├── .dockerignore
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ └── template_loader.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── rclone_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ └── tool.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ └── search_repository.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ └── sync_report.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ ├── test_disable_permalinks_integration.py
│ └── test_sync_performance_benchmark.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ └── test_template_loader.py
│ ├── cli
│ │ ├── conftest.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ ├── test_project_add_with_local_path.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── conftest.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_prompts.py
│ │ ├── test_resources.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_db_migration_deduplication.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ ├── test_rclone_commands.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
├── api-performance.md
├── background-relations.md
├── basic-memory-home.md
├── bug-fixes.md
├── chatgpt-integration.md
├── cloud-authentication.md
├── cloud-bisync.md
├── cloud-mode-usage.md
├── cloud-mount.md
├── default-project-mode.md
├── env-file-removal.md
├── env-var-overrides.md
├── explicit-project-parameter.md
├── gitignore-integration.md
├── project-root-env-var.md
├── README.md
└── sqlite-performance.md
```
# Files
--------------------------------------------------------------------------------
/test-int/mcp/test_single_project_mcp_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for single project mode MCP functionality.
Tests the --project constraint feature that restricts MCP server to a single project,
covering project override behavior, project management tool restrictions, and
content tool functionality in constrained mode.
"""
import os
import pytest
from fastmcp import Client
@pytest.mark.asyncio
async def test_project_constraint_override_content_tools(mcp_server, app, test_project):
"""Test that content tools use constrained project even when different project specified."""
# Set up project constraint
os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name
try:
async with Client(mcp_server) as client:
# Try to write to a different project - should be overridden
result = await client.call_tool(
"write_note",
{
"project": "some-other-project", # Should be ignored
"title": "Constraint Test Note",
"folder": "test",
"content": "# Constraint Test\n\nThis should go to the constrained project.",
"tags": "constraint,test",
},
)
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should use the constrained project, not the requested one
assert f"project: {test_project.name}" in response_text
assert "# Created note" in response_text
assert "file_path: test/Constraint Test Note.md" in response_text
assert f"[Session: Using project '{test_project.name}']" in response_text
finally:
# Clean up environment variable
if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
@pytest.mark.asyncio
async def test_project_constraint_read_note_override(mcp_server, app, test_project):
"""Test that read_note also respects project constraint."""
# Set up project constraint
os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name
try:
async with Client(mcp_server) as client:
# First create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Read Test Note",
"folder": "test",
"content": "# Read Test\n\nContent for reading test.",
},
)
# Try to read from different project - should be overridden
result = await client.call_tool(
"read_note",
{
"project": "wrong-project", # Should be ignored
"identifier": "Read Test Note",
},
)
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should find note in constrained project
assert "# Read Test" in response_text
# read_note returns the note content, not a summary with project info
finally:
if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
@pytest.mark.asyncio
async def test_project_constraint_search_notes_override(mcp_server, app, test_project):
"""Test that search_notes respects project constraint."""
# Set up project constraint
os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name
try:
async with Client(mcp_server) as client:
# First create a searchable note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Searchable Note",
"folder": "test",
"content": "# Searchable\n\nThis content has unique searchable terms.",
},
)
# Try to search in different project - should be overridden
result = await client.call_tool(
"search_notes",
{
"project": "different-project", # Should be ignored
"query": "searchable terms",
},
)
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should find results in constrained project
assert "Searchable Note" in response_text
# search_notes returns search results, check if it found the note
finally:
if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
@pytest.mark.asyncio
async def test_list_projects_constrained_mode(mcp_server, app, test_project):
"""Test that list_memory_projects shows only constrained project."""
# Set up project constraint
os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name
try:
async with Client(mcp_server) as client:
result = await client.call_tool("list_memory_projects", {})
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should show constraint message
assert "MCP server is constrained to a single project" in response_text
finally:
if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
@pytest.mark.asyncio
async def test_create_project_disabled_in_constrained_mode(mcp_server, app, test_project):
"""Test that create_memory_project is disabled when server is constrained."""
# Set up project constraint
os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name
try:
async with Client(mcp_server) as client:
result = await client.call_tool(
"create_memory_project",
{"project_name": "new-project", "project_path": "/tmp/new-project"},
)
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should show error message
assert "# Error" in response_text
assert "Project creation disabled" in response_text
assert f"constrained to project '{test_project.name}'" in response_text
assert "Use the CLI to create projects:" in response_text
assert 'basic-memory project add "new-project" "/tmp/new-project"' in response_text
finally:
if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
@pytest.mark.asyncio
async def test_delete_project_disabled_in_constrained_mode(mcp_server, app, test_project):
"""Test that delete_project is disabled when server is constrained."""
# Set up project constraint
os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name
try:
async with Client(mcp_server) as client:
result = await client.call_tool("delete_project", {"project_name": "some-project"})
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should show error message
assert "# Error" in response_text
assert "Project deletion disabled" in response_text
assert f"constrained to project '{test_project.name}'" in response_text
assert "Use the CLI to delete projects:" in response_text
assert 'basic-memory project remove "some-project"' in response_text
finally:
if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
@pytest.mark.asyncio
async def test_normal_mode_without_constraint(mcp_server, app, test_project):
"""Test that tools work normally when no constraint is set."""
# Ensure no constraint is set
if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
async with Client(mcp_server) as client:
# Test write_note works with explicit project
result = await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Normal Mode Note",
"folder": "test",
"content": "# Normal Mode\n\nThis should work normally.",
},
)
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert f"project: {test_project.name}" in response_text
assert "# Created note" in response_text
# Test list_memory_projects works normally
list_result = await client.call_tool("list_memory_projects", {})
list_text = list_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should not show constraint message
assert "MCP server is constrained to a single project" not in list_text
@pytest.mark.asyncio
async def test_constraint_with_multiple_content_tools(mcp_server, app, test_project):
"""Test that constraint works across multiple different content tools."""
# Set up project constraint
os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name
try:
async with Client(mcp_server) as client:
# Test write_note
write_result = await client.call_tool(
"write_note",
{
"project": "wrong-project",
"title": "Multi Tool Test",
"folder": "test",
"content": "# Multi Tool Test\n\n- [note] Testing multiple tools",
},
)
assert f"project: {test_project.name}" in write_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Test read_note
read_result = await client.call_tool(
"read_note", {"project": "another-wrong-project", "identifier": "Multi Tool Test"}
)
# Should successfully find the note (proving constraint worked)
assert "# Multi Tool Test" in read_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Test search_notes
search_result = await client.call_tool(
"search_notes", {"project": "yet-another-wrong-project", "query": "multiple tools"}
)
# Should find results (proving constraint worked)
assert "Multi Tool Test" in search_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
finally:
if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
@pytest.mark.asyncio
async def test_constraint_environment_cleanup(mcp_server, app, test_project):
"""Test that removing constraint restores normal behavior."""
# Set constraint
os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name
async with Client(mcp_server) as client:
# Verify constraint is active
constrained_result = await client.call_tool("list_memory_projects", {})
assert "MCP server is constrained to a single project" in constrained_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Remove constraint
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
# Verify normal behavior is restored
normal_result = await client.call_tool("list_memory_projects", {})
normal_text = normal_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert "constrained to a single project" not in normal_text
@pytest.mark.asyncio
async def test_constraint_with_invalid_project_override(mcp_server, app, test_project):
"""Test constraint behavior when trying to override with invalid project names."""
# Set up project constraint
os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name
try:
async with Client(mcp_server) as client:
# Try various invalid project names - should all be overridden
invalid_projects = [
"non-existent-project",
"",
"project-with-special-chars!@#",
"a" * 100, # Very long name
]
for i, invalid_project in enumerate(invalid_projects):
result = await client.call_tool(
"write_note",
{
"project": invalid_project,
"title": f"Test Invalid {i} {invalid_project[:5]}",
"folder": "test",
"content": f"Testing with invalid project: {invalid_project}",
},
)
# Should still use the constrained project
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert f"project: {test_project.name}" in response_text
# Should create or update successfully
assert "# Created note" in response_text or "# Updated note" in response_text
finally:
if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
```
--------------------------------------------------------------------------------
/tests/services/test_link_resolver.py:
--------------------------------------------------------------------------------
```python
"""Tests for link resolution service."""
from datetime import datetime, timezone
import pytest
import pytest_asyncio
from basic_memory.schemas.base import Entity as EntitySchema
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.models.knowledge import Entity as EntityModel
@pytest_asyncio.fixture
async def test_entities(entity_service, file_service):
"""Create a set of test entities.
├── components
│ ├── Auth Service.md
│ └── Core Service.md
├── config
│ └── Service Config.md
└── specs
└── Core Features.md
"""
e1, _ = await entity_service.create_or_update_entity(
EntitySchema(
title="Core Service",
entity_type="component",
folder="components",
project=entity_service.repository.project_id,
)
)
e2, _ = await entity_service.create_or_update_entity(
EntitySchema(
title="Service Config",
entity_type="config",
folder="config",
project=entity_service.repository.project_id,
)
)
e3, _ = await entity_service.create_or_update_entity(
EntitySchema(
title="Auth Service",
entity_type="component",
folder="components",
project=entity_service.repository.project_id,
)
)
e4, _ = await entity_service.create_or_update_entity(
EntitySchema(
title="Core Features",
entity_type="specs",
folder="specs",
project=entity_service.repository.project_id,
)
)
e5, _ = await entity_service.create_or_update_entity(
EntitySchema(
title="Sub Features 1",
entity_type="specs",
folder="specs/subspec",
project=entity_service.repository.project_id,
)
)
e6, _ = await entity_service.create_or_update_entity(
EntitySchema(
title="Sub Features 2",
entity_type="specs",
folder="specs/subspec",
project=entity_service.repository.project_id,
)
)
# non markdown entity
e7 = await entity_service.repository.add(
EntityModel(
title="Image.png",
entity_type="file",
content_type="image/png",
file_path="Image.png",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
project_id=entity_service.repository.project_id,
)
)
e8 = await entity_service.create_entity( # duplicate title
EntitySchema(
title="Core Service",
entity_type="component",
folder="components2",
project=entity_service.repository.project_id,
)
)
return [e1, e2, e3, e4, e5, e6, e7, e8]
@pytest_asyncio.fixture
async def link_resolver(entity_repository, search_service, test_entities):
"""Create LinkResolver instance with indexed test data."""
# Index all test entities
for entity in test_entities:
await search_service.index_entity(entity)
return LinkResolver(entity_repository, search_service)
@pytest.mark.asyncio
async def test_exact_permalink_match(link_resolver, test_entities):
"""Test resolving a link that exactly matches a permalink."""
entity = await link_resolver.resolve_link("components/core-service")
assert entity.permalink == "components/core-service"
@pytest.mark.asyncio
async def test_exact_title_match(link_resolver, test_entities):
"""Test resolving a link that matches an entity title."""
entity = await link_resolver.resolve_link("Core Service")
assert entity.permalink == "components/core-service"
@pytest.mark.asyncio
async def test_duplicate_title_match(link_resolver, test_entities):
"""Test resolving a link that matches an entity title."""
entity = await link_resolver.resolve_link("Core Service")
assert entity.permalink == "components/core-service"
@pytest.mark.asyncio
async def test_fuzzy_title_partial_match(link_resolver):
# Test partial match
result = await link_resolver.resolve_link("Auth Serv")
assert result is not None, "Did not find partial match"
assert result.permalink == "components/auth-service"
@pytest.mark.asyncio
async def test_fuzzy_title_exact_match(link_resolver):
# Test partial match
result = await link_resolver.resolve_link("auth-service")
assert result.permalink == "components/auth-service"
@pytest.mark.asyncio
async def test_link_text_normalization(link_resolver):
"""Test link text normalization."""
# Basic normalization
text, alias = link_resolver._normalize_link_text("[[Core Service]]")
assert text == "Core Service"
assert alias is None
# With alias
text, alias = link_resolver._normalize_link_text("[[Core Service|Main Service]]")
assert text == "Core Service"
assert alias == "Main Service"
# Extra whitespace
text, alias = link_resolver._normalize_link_text(" [[ Core Service | Main Service ]] ")
assert text == "Core Service"
assert alias == "Main Service"
@pytest.mark.asyncio
async def test_resolve_none(link_resolver):
"""Test resolving non-existent entity."""
# Basic new entity
assert await link_resolver.resolve_link("New Feature") is None
@pytest.mark.asyncio
async def test_resolve_file(link_resolver):
"""Test resolving non-existent entity."""
# Basic new entity
resolved = await link_resolver.resolve_link("Image.png")
assert resolved is not None
assert resolved.entity_type == "file"
assert resolved.title == "Image.png"
@pytest.mark.asyncio
async def test_folder_title_pattern_with_md_extension(link_resolver, test_entities):
"""Test resolving folder/title patterns that need .md extension added.
This tests the new logic added in step 4 of resolve_link that handles
patterns like 'folder/title' by trying 'folder/title.md' as file path.
"""
# Test folder/title pattern for markdown entities
# "components/Core Service" should resolve to file path "components/Core Service.md"
entity = await link_resolver.resolve_link("components/Core Service")
assert entity is not None
assert entity.permalink == "components/core-service"
assert entity.file_path == "components/Core Service.md"
# Test with different entity
entity = await link_resolver.resolve_link("config/Service Config")
assert entity is not None
assert entity.permalink == "config/service-config"
assert entity.file_path == "config/Service Config.md"
# Test with nested folder structure
entity = await link_resolver.resolve_link("specs/subspec/Sub Features 1")
assert entity is not None
assert entity.permalink == "specs/subspec/sub-features-1"
assert entity.file_path == "specs/subspec/Sub Features 1.md"
# Test that it doesn't try to add .md to things that already have it
entity = await link_resolver.resolve_link("components/Core Service.md")
assert entity is not None
assert entity.permalink == "components/core-service"
# Test that it doesn't try to add .md to single words (no slash)
entity = await link_resolver.resolve_link("NonExistent")
assert entity is None
# Test that it doesn't interfere with exact permalink matches
entity = await link_resolver.resolve_link("components/core-service")
assert entity is not None
assert entity.permalink == "components/core-service"
# Tests for strict mode parameter combinations
@pytest.mark.asyncio
async def test_strict_mode_parameter_combinations(link_resolver, test_entities):
"""Test all combinations of use_search and strict parameters."""
# Test queries
exact_match = "Auth Service" # Should always work (unique title)
fuzzy_match = "Auth Serv" # Should only work with fuzzy search enabled
non_existent = "Does Not Exist" # Should never work
# Case 1: use_search=True, strict=False (default behavior - fuzzy matching allowed)
result = await link_resolver.resolve_link(exact_match, use_search=True, strict=False)
assert result is not None
assert result.permalink == "components/auth-service"
result = await link_resolver.resolve_link(fuzzy_match, use_search=True, strict=False)
assert result is not None # Should find "Auth Service" via fuzzy matching
assert result.permalink == "components/auth-service"
result = await link_resolver.resolve_link(non_existent, use_search=True, strict=False)
assert result is None
# Case 2: use_search=True, strict=True (exact matches only, even with search enabled)
result = await link_resolver.resolve_link(exact_match, use_search=True, strict=True)
assert result is not None
assert result.permalink == "components/auth-service"
result = await link_resolver.resolve_link(fuzzy_match, use_search=True, strict=True)
assert result is None # Should NOT find via fuzzy matching in strict mode
result = await link_resolver.resolve_link(non_existent, use_search=True, strict=True)
assert result is None
# Case 3: use_search=False, strict=False (no search, exact repository matches only)
result = await link_resolver.resolve_link(exact_match, use_search=False, strict=False)
assert result is not None
assert result.permalink == "components/auth-service"
result = await link_resolver.resolve_link(fuzzy_match, use_search=False, strict=False)
assert result is None # No search means no fuzzy matching
result = await link_resolver.resolve_link(non_existent, use_search=False, strict=False)
assert result is None
# Case 4: use_search=False, strict=True (redundant but should work same as case 3)
result = await link_resolver.resolve_link(exact_match, use_search=False, strict=True)
assert result is not None
assert result.permalink == "components/auth-service"
result = await link_resolver.resolve_link(fuzzy_match, use_search=False, strict=True)
assert result is None # No search means no fuzzy matching
result = await link_resolver.resolve_link(non_existent, use_search=False, strict=True)
assert result is None
@pytest.mark.asyncio
async def test_exact_match_types_in_strict_mode(link_resolver, test_entities):
"""Test that all types of exact matches work in strict mode."""
# 1. Exact permalink match
result = await link_resolver.resolve_link("components/core-service", strict=True)
assert result is not None
assert result.permalink == "components/core-service"
# 2. Exact title match
result = await link_resolver.resolve_link("Core Service", strict=True)
assert result is not None
assert result.permalink == "components/core-service"
# 3. Exact file path match
result = await link_resolver.resolve_link("components/Core Service.md", strict=True)
assert result is not None
assert result.permalink == "components/core-service"
# 4. Folder/title pattern with .md extension added
result = await link_resolver.resolve_link("components/Core Service", strict=True)
assert result is not None
assert result.permalink == "components/core-service"
# 5. Non-markdown file (Image.png)
result = await link_resolver.resolve_link("Image.png", strict=True)
assert result is not None
assert result.title == "Image.png"
@pytest.mark.asyncio
async def test_fuzzy_matching_blocked_in_strict_mode(link_resolver, test_entities):
"""Test that various fuzzy matching scenarios are blocked in strict mode."""
# Partial matches that would work in normal mode
fuzzy_queries = [
"Auth Serv", # Partial title
"auth-service", # Lowercase permalink variation
"Core", # Single word from title
"Service", # Common word
"Serv", # Partial word
]
for query in fuzzy_queries:
# Should NOT work in strict mode
strict_result = await link_resolver.resolve_link(query, strict=True)
assert strict_result is None, f"Query '{query}' should return None in strict mode"
@pytest.mark.asyncio
async def test_link_normalization_with_strict_mode(link_resolver, test_entities):
"""Test that link normalization still works in strict mode."""
# Test bracket removal and alias handling in strict mode
queries_and_expected = [
("[[Core Service]]", "components/core-service"),
("[[Core Service|Main]]", "components/core-service"), # Alias should be ignored
(" [[ Core Service ]] ", "components/core-service"), # Extra whitespace
]
for query, expected_permalink in queries_and_expected:
result = await link_resolver.resolve_link(query, strict=True)
assert result is not None, f"Query '{query}' should find entity in strict mode"
assert result.permalink == expected_permalink
@pytest.mark.asyncio
async def test_duplicate_title_handling_in_strict_mode(link_resolver, test_entities):
"""Test how duplicate titles are handled in strict mode."""
# "Core Service" appears twice in test data (components/core-service and components2/core-service)
# In strict mode, if there are multiple exact title matches, it should still return the first one
# (same behavior as normal mode for exact matches)
result = await link_resolver.resolve_link("Core Service", strict=True)
assert result is not None
# Should return the first match (components/core-service based on test fixture order)
assert result.permalink == "components/core-service"
```
--------------------------------------------------------------------------------
/tests/api/test_directory_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for the directory router API endpoints."""
from unittest.mock import patch
import pytest
from basic_memory.schemas.directory import DirectoryNode
@pytest.mark.asyncio
async def test_get_directory_tree_endpoint(test_graph, client, project_url):
"""Test the get_directory_tree endpoint returns correctly structured data."""
# Call the endpoint
response = await client.get(f"{project_url}/directory/tree")
# Verify response
assert response.status_code == 200
data = response.json()
# Check that the response is a valid directory tree
assert "name" in data
assert "directory_path" in data
assert "children" in data
assert "type" in data
# The root node should have children
assert isinstance(data["children"], list)
# Root name should be the project name or similar
assert data["name"]
# Root directory_path should be a string
assert isinstance(data["directory_path"], str)
@pytest.mark.asyncio
async def test_get_directory_tree_structure(test_graph, client, project_url):
"""Test the structure of the directory tree returned by the endpoint."""
# Call the endpoint
response = await client.get(f"{project_url}/directory/tree")
# Verify response
assert response.status_code == 200
data = response.json()
# Function to recursively check each node in the tree
def check_node_structure(node):
assert "name" in node
assert "directory_path" in node
assert "children" in node
assert "type" in node
assert isinstance(node["children"], list)
# Check each child recursively
for child in node["children"]:
check_node_structure(child)
# Check the entire tree structure
check_node_structure(data)
@pytest.mark.asyncio
async def test_get_directory_tree_mocked(client, project_url):
"""Test the get_directory_tree endpoint with a mocked service."""
# Create a mock directory tree
mock_tree = DirectoryNode(
name="root",
directory_path="/test",
type="directory",
children=[
DirectoryNode(
name="folder1",
directory_path="/test/folder1",
type="directory",
children=[
DirectoryNode(
name="subfolder",
directory_path="/test/folder1/subfolder",
type="directory",
children=[],
)
],
),
DirectoryNode(
name="folder2", directory_path="/test/folder2", type="directory", children=[]
),
],
)
# Patch the directory service
with patch(
"basic_memory.services.directory_service.DirectoryService.get_directory_tree",
return_value=mock_tree,
):
# Call the endpoint
response = await client.get(f"{project_url}/directory/tree")
# Verify response
assert response.status_code == 200
data = response.json()
# Check structure matches our mock
assert data["name"] == "root"
assert data["directory_path"] == "/test"
assert data["type"] == "directory"
assert len(data["children"]) == 2
# Check first child
folder1 = data["children"][0]
assert folder1["name"] == "folder1"
assert folder1["directory_path"] == "/test/folder1"
assert folder1["type"] == "directory"
assert len(folder1["children"]) == 1
# Check subfolder
subfolder = folder1["children"][0]
assert subfolder["name"] == "subfolder"
assert subfolder["directory_path"] == "/test/folder1/subfolder"
assert subfolder["type"] == "directory"
assert subfolder["children"] == []
# Check second child
folder2 = data["children"][1]
assert folder2["name"] == "folder2"
assert folder2["directory_path"] == "/test/folder2"
assert folder2["type"] == "directory"
assert folder2["children"] == []
@pytest.mark.asyncio
async def test_list_directory_endpoint_default(test_graph, client, project_url):
"""Test the list_directory endpoint with default parameters."""
# Call the endpoint with default parameters
response = await client.get(f"{project_url}/directory/list")
# Verify response
assert response.status_code == 200
data = response.json()
# Should return a list
assert isinstance(data, list)
# With test_graph, should return the "test" directory
assert len(data) == 1
assert data[0]["name"] == "test"
assert data[0]["type"] == "directory"
@pytest.mark.asyncio
async def test_list_directory_endpoint_specific_path(test_graph, client, project_url):
"""Test the list_directory endpoint with specific directory path."""
# Call the endpoint with /test directory
response = await client.get(f"{project_url}/directory/list?dir_name=/test")
# Verify response
assert response.status_code == 200
data = response.json()
# Should return list of files in test directory
assert isinstance(data, list)
assert len(data) == 5
# All should be files (no subdirectories in test_graph)
for item in data:
assert item["type"] == "file"
assert item["name"].endswith(".md")
@pytest.mark.asyncio
async def test_list_directory_endpoint_with_glob(test_graph, client, project_url):
"""Test the list_directory endpoint with glob filtering."""
# Call the endpoint with glob filter
response = await client.get(
f"{project_url}/directory/list?dir_name=/test&file_name_glob=*Connected*"
)
# Verify response
assert response.status_code == 200
data = response.json()
# Should return only Connected Entity files
assert isinstance(data, list)
assert len(data) == 2
file_names = {item["name"] for item in data}
assert file_names == {"Connected Entity 1.md", "Connected Entity 2.md"}
@pytest.mark.asyncio
async def test_list_directory_endpoint_with_depth(test_graph, client, project_url):
"""Test the list_directory endpoint with depth control."""
# Test depth=1 (default)
response_depth_1 = await client.get(f"{project_url}/directory/list?dir_name=/&depth=1")
assert response_depth_1.status_code == 200
data_depth_1 = response_depth_1.json()
assert len(data_depth_1) == 1 # Just the test directory
# Test depth=2 (should include files in test directory)
response_depth_2 = await client.get(f"{project_url}/directory/list?dir_name=/&depth=2")
assert response_depth_2.status_code == 200
data_depth_2 = response_depth_2.json()
assert len(data_depth_2) == 6 # test directory + 5 files
@pytest.mark.asyncio
async def test_list_directory_endpoint_nonexistent_path(test_graph, client, project_url):
"""Test the list_directory endpoint with nonexistent directory."""
# Call the endpoint with nonexistent directory
response = await client.get(f"{project_url}/directory/list?dir_name=/nonexistent")
# Verify response
assert response.status_code == 200
data = response.json()
# Should return empty list
assert isinstance(data, list)
assert len(data) == 0
@pytest.mark.asyncio
async def test_list_directory_endpoint_validation_errors(client, project_url):
"""Test the list_directory endpoint with invalid parameters."""
# Test depth too low
response = await client.get(f"{project_url}/directory/list?depth=0")
assert response.status_code == 422 # Validation error
# Test depth too high
response = await client.get(f"{project_url}/directory/list?depth=11")
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_list_directory_endpoint_mocked(client, project_url):
"""Test the list_directory endpoint with mocked service."""
# Create mock directory nodes
mock_nodes = [
DirectoryNode(
name="folder1",
directory_path="/folder1",
type="directory",
),
DirectoryNode(
name="file1.md",
directory_path="/file1.md",
file_path="file1.md",
type="file",
title="File 1",
permalink="file-1",
),
]
# Patch the directory service
with patch(
"basic_memory.services.directory_service.DirectoryService.list_directory",
return_value=mock_nodes,
):
# Call the endpoint
response = await client.get(f"{project_url}/directory/list?dir_name=/test")
# Verify response
assert response.status_code == 200
data = response.json()
# Check structure matches our mock
assert isinstance(data, list)
assert len(data) == 2
# Check directory
folder = next(item for item in data if item["type"] == "directory")
assert folder["name"] == "folder1"
assert folder["directory_path"] == "/folder1"
# Check file
file_item = next(item for item in data if item["type"] == "file")
assert file_item["name"] == "file1.md"
assert file_item["directory_path"] == "/file1.md"
assert file_item["file_path"] == "file1.md"
assert file_item["title"] == "File 1"
assert file_item["permalink"] == "file-1"
@pytest.mark.asyncio
async def test_get_directory_structure_endpoint(test_graph, client, project_url):
"""Test the get_directory_structure endpoint returns folders only."""
# Call the endpoint
response = await client.get(f"{project_url}/directory/structure")
# Verify response
assert response.status_code == 200
data = response.json()
# Check that the response is a valid directory tree
assert "name" in data
assert "directory_path" in data
assert "children" in data
assert "type" in data
assert data["type"] == "directory"
# Root should be present
assert data["name"] == "Root"
assert data["directory_path"] == "/"
# Should have the test directory
assert len(data["children"]) == 1
test_dir = data["children"][0]
assert test_dir["name"] == "test"
assert test_dir["type"] == "directory"
assert test_dir["directory_path"] == "/test"
# Should NOT have any files (test_graph has files but no subdirectories)
assert len(test_dir["children"]) == 0
# Verify no file metadata is present in directory nodes
assert test_dir.get("entity_id") is None
assert test_dir.get("content_type") is None
assert test_dir.get("title") is None
assert test_dir.get("permalink") is None
@pytest.mark.asyncio
async def test_get_directory_structure_empty(client, project_url):
"""Test the get_directory_structure endpoint with empty database."""
# Call the endpoint
response = await client.get(f"{project_url}/directory/structure")
# Verify response
assert response.status_code == 200
data = response.json()
# Should return root with no children
assert data["name"] == "Root"
assert data["directory_path"] == "/"
assert data["type"] == "directory"
assert len(data["children"]) == 0
@pytest.mark.asyncio
async def test_get_directory_structure_mocked(client, project_url):
"""Test the get_directory_structure endpoint with mocked service."""
# Create a mock directory structure (folders only, no files)
mock_structure = DirectoryNode(
name="Root",
directory_path="/",
type="directory",
children=[
DirectoryNode(
name="docs",
directory_path="/docs",
type="directory",
children=[
DirectoryNode(
name="guides",
directory_path="/docs/guides",
type="directory",
children=[],
),
DirectoryNode(
name="api",
directory_path="/docs/api",
type="directory",
children=[],
),
],
),
DirectoryNode(name="specs", directory_path="/specs", type="directory", children=[]),
],
)
# Patch the directory service
with patch(
"basic_memory.services.directory_service.DirectoryService.get_directory_structure",
return_value=mock_structure,
):
# Call the endpoint
response = await client.get(f"{project_url}/directory/structure")
# Verify response
assert response.status_code == 200
data = response.json()
# Check structure matches our mock (folders only)
assert data["name"] == "Root"
assert data["directory_path"] == "/"
assert data["type"] == "directory"
assert len(data["children"]) == 2
# Check docs directory
docs = data["children"][0]
assert docs["name"] == "docs"
assert docs["directory_path"] == "/docs"
assert docs["type"] == "directory"
assert len(docs["children"]) == 2
# Check subdirectories
guides = docs["children"][0]
assert guides["name"] == "guides"
assert guides["directory_path"] == "/docs/guides"
assert guides["type"] == "directory"
assert guides["children"] == []
api = docs["children"][1]
assert api["name"] == "api"
assert api["directory_path"] == "/docs/api"
assert api["type"] == "directory"
assert api["children"] == []
# Check specs directory
specs = data["children"][1]
assert specs["name"] == "specs"
assert specs["directory_path"] == "/specs"
assert specs["type"] == "directory"
assert specs["children"] == []
```
--------------------------------------------------------------------------------
/test-int/mcp/test_default_project_mode_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for default project mode functionality.
Tests the default_project_mode configuration that allows tools to automatically
use the default_project when no project parameter is specified, covering
parameter resolution hierarchy and mode-specific behavior.
"""
import os
from pathlib import Path
import pytest
from fastmcp import Client
from unittest.mock import patch
from basic_memory.config import ConfigManager, BasicMemoryConfig
@pytest.mark.asyncio
async def test_default_project_mode_enabled_write_note(mcp_server, app, test_project):
"""Test that write_note uses default project when default_project_mode=true and no project specified."""
# Mock config with default_project_mode enabled
mock_config = BasicMemoryConfig(
default_project=test_project.name,
default_project_mode=True,
projects={test_project.name: test_project.path},
)
with patch.object(ConfigManager, "config", mock_config):
async with Client(mcp_server) as client:
# Call write_note without project parameter
result = await client.call_tool(
"write_note",
{
"title": "Default Mode Test",
"folder": "test",
"content": "# Default Mode Test\n\nThis should use the default project automatically.",
"tags": "default,mode,test",
},
)
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should use the default project
assert f"project: {test_project.name}" in response_text
assert "# Created note" in response_text
assert "file_path: test/Default Mode Test.md" in response_text
assert f"[Session: Using project '{test_project.name}']" in response_text
@pytest.mark.asyncio
async def test_default_project_mode_explicit_override(
mcp_server, app, test_project, config_home, engine_factory
):
"""Test that explicit project parameter overrides default_project_mode."""
# Create a second project for testing override
engine, session_maker = engine_factory
from basic_memory.repository.project_repository import ProjectRepository
project_repository = ProjectRepository(session_maker)
other_project = await project_repository.create(
{
"name": "other-project",
"description": "Second project for testing",
"path": str(config_home / "other-project"),
"is_active": True,
"is_default": False,
}
)
# Mock config with default_project_mode enabled pointing to test_project
mock_config = BasicMemoryConfig(
default_project=test_project.name,
default_project_mode=True,
projects={test_project.name: test_project.path, other_project.name: other_project.path},
)
with patch.object(ConfigManager, "config", mock_config):
async with Client(mcp_server) as client:
# Call write_note with explicit project parameter (should override default)
result = await client.call_tool(
"write_note",
{
"title": "Override Test",
"folder": "test",
"content": "# Override Test\n\nThis should go to the explicitly specified project.",
"project": other_project.name, # Explicit override
},
)
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should use the explicitly specified project, not default
assert f"project: {other_project.name}" in response_text
assert "# Created note" in response_text
assert f"[Session: Using project '{other_project.name}']" in response_text
@pytest.mark.asyncio
async def test_default_project_mode_disabled_requires_project(mcp_server, app, test_project):
"""Test that tools require project parameter when default_project_mode=false."""
# Mock config with default_project_mode disabled
mock_config = BasicMemoryConfig(
default_project=test_project.name,
default_project_mode=False, # Disabled
projects={test_project.name: test_project.path},
)
with patch.object(ConfigManager, "config", mock_config):
async with Client(mcp_server) as client:
# Call write_note without project parameter - should fail
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"write_note",
{
"title": "Should Fail",
"folder": "test",
"content": "# Should Fail\n\nThis should fail because no project specified.",
},
)
# Should get an error about missing project
error_message = str(exc_info.value)
assert (
"No project specified" in error_message
or "project parameter" in error_message.lower()
)
@pytest.mark.asyncio
async def test_cli_constraint_overrides_default_project_mode(
mcp_server, app, test_project, config_home, engine_factory
):
"""Test that CLI --project constraint overrides default_project_mode."""
# Create a different project for CLI constraint
engine, session_maker = engine_factory
from basic_memory.repository.project_repository import ProjectRepository
project_repository = ProjectRepository(session_maker)
other_project = await project_repository.create(
{
"name": "cli-project",
"description": "Project for CLI constraint testing",
"path": str(config_home / "cli-project"),
"is_active": True,
"is_default": False,
}
)
# Set up CLI project constraint (highest priority)
os.environ["BASIC_MEMORY_MCP_PROJECT"] = other_project.name
# Mock config with default_project_mode enabled pointing to test_project
mock_config = BasicMemoryConfig(
default_project=test_project.name,
default_project_mode=True,
projects={test_project.name: test_project.path, other_project.name: other_project.path},
)
try:
with patch.object(ConfigManager, "config", mock_config):
async with Client(mcp_server) as client:
# Call write_note without project parameter
result = await client.call_tool(
"write_note",
{
"title": "CLI Constraint Test",
"folder": "test",
"content": "# CLI Constraint Test\n\nThis should use CLI constrained project.",
},
)
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should use CLI constrained project, not default project
assert f"project: {other_project.name}" in response_text
assert "# Created note" in response_text
assert f"[Session: Using project '{other_project.name}']" in response_text
finally:
# Clean up environment variable
if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
@pytest.mark.asyncio
async def test_default_project_mode_read_note(mcp_server, app, test_project):
"""Test that read_note works with default_project_mode."""
# Mock config with default_project_mode enabled
mock_config = BasicMemoryConfig(
default_project=test_project.name,
default_project_mode=True,
projects={test_project.name: test_project.path},
)
with patch.object(ConfigManager, "config", mock_config):
async with Client(mcp_server) as client:
# First create a note
await client.call_tool(
"write_note",
{
"title": "Read Test Note",
"folder": "test",
"content": "# Read Test Note\n\nThis note will be read back.",
},
)
# Now read it back without specifying project
result = await client.call_tool(
"read_note",
{
"identifier": "Read Test Note",
},
)
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should successfully read the note
assert "# Read Test Note" in response_text
assert "This note will be read back." in response_text
@pytest.mark.asyncio
async def test_default_project_mode_edit_note(mcp_server, app, test_project):
"""Test that edit_note works with default_project_mode."""
# Mock config with default_project_mode enabled
mock_config = BasicMemoryConfig(
default_project=test_project.name,
default_project_mode=True,
projects={test_project.name: test_project.path},
)
with patch.object(ConfigManager, "config", mock_config):
async with Client(mcp_server) as client:
# First create a note
await client.call_tool(
"write_note",
{
"title": "Edit Test Note",
"folder": "test",
"content": "# Edit Test Note\n\nOriginal content.",
},
)
# Now edit it without specifying project
result = await client.call_tool(
"edit_note",
{
"identifier": "Edit Test Note",
"operation": "append",
"content": "\n\n## Added Content\n\nThis was added via edit_note.",
},
)
assert len(result.content) == 1
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should successfully edit the note
assert "# Edited note" in response_text
assert "operation: Added" in response_text
@pytest.mark.asyncio
async def test_project_resolution_hierarchy(
mcp_server, app, test_project, config_home, engine_factory
):
"""Test the complete three-tier project resolution hierarchy."""
# Create projects for testing
engine, session_maker = engine_factory
from basic_memory.repository.project_repository import ProjectRepository
project_repository = ProjectRepository(session_maker)
default_project = test_project
cli_project = await project_repository.create(
{
"name": "cli-hierarchy-project",
"description": "Project for CLI hierarchy testing",
"path": str(config_home / "cli-hierarchy-project"),
"is_active": True,
"is_default": False,
}
)
explicit_project = await project_repository.create(
{
"name": "explicit-hierarchy-project",
"description": "Project for explicit hierarchy testing",
"path": str(config_home / "explicit-hierarchy-project"),
"is_active": True,
"is_default": False,
}
)
# Mock config with default_project_mode enabled
mock_config = BasicMemoryConfig(
default_project=default_project.name,
default_project_mode=True,
projects={
default_project.name: Path(default_project.path).as_posix(),
cli_project.name: Path(cli_project.path).as_posix(),
explicit_project.name: Path(explicit_project.path).as_posix(),
},
)
# Test 1: CLI constraint (highest priority)
os.environ["BASIC_MEMORY_MCP_PROJECT"] = cli_project.name
try:
with patch.object(ConfigManager, "config", mock_config):
async with Client(mcp_server) as client:
result = await client.call_tool(
"write_note",
{
"title": "CLI Priority Test",
"folder": "test",
"content": "# CLI Priority Test",
"project": explicit_project.name, # Should be ignored
},
)
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert f"project: {cli_project.name}" in response_text
finally:
del os.environ["BASIC_MEMORY_MCP_PROJECT"]
# Test 2: Explicit project (medium priority)
with patch.object(ConfigManager, "config", mock_config):
async with Client(mcp_server) as client:
result = await client.call_tool(
"write_note",
{
"title": "Explicit Priority Test",
"folder": "test",
"content": "# Explicit Priority Test",
"project": explicit_project.name,
},
)
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert f"project: {explicit_project.name}" in response_text
# Test 3: Default project (lowest priority)
with patch.object(ConfigManager, "config", mock_config):
async with Client(mcp_server) as client:
result = await client.call_tool(
"write_note",
{
"title": "Default Priority Test",
"folder": "test",
"content": "# Default Priority Test",
# No project specified
},
)
response_text = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert f"project: {default_project.name}" in response_text
```
--------------------------------------------------------------------------------
/specs/SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-9-1 Follow-Ups: Conflict, Sync, and Observability'
type: tasklist
permalink: specs/spec-9-follow-ups-conflict-sync-and-observability
related: specs/spec-9-multi-project-bisync
status: revised
revision_date: 2025-10-03
---
# SPEC-9-1 Follow-Ups: Conflict, Sync, and Observability
**REVISED 2025-10-03:** Simplified to leverage rclone built-ins instead of custom conflict handling.
**Context:** SPEC-9 delivered multi-project bidirectional sync and a unified CLI. This follow-up focuses on **observability and safety** using rclone's built-in capabilities rather than reinventing conflict handling.
**Design Philosophy: "Be Dumb Like Git"**
- Let rclone bisync handle conflict detection (it already does this)
- Make conflicts visible and recoverable, don't prevent them
- Cloud is always the winner on conflict (cloud-primary model)
- Users who want version history can just use Git locally in their sync directory
**What Changed from Original Version:**
- **Replaced:** Custom `.bmmeta` sidecars → Use rclone's `.bisync/` state tracking
- **Replaced:** Custom conflict detection → Use rclone bisync 3-way merge
- **Replaced:** Tombstone files → rclone delete tracking handles this
- **Replaced:** Distributed lease → Local process lock only (document multi-device warning)
- **Replaced:** S3 versioning service → Users just use Git locally if they want history
- **Deferred:** SPEC-14 Git integration → Postponed to teams/multi-user features
## ✅ Now
- [ ] **Local process lock**: Prevent concurrent bisync runs on same device (`~/.basic-memory/sync.lock`)
- [ ] **Structured sync reports**: Parse rclone bisync output into JSON reports (creates/updates/deletes/conflicts, bytes, duration); `bm sync --report`
- [ ] **Multi-device warning**: Document that users should not run `--watch` on multiple devices simultaneously
- [ ] **Version control guidance**: Document pattern for users to use Git locally in their sync directory if they want version history
- [ ] **Docs polish**: cloud-mode toggle, mount↔bisync directory isolation, conflict semantics, quick start, migration guide, short demo clip/GIF
## 🔜 Next
- [ ] **Observability commands**: `bm conflicts list`, `bm sync history` to view sync reports and conflicts
- [ ] **Conflict resolution UI**: `bm conflicts resolve <file>` to interactively pick winner from conflict files
- [ ] **Selective sync**: allow include/exclude by project; per-project profile (safe/balanced/fast)
## 🧭 Later
- [ ] **Near real-time sync**: File watcher → targeted `rclone copy` for individual files (keep bisync as backstop)
- [ ] **Sharing / scoped tokens**: cross-tenant/project access
- [ ] **Bandwidth controls & backpressure**: policy for large repos
- [ ] **Client-side encryption (optional)**: with clear trade-offs
## 📏 Acceptance criteria (for "Now" items)
- [ ] Local process lock prevents concurrent bisync runs on same device
- [ ] rclone bisync conflict files visible and documented (`file.conflict1.md`, `file.conflict2.md`)
- [ ] `bm sync --report` generates parsable JSON with sync statistics
- [ ] Documentation clearly warns about multi-device `--watch` mode
- [ ] Documentation shows users how to use Git locally for version history
## What We're NOT Building (Deferred to rclone)
- ❌ Custom `.bmmeta` sidecars (rclone tracks state in `.bisync/` workdir)
- ❌ Custom conflict detection (rclone bisync already does 3-way merge detection)
- ❌ Tombstone files (S3 versioning + rclone delete tracking handles this)
- ❌ Distributed lease (low probability issue, rclone detects state divergence)
- ❌ Rename/move tracking (rclone has size+modtime heuristics built-in)
## Implementation Summary
**Current State (SPEC-9):**
- ✅ rclone bisync with 3 profiles (safe/balanced/fast)
- ✅ `--max-delete` safety limits (10/25/50 files)
- ✅ `--conflict-resolve=newer` for auto-resolution
- ✅ Watch mode: `bm sync --watch` (60s intervals)
- ✅ Integrity checking: `bm cloud check`
- ✅ Mount vs bisync directory isolation
**What's Needed (This Spec):**
1. **Process lock** - Simple file-based lock in `~/.basic-memory/sync.lock`
2. **Sync reports** - Parse rclone output, save to `~/.basic-memory/sync-history/`
3. **Documentation** - Multi-device warnings, conflict resolution workflow, Git usage pattern
**User Model:**
- Cloud is always the winner on conflict (cloud-primary)
- rclone creates `.conflict` files for divergent edits
- Users who want version history just use Git in their local sync directory
- Users warned: don't run `--watch` on multiple devices
## Decision Rationale & Trade-offs
### Why Trust rclone Instead of Custom Conflict Handling?
**rclone bisync already provides:**
- 3-way merge detection (compares local, remote, and last-known state)
- File state tracking in `.bisync/` workdir (hashes, modtimes)
- Automatic conflict file creation: `file.conflict1.md`, `file.conflict2.md`
- Rename detection via size+modtime heuristics
- Delete tracking (prevents resurrection of deleted files)
- Battle-tested with extensive edge case handling
**What we'd have to build with custom approach:**
- Per-file metadata tracking (`.bmmeta` sidecars)
- 3-way diff algorithm
- Conflict detection logic
- Tombstone files for deletes
- Rename/move detection
- Testing for all edge cases
**Decision:** Use what rclone already does well. Don't reinvent the wheel.
### Why Let Users Use Git Locally Instead of Building Versioning?
**The simplest solution: Just use Git**
Users who want version history can literally just use Git in their sync directory:
```bash
cd ~/basic-memory-cloud-sync/
git init
git add .
git commit -m "backup"
# Push to their own GitHub if they want
git remote add origin [email protected]:user/my-knowledge.git
git push
```
**Why this is perfect:**
- ✅ We build nothing
- ✅ Users who want Git... just use Git
- ✅ Users who don't care... don't need to
- ✅ rclone bisync already handles sync conflicts
- ✅ Users own their data, they can version it however they want (Git, Time Machine, etc.)
**What we'd have to build for S3 versioning:**
- API to enable versioning on Tigris buckets
- **Problem**: Tigris doesn't support S3 bucket versioning
- Restore commands: `bm cloud restore --version-id`
- Version listing: `bm cloud versions <path>`
- Lifecycle policies for version retention
- Documentation and user education
**What we'd have to build for SPEC-14 Git integration:**
- Committer service (daemon watching `/app/data/`)
- Puller service (webhook handler for GitHub pushes)
- Git LFS for large files
- Loop prevention between Git ↔ bisync ↔ local
- Merge conflict handling at TWO layers (rclone + Git)
- Webhook infrastructure and monitoring
**Decision:** Don't build version control. Document the pattern. "The easiest problem to solve is the one you avoid."
**When to revisit:** Teams/multi-user features where server-side version control becomes necessary for collaboration.
### Why No Distributed Lease?
**Low probability issue:**
- Requires user to manually run `bm sync` on multiple devices at exact same time
- Most users run `--watch` on one primary device
- rclone bisync detects state divergence and fails safely
**Safety nets in place:**
- Local process lock prevents concurrent runs on same device
- rclone bisync aborts if bucket state changed during sync
- S3 versioning recovers from any overwrites
- Documentation warns against multi-device `--watch`
**Failure mode:**
```bash
# Device A and B sync simultaneously
Device A: bm sync → succeeds
Device B: bm sync → "Error: path has changed, run --resync"
# User fixes with resync
Device B: bm sync --resync → establishes new baseline
```
**Decision:** Document the issue, add local lock, defer distributed coordination until users report actual problems.
### Cloud-Primary Conflict Model
**User mental model:**
- Cloud is the source of truth (like Dropbox/iCloud)
- Local is working copy
- On conflict: cloud wins, local edits → `.conflict` file
- User manually picks winner
**Why this works:**
- Simpler than bidirectional merge (no automatic resolution risk)
- Matches user expectations from Dropbox
- S3 versioning provides safety net for overwrites
- Clear recovery path: restore from S3 version if needed
**Example workflow:**
```bash
# Edit file on Device A and Device B while offline
# Both devices come online and sync
Device A: bm sync
# → Pushes to cloud first, becomes canonical version
Device B: bm sync
# → Detects conflict
# → Cloud version: work/notes.md
# → Local version: work/notes.md.conflict1
# → User manually merges or picks winner
# Restore if needed
bm cloud restore work/notes.md --version-id abc123
```
## Implementation Details
### 1. Local Process Lock
```python
# ~/.basic-memory/sync.lock
import os
import psutil
from pathlib import Path
class SyncLock:
def __init__(self):
self.lock_file = Path.home() / '.basic-memory' / 'sync.lock'
def acquire(self):
if self.lock_file.exists():
pid = int(self.lock_file.read_text())
if psutil.pid_exists(pid):
raise BisyncError(
f"Sync already running (PID {pid}). "
f"Wait for completion or kill stale process."
)
# Stale lock, remove it
self.lock_file.unlink()
self.lock_file.write_text(str(os.getpid()))
def release(self):
if self.lock_file.exists():
self.lock_file.unlink()
def __enter__(self):
self.acquire()
return self
def __exit__(self, *args):
self.release()
# Usage
with SyncLock():
run_rclone_bisync()
```
### 3. Sync Report Parsing
```python
# Parse rclone bisync output
import json
from datetime import datetime
from pathlib import Path
def parse_sync_report(rclone_output: str, duration: float, exit_code: int) -> dict:
"""Parse rclone bisync output into structured report."""
# rclone bisync outputs lines like:
# "Synching Path1 /local/path with Path2 remote:bucket"
# "- Path1 File was copied to Path2"
# "Bisync successful"
report = {
"timestamp": datetime.now().isoformat(),
"duration_seconds": duration,
"exit_code": exit_code,
"success": exit_code == 0,
"files_created": 0,
"files_updated": 0,
"files_deleted": 0,
"conflicts": [],
"errors": []
}
for line in rclone_output.split('\n'):
if 'was copied to' in line:
report['files_created'] += 1
elif 'was updated in' in line:
report['files_updated'] += 1
elif 'was deleted from' in line:
report['files_deleted'] += 1
elif '.conflict' in line:
report['conflicts'].append(line.strip())
elif 'ERROR' in line:
report['errors'].append(line.strip())
return report
def save_sync_report(report: dict):
"""Save sync report to history."""
history_dir = Path.home() / '.basic-memory' / 'sync-history'
history_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
report_file = history_dir / f'{timestamp}.json'
report_file.write_text(json.dumps(report, indent=2))
# Usage in run_bisync()
start_time = time.time()
result = subprocess.run(bisync_cmd, capture_output=True, text=True)
duration = time.time() - start_time
report = parse_sync_report(result.stdout, duration, result.returncode)
save_sync_report(report)
if report['conflicts']:
console.print(f"[yellow]⚠ {len(report['conflicts'])} conflict(s) detected[/yellow]")
console.print("[dim]Run 'bm conflicts list' to view[/dim]")
```
### 4. User Commands
```bash
# View sync history
bm sync history
# → Lists recent syncs from ~/.basic-memory/sync-history/*.json
# → Shows: timestamp, duration, files changed, conflicts, errors
# View current conflicts
bm conflicts list
# → Scans sync directory for *.conflict* files
# → Shows: file path, conflict versions, timestamps
# Restore from S3 version
bm cloud restore work/notes.md --version-id abc123
# → Uses aws s3api get-object with version-id
# → Downloads to original path
bm cloud restore work/notes.md --timestamp "2025-10-03 14:30"
# → Lists versions, finds closest to timestamp
# → Downloads that version
# List file versions
bm cloud versions work/notes.md
# → Uses aws s3api list-object-versions
# → Shows: version-id, timestamp, size, author
# Interactive conflict resolution
bm conflicts resolve work/notes.md
# → Shows both versions side-by-side
# → Prompts: Keep local, keep cloud, merge manually, restore from S3 version
# → Cleans up .conflict files after resolution
```
## Success Metrics & Monitoring
**Phase 1 (v1) - Basic Safety:**
- [ ] Conflict detection rate < 5% of syncs (measure in telemetry)
- [ ] User can resolve conflicts within 5 minutes (UX testing)
- [ ] Documentation prevents 90% of multi-device issues
**Phase 2 (v2) - Observability:**
- [ ] 80% of users check `bm sync history` when troubleshooting
- [ ] Average time to restore from S3 version < 2 minutes
-
- [ ] Conflict resolution success rate > 95%
**What to measure:**
```python
# Telemetry in sync reports
{
"conflict_rate": conflicts / total_syncs,
"multi_device_collisions": count_state_divergence_errors,
"version_restores": count_restore_operations,
"avg_sync_duration": sum(durations) / count,
"max_delete_trips": count_max_delete_aborts
}
```
**When to add distributed lease:**
- Multi-device collision rate > 5% of syncs
- User complaints about state divergence errors
- Evidence that local lock isn't sufficient
**When to revisit Git (SPEC-14):**
- Teams feature launches (multi-user collaboration)
- Users request commit messages / audit trail
- PR-based review workflow becomes valuable
## Links
- SPEC-9: `specs/spec-9-multi-project-bisync`
- SPEC-14: `specs/spec-14-cloud-git-versioning` (deferred in favor of S3 versioning)
- rclone bisync docs: https://rclone.org/bisync/
- Tigris S3 versioning: https://www.tigrisdata.com/docs/buckets/versioning/
---
**Owner:** <assign> | **Review cadence:** weekly in standup | **Last updated:** 2025-10-03
```
--------------------------------------------------------------------------------
/tests/api/test_resource_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for resource router endpoints."""
import json
from datetime import datetime, timezone
from pathlib import Path
import pytest
from basic_memory.schemas import EntityResponse
from basic_memory.utils import normalize_newlines
@pytest.mark.asyncio
async def test_get_resource_content(client, project_config, entity_repository, project_url):
"""Test getting content by permalink."""
# Create a test file
content = "# Test Content\n\nThis is a test file."
test_file = Path(project_config.home) / "test" / "test.md"
test_file.parent.mkdir(parents=True, exist_ok=True)
test_file.write_text(content)
# Create entity referencing the file
entity = await entity_repository.create(
{
"title": "Test Entity",
"entity_type": "test",
"permalink": "test/test",
"file_path": "test/test.md", # Relative to config.home
"content_type": "text/markdown",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
)
# Test getting the content
response = await client.get(f"{project_url}/resource/{entity.permalink}")
assert response.status_code == 200
assert response.headers["content-type"] == "text/markdown; charset=utf-8"
assert response.text == normalize_newlines(content)
@pytest.mark.asyncio
async def test_get_resource_pagination(client, project_config, entity_repository, project_url):
"""Test getting content by permalink with pagination."""
# Create a test file
content = "# Test Content\n\nThis is a test file."
test_file = Path(project_config.home) / "test" / "test.md"
test_file.parent.mkdir(parents=True, exist_ok=True)
test_file.write_text(content)
# Create entity referencing the file
entity = await entity_repository.create(
{
"title": "Test Entity",
"entity_type": "test",
"permalink": "test/test",
"file_path": "test/test.md", # Relative to config.home
"content_type": "text/markdown",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
)
# Test getting the content
response = await client.get(
f"{project_url}/resource/{entity.permalink}", params={"page": 1, "page_size": 1}
)
assert response.status_code == 200
assert response.headers["content-type"] == "text/markdown; charset=utf-8"
assert response.text == normalize_newlines(content)
@pytest.mark.asyncio
async def test_get_resource_by_title(client, project_config, entity_repository, project_url):
"""Test getting content by permalink."""
# Create a test file
content = "# Test Content\n\nThis is a test file."
test_file = Path(project_config.home) / "test" / "test.md"
test_file.parent.mkdir(parents=True, exist_ok=True)
test_file.write_text(content)
# Create entity referencing the file
entity = await entity_repository.create(
{
"title": "Test Entity",
"entity_type": "test",
"permalink": "test/test",
"file_path": "test/test.md", # Relative to config.home
"content_type": "text/markdown",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
)
# Test getting the content
response = await client.get(f"{project_url}/resource/{entity.title}")
assert response.status_code == 200
@pytest.mark.asyncio
async def test_get_resource_missing_entity(client, project_url):
"""Test 404 when entity doesn't exist."""
response = await client.get(f"{project_url}/resource/does/not/exist")
assert response.status_code == 404
assert "Resource not found" in response.json()["detail"]
@pytest.mark.asyncio
async def test_get_resource_missing_file(client, project_config, entity_repository, project_url):
"""Test 404 when file doesn't exist."""
# Create entity referencing non-existent file
entity = await entity_repository.create(
{
"title": "Missing File",
"entity_type": "test",
"permalink": "test/missing",
"file_path": "test/missing.md",
"content_type": "text/markdown",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
)
response = await client.get(f"{project_url}/resource/{entity.permalink}")
assert response.status_code == 404
assert "File not found" in response.json()["detail"]
@pytest.mark.asyncio
async def test_get_resource_observation(client, project_config, entity_repository, project_url):
"""Test getting content by observation permalink."""
# Create entity
content = "# Test Content\n\n- [note] an observation."
data = {
"title": "Test Entity",
"folder": "test",
"entity_type": "test",
"content": f"{content}",
}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
entity_response = response.json()
entity = EntityResponse(**entity_response)
assert len(entity.observations) == 1
observation = entity.observations[0]
# Test getting the content via the observation
response = await client.get(f"{project_url}/resource/{observation.permalink}")
assert response.status_code == 200
assert response.headers["content-type"] == "text/markdown; charset=utf-8"
assert (
normalize_newlines(
"""
---
title: Test Entity
type: test
permalink: test/test-entity
---
# Test Content
- [note] an observation.
""".strip()
)
in response.text
)
@pytest.mark.asyncio
async def test_get_resource_entities(client, project_config, entity_repository, project_url):
"""Test getting content by permalink match."""
# Create entity
content1 = "# Test Content\n"
data = {
"title": "Test Entity",
"folder": "test",
"entity_type": "test",
"content": f"{content1}",
}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
entity_response = response.json()
entity1 = EntityResponse(**entity_response)
content2 = "# Related Content\n- links to [[Test Entity]]"
data = {
"title": "Related Entity",
"folder": "test",
"entity_type": "test",
"content": f"{content2}",
}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
entity_response = response.json()
entity2 = EntityResponse(**entity_response)
assert len(entity2.relations) == 1
# Test getting the content via the relation
response = await client.get(f"{project_url}/resource/test/*")
assert response.status_code == 200
assert response.headers["content-type"] == "text/markdown; charset=utf-8"
assert (
normalize_newlines(
f"""
--- memory://test/test-entity {entity1.updated_at.isoformat()} {entity1.checksum[:8]}
# Test Content
--- memory://test/related-entity {entity2.updated_at.isoformat()} {entity2.checksum[:8]}
# Related Content
- links to [[Test Entity]]
""".strip()
)
in response.text
)
@pytest.mark.asyncio
async def test_get_resource_entities_pagination(
client, project_config, entity_repository, project_url
):
"""Test getting content by permalink match."""
# Create entity
content1 = "# Test Content\n"
data = {
"title": "Test Entity",
"folder": "test",
"entity_type": "test",
"content": f"{content1}",
}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
entity_response = response.json()
entity1 = EntityResponse(**entity_response)
assert entity1
content2 = "# Related Content\n- links to [[Test Entity]]"
data = {
"title": "Related Entity",
"folder": "test",
"entity_type": "test",
"content": f"{content2}",
}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
entity_response = response.json()
entity2 = EntityResponse(**entity_response)
assert len(entity2.relations) == 1
# Test getting second result
response = await client.get(
f"{project_url}/resource/test/*", params={"page": 2, "page_size": 1}
)
assert response.status_code == 200
assert response.headers["content-type"] == "text/markdown; charset=utf-8"
assert (
normalize_newlines(
"""
---
title: Related Entity
type: test
permalink: test/related-entity
---
# Related Content
- links to [[Test Entity]]
""".strip()
)
in response.text
)
@pytest.mark.asyncio
async def test_get_resource_relation(client, project_config, entity_repository, project_url):
"""Test getting content by relation permalink."""
# Create entity
content1 = "# Test Content\n"
data = {
"title": "Test Entity",
"folder": "test",
"entity_type": "test",
"content": f"{content1}",
}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
entity_response = response.json()
entity1 = EntityResponse(**entity_response)
content2 = "# Related Content\n- links to [[Test Entity]]"
data = {
"title": "Related Entity",
"folder": "test",
"entity_type": "test",
"content": f"{content2}",
}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
entity_response = response.json()
entity2 = EntityResponse(**entity_response)
assert len(entity2.relations) == 1
relation = entity2.relations[0]
# Test getting the content via the relation
response = await client.get(f"{project_url}/resource/{relation.permalink}")
assert response.status_code == 200
assert response.headers["content-type"] == "text/markdown; charset=utf-8"
assert (
normalize_newlines(
f"""
--- memory://test/test-entity {entity1.updated_at.isoformat()} {entity1.checksum[:8]}
# Test Content
--- memory://test/related-entity {entity2.updated_at.isoformat()} {entity2.checksum[:8]}
# Related Content
- links to [[Test Entity]]
""".strip()
)
in response.text
)
@pytest.mark.asyncio
async def test_put_resource_new_file(
client, project_config, entity_repository, search_repository, project_url
):
"""Test creating a new file via PUT."""
# Test data
file_path = "visualizations/test.canvas"
canvas_data = {
"nodes": [
{
"id": "node1",
"type": "text",
"text": "Test node content",
"x": 100,
"y": 200,
"width": 400,
"height": 300,
}
],
"edges": [],
}
# Make sure the file doesn't exist yet
full_path = Path(project_config.home) / file_path
if full_path.exists():
full_path.unlink()
# Execute PUT request
response = await client.put(
f"{project_url}/resource/{file_path}", json=json.dumps(canvas_data, indent=2)
)
# Verify response
assert response.status_code == 201
response_data = response.json()
assert response_data["file_path"] == file_path
assert "checksum" in response_data
assert "size" in response_data
# Verify file was created
full_path = Path(project_config.home) / file_path
assert full_path.exists()
# Verify file content
file_content = full_path.read_text(encoding="utf-8")
assert json.loads(file_content) == canvas_data
# Verify entity was created in DB
entity = await entity_repository.get_by_file_path(file_path)
assert entity is not None
assert entity.entity_type == "canvas"
assert entity.content_type == "application/json"
# Verify entity was indexed for search
search_results = await search_repository.search(title="test.canvas")
assert len(search_results) > 0
@pytest.mark.asyncio
async def test_put_resource_update_existing(client, project_config, entity_repository, project_url):
"""Test updating an existing file via PUT."""
# Create an initial file and entity
file_path = "visualizations/update-test.canvas"
full_path = Path(project_config.home) / file_path
full_path.parent.mkdir(parents=True, exist_ok=True)
initial_data = {
"nodes": [
{
"id": "initial",
"type": "text",
"text": "Initial content",
"x": 0,
"y": 0,
"width": 200,
"height": 100,
}
],
"edges": [],
}
full_path.write_text(json.dumps(initial_data))
# Create the initial entity
initial_entity = await entity_repository.create(
{
"title": "update-test.canvas",
"entity_type": "canvas",
"file_path": file_path,
"content_type": "application/json",
"checksum": "initial123",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
)
# New data for update
updated_data = {
"nodes": [
{
"id": "updated",
"type": "text",
"text": "Updated content",
"x": 100,
"y": 100,
"width": 300,
"height": 200,
}
],
"edges": [],
}
# Execute PUT request to update
response = await client.put(
f"{project_url}/resource/{file_path}", json=json.dumps(updated_data, indent=2)
)
# Verify response
assert response.status_code == 200
# Verify file was updated
updated_content = full_path.read_text(encoding="utf-8")
assert json.loads(updated_content) == updated_data
# Verify entity was updated
updated_entity = await entity_repository.get_by_file_path(file_path)
assert updated_entity.id == initial_entity.id # Same entity, updated
assert updated_entity.checksum != initial_entity.checksum # Checksum changed
```
--------------------------------------------------------------------------------
/tests/mcp/test_permalink_collision_file_overwrite.py:
--------------------------------------------------------------------------------
```python
"""Tests for permalink collision file overwrite bug discovered in live testing.
This test reproduces a critical data loss bug where creating notes with
titles that normalize to different permalinks but resolve to the same
file location causes silent file overwrites without warning.
Related to GitHub Issue #139 but tests a different aspect - not database
UNIQUE constraints, but actual file overwrite behavior.
Example scenario from live testing:
1. Create "Node A" → file: edge-cases/Node A.md, permalink: edge-cases/node-a
2. Create "Node C" → file: edge-cases/Node C.md, permalink: edge-cases/node-c
3. BUG: Node C creation overwrites edge-cases/Node A.md file content
4. Result: File "Node A.md" exists but contains "Node C" content
"""
import pytest
from pathlib import Path
from textwrap import dedent
from basic_memory.mcp.tools import write_note, read_note
from basic_memory.sync.sync_service import SyncService
from basic_memory.config import ProjectConfig
from basic_memory.services import EntityService
async def force_full_scan(sync_service: SyncService) -> None:
"""Force next sync to do a full scan by clearing watermark (for testing moves/deletions)."""
if sync_service.entity_repository.project_id is not None:
project = await sync_service.project_repository.find_by_id(
sync_service.entity_repository.project_id
)
if project:
await sync_service.project_repository.update(
project.id,
{
"last_scan_timestamp": None,
"last_file_count": None,
},
)
@pytest.mark.asyncio
async def test_permalink_collision_should_not_overwrite_different_file(app, test_project):
"""Test that creating notes with different titles doesn't overwrite existing files.
This test reproduces the critical bug discovered in Phase 4 of live testing where:
- Creating "Node A" worked fine
- Creating "Node C" silently overwrote Node A.md's content
- No warning or error was shown to the user
- Original Node A content was permanently lost
Expected behavior:
- Each note with a different title should create/update its own file
- No silent overwrites should occur
- Files should maintain their distinct content
Current behavior (BUG):
- Second note creation sometimes overwrites first note's file
- File "Node A.md" contains "Node C" content after creating Node C
- Data loss occurs without user warning
"""
# Step 1: Create first note "Node A"
result_a = await write_note.fn(
project=test_project.name,
title="Node A",
folder="edge-cases",
content="# Node A\n\nOriginal content for Node A\n\n## Relations\n- links_to [[Node B]]",
)
assert "# Created note" in result_a
assert "file_path: edge-cases/Node A.md" in result_a
assert "permalink: edge-cases/node-a" in result_a
# Verify Node A content via read
content_a = await read_note.fn("edge-cases/node-a", project=test_project.name)
assert "Node A" in content_a
assert "Original content for Node A" in content_a
# Step 2: Create second note "Node B" (should be independent)
result_b = await write_note.fn(
project=test_project.name,
title="Node B",
folder="edge-cases",
content="# Node B\n\nContent for Node B",
)
assert "# Created note" in result_b
assert "file_path: edge-cases/Node B.md" in result_b
assert "permalink: edge-cases/node-b" in result_b
# Step 3: Create third note "Node C" (this is where the bug occurs)
result_c = await write_note.fn(
project=test_project.name,
title="Node C",
folder="edge-cases",
content="# Node C\n\nContent for Node C\n\n## Relations\n- links_to [[Node A]]",
)
assert "# Created note" in result_c
assert "file_path: edge-cases/Node C.md" in result_c
assert "permalink: edge-cases/node-c" in result_c
# CRITICAL CHECK: Verify Node A still has its original content
# This is where the bug manifests - Node A.md gets overwritten with Node C content
content_a_after = await read_note.fn("edge-cases/node-a", project=test_project.name)
assert "Node A" in content_a_after, "Node A title should still be 'Node A'"
assert "Original content for Node A" in content_a_after, (
"Node A file should NOT be overwritten by Node C creation"
)
assert "Content for Node C" not in content_a_after, "Node A should NOT contain Node C's content"
# Verify Node C has its own content
content_c = await read_note.fn("edge-cases/node-c", project=test_project.name)
assert "Node C" in content_c
assert "Content for Node C" in content_c
assert "Original content for Node A" not in content_c, (
"Node C should not contain Node A's content"
)
# Verify files physically exist with correct content
project_path = Path(test_project.path)
node_a_file = project_path / "edge-cases" / "Node A.md"
node_c_file = project_path / "edge-cases" / "Node C.md"
assert node_a_file.exists(), "Node A.md file should exist"
assert node_c_file.exists(), "Node C.md file should exist"
# Read actual file contents to verify no overwrite occurred
node_a_file_content = node_a_file.read_text()
node_c_file_content = node_c_file.read_text()
assert "Node A" in node_a_file_content, "Physical file Node A.md should contain Node A title"
assert "Original content for Node A" in node_a_file_content, (
"Physical file Node A.md should contain original Node A content"
)
assert "Content for Node C" not in node_a_file_content, (
"Physical file Node A.md should NOT contain Node C content"
)
assert "Node C" in node_c_file_content, "Physical file Node C.md should contain Node C title"
assert "Content for Node C" in node_c_file_content, (
"Physical file Node C.md should contain Node C content"
)
@pytest.mark.asyncio
async def test_notes_with_similar_titles_maintain_separate_files(app, test_project):
"""Test that notes with similar titles that normalize differently don't collide.
Tests additional edge cases around permalink normalization to ensure
we don't have collision issues with various title patterns.
"""
# Create notes with titles that could potentially cause issues
titles_and_folders = [
("My Note", "test"),
("My-Note", "test"), # Different title, similar permalink
("My_Note", "test"), # Underscore vs hyphen
("my note", "test"), # Case variation
]
created_permalinks = []
for title, folder in titles_and_folders:
result = await write_note.fn(
project=test_project.name,
title=title,
folder=folder,
content=f"# {title}\n\nUnique content for {title}",
)
permalink = None
# Extract permalink from result
for line in result.split("\n"):
if line.startswith("permalink:"):
permalink = line.split(":", 1)[1].strip()
created_permalinks.append((title, permalink))
break
# Verify each note can be read back with its own content
content = await read_note.fn(permalink, project=test_project.name)
assert f"Unique content for {title}" in content, (
f"Note with title '{title}' should maintain its unique content"
)
# Verify all created permalinks are tracked
assert len(created_permalinks) == len(titles_and_folders), (
"All notes should be created successfully"
)
@pytest.mark.asyncio
async def test_sequential_note_creation_preserves_all_files(app, test_project):
"""Test that rapid sequential note creation doesn't cause file overwrites.
This test creates multiple notes in sequence to ensure that file
creation/update logic doesn't have race conditions or state issues
that could cause overwrites.
"""
notes_data = [
("Alpha", "# Alpha\n\nAlpha content"),
("Beta", "# Beta\n\nBeta content"),
("Gamma", "# Gamma\n\nGamma content"),
("Delta", "# Delta\n\nDelta content"),
("Epsilon", "# Epsilon\n\nEpsilon content"),
]
# Create all notes
for title, content in notes_data:
result = await write_note.fn(
project=test_project.name,
title=title,
folder="sequence-test",
content=content,
)
assert "# Created note" in result or "# Updated note" in result
# Verify all notes still exist with correct content
for title, expected_content in notes_data:
# Normalize title to permalink format
permalink = f"sequence-test/{title.lower()}"
content = await read_note.fn(permalink, project=test_project.name)
assert title in content, f"Note '{title}' should still have its title"
assert expected_content.split("\n\n")[1] in content, (
f"Note '{title}' should still have its original content"
)
# Verify physical files exist
project_path = Path(test_project.path)
sequence_dir = project_path / "sequence-test"
for title, _ in notes_data:
file_path = sequence_dir / f"{title}.md"
assert file_path.exists(), f"File for '{title}' should exist"
file_content = file_path.read_text()
assert title in file_content, f"Physical file for '{title}' should contain correct title"
@pytest.mark.asyncio
async def test_sync_permalink_collision_file_overwrite_bug(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that reproduces the permalink collision file overwrite bug via sync.
This test directly creates files and runs sync to reproduce the exact bug
discovered in live testing where Node C overwrote Node A.md.
The bug occurs when:
1. File "Node A.md" exists with permalink "edge-cases/node-a"
2. File "Node C.md" is created with permalink "edge-cases/node-c"
3. During sync, somehow Node C content overwrites Node A.md
4. Result: File "Node A.md" contains Node C content (data loss!)
"""
project_dir = project_config.home
edge_cases_dir = project_dir / "edge-cases"
edge_cases_dir.mkdir(parents=True, exist_ok=True)
# Step 1: Create Node A file
node_a_content = dedent("""
---
title: Node A
type: note
tags:
- circular-test
---
# Node A
Original content for Node A
## Relations
- links_to [[Node B]]
- references [[Node C]]
""").strip()
node_a_file = edge_cases_dir / "Node A.md"
node_a_file.write_text(node_a_content)
# Sync to create Node A in database
await sync_service.sync(project_dir)
# Verify Node A is in database
node_a = await entity_service.get_by_permalink("edge-cases/node-a")
assert node_a is not None
assert node_a.title == "Node A"
# Verify Node A file has correct content
assert node_a_file.exists()
node_a_file_content = node_a_file.read_text()
assert "title: Node A" in node_a_file_content
assert "Original content for Node A" in node_a_file_content
# Step 2: Create Node B file
node_b_content = dedent("""
---
title: Node B
type: note
tags:
- circular-test
---
# Node B
Content for Node B
## Relations
- links_to [[Node C]]
- part_of [[Node A]]
""").strip()
node_b_file = edge_cases_dir / "Node B.md"
node_b_file.write_text(node_b_content)
# Force full scan to detect the new file
# (file just created may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Sync to create Node B
await sync_service.sync(project_dir)
# Step 3: Create Node C file (this is where the bug might occur)
node_c_content = dedent("""
---
title: Node C
type: note
tags:
- circular-test
---
# Node C
Content for Node C
## Relations
- links_to [[Node A]]
- references [[Node B]]
""").strip()
node_c_file = edge_cases_dir / "Node C.md"
node_c_file.write_text(node_c_content)
# Force full scan to detect the new file
# (file just created may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Sync to create Node C - THIS IS WHERE THE BUG OCCURS
await sync_service.sync(project_dir)
# CRITICAL VERIFICATION: Check if Node A file was overwritten
assert node_a_file.exists(), "Node A.md file should still exist"
# Read Node A file content to check for overwrite bug
node_a_after_sync = node_a_file.read_text()
# The bug: Node A.md contains Node C content instead of Node A content
assert "title: Node A" in node_a_after_sync, (
"Node A.md file should still have title: Node A in frontmatter"
)
assert "Node A" in node_a_after_sync, "Node A.md file should still contain 'Node A' title"
assert "Original content for Node A" in node_a_after_sync, (
f"Node A.md file should NOT be overwritten! Content: {node_a_after_sync[:200]}"
)
assert "Content for Node C" not in node_a_after_sync, (
f"Node A.md should NOT contain Node C content! Content: {node_a_after_sync[:200]}"
)
# Verify Node C file exists with correct content
assert node_c_file.exists(), "Node C.md file should exist"
node_c_after_sync = node_c_file.read_text()
assert "Node C" in node_c_after_sync
assert "Content for Node C" in node_c_after_sync
# Verify database has both entities correctly
node_a_db = await entity_service.get_by_permalink("edge-cases/node-a")
node_c_db = await entity_service.get_by_permalink("edge-cases/node-c")
assert node_a_db is not None, "Node A should exist in database"
assert node_a_db.title == "Node A", "Node A database entry should have correct title"
assert node_c_db is not None, "Node C should exist in database"
assert node_c_db.title == "Node C", "Node C database entry should have correct title"
```
--------------------------------------------------------------------------------
/src/basic_memory/api/routers/project_router.py:
--------------------------------------------------------------------------------
```python
"""Router for project management."""
import os
from fastapi import APIRouter, HTTPException, Path, Body, BackgroundTasks, Response, Query
from typing import Optional
from loguru import logger
from basic_memory.deps import (
ProjectConfigDep,
ProjectServiceDep,
ProjectPathDep,
SyncServiceDep,
)
from basic_memory.schemas import ProjectInfoResponse, SyncReportResponse
from basic_memory.schemas.project_info import (
ProjectList,
ProjectItem,
ProjectInfoRequest,
ProjectStatusResponse,
)
from basic_memory.utils import normalize_project_path
# Router for resources in a specific project
# The ProjectPathDep is used in the path as a prefix, so the request path is like /{project}/project/info
project_router = APIRouter(prefix="/project", tags=["project"])
# Router for managing project resources
project_resource_router = APIRouter(prefix="/projects", tags=["project_management"])
@project_router.get("/info", response_model=ProjectInfoResponse)
async def get_project_info(
project_service: ProjectServiceDep,
project: ProjectPathDep,
) -> ProjectInfoResponse:
"""Get comprehensive information about the specified Basic Memory project."""
return await project_service.get_project_info(project)
@project_router.get("/item", response_model=ProjectItem)
async def get_project(
project_service: ProjectServiceDep,
project: ProjectPathDep,
) -> ProjectItem:
"""Get bassic info about the specified Basic Memory project."""
found_project = await project_service.get_project(project)
if not found_project:
raise HTTPException(
status_code=404, detail=f"Project: '{project}' does not exist"
) # pragma: no cover
return ProjectItem(
name=found_project.name,
path=normalize_project_path(found_project.path),
is_default=found_project.is_default or False,
)
# Update a project
@project_router.patch("/{name}", response_model=ProjectStatusResponse)
async def update_project(
project_service: ProjectServiceDep,
name: str = Path(..., description="Name of the project to update"),
path: Optional[str] = Body(None, description="New absolute path for the project"),
is_active: Optional[bool] = Body(None, description="Status of the project (active/inactive)"),
) -> ProjectStatusResponse:
"""Update a project's information in configuration and database.
Args:
name: The name of the project to update
path: Optional new absolute path for the project
is_active: Optional status update for the project
Returns:
Response confirming the project was updated
"""
try:
# Validate that path is absolute if provided
if path and not os.path.isabs(path):
raise HTTPException(status_code=400, detail="Path must be absolute")
# Get original project info for the response
old_project_info = ProjectItem(
name=name,
path=project_service.projects.get(name, ""),
)
if path:
await project_service.move_project(name, path)
elif is_active is not None:
await project_service.update_project(name, is_active=is_active)
# Get updated project info
updated_path = path if path else project_service.projects.get(name, "")
return ProjectStatusResponse(
message=f"Project '{name}' updated successfully",
status="success",
default=(name == project_service.default_project),
old_project=old_project_info,
new_project=ProjectItem(name=name, path=updated_path),
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Sync project filesystem
@project_router.post("/sync")
async def sync_project(
background_tasks: BackgroundTasks,
sync_service: SyncServiceDep,
project_config: ProjectConfigDep,
force_full: bool = Query(
False, description="Force full scan, bypassing watermark optimization"
),
):
"""Force project filesystem sync to database.
Scans the project directory and updates the database with any new or modified files.
Args:
background_tasks: FastAPI background tasks
sync_service: Sync service for this project
project_config: Project configuration
force_full: If True, force a full scan even if watermark exists
Returns:
Response confirming sync was initiated
"""
background_tasks.add_task(
sync_service.sync, project_config.home, project_config.name, force_full=force_full
)
logger.info(
f"Filesystem sync initiated for project: {project_config.name} (force_full={force_full})"
)
return {
"status": "sync_started",
"message": f"Filesystem sync initiated for project '{project_config.name}'",
}
@project_router.post("/status", response_model=SyncReportResponse)
async def project_sync_status(
sync_service: SyncServiceDep,
project_config: ProjectConfigDep,
) -> SyncReportResponse:
"""Scan directory for changes compared to database state.
Args:
sync_service: Sync service for this project
project_config: Project configuration
Returns:
Scan report with details on files that need syncing
"""
logger.info(f"Scanning filesystem for project: {project_config.name}")
sync_report = await sync_service.scan(project_config.home)
return SyncReportResponse.from_sync_report(sync_report)
# List all available projects
@project_resource_router.get("/projects", response_model=ProjectList)
async def list_projects(
project_service: ProjectServiceDep,
) -> ProjectList:
"""List all configured projects.
Returns:
A list of all projects with metadata
"""
projects = await project_service.list_projects()
default_project = project_service.default_project
project_items = [
ProjectItem(
name=project.name,
path=normalize_project_path(project.path),
is_default=project.is_default or False,
)
for project in projects
]
return ProjectList(
projects=project_items,
default_project=default_project,
)
# Add a new project
@project_resource_router.post("/projects", response_model=ProjectStatusResponse, status_code=201)
async def add_project(
response: Response,
project_data: ProjectInfoRequest,
project_service: ProjectServiceDep,
) -> ProjectStatusResponse:
"""Add a new project to configuration and database.
Args:
project_data: The project name and path, with option to set as default
Returns:
Response confirming the project was added
"""
# Check if project already exists before attempting to add
existing_project = await project_service.get_project(project_data.name)
if existing_project:
# Project exists - check if paths match for true idempotency
# Normalize paths for comparison (resolve symlinks, etc.)
from pathlib import Path
requested_path = Path(project_data.path).resolve()
existing_path = Path(existing_project.path).resolve()
if requested_path == existing_path:
# Same name, same path - return 200 OK (idempotent)
response.status_code = 200
return ProjectStatusResponse( # pyright: ignore [reportCallIssue]
message=f"Project '{project_data.name}' already exists",
status="success",
default=existing_project.is_default or False,
new_project=ProjectItem(
name=existing_project.name,
path=existing_project.path,
is_default=existing_project.is_default or False,
),
)
else:
# Same name, different path - this is an error
raise HTTPException(
status_code=400,
detail=f"Project '{project_data.name}' already exists with different path. Existing: {existing_project.path}, Requested: {project_data.path}",
)
try: # pragma: no cover
# The service layer now handles cloud mode validation and path sanitization
await project_service.add_project(
project_data.name, project_data.path, set_default=project_data.set_default
)
return ProjectStatusResponse( # pyright: ignore [reportCallIssue]
message=f"Project '{project_data.name}' added successfully",
status="success",
default=project_data.set_default,
new_project=ProjectItem(
name=project_data.name, path=project_data.path, is_default=project_data.set_default
),
)
except ValueError as e: # pragma: no cover
raise HTTPException(status_code=400, detail=str(e))
# Remove a project
@project_resource_router.delete("/{name}", response_model=ProjectStatusResponse)
async def remove_project(
project_service: ProjectServiceDep,
name: str = Path(..., description="Name of the project to remove"),
delete_notes: bool = Query(
False, description="If True, delete project directory from filesystem"
),
) -> ProjectStatusResponse:
"""Remove a project from configuration and database.
Args:
name: The name of the project to remove
delete_notes: If True, delete the project directory from the filesystem
Returns:
Response confirming the project was removed
"""
try:
old_project = await project_service.get_project(name)
if not old_project: # pragma: no cover
raise HTTPException(
status_code=404, detail=f"Project: '{name}' does not exist"
) # pragma: no cover
# Check if trying to delete the default project
if name == project_service.default_project:
available_projects = await project_service.list_projects()
other_projects = [p.name for p in available_projects if p.name != name]
detail = f"Cannot delete default project '{name}'. "
if other_projects:
detail += (
f"Set another project as default first. Available: {', '.join(other_projects)}"
)
else:
detail += "This is the only project in your configuration."
raise HTTPException(status_code=400, detail=detail)
await project_service.remove_project(name, delete_notes=delete_notes)
return ProjectStatusResponse(
message=f"Project '{name}' removed successfully",
status="success",
default=False,
old_project=ProjectItem(name=old_project.name, path=old_project.path),
new_project=None,
)
except ValueError as e: # pragma: no cover
raise HTTPException(status_code=400, detail=str(e))
# Set a project as default
@project_resource_router.put("/{name}/default", response_model=ProjectStatusResponse)
async def set_default_project(
project_service: ProjectServiceDep,
name: str = Path(..., description="Name of the project to set as default"),
) -> ProjectStatusResponse:
"""Set a project as the default project.
Args:
name: The name of the project to set as default
Returns:
Response confirming the project was set as default
"""
try:
# Get the old default project
default_name = project_service.default_project
default_project = await project_service.get_project(default_name)
if not default_project: # pragma: no cover
raise HTTPException( # pragma: no cover
status_code=404, detail=f"Default Project: '{default_name}' does not exist"
)
# get the new project
new_default_project = await project_service.get_project(name)
if not new_default_project: # pragma: no cover
raise HTTPException(
status_code=404, detail=f"Project: '{name}' does not exist"
) # pragma: no cover
await project_service.set_default_project(name)
return ProjectStatusResponse(
message=f"Project '{name}' set as default successfully",
status="success",
default=True,
old_project=ProjectItem(name=default_name, path=default_project.path),
new_project=ProjectItem(
name=name,
path=new_default_project.path,
is_default=True,
),
)
except ValueError as e: # pragma: no cover
raise HTTPException(status_code=400, detail=str(e))
# Get the default project
@project_resource_router.get("/default", response_model=ProjectItem)
async def get_default_project(
project_service: ProjectServiceDep,
) -> ProjectItem:
"""Get the default project.
Returns:
Response with project default information
"""
# Get the old default project
default_name = project_service.default_project
default_project = await project_service.get_project(default_name)
if not default_project: # pragma: no cover
raise HTTPException( # pragma: no cover
status_code=404, detail=f"Default Project: '{default_name}' does not exist"
)
return ProjectItem(name=default_project.name, path=default_project.path, is_default=True)
# Synchronize projects between config and database
@project_resource_router.post("/config/sync", response_model=ProjectStatusResponse)
async def synchronize_projects(
project_service: ProjectServiceDep,
) -> ProjectStatusResponse:
"""Synchronize projects between configuration file and database.
Ensures that all projects in the configuration file exist in the database
and vice versa.
Returns:
Response confirming synchronization was completed
"""
try: # pragma: no cover
await project_service.synchronize_projects()
return ProjectStatusResponse( # pyright: ignore [reportCallIssue]
message="Projects synchronized successfully between configuration and database",
status="success",
default=False,
)
except ValueError as e: # pragma: no cover
raise HTTPException(status_code=400, detail=str(e))
```
--------------------------------------------------------------------------------
/test-int/mcp/test_delete_note_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for delete_note MCP tool.
Tests the complete delete note workflow: MCP client -> MCP server -> FastAPI -> database
"""
import pytest
from fastmcp import Client
@pytest.mark.asyncio
async def test_delete_note_by_title(mcp_server, app, test_project):
"""Test deleting a note by its title."""
async with Client(mcp_server) as client:
# First create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Note to Delete",
"folder": "test",
"content": "# Note to Delete\n\nThis note will be deleted.",
"tags": "test,delete",
},
)
# Verify the note exists by reading it
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "Note to Delete",
},
)
assert len(read_result.content) == 1
assert "Note to Delete" in read_result.content[0].text
# Delete the note by title
delete_result = await client.call_tool(
"delete_note",
{
"project": test_project.name,
"identifier": "Note to Delete",
},
)
# Should return True for successful deletion
assert len(delete_result.content) == 1
assert delete_result.content[0].type == "text"
assert "true" in delete_result.content[0].text.lower()
# Verify the note no longer exists
read_after_delete = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "Note to Delete",
},
)
# Should return helpful "Note Not Found" message instead of the actual note
assert len(read_after_delete.content) == 1
result_text = read_after_delete.content[0].text
assert "Note Not Found" in result_text
assert "Note to Delete" in result_text
@pytest.mark.asyncio
async def test_delete_note_by_permalink(mcp_server, app, test_project):
"""Test deleting a note by its permalink."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Permalink Delete Test",
"folder": "tests",
"content": "# Permalink Delete Test\n\nTesting deletion by permalink.",
"tags": "test,permalink",
},
)
# Delete the note by permalink
delete_result = await client.call_tool(
"delete_note",
{
"project": test_project.name,
"identifier": "tests/permalink-delete-test",
},
)
# Should return True for successful deletion
assert len(delete_result.content) == 1
assert "true" in delete_result.content[0].text.lower()
# Verify the note no longer exists by searching
search_result = await client.call_tool(
"search_notes",
{
"project": test_project.name,
"query": "Permalink Delete Test",
},
)
# Should have no results
assert (
'"results": []' in search_result.content[0].text
or '"results":[]' in search_result.content[0].text
)
@pytest.mark.asyncio
async def test_delete_note_with_observations_and_relations(mcp_server, app, test_project):
"""Test deleting a note that has observations and relations."""
async with Client(mcp_server) as client:
# Create a complex note with observations and relations
complex_content = """# Project Management System
This is a comprehensive project management system.
## Observations
- [feature] Task tracking functionality
- [feature] User authentication system
- [tech] Built with Python and Flask
- [status] Currently in development
## Relations
- depends_on [[Database Schema]]
- implements [[User Stories]]
- part_of [[Main Application]]
The system handles multiple projects and users."""
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Project Management System",
"folder": "projects",
"content": complex_content,
"tags": "project,management,system",
},
)
# Verify the note exists and has content
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "Project Management System",
},
)
assert len(read_result.content) == 1
result_text = read_result.content[0].text
assert "Task tracking functionality" in result_text
assert "depends_on" in result_text
# Delete the complex note
delete_result = await client.call_tool(
"delete_note",
{
"project": test_project.name,
"identifier": "projects/project-management-system",
},
)
# Should return True for successful deletion
assert "true" in delete_result.content[0].text.lower()
# Verify the note and all its components are deleted
read_after_delete_2 = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "Project Management System",
},
)
# Should return "Note Not Found" message
assert len(read_after_delete_2.content) == 1
result_text = read_after_delete_2.content[0].text
assert "Note Not Found" in result_text
assert "Project Management System" in result_text
@pytest.mark.asyncio
async def test_delete_note_special_characters_in_title(mcp_server, app, test_project):
"""Test deleting notes with special characters in the title."""
async with Client(mcp_server) as client:
# Create notes with special characters
special_titles = [
"Note with spaces",
"Note-with-dashes",
"Note_with_underscores",
"Note (with parentheses)",
"Note & Symbols!",
]
# Create all the notes
for title in special_titles:
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": title,
"folder": "special",
"content": f"# {title}\n\nContent for {title}",
"tags": "special,characters",
},
)
# Delete each note by title
for title in special_titles:
delete_result = await client.call_tool(
"delete_note",
{
"project": test_project.name,
"identifier": title,
},
)
# Should return True for successful deletion
assert "true" in delete_result.content[0].text.lower(), (
f"Failed to delete note: {title}"
)
# Verify the note is deleted
read_after_delete = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": title,
},
)
# Should return "Note Not Found" message
assert len(read_after_delete.content) == 1
result_text = read_after_delete.content[0].text
assert "Note Not Found" in result_text
assert title in result_text
@pytest.mark.asyncio
async def test_delete_nonexistent_note(mcp_server, app, test_project):
"""Test attempting to delete a note that doesn't exist."""
async with Client(mcp_server) as client:
# Try to delete a note that doesn't exist
delete_result = await client.call_tool(
"delete_note",
{
"project": test_project.name,
"identifier": "Nonexistent Note",
},
)
# Should return False for unsuccessful deletion
assert len(delete_result.content) == 1
assert "false" in delete_result.content[0].text.lower()
@pytest.mark.asyncio
async def test_delete_note_by_file_path(mcp_server, app, test_project):
"""Test deleting a note using its file path."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "File Path Delete",
"folder": "docs",
"content": "# File Path Delete\n\nTesting deletion by file path.",
"tags": "test,filepath",
},
)
# Try to delete using the file path (should work as an identifier)
delete_result = await client.call_tool(
"delete_note",
{
"project": test_project.name,
"identifier": "docs/File Path Delete.md",
},
)
# Should return True for successful deletion
assert "true" in delete_result.content[0].text.lower()
# Verify deletion
read_after_delete = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "File Path Delete",
},
)
# Should return "Note Not Found" message
assert len(read_after_delete.content) == 1
result_text = read_after_delete.content[0].text
assert "Note Not Found" in result_text
assert "File Path Delete" in result_text
@pytest.mark.asyncio
async def test_delete_note_case_insensitive(mcp_server, app, test_project):
"""Test that note deletion is case insensitive for titles."""
async with Client(mcp_server) as client:
# Create a note with mixed case
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "CamelCase Note Title",
"folder": "test",
"content": "# CamelCase Note Title\n\nTesting case sensitivity.",
"tags": "test,case",
},
)
# Try to delete with different case
delete_result = await client.call_tool(
"delete_note",
{
"project": test_project.name,
"identifier": "camelcase note title",
},
)
# Should return True for successful deletion
assert "true" in delete_result.content[0].text.lower()
@pytest.mark.asyncio
async def test_delete_multiple_notes_sequentially(mcp_server, app, test_project):
"""Test deleting multiple notes in sequence."""
async with Client(mcp_server) as client:
# Create multiple notes
note_titles = [
"First Note",
"Second Note",
"Third Note",
"Fourth Note",
"Fifth Note",
]
for title in note_titles:
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": title,
"folder": "batch",
"content": f"# {title}\n\nContent for {title}",
"tags": "batch,test",
},
)
# Delete all notes sequentially
for title in note_titles:
delete_result = await client.call_tool(
"delete_note",
{
"project": test_project.name,
"identifier": title,
},
)
# Each deletion should be successful
assert "true" in delete_result.content[0].text.lower(), f"Failed to delete {title}"
# Verify all notes are deleted by searching
search_result = await client.call_tool(
"search_notes",
{
"project": test_project.name,
"query": "batch",
},
)
# Should have no results
assert (
'"results": []' in search_result.content[0].text
or '"results":[]' in search_result.content[0].text
)
@pytest.mark.asyncio
async def test_delete_note_with_unicode_content(mcp_server, app, test_project):
"""Test deleting notes with Unicode content."""
async with Client(mcp_server) as client:
# Create a note with Unicode content
unicode_content = """# Unicode Test Note 🚀
This note contains various Unicode characters:
- Emojis: 🎉 🔥 ⚡ 💡
- Languages: 测试中文 Tëst Übër
- Symbols: ♠♣♥♦ ←→↑↓ ∞≠≤≥
- Math: ∑∏∂∇∆Ω
## Observations
- [test] Unicode characters preserved ✓
- [note] Emoji support working 🎯
## Relations
- supports [[Unicode Standards]]
- tested_with [[Various Languages]]"""
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Unicode Test Note",
"folder": "unicode",
"content": unicode_content,
"tags": "unicode,test,emoji",
},
)
# Delete the Unicode note
delete_result = await client.call_tool(
"delete_note",
{
"project": test_project.name,
"identifier": "Unicode Test Note",
},
)
# Should return True for successful deletion
assert "true" in delete_result.content[0].text.lower()
# Verify deletion
read_after_delete = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "Unicode Test Note",
},
)
# Should return "Note Not Found" message
assert len(read_after_delete.content) == 1
result_text = read_after_delete.content[0].text
assert "Note Not Found" in result_text
assert "Unicode Test Note" in result_text
```
--------------------------------------------------------------------------------
/src/basic_memory/utils.py:
--------------------------------------------------------------------------------
```python
"""Utility functions for basic-memory."""
import os
import logging
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional, Protocol, Union, runtime_checkable, List
from loguru import logger
from unidecode import unidecode
def normalize_project_path(path: str) -> str:
"""Normalize project path by stripping mount point prefix.
In cloud deployments, the S3 bucket is mounted at /app/data. We strip this
prefix from project paths to avoid leaking implementation details and to
ensure paths match the actual S3 bucket structure.
For local paths (including Windows paths), returns the path unchanged.
Args:
path: Project path (e.g., "/app/data/basic-memory-llc" or "C:\\Users\\...")
Returns:
Normalized path (e.g., "/basic-memory-llc" or "C:\\Users\\...")
Examples:
>>> normalize_project_path("/app/data/my-project")
'/my-project'
>>> normalize_project_path("/my-project")
'/my-project'
>>> normalize_project_path("app/data/my-project")
'/my-project'
>>> normalize_project_path("C:\\\\Users\\\\project")
'C:\\\\Users\\\\project'
"""
# Check if this is a Windows absolute path (e.g., C:\Users\...)
# Windows paths have a drive letter followed by a colon
if len(path) >= 2 and path[1] == ":":
# Windows absolute path - return unchanged
return path
# Handle both absolute and relative Unix paths
normalized = path.lstrip("/")
if normalized.startswith("app/data/"):
normalized = normalized.removeprefix("app/data/")
# Ensure leading slash for Unix absolute paths
if not normalized.startswith("/"):
normalized = "/" + normalized
return normalized
@runtime_checkable
class PathLike(Protocol):
"""Protocol for objects that can be used as paths."""
def __str__(self) -> str: ...
# In type annotations, use Union[Path, str] instead of FilePath for now
# This preserves compatibility with existing code while we migrate
FilePath = Union[Path, str]
# Disable the "Queue is full" warning
logging.getLogger("opentelemetry.sdk.metrics._internal.instrument").setLevel(logging.ERROR)
def generate_permalink(file_path: Union[Path, str, PathLike], split_extension: bool = True) -> str:
"""Generate a stable permalink from a file path.
Args:
file_path: Original file path (str, Path, or PathLike)
Returns:
Normalized permalink that matches validation rules. Converts spaces and underscores
to hyphens for consistency. Preserves non-ASCII characters like Chinese.
Examples:
>>> generate_permalink("docs/My Feature.md")
'docs/my-feature'
>>> generate_permalink("specs/API (v2).md")
'specs/api-v2'
>>> generate_permalink("design/unified_model_refactor.md")
'design/unified-model-refactor'
>>> generate_permalink("中文/测试文档.md")
'中文/测试文档'
"""
# Convert Path to string if needed
path_str = Path(str(file_path)).as_posix()
# Remove extension (for now, possibly)
(base, extension) = os.path.splitext(path_str)
# Check if we have CJK characters that should be preserved
# CJK ranges: \u4e00-\u9fff (CJK Unified Ideographs), \u3000-\u303f (CJK symbols),
# \u3400-\u4dbf (CJK Extension A), \uff00-\uffef (Fullwidth forms)
has_cjk_chars = any(
"\u4e00" <= char <= "\u9fff"
or "\u3000" <= char <= "\u303f"
or "\u3400" <= char <= "\u4dbf"
or "\uff00" <= char <= "\uffef"
for char in base
)
if has_cjk_chars:
# For text with CJK characters, selectively transliterate only Latin accented chars
result = ""
for char in base:
if (
"\u4e00" <= char <= "\u9fff"
or "\u3000" <= char <= "\u303f"
or "\u3400" <= char <= "\u4dbf"
):
# Preserve CJK ideographs and symbols
result += char
elif "\uff00" <= char <= "\uffef":
# Remove Chinese fullwidth punctuation entirely (like ,!?)
continue
else:
# Transliterate Latin accented characters to ASCII
result += unidecode(char)
# Insert hyphens between CJK and Latin character transitions
# Match: CJK followed by Latin letter/digit, or Latin letter/digit followed by CJK
result = re.sub(
r"([\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf])([a-zA-Z0-9])", r"\1-\2", result
)
result = re.sub(
r"([a-zA-Z0-9])([\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf])", r"\1-\2", result
)
# Insert dash between camelCase
result = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", result)
# Convert ASCII letters to lowercase, preserve CJK
lower_text = "".join(c.lower() if c.isascii() and c.isalpha() else c for c in result)
# Replace underscores with hyphens
text_with_hyphens = lower_text.replace("_", "-")
# Remove apostrophes entirely (don't replace with hyphens)
text_no_apostrophes = text_with_hyphens.replace("'", "")
# Replace unsafe chars with hyphens, but preserve CJK characters
clean_text = re.sub(
r"[^a-z0-9\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf/\-]", "-", text_no_apostrophes
)
else:
# Original ASCII-only processing for backward compatibility
# Transliterate unicode to ascii
ascii_text = unidecode(base)
# Insert dash between camelCase
ascii_text = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", ascii_text)
# Convert to lowercase
lower_text = ascii_text.lower()
# replace underscores with hyphens
text_with_hyphens = lower_text.replace("_", "-")
# Remove apostrophes entirely (don't replace with hyphens)
text_no_apostrophes = text_with_hyphens.replace("'", "")
# Replace remaining invalid chars with hyphens
clean_text = re.sub(r"[^a-z0-9/\-]", "-", text_no_apostrophes)
# Collapse multiple hyphens
clean_text = re.sub(r"-+", "-", clean_text)
# Clean each path segment
segments = clean_text.split("/")
clean_segments = [s.strip("-") for s in segments]
return_val = "/".join(clean_segments)
# Append file extension back, if necessary
if not split_extension and extension:
return_val += extension
return return_val
def setup_logging(
env: str,
home_dir: Path,
log_file: Optional[str] = None,
log_level: str = "INFO",
console: bool = True,
) -> None: # pragma: no cover
"""
Configure logging for the application.
Args:
env: The environment name (dev, test, prod)
home_dir: The root directory for the application
log_file: The name of the log file to write to
log_level: The logging level to use
console: Whether to log to the console
"""
# Remove default handler and any existing handlers
logger.remove()
# Add file handler if we are not running tests and a log file is specified
if log_file and env != "test":
# Setup file logger
log_path = home_dir / log_file
logger.add(
str(log_path),
level=log_level,
rotation="10 MB",
retention="10 days",
backtrace=True,
diagnose=True,
enqueue=True,
colorize=False,
)
# Add console logger if requested or in test mode
if env == "test" or console:
logger.add(sys.stderr, level=log_level, backtrace=True, diagnose=True, colorize=True)
logger.info(f"ENV: '{env}' Log level: '{log_level}' Logging to {log_file}")
# Bind environment context for structured logging (works in both local and cloud)
tenant_id = os.getenv("BASIC_MEMORY_TENANT_ID", "local")
fly_app_name = os.getenv("FLY_APP_NAME", "local")
fly_machine_id = os.getenv("FLY_MACHINE_ID", "local")
fly_region = os.getenv("FLY_REGION", "local")
logger.configure(
extra={
"tenant_id": tenant_id,
"fly_app_name": fly_app_name,
"fly_machine_id": fly_machine_id,
"fly_region": fly_region,
}
)
# Reduce noise from third-party libraries
noisy_loggers = {
# HTTP client logs
"httpx": logging.WARNING,
# File watching logs
"watchfiles.main": logging.WARNING,
}
# Set log levels for noisy loggers
for logger_name, level in noisy_loggers.items():
logging.getLogger(logger_name).setLevel(level)
def parse_tags(tags: Union[List[str], str, None]) -> List[str]:
"""Parse tags from various input formats into a consistent list.
Args:
tags: Can be a list of strings, a comma-separated string, or None
Returns:
A list of tag strings, or an empty list if no tags
Note:
This function strips leading '#' characters from tags to prevent
their accumulation when tags are processed multiple times.
"""
if tags is None:
return []
# Process list of tags
if isinstance(tags, list):
# First strip whitespace, then strip leading '#' characters to prevent accumulation
return [tag.strip().lstrip("#") for tag in tags if tag and tag.strip()]
# Process string input
if isinstance(tags, str):
# Check if it's a JSON array string (common issue from AI assistants)
import json
if tags.strip().startswith("[") and tags.strip().endswith("]"):
try:
# Try to parse as JSON array
parsed_json = json.loads(tags)
if isinstance(parsed_json, list):
# Recursively parse the JSON array as a list
return parse_tags(parsed_json)
except json.JSONDecodeError:
# Not valid JSON, fall through to comma-separated parsing
pass
# Split by comma, strip whitespace, then strip leading '#' characters
return [tag.strip().lstrip("#") for tag in tags.split(",") if tag and tag.strip()]
# For any other type, try to convert to string and parse
try: # pragma: no cover
return parse_tags(str(tags))
except (ValueError, TypeError): # pragma: no cover
logger.warning(f"Couldn't parse tags from input of type {type(tags)}: {tags}")
return []
def normalize_newlines(multiline: str) -> str:
"""Replace any \r\n, \r, or \n with the native newline.
Args:
multiline: String containing any mixture of newlines.
Returns:
A string with normalized newlines native to the platform.
"""
return re.sub(r"\r\n?|\n", os.linesep, multiline)
def normalize_file_path_for_comparison(file_path: str) -> str:
"""Normalize a file path for conflict detection.
This function normalizes file paths to help detect potential conflicts:
- Converts to lowercase for case-insensitive comparison
- Normalizes Unicode characters
- Handles path separators consistently
Args:
file_path: The file path to normalize
Returns:
Normalized file path for comparison purposes
"""
import unicodedata
# Convert to lowercase for case-insensitive comparison
normalized = file_path.lower()
# Normalize Unicode characters (NFD normalization)
normalized = unicodedata.normalize("NFD", normalized)
# Replace path separators with forward slashes
normalized = normalized.replace("\\", "/")
# Remove multiple slashes
normalized = re.sub(r"/+", "/", normalized)
return normalized
def detect_potential_file_conflicts(file_path: str, existing_paths: List[str]) -> List[str]:
"""Detect potential conflicts between a file path and existing paths.
This function checks for various types of conflicts:
- Case sensitivity differences
- Unicode normalization differences
- Path separator differences
- Permalink generation conflicts
Args:
file_path: The file path to check
existing_paths: List of existing file paths to check against
Returns:
List of existing paths that might conflict with the given file path
"""
conflicts = []
# Normalize the input file path
normalized_input = normalize_file_path_for_comparison(file_path)
input_permalink = generate_permalink(file_path)
for existing_path in existing_paths:
# Skip identical paths
if existing_path == file_path:
continue
# Check for case-insensitive path conflicts
normalized_existing = normalize_file_path_for_comparison(existing_path)
if normalized_input == normalized_existing:
conflicts.append(existing_path)
continue
# Check for permalink conflicts
existing_permalink = generate_permalink(existing_path)
if input_permalink == existing_permalink:
conflicts.append(existing_path)
continue
return conflicts
def valid_project_path_value(path: str):
"""Ensure project path is valid."""
# Allow empty strings as they resolve to the project root
if not path:
return True
# Check for obvious path traversal patterns first
if ".." in path or "~" in path:
return False
# Check for Windows-style path traversal (even on Unix systems)
if "\\.." in path or path.startswith("\\"):
return False
# Block absolute paths (Unix-style starting with / or Windows-style with drive letters)
if path.startswith("/") or (len(path) >= 2 and path[1] == ":"):
return False
# Block paths with control characters (but allow whitespace that will be stripped)
if path.strip() and any(ord(c) < 32 and c not in [" ", "\t"] for c in path):
return False
return True
def validate_project_path(path: str, project_path: Path) -> bool:
"""Ensure path is valid and stays within project boundaries."""
if not valid_project_path_value(path):
return False
try:
resolved = (project_path / path).resolve()
return resolved.is_relative_to(project_path.resolve())
except (ValueError, OSError):
return False
def ensure_timezone_aware(dt: datetime) -> datetime:
"""Ensure a datetime is timezone-aware using system timezone.
If the datetime is naive, convert it to timezone-aware using the system's local timezone.
If it's already timezone-aware, return it unchanged.
Args:
dt: The datetime to ensure is timezone-aware
Returns:
A timezone-aware datetime
"""
if dt.tzinfo is None:
# Naive datetime - assume it's in local time and add timezone
return dt.astimezone()
else:
# Already timezone-aware
return dt
```
--------------------------------------------------------------------------------
/test-int/mcp/test_chatgpt_tools_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for ChatGPT-compatible MCP tools.
Tests the complete flow of search and fetch tools designed for ChatGPT integration,
ensuring they properly wrap Basic Memory's MCP tools and return OpenAI-compatible
MCP content array format.
"""
import json
import pytest
from fastmcp import Client
def extract_mcp_json_content(mcp_result):
"""
Helper to extract JSON content from MCP CallToolResult.
FastMCP auto-serializes our List[Dict[str, Any]] return values, so we need to:
1. Get the content list from the CallToolResult
2. Parse the JSON string in the text field (which is our serialized list)
3. Extract the actual JSON from the MCP content array structure
"""
content_list = mcp_result.content
mcp_content_list = json.loads(content_list[0].text)
return json.loads(mcp_content_list[0]["text"])
@pytest.mark.asyncio
async def test_chatgpt_search_basic(mcp_server, app, test_project):
"""Test basic ChatGPT search functionality with MCP content array format."""
async with Client(mcp_server) as client:
# Create test notes for searching
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Machine Learning Fundamentals",
"folder": "ai",
"content": (
"# Machine Learning Fundamentals\n\nIntroduction to ML concepts and algorithms."
),
"tags": "ml,ai,fundamentals",
},
)
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Deep Learning with PyTorch",
"folder": "ai",
"content": (
"# Deep Learning with PyTorch\n\n"
"Building neural networks using PyTorch framework."
),
"tags": "pytorch,deep-learning,ai",
},
)
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Data Visualization Guide",
"folder": "data",
"content": (
"# Data Visualization Guide\n\nCreating charts and graphs for data analysis."
),
"tags": "visualization,data,charts",
},
)
# Test ChatGPT search tool
search_result = await client.call_tool(
"search",
{
"query": "Machine Learning",
},
)
# Extract JSON content from MCP result
results_json = extract_mcp_json_content(search_result)
assert "results" in results_json
assert len(results_json["results"]) > 0
# Check result structure
first_result = results_json["results"][0]
assert "id" in first_result
assert "title" in first_result
assert "url" in first_result
# Verify correct content found
titles = [r["title"] for r in results_json["results"]]
assert "Machine Learning Fundamentals" in titles
assert "Data Visualization Guide" not in titles
@pytest.mark.asyncio
async def test_chatgpt_search_empty_results(mcp_server, app, test_project):
"""Test ChatGPT search with no matching results."""
async with Client(mcp_server) as client:
# Search for non-existent content
search_result = await client.call_tool(
"search",
{
"query": "NonExistentTopic12345",
},
)
# Extract JSON content from MCP result
results_json = extract_mcp_json_content(search_result)
assert "results" in results_json
assert len(results_json["results"]) == 0
assert results_json["query"] == "NonExistentTopic12345"
@pytest.mark.asyncio
async def test_chatgpt_search_with_boolean_operators(mcp_server, app, test_project):
"""Test ChatGPT search with boolean operators."""
async with Client(mcp_server) as client:
# Create test notes
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Python Web Frameworks",
"folder": "dev",
"content": (
"# Python Web Frameworks\n\nComparing Django and Flask for web development."
),
"tags": "python,web,frameworks",
},
)
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "JavaScript Frameworks",
"folder": "dev",
"content": "# JavaScript Frameworks\n\nReact, Vue, and Angular comparison.",
"tags": "javascript,web,frameworks",
},
)
# Test with AND operator
search_result = await client.call_tool(
"search",
{
"query": "Python AND frameworks",
},
)
results_json = extract_mcp_json_content(search_result)
titles = [r["title"] for r in results_json["results"]]
assert "Python Web Frameworks" in titles
assert "JavaScript Frameworks" not in titles
@pytest.mark.asyncio
async def test_chatgpt_fetch_document(mcp_server, app, test_project):
"""Test ChatGPT fetch tool for retrieving full document content."""
async with Client(mcp_server) as client:
# Create a test note
note_content = """# Advanced Python Techniques
## Overview
This document covers advanced Python programming techniques.
## Topics Covered
- Decorators
- Context Managers
- Metaclasses
- Async/Await patterns
## Code Examples
```python
def my_decorator(func):
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
```
"""
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Advanced Python Techniques",
"folder": "programming",
"content": note_content,
"tags": "python,advanced,programming",
},
)
# Fetch the document using its title
fetch_result = await client.call_tool(
"fetch",
{
"id": "Advanced Python Techniques",
},
)
# Extract JSON content from MCP result
document_json = extract_mcp_json_content(fetch_result)
assert "id" in document_json
assert "title" in document_json
assert "text" in document_json
assert "url" in document_json
assert "metadata" in document_json
# Verify content
assert document_json["title"] == "Advanced Python Techniques"
assert "Decorators" in document_json["text"]
assert "Context Managers" in document_json["text"]
assert "def my_decorator" in document_json["text"]
@pytest.mark.asyncio
async def test_chatgpt_fetch_by_permalink(mcp_server, app, test_project):
"""Test ChatGPT fetch using permalink identifier."""
async with Client(mcp_server) as client:
# Create a note with known content
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Test Document",
"folder": "test",
"content": "# Test Document\n\nThis is test content for permalink fetching.",
"tags": "test",
},
)
# First search to get the permalink
search_result = await client.call_tool(
"search",
{
"query": "Test Document",
},
)
results_json = extract_mcp_json_content(search_result)
assert len(results_json["results"]) > 0
permalink = results_json["results"][0]["id"]
# Fetch using the permalink
fetch_result = await client.call_tool(
"fetch",
{
"id": permalink,
},
)
# Verify the fetched document
document_json = extract_mcp_json_content(fetch_result)
assert document_json["id"] == permalink
assert "Test Document" in document_json["title"]
assert "test content for permalink fetching" in document_json["text"]
@pytest.mark.asyncio
async def test_chatgpt_fetch_nonexistent_document(mcp_server, app, test_project):
"""Test ChatGPT fetch with non-existent document ID."""
async with Client(mcp_server) as client:
# Try to fetch a non-existent document
fetch_result = await client.call_tool(
"fetch",
{
"id": "NonExistentDocument12345",
},
)
# Extract JSON content from MCP result
document_json = extract_mcp_json_content(fetch_result)
# Should have document structure even for errors
assert "id" in document_json
assert "title" in document_json
assert "text" in document_json
# Check for error indication
assert document_json["id"] == "NonExistentDocument12345"
assert "Not Found" in document_json["text"] or "not found" in document_json["text"]
@pytest.mark.asyncio
async def test_chatgpt_fetch_with_empty_title(mcp_server, app, test_project):
"""Test ChatGPT fetch handles documents with empty or missing titles."""
async with Client(mcp_server) as client:
# Create a note without a title in the content
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "untitled-note",
"folder": "misc",
"content": "This is content without a markdown header.\n\nJust plain text.",
"tags": "misc",
},
)
# Fetch the document
fetch_result = await client.call_tool(
"fetch",
{
"id": "untitled-note",
},
)
# Parse JSON response
document_json = extract_mcp_json_content(fetch_result)
# Should have a title even if content doesn't have one
assert "title" in document_json
assert document_json["title"] != ""
assert document_json["title"] is not None
assert "content without a markdown header" in document_json["text"]
@pytest.mark.asyncio
async def test_chatgpt_search_pagination_default(mcp_server, app, test_project):
"""Test that ChatGPT search uses reasonable pagination defaults."""
async with Client(mcp_server) as client:
# Create more than 10 notes to test pagination
for i in range(15):
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": f"Test Note {i}",
"folder": "bulk",
"content": f"# Test Note {i}\n\nThis is test content number {i}.",
"tags": "test,bulk",
},
)
# Search should return max 10 results by default
search_result = await client.call_tool(
"search",
{
"query": "Test Note",
},
)
results_json = extract_mcp_json_content(search_result)
# Should have at most 10 results (the default page_size)
assert len(results_json["results"]) <= 10
assert results_json["total_count"] <= 10
@pytest.mark.asyncio
async def test_chatgpt_tools_error_handling(mcp_server, app, test_project):
"""Test error handling in ChatGPT tools returns proper MCP format."""
async with Client(mcp_server) as client:
# Test search with invalid query (if validation exists)
# Using empty query to potentially trigger an error
search_result = await client.call_tool(
"search",
{
"query": "", # Empty query might cause an error
},
)
# Should still return MCP content array format
assert hasattr(search_result, "content")
content_list = search_result.content
assert isinstance(content_list, list)
assert len(content_list) == 1
assert content_list[0].type == "text"
# Should be valid JSON even on error
results_json = extract_mcp_json_content(search_result)
assert "results" in results_json # Should have results key even if empty
@pytest.mark.asyncio
async def test_chatgpt_integration_workflow(mcp_server, app, test_project):
"""Test complete workflow: search then fetch, as ChatGPT would use it."""
async with Client(mcp_server) as client:
# Step 1: Create multiple documents
docs = [
{
"title": "API Design Best Practices",
"content": (
"# API Design Best Practices\n\nRESTful API design principles and patterns."
),
"tags": "api,rest,design",
},
{
"title": "GraphQL vs REST",
"content": "# GraphQL vs REST\n\nComparing GraphQL and REST API architectures.",
"tags": "api,graphql,rest",
},
{
"title": "Database Design Patterns",
"content": (
"# Database Design Patterns\n\n"
"Common database design patterns and anti-patterns."
),
"tags": "database,design,patterns",
},
]
for doc in docs:
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": doc["title"],
"folder": "architecture",
"content": doc["content"],
"tags": doc["tags"],
},
)
# Step 2: Search for API-related content (as ChatGPT would)
search_result = await client.call_tool(
"search",
{
"query": "API",
},
)
results_json = extract_mcp_json_content(search_result)
assert len(results_json["results"]) >= 2
# Step 3: Fetch one of the search results (as ChatGPT would)
first_result_id = results_json["results"][0]["id"]
fetch_result = await client.call_tool(
"fetch",
{
"id": first_result_id,
},
)
document_json = extract_mcp_json_content(fetch_result)
# Verify the fetched document matches search result
assert document_json["id"] == first_result_id
assert "API" in document_json["text"] or "api" in document_json["text"].lower()
# Verify document has expected structure
assert document_json["metadata"]["format"] == "markdown"
```
--------------------------------------------------------------------------------
/tests/cli/test_cli_tools.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Basic Memory CLI tools.
These tests use real MCP tools with the test environment instead of mocks.
"""
# Import for testing
import io
from datetime import datetime, timedelta
import json
from textwrap import dedent
from typing import AsyncGenerator
from unittest.mock import patch
import pytest_asyncio
from typer.testing import CliRunner
from basic_memory.cli.commands.tool import tool_app
from basic_memory.schemas.base import Entity as EntitySchema
runner = CliRunner()
@pytest_asyncio.fixture
async def setup_test_note(entity_service, search_service) -> AsyncGenerator[dict, None]:
"""Create a test note for CLI tests."""
note_content = dedent("""
# Test Note
This is a test note for CLI commands.
## Observations
- [tech] Test observation #test
- [note] Another observation
## Relations
- connects_to [[Another Note]]
""")
entity, created = await entity_service.create_or_update_entity(
EntitySchema(
title="Test Note",
folder="test",
entity_type="note",
content=note_content,
)
)
# Index the entity for search
await search_service.index_entity(entity)
yield {
"title": entity.title,
"permalink": entity.permalink,
"content": note_content,
}
def test_write_note(cli_env, project_config, test_project):
"""Test write_note command with basic arguments."""
result = runner.invoke(
tool_app,
[
"write-note",
"--title",
"CLI Test Note",
"--content",
"This is a CLI test note",
"--folder",
"test",
"--project",
test_project.name,
],
)
assert result.exit_code == 0
# Check for expected success message
assert "CLI Test Note" in result.stdout
assert "Created" in result.stdout or "Updated" in result.stdout
assert "permalink" in result.stdout
def test_write_note_with_project_arg(cli_env, project_config, test_project):
"""Test write_note command with basic arguments."""
result = runner.invoke(
tool_app,
[
"write-note",
"--project",
test_project.name,
"--title",
"CLI Test Note",
"--content",
"This is a CLI test note",
"--folder",
"test",
],
)
assert result.exit_code == 0
# Check for expected success message
assert "CLI Test Note" in result.stdout
assert "Created" in result.stdout or "Updated" in result.stdout
assert "permalink" in result.stdout
def test_write_note_with_tags(cli_env, project_config):
"""Test write_note command with tags."""
result = runner.invoke(
tool_app,
[
"write-note",
"--title",
"Tagged CLI Test Note",
"--content",
"This is a test note with tags",
"--folder",
"test",
"--tags",
"tag1",
"--tags",
"tag2",
],
)
assert result.exit_code == 0
# Check for expected success message
assert "Tagged CLI Test Note" in result.stdout
assert "tag1, tag2" in result.stdout or "tag1" in result.stdout and "tag2" in result.stdout
def test_write_note_from_stdin(cli_env, project_config, monkeypatch):
"""Test write_note command reading from stdin.
This test requires minimal mocking of stdin to simulate piped input.
"""
test_content = "This is content from stdin for testing"
# Mock stdin using monkeypatch, which works better with typer's CliRunner
monkeypatch.setattr("sys.stdin", io.StringIO(test_content))
monkeypatch.setattr("sys.stdin.isatty", lambda: False) # Simulate piped input
# Use runner.invoke with input parameter as a fallback
result = runner.invoke(
tool_app,
[
"write-note",
"--title",
"Stdin Test Note",
"--folder",
"test",
],
input=test_content, # Provide input as a fallback
)
assert result.exit_code == 0
# Check for expected success message
assert "Stdin Test Note" in result.stdout
assert "Created" in result.stdout or "Updated" in result.stdout
assert "permalink" in result.stdout
def test_write_note_content_param_priority(cli_env, project_config):
"""Test that content parameter has priority over stdin."""
stdin_content = "This content from stdin should NOT be used"
param_content = "This explicit content parameter should be used"
# Mock stdin but provide explicit content parameter
with (
patch("sys.stdin", io.StringIO(stdin_content)),
patch("sys.stdin.isatty", return_value=False),
): # Simulate piped input
result = runner.invoke(
tool_app,
[
"write-note",
"--title",
"Priority Test Note",
"--content",
param_content,
"--folder",
"test",
],
)
assert result.exit_code == 0
# Check the note was created with the content from parameter, not stdin
# We can't directly check file contents in this test approach
# but we can verify the command succeeded
assert "Priority Test Note" in result.stdout
assert "Created" in result.stdout or "Updated" in result.stdout
def test_write_note_no_content(cli_env, project_config):
"""Test error handling when no content is provided."""
# Mock stdin to appear as a terminal, not a pipe
with patch("sys.stdin.isatty", return_value=True):
result = runner.invoke(
tool_app,
[
"write-note",
"--title",
"No Content Note",
"--folder",
"test",
],
)
# Should exit with an error
assert result.exit_code == 1
# assert "No content provided" in result.stderr
def test_read_note(cli_env, setup_test_note):
"""Test read_note command."""
permalink = setup_test_note["permalink"]
result = runner.invoke(
tool_app,
["read-note", permalink],
)
assert result.exit_code == 0
# Should contain the note content and structure
assert "Test Note" in result.stdout
assert "This is a test note for CLI commands" in result.stdout
assert "## Observations" in result.stdout
assert "Test observation" in result.stdout
assert "## Relations" in result.stdout
assert "connects_to [[Another Note]]" in result.stdout
# Note: We found that square brackets like [tech] are being stripped in CLI output,
# so we're not asserting their presence
def test_search_basic(cli_env, setup_test_note, test_project):
"""Test basic search command."""
result = runner.invoke(
tool_app,
["search-notes", "test observation", "--project", test_project.name],
)
assert result.exit_code == 0
# Result should be JSON containing our test note
search_result = json.loads(result.stdout)
assert len(search_result["results"]) > 0
# At least one result should match our test note or observation
found = False
for item in search_result["results"]:
if "test" in item["permalink"].lower() and "observation" in item["permalink"].lower():
found = True
break
assert found, "Search did not find the test observation"
def test_search_permalink(cli_env, setup_test_note):
"""Test search with permalink flag."""
permalink = setup_test_note["permalink"]
result = runner.invoke(
tool_app,
["search-notes", permalink, "--permalink"],
)
assert result.exit_code == 0
# Result should be JSON containing our test note
search_result = json.loads(result.stdout)
assert len(search_result["results"]) > 0
# Should find a result with matching permalink
found = False
for item in search_result["results"]:
if item["permalink"] == permalink:
found = True
break
assert found, "Search did not find the note by permalink"
def test_build_context(cli_env, setup_test_note):
"""Test build_context command."""
permalink = setup_test_note["permalink"]
result = runner.invoke(
tool_app,
["build-context", f"memory://{permalink}"],
)
assert result.exit_code == 0
# Result should be JSON containing our test note
context_result = json.loads(result.stdout)
assert "results" in context_result
assert len(context_result["results"]) > 0
# Primary results should include our test note
found = False
for item in context_result["results"]:
if item["primary_result"]["permalink"] == permalink:
found = True
break
assert found, "Context did not include the test note"
def test_build_context_with_options(cli_env, setup_test_note):
"""Test build_context command with all options."""
permalink = setup_test_note["permalink"]
result = runner.invoke(
tool_app,
[
"build-context",
f"memory://{permalink}",
"--depth",
"2",
"--timeframe",
"1d",
"--page",
"1",
"--page-size",
"5",
"--max-related",
"20",
],
)
assert result.exit_code == 0
# Result should be JSON containing our test note
context_result = json.loads(result.stdout)
# Check that metadata reflects our options
assert context_result["metadata"]["depth"] == 2
timeframe = datetime.fromisoformat(context_result["metadata"]["timeframe"])
assert datetime.now().astimezone() - timeframe <= timedelta(
days=2
) # Compare timezone-aware datetimes
# Results should include our test note
found = False
for item in context_result["results"]:
if item["primary_result"]["permalink"] == permalink:
found = True
break
assert found, "Context did not include the test note"
def test_build_context_string_depth_parameter(cli_env, setup_test_note):
"""Test build_context command handles string depth parameter correctly."""
permalink = setup_test_note["permalink"]
# Test valid string depth parameter - Typer should convert it to int
result = runner.invoke(
tool_app,
[
"build-context",
f"memory://{permalink}",
"--depth",
"2", # This is always a string from CLI
],
)
assert result.exit_code == 0
# Result should be JSON containing our test note with correct depth
context_result = json.loads(result.stdout)
assert context_result["metadata"]["depth"] == 2
# Test invalid string depth parameter - should fail with Typer validation error
result = runner.invoke(
tool_app,
[
"build-context",
f"memory://{permalink}",
"--depth",
"invalid",
],
)
assert result.exit_code == 2 # Typer exits with code 2 for parameter validation errors
# Typer should show a usage error for invalid integer
assert (
"invalid" in result.stderr
and "is not a valid" in result.stderr
and "integer" in result.stderr
)
# The get-entity CLI command was removed when tools were refactored
# into separate files with improved error handling
def test_recent_activity(cli_env, setup_test_note, test_project):
"""Test recent_activity command with defaults."""
result = runner.invoke(
tool_app,
["recent-activity"],
)
assert result.exit_code == 0
# Result should be human-readable string containing recent activity
output = result.stdout
assert "Recent Activity Summary" in output
assert "Most Active Project:" in output or "Other Active Projects:" in output
# Our test note should be referenced in the output
assert setup_test_note["permalink"] in output or setup_test_note["title"] in output
def test_recent_activity_with_options(cli_env, setup_test_note, test_project):
"""Test recent_activity command with options."""
result = runner.invoke(
tool_app,
[
"recent-activity",
"--type",
"entity",
"--depth",
"2",
"--timeframe",
"7d",
],
)
assert result.exit_code == 0
# Result should be human-readable string containing recent activity
output = result.stdout
assert "Recent Activity Summary" in output
assert "Most Active Project:" in output or "Other Active Projects:" in output
# Should include information about entities since we requested entity type
assert setup_test_note["permalink"] in output or setup_test_note["title"] in output
def test_continue_conversation(cli_env, setup_test_note):
"""Test continue_conversation command."""
permalink = setup_test_note["permalink"]
# Run the CLI command
result = runner.invoke(
tool_app,
["continue-conversation", "--topic", "Test Note"],
)
assert result.exit_code == 0
# Check result contains expected content
assert "Continuing conversation on: Test Note" in result.stdout
assert "This is a memory retrieval session" in result.stdout
assert "read_note" in result.stdout
assert permalink in result.stdout
def test_continue_conversation_no_results(cli_env):
"""Test continue_conversation command with no results."""
# Run the CLI command with a nonexistent topic
result = runner.invoke(
tool_app,
["continue-conversation", "--topic", "NonexistentTopic"],
)
assert result.exit_code == 0
# Check result contains expected content for no results
assert "Continuing conversation on: NonexistentTopic" in result.stdout
assert "The supplied query did not return any information" in result.stdout
@patch("basic_memory.services.initialization.initialize_database")
def test_ensure_migrations_functionality(mock_initialize_database, app_config, monkeypatch):
"""Test the database initialization functionality."""
from basic_memory.services.initialization import ensure_initialization
# Call the function
ensure_initialization(app_config)
# The underlying asyncio.run should call our mocked function
mock_initialize_database.assert_called_once()
@patch("basic_memory.services.initialization.initialize_database")
def test_ensure_migrations_handles_errors(mock_initialize_database, app_config, monkeypatch):
"""Test that initialization handles errors gracefully."""
from basic_memory.services.initialization import ensure_initialization
# Configure mock to raise an exception
mock_initialize_database.side_effect = Exception("Test error")
# Call the function - should not raise exception
ensure_initialization(app_config)
# We're just making sure it doesn't crash by calling it
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
"""Common test fixtures."""
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
from typing import AsyncGenerator
import os
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
from basic_memory import db
from basic_memory.config import ProjectConfig, BasicMemoryConfig, ConfigManager
from basic_memory.db import DatabaseType
from basic_memory.markdown import EntityParser
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.models import Base
from basic_memory.models.knowledge import Entity
from basic_memory.models.project import Project
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.repository.observation_repository import ObservationRepository
from basic_memory.repository.project_repository import ProjectRepository
from basic_memory.repository.relation_repository import RelationRepository
from basic_memory.repository.search_repository import SearchRepository
from basic_memory.schemas.base import Entity as EntitySchema
from basic_memory.services import (
EntityService,
ProjectService,
)
from basic_memory.services.directory_service import DirectoryService
from basic_memory.services.file_service import FileService
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.services.search_service import SearchService
from basic_memory.sync.sync_service import SyncService
from basic_memory.sync.watch_service import WatchService
@pytest.fixture
def anyio_backend():
return "asyncio"
@pytest.fixture
def project_root() -> Path:
return Path(__file__).parent.parent
@pytest.fixture
def config_home(tmp_path, monkeypatch) -> Path:
# Patch HOME environment variable for the duration of the test
monkeypatch.setenv("HOME", str(tmp_path))
# On Windows, also set USERPROFILE
if os.name == "nt":
monkeypatch.setenv("USERPROFILE", str(tmp_path))
# Set BASIC_MEMORY_HOME to the test directory
monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
return tmp_path
@pytest.fixture(scope="function", autouse=True)
def app_config(config_home, tmp_path, monkeypatch) -> BasicMemoryConfig:
"""Create test app configuration."""
# Create a basic config without depending on test_project to avoid circular dependency
projects = {"test-project": str(config_home)}
app_config = BasicMemoryConfig(
env="test",
projects=projects,
default_project="test-project",
update_permalinks_on_move=True,
)
return app_config
@pytest.fixture(autouse=True)
def config_manager(
app_config: BasicMemoryConfig, project_config: ProjectConfig, config_home: Path, monkeypatch
) -> ConfigManager:
# Invalidate config cache to ensure clean state for each test
from basic_memory import config as config_module
config_module._CONFIG_CACHE = None
# Create a new ConfigManager that uses the test home directory
config_manager = ConfigManager()
# Update its paths to use the test directory
config_manager.config_dir = config_home / ".basic-memory"
config_manager.config_file = config_manager.config_dir / "config.json"
config_manager.config_dir.mkdir(parents=True, exist_ok=True)
# Ensure the config file is written to disk
config_manager.save_config(app_config)
return config_manager
@pytest.fixture(scope="function", autouse=True)
def project_config(test_project):
"""Create test project configuration."""
project_config = ProjectConfig(
name=test_project.name,
home=Path(test_project.path),
)
return project_config
@dataclass
class TestConfig:
config_home: Path
project_config: ProjectConfig
app_config: BasicMemoryConfig
config_manager: ConfigManager
@pytest.fixture
def test_config(config_home, project_config, app_config, config_manager) -> TestConfig:
"""All test configuration fixtures"""
return TestConfig(config_home, project_config, app_config, config_manager)
@pytest_asyncio.fixture(scope="function")
async def engine_factory(
app_config,
) -> AsyncGenerator[tuple[AsyncEngine, async_sessionmaker[AsyncSession]], None]:
"""Create an engine and session factory using an in-memory SQLite database."""
async with db.engine_session_factory(
db_path=app_config.database_path, db_type=DatabaseType.MEMORY
) as (engine, session_maker):
# Create all tables for the DB the engine is connected to
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine, session_maker
@pytest_asyncio.fixture
async def session_maker(engine_factory) -> async_sessionmaker[AsyncSession]:
"""Get session maker for tests."""
_, session_maker = engine_factory
return session_maker
## Repositories
@pytest_asyncio.fixture(scope="function")
async def entity_repository(
session_maker: async_sessionmaker[AsyncSession], test_project: Project
) -> EntityRepository:
"""Create an EntityRepository instance with project context."""
return EntityRepository(session_maker, project_id=test_project.id)
@pytest_asyncio.fixture(scope="function")
async def observation_repository(
session_maker: async_sessionmaker[AsyncSession], test_project: Project
) -> ObservationRepository:
"""Create an ObservationRepository instance with project context."""
return ObservationRepository(session_maker, project_id=test_project.id)
@pytest_asyncio.fixture(scope="function")
async def relation_repository(
session_maker: async_sessionmaker[AsyncSession], test_project: Project
) -> RelationRepository:
"""Create a RelationRepository instance with project context."""
return RelationRepository(session_maker, project_id=test_project.id)
@pytest_asyncio.fixture(scope="function")
async def project_repository(
session_maker: async_sessionmaker[AsyncSession],
) -> ProjectRepository:
"""Create a ProjectRepository instance."""
return ProjectRepository(session_maker)
@pytest_asyncio.fixture(scope="function")
async def test_project(config_home, engine_factory) -> Project:
"""Create a test project to be used as context for other repositories."""
project_data = {
"name": "test-project",
"description": "Project used as context for tests",
"path": str(config_home),
"is_active": True,
"is_default": True, # Explicitly set as the default project (for cli operations)
}
engine, session_maker = engine_factory
project_repository = ProjectRepository(session_maker)
project = await project_repository.create(project_data)
return project
## Services
@pytest_asyncio.fixture
async def entity_service(
entity_repository: EntityRepository,
observation_repository: ObservationRepository,
relation_repository: RelationRepository,
entity_parser: EntityParser,
file_service: FileService,
link_resolver: LinkResolver,
app_config: BasicMemoryConfig,
) -> EntityService:
"""Create EntityService."""
return EntityService(
entity_parser=entity_parser,
entity_repository=entity_repository,
observation_repository=observation_repository,
relation_repository=relation_repository,
file_service=file_service,
link_resolver=link_resolver,
app_config=app_config,
)
@pytest.fixture
def file_service(
project_config: ProjectConfig, markdown_processor: MarkdownProcessor
) -> FileService:
"""Create FileService instance."""
return FileService(project_config.home, markdown_processor)
@pytest.fixture
def markdown_processor(entity_parser: EntityParser) -> MarkdownProcessor:
"""Create writer instance."""
return MarkdownProcessor(entity_parser)
@pytest.fixture
def link_resolver(entity_repository: EntityRepository, search_service: SearchService):
"""Create parser instance."""
return LinkResolver(entity_repository, search_service)
@pytest.fixture
def entity_parser(project_config):
"""Create parser instance."""
return EntityParser(project_config.home)
@pytest_asyncio.fixture
async def sync_service(
app_config: BasicMemoryConfig,
entity_service: EntityService,
entity_parser: EntityParser,
project_repository: ProjectRepository,
entity_repository: EntityRepository,
relation_repository: RelationRepository,
search_service: SearchService,
file_service: FileService,
) -> SyncService:
"""Create sync service for testing."""
return SyncService(
app_config=app_config,
entity_service=entity_service,
project_repository=project_repository,
entity_repository=entity_repository,
relation_repository=relation_repository,
entity_parser=entity_parser,
search_service=search_service,
file_service=file_service,
)
@pytest_asyncio.fixture
async def directory_service(entity_repository, project_config) -> DirectoryService:
"""Create directory service for testing."""
return DirectoryService(
entity_repository=entity_repository,
)
@pytest_asyncio.fixture
async def search_repository(session_maker, test_project: Project):
"""Create SearchRepository instance with project context"""
return SearchRepository(session_maker, project_id=test_project.id)
@pytest_asyncio.fixture(autouse=True)
async def init_search_index(search_service):
await search_service.init_search_index()
@pytest_asyncio.fixture
async def search_service(
search_repository: SearchRepository,
entity_repository: EntityRepository,
file_service: FileService,
) -> SearchService:
"""Create and initialize search service"""
service = SearchService(search_repository, entity_repository, file_service)
await service.init_search_index()
return service
@pytest_asyncio.fixture(scope="function")
async def sample_entity(entity_repository: EntityRepository) -> Entity:
"""Create a sample entity for testing."""
entity_data = {
"project_id": entity_repository.project_id,
"title": "Test Entity",
"entity_type": "test",
"permalink": "test/test-entity",
"file_path": "test/test_entity.md",
"content_type": "text/markdown",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
return await entity_repository.create(entity_data)
@pytest_asyncio.fixture
async def project_service(
project_repository: ProjectRepository,
) -> ProjectService:
"""Create ProjectService with repository."""
return ProjectService(repository=project_repository)
@pytest_asyncio.fixture
async def full_entity(sample_entity, entity_repository, file_service, entity_service) -> Entity:
"""Create a search test entity."""
# Create test entity
entity, created = await entity_service.create_or_update_entity(
EntitySchema(
title="Search_Entity",
folder="test",
entity_type="test",
content=dedent("""
## Observations
- [tech] Tech note
- [design] Design note
## Relations
- out1 [[Test Entity]]
- out2 [[Test Entity]]
"""),
)
)
return entity
@pytest_asyncio.fixture
async def test_graph(
entity_repository,
relation_repository,
observation_repository,
search_service,
file_service,
entity_service,
):
"""Create a test knowledge graph with entities, relations and observations."""
# Create some test entities in reverse order so they will be linked
deeper, _ = await entity_service.create_or_update_entity(
EntitySchema(
title="Deeper Entity",
entity_type="deeper",
folder="test",
content=dedent("""
# Deeper Entity
"""),
)
)
deep, _ = await entity_service.create_or_update_entity(
EntitySchema(
title="Deep Entity",
entity_type="deep",
folder="test",
content=dedent("""
# Deep Entity
- deeper_connection [[Deeper Entity]]
"""),
)
)
connected_2, _ = await entity_service.create_or_update_entity(
EntitySchema(
title="Connected Entity 2",
entity_type="test",
folder="test",
content=dedent("""
# Connected Entity 2
- deep_connection [[Deep Entity]]
"""),
)
)
connected_1, _ = await entity_service.create_or_update_entity(
EntitySchema(
title="Connected Entity 1",
entity_type="test",
folder="test",
content=dedent("""
# Connected Entity 1
- [note] Connected 1 note
- connected_to [[Connected Entity 2]]
"""),
)
)
root, _ = await entity_service.create_or_update_entity(
EntitySchema(
title="Root",
entity_type="test",
folder="test",
content=dedent("""
# Root Entity
- [note] Root note 1
- [tech] Root tech note
- connects_to [[Connected Entity 1]]
"""),
)
)
# get latest
entities = await entity_repository.find_all()
relations = await relation_repository.find_all()
# Index everything for search
for entity in entities:
await search_service.index_entity(entity)
return {
"root": root,
"connected1": connected_1,
"connected2": connected_2,
"deep": deep,
"observations": [e.observations for e in entities],
"relations": relations,
}
@pytest.fixture
def watch_service(app_config: BasicMemoryConfig, project_repository) -> WatchService:
return WatchService(app_config=app_config, project_repository=project_repository)
@pytest.fixture
def test_files(project_config, project_root) -> dict[str, Path]:
"""Copy test files into the project directory.
Returns a dict mapping file names to their paths in the project dir.
"""
# Source files relative to tests directory
source_files = {
"pdf": Path(project_root / "tests/Non-MarkdownFileSupport.pdf"),
"image": Path(project_root / "tests/Screenshot.png"),
}
# Create copies in temp project directory
project_files = {}
for name, src_path in source_files.items():
# Read source file
content = src_path.read_bytes()
# Create destination path and ensure parent dirs exist
dest_path = project_config.home / src_path.name
dest_path.parent.mkdir(parents=True, exist_ok=True)
# Write file
dest_path.write_bytes(content)
project_files[name] = dest_path
return project_files
@pytest_asyncio.fixture
async def synced_files(sync_service, project_config, test_files):
# Initial sync - should create forward reference
await sync_service.sync(project_config.home)
return test_files
```
--------------------------------------------------------------------------------
/tests/utils/test_validate_project_path.py:
--------------------------------------------------------------------------------
```python
"""Tests for the validate_project_path security function."""
import pytest
from pathlib import Path
from basic_memory.utils import validate_project_path
class TestValidateProjectPathSafety:
"""Test that validate_project_path correctly identifies safe paths."""
def test_valid_relative_paths(self, tmp_path):
"""Test that legitimate relative paths are allowed."""
project_path = tmp_path / "project"
project_path.mkdir()
safe_paths = [
"notes/meeting.md",
"docs/readme.txt",
"folder/subfolder/file.txt",
"simple-file.md",
"research/findings-2025.md",
"projects/basic-memory/docs.md",
"deep/nested/directory/structure/file.txt",
"file-with-hyphens.md",
"file_with_underscores.txt",
"file123.md",
"UPPERCASE.MD",
"MixedCase.txt",
]
for path in safe_paths:
assert validate_project_path(path, project_path), (
f"Safe path '{path}' should be allowed"
)
def test_empty_and_current_directory(self, tmp_path):
"""Test handling of empty paths and current directory references."""
project_path = tmp_path / "project"
project_path.mkdir()
# Current directory should be safe
assert validate_project_path(".", project_path)
# Files in current directory should be safe
assert validate_project_path("./file.txt", project_path)
def test_nested_safe_paths(self, tmp_path):
"""Test deeply nested but safe paths."""
project_path = tmp_path / "project"
project_path.mkdir()
nested_paths = [
"level1/level2/level3/level4/file.txt",
"very/deeply/nested/directory/structure/with/many/levels/file.md",
"a/b/c/d/e/f/g/h/i/j/file.txt",
]
for path in nested_paths:
assert validate_project_path(path, project_path), (
f"Nested path '{path}' should be allowed"
)
class TestValidateProjectPathAttacks:
"""Test that validate_project_path blocks path traversal attacks."""
def test_unix_path_traversal(self, tmp_path):
"""Test that Unix-style path traversal is blocked."""
project_path = tmp_path / "project"
project_path.mkdir()
attack_paths = [
"../",
"../../",
"../../../",
"../etc/passwd",
"../../etc/passwd",
"../../../etc/passwd",
"../../../../etc/passwd",
"../../.env",
"../../../home/user/.ssh/id_rsa",
"../../../../var/log/auth.log",
"../../.bashrc",
"../../../etc/shadow",
]
for path in attack_paths:
assert not validate_project_path(path, project_path), (
f"Attack path '{path}' should be blocked"
)
def test_windows_path_traversal(self, tmp_path):
"""Test that Windows-style path traversal is blocked."""
project_path = tmp_path / "project"
project_path.mkdir()
attack_paths = [
"..\\",
"..\\..\\",
"..\\..\\..\\",
"..\\..\\..\\Windows\\System32\\config\\SAM",
"..\\..\\..\\Users\\user\\.env",
"..\\..\\..\\Windows\\System32\\drivers\\etc\\hosts",
"..\\..\\Boot.ini",
"\\Windows\\System32",
"\\..\\..\\Windows",
]
for path in attack_paths:
assert not validate_project_path(path, project_path), (
f"Windows attack path '{path}' should be blocked"
)
def test_mixed_traversal_patterns(self, tmp_path):
"""Test paths that mix legitimate content with traversal."""
project_path = tmp_path / "project"
project_path.mkdir()
mixed_attacks = [
"notes/../../../etc/passwd",
"docs/../../.env",
"folder/subfolder/../../../etc/passwd",
"legitimate/path/../../.ssh/id_rsa",
"notes/../../../home/user/.bashrc",
"documents/../../Windows/System32/config/SAM",
]
for path in mixed_attacks:
assert not validate_project_path(path, project_path), (
f"Mixed attack path '{path}' should be blocked"
)
def test_home_directory_access(self, tmp_path):
"""Test that home directory access patterns are blocked."""
project_path = tmp_path / "project"
project_path.mkdir()
home_attacks = [
"~/",
"~/.env",
"~/.ssh/id_rsa",
"~/secrets.txt",
"~/Documents/passwords.txt",
"~\\AppData\\secrets",
"~\\Desktop\\config.ini",
]
for path in home_attacks:
assert not validate_project_path(path, project_path), (
f"Home directory attack '{path}' should be blocked"
)
def test_unc_and_network_paths(self, tmp_path):
"""Test that UNC and network paths are blocked."""
project_path = tmp_path / "project"
project_path.mkdir()
network_attacks = [
"\\\\server\\share",
"\\\\192.168.1.100\\c$",
"\\\\evil-server\\malicious-share\\file.exe",
"\\\\localhost\\c$\\Windows\\System32",
]
for path in network_attacks:
assert not validate_project_path(path, project_path), (
f"Network path attack '{path}' should be blocked"
)
def test_absolute_paths(self, tmp_path):
"""Test that absolute paths are blocked (if they contain traversal)."""
project_path = tmp_path / "project"
project_path.mkdir()
# Note: Some absolute paths might be allowed by pathlib resolution,
# but our function should catch traversal patterns first
absolute_attacks = [
"/etc/passwd",
"/home/user/.env",
"/var/log/auth.log",
"/root/.ssh/id_rsa",
"C:\\Windows\\System32\\config\\SAM",
"C:\\Users\\user\\.env",
"D:\\secrets\\config.json",
]
for path in absolute_attacks:
# These should be blocked either by traversal detection or pathlib resolution
result = validate_project_path(path, project_path)
assert not result, f"Absolute path '{path}' should be blocked"
class TestValidateProjectPathEdgeCases:
"""Test edge cases and error conditions."""
def test_malformed_paths(self, tmp_path):
"""Test handling of malformed or unusual paths."""
project_path = tmp_path / "project"
project_path.mkdir()
malformed_paths = [
"", # Empty string
" ", # Whitespace only
"\n", # Newline
"\t", # Tab
"\r\n", # Windows line ending
"file\x00name", # Null byte (if it gets this far)
"file\x01name", # Other control characters
]
for path in malformed_paths:
# These should either be blocked or cause an exception that's handled
try:
result = validate_project_path(path, project_path)
if path.strip(): # Non-empty paths with control chars should be blocked
assert not result, f"Malformed path '{repr(path)}' should be blocked"
except (ValueError, OSError):
# It's acceptable for these to raise exceptions
pass
def test_very_long_paths(self, tmp_path):
"""Test handling of very long paths."""
project_path = tmp_path / "project"
project_path.mkdir()
# Create a very long but legitimate path
long_path = "/".join(["verylongdirectoryname" * 10 for _ in range(10)])
# Should handle long paths gracefully (either allow or reject based on filesystem limits)
try:
result = validate_project_path(long_path, project_path)
# Result can be True or False, just shouldn't crash
assert isinstance(result, bool)
except (ValueError, OSError):
# It's acceptable for very long paths to raise exceptions
pass
def test_nonexistent_project_path(self):
"""Test behavior when project path doesn't exist."""
nonexistent_project = Path("/this/path/does/not/exist")
# Should still be able to validate relative paths
assert validate_project_path("notes/file.txt", nonexistent_project)
assert not validate_project_path("../../../etc/passwd", nonexistent_project)
def test_unicode_and_special_characters(self, tmp_path):
"""Test paths with Unicode and special characters."""
project_path = tmp_path / "project"
project_path.mkdir()
unicode_paths = [
"notes/文档.md", # Chinese characters
"docs/résumé.txt", # Accented characters
"files/naïve.md", # Diaeresis
"notes/café.txt", # Acute accent
"docs/日本語.md", # Japanese
"files/αβγ.txt", # Greek
"notes/файл.md", # Cyrillic
]
for path in unicode_paths:
try:
result = validate_project_path(path, project_path)
assert isinstance(result, bool), f"Unicode path '{path}' should return boolean"
# Unicode paths should generally be allowed if they don't contain traversal
assert result, f"Unicode path '{path}' should be allowed"
except (UnicodeError, OSError):
# Some unicode handling issues might be acceptable
pass
def test_case_sensitivity(self, tmp_path):
"""Test case sensitivity of traversal detection."""
project_path = tmp_path / "project"
project_path.mkdir()
# These should all be blocked regardless of case
case_variations = [
"../file.txt",
"../FILE.TXT",
"~/file.txt",
"~/FILE.TXT",
]
for path in case_variations:
assert not validate_project_path(path, project_path), (
f"Case variation '{path}' should be blocked"
)
def test_symbolic_link_behavior(self, tmp_path):
"""Test behavior with symbolic links (if supported by filesystem)."""
project_path = tmp_path / "project"
project_path.mkdir()
# Create a directory outside the project
outside_dir = tmp_path / "outside"
outside_dir.mkdir()
try:
# Try to create a symlink inside the project pointing outside
symlink_path = project_path / "symlink"
symlink_path.symlink_to(outside_dir)
# Paths through symlinks should be handled safely
result = validate_project_path("symlink/file.txt", project_path)
# The result can vary based on how pathlib handles symlinks,
# but it shouldn't crash and should be a boolean
assert isinstance(result, bool)
except (OSError, NotImplementedError):
# Symlinks might not be supported on this filesystem
pytest.skip("Symbolic links not supported on this filesystem")
def test_relative_path_edge_cases(self, tmp_path):
"""Test edge cases in relative path handling."""
project_path = tmp_path / "project"
project_path.mkdir()
edge_cases = [
".", # Current directory
"./", # Current directory with slash
"./file.txt", # File in current directory
"./folder/file.txt", # Nested file through current directory
"folder/./file.txt", # Current directory in middle of path
"folder/subfolder/.", # Current directory at end
]
for path in edge_cases:
result = validate_project_path(path, project_path)
# These should generally be safe as they don't escape the project
assert result, f"Relative path edge case '{path}' should be allowed"
class TestValidateProjectPathPerformance:
"""Test performance characteristics of path validation."""
def test_performance_with_many_paths(self, tmp_path):
"""Test that validation performs reasonably with many paths."""
project_path = tmp_path / "project"
project_path.mkdir()
# Test a mix of safe and dangerous paths
test_paths = []
# Add safe paths
for i in range(100):
test_paths.append(f"folder{i}/file{i}.txt")
# Add dangerous paths
for i in range(100):
test_paths.append(f"../../../etc/passwd{i}")
import time
start_time = time.time()
for path in test_paths:
result = validate_project_path(path, project_path)
assert isinstance(result, bool)
end_time = time.time()
# Should complete reasonably quickly (adjust threshold as needed)
assert end_time - start_time < 1.0, "Path validation should be fast"
class TestValidateProjectPathIntegration:
"""Integration tests with real filesystem scenarios."""
def test_with_actual_filesystem_structure(self, tmp_path):
"""Test validation with actual files and directories."""
project_path = tmp_path / "project"
project_path.mkdir()
# Create some actual files and directories
(project_path / "notes").mkdir()
(project_path / "docs").mkdir()
(project_path / "notes" / "meeting.md").write_text("# Meeting Notes")
(project_path / "docs" / "readme.txt").write_text("README")
# Test accessing existing files
assert validate_project_path("notes/meeting.md", project_path)
assert validate_project_path("docs/readme.txt", project_path)
# Test accessing non-existent but safe paths
assert validate_project_path("notes/new-file.md", project_path)
assert validate_project_path("new-folder/file.txt", project_path)
# Test that attacks are still blocked even with real filesystem
assert not validate_project_path("../../../etc/passwd", project_path)
assert not validate_project_path("notes/../../../etc/passwd", project_path)
def test_project_path_resolution_accuracy(self, tmp_path):
"""Test that path resolution works correctly with real paths."""
# Create a more complex directory structure
base_path = tmp_path / "workspace"
project_path = base_path / "my-project"
sibling_path = base_path / "other-project"
base_path.mkdir()
project_path.mkdir()
sibling_path.mkdir()
# Create a sensitive file in the sibling directory
(sibling_path / "secrets.txt").write_text("secret data")
# Try to access the sibling directory through traversal
attack_path = "../other-project/secrets.txt"
assert not validate_project_path(attack_path, project_path)
# Verify that legitimate access within project works
assert validate_project_path("my-file.txt", project_path)
assert validate_project_path("subdir/my-file.txt", project_path)
```