This is page 2 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── mount_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── sync.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   ├── test_sync_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_bisync_commands.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_cloud_utils.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```
# Files
--------------------------------------------------------------------------------
/.claude/commands/release/release-check.md:
--------------------------------------------------------------------------------
```markdown
# /release-check - Pre-flight Release Validation
Comprehensive pre-flight check for release readiness without making any changes.
## Usage
```
/release-check [version]
```
**Parameters:**
- `version` (optional): Version to validate like `v0.13.0`. If not provided, determines from context.
## Implementation
You are an expert QA engineer for the Basic Memory project. When the user runs `/release-check`, execute the following validation steps:
### Step 1: Environment Validation
1. **Git Status Check**
   - Verify working directory is clean
   - Confirm on `main` branch
   - Check if ahead/behind origin
2. **Version Validation**
   - Validate version format if provided
   - Check for existing tags with same version
   - Verify version increments properly from last release
### Step 2: Code Quality Gates
1. **Test Suite Validation**
   ```bash
   just test
   ```
   - All tests must pass
   - Check test coverage (target: 95%+)
   - Validate no skipped critical tests
2. **Code Quality Checks**
   ```bash
   just lint
   just type-check
   ```
   - No linting errors
   - No type checking errors
   - Code formatting is consistent
### Step 3: Documentation Validation
1. **Changelog Check**
   - CHANGELOG.md contains entry for target version
   - Entry includes all major features and fixes
   - Breaking changes are documented
2. **Documentation Currency**
   - README.md reflects current functionality
   - CLI reference is up to date
   - MCP tools are documented
### Step 4: Dependency Validation
1. **Security Scan**
   - No known vulnerabilities in dependencies
   - All dependencies are at appropriate versions
   - No conflicting dependency versions
2. **Build Validation**
   - Package builds successfully
   - All required files are included
   - No missing dependencies
### Step 5: Issue Tracking Validation
1. **GitHub Issues Check**
   - No critical open issues blocking release
   - All milestone issues are resolved
   - High-priority bugs are fixed
2. **Testing Coverage**
   - Integration tests pass
   - MCP tool tests pass
   - Cross-platform compatibility verified
## Report Format
Generate a comprehensive report:
```
🔍 Release Readiness Check for v0.13.0
✅ PASSED CHECKS:
├── Git status clean
├── On main branch  
├── All tests passing (744/744)
├── Test coverage: 98.2%
├── Type checking passed
├── Linting passed
├── CHANGELOG.md updated
└── No critical issues open
⚠️  WARNINGS:
├── 2 medium-priority issues still open
└── Documentation could be updated
❌ BLOCKING ISSUES:
└── None found
🎯 RELEASE READINESS: ✅ READY
Recommended next steps:
1. Address warnings if desired
2. Run `/release v0.13.0` when ready
```
## Validation Criteria
### Must Pass (Blocking)
- [ ] All tests pass
- [ ] No type errors
- [ ] No linting errors  
- [ ] Working directory clean
- [ ] On main branch
- [ ] CHANGELOG.md has version entry
- [ ] No critical open issues
### Should Pass (Warnings)
- [ ] Test coverage >95%
- [ ] No medium-priority open issues
- [ ] Documentation up to date
- [ ] No dependency vulnerabilities
## Context
- This is a read-only validation - makes no changes
- Provides confidence before running actual release
- Helps identify issues early in release process
- Can be run multiple times safely
```
--------------------------------------------------------------------------------
/tests/schemas/test_search.py:
--------------------------------------------------------------------------------
```python
"""Tests for search schemas."""
from datetime import datetime
from basic_memory.schemas.search import (
    SearchItemType,
    SearchQuery,
    SearchResult,
    SearchResponse,
)
def test_search_modes():
    """Test different search modes."""
    # Exact permalink
    query = SearchQuery(permalink="specs/search")
    assert query.permalink == "specs/search"
    assert query.text is None
    # Pattern match
    query = SearchQuery(permalink="specs/*")
    assert query.permalink == "specs/*"
    assert query.text is None
    # Text search
    query = SearchQuery(text="search implementation")
    assert query.text == "search implementation"
    assert query.permalink is None
def test_search_filters():
    """Test search result filtering."""
    query = SearchQuery(
        text="search",
        entity_types=[SearchItemType.ENTITY],
        types=["component"],
        after_date=datetime(2024, 1, 1),
    )
    assert query.entity_types == [SearchItemType.ENTITY]
    assert query.types == ["component"]
    assert query.after_date == "2024-01-01T00:00:00"
def test_search_result():
    """Test search result structure."""
    result = SearchResult(
        title="test",
        type=SearchItemType.ENTITY,
        entity="some_entity",
        score=0.8,
        metadata={"entity_type": "component"},
        permalink="specs/search",
        file_path="specs/search.md",
    )
    assert result.type == SearchItemType.ENTITY
    assert result.score == 0.8
    assert result.metadata == {"entity_type": "component"}
def test_observation_result():
    """Test observation result fields."""
    result = SearchResult(
        title="test",
        permalink="specs/search",
        file_path="specs/search.md",
        type=SearchItemType.OBSERVATION,
        score=0.5,
        metadata={},
        entity="some_entity",
        category="tech",
    )
    assert result.entity == "some_entity"
    assert result.category == "tech"
def test_relation_result():
    """Test relation result fields."""
    result = SearchResult(
        title="test",
        permalink="specs/search",
        file_path="specs/search.md",
        type=SearchItemType.RELATION,
        entity="some_entity",
        score=0.5,
        metadata={},
        from_entity="123",
        to_entity="456",
        relation_type="depends_on",
    )
    assert result.from_entity == "123"
    assert result.to_entity == "456"
    assert result.relation_type == "depends_on"
def test_search_response():
    """Test search response wrapper."""
    results = [
        SearchResult(
            title="test",
            permalink="specs/search",
            file_path="specs/search.md",
            type=SearchItemType.ENTITY,
            entity="some_entity",
            score=0.8,
            metadata={},
        ),
        SearchResult(
            title="test",
            permalink="specs/search",
            file_path="specs/search.md",
            type=SearchItemType.ENTITY,
            entity="some_entity",
            score=0.6,
            metadata={},
        ),
    ]
    response = SearchResponse(results=results, current_page=1, page_size=1)
    assert len(response.results) == 2
    assert response.results[0].score > response.results[1].score
```
--------------------------------------------------------------------------------
/src/basic_memory/markdown/utils.py:
--------------------------------------------------------------------------------
```python
"""Utilities for converting between markdown and entity models."""
from pathlib import Path
from typing import Any, Optional
from frontmatter import Post
from basic_memory.file_utils import has_frontmatter, remove_frontmatter, parse_frontmatter
from basic_memory.markdown import EntityMarkdown
from basic_memory.models import Entity
from basic_memory.models import Observation as ObservationModel
def entity_model_from_markdown(
    file_path: Path, markdown: EntityMarkdown, entity: Optional[Entity] = None
) -> Entity:
    """
    Convert markdown entity to model. Does not include relations.
    Args:
        file_path: Path to the markdown file
        markdown: Parsed markdown entity
        entity: Optional existing entity to update
    Returns:
        Entity model populated from markdown
    Raises:
        ValueError: If required datetime fields are missing from markdown
    """
    if not markdown.created or not markdown.modified:  # pragma: no cover
        raise ValueError("Both created and modified dates are required in markdown")
    # Create or update entity
    model = entity or Entity()
    # Update basic fields
    model.title = markdown.frontmatter.title
    model.entity_type = markdown.frontmatter.type
    # Only update permalink if it exists in frontmatter, otherwise preserve existing
    if markdown.frontmatter.permalink is not None:
        model.permalink = markdown.frontmatter.permalink
    model.file_path = file_path.as_posix()
    model.content_type = "text/markdown"
    model.created_at = markdown.created
    model.updated_at = markdown.modified
    # Handle metadata - ensure all values are strings and filter None
    metadata = markdown.frontmatter.metadata or {}
    model.entity_metadata = {k: str(v) for k, v in metadata.items() if v is not None}
    # Convert observations
    model.observations = [
        ObservationModel(
            content=obs.content,
            category=obs.category,
            context=obs.context,
            tags=obs.tags,
        )
        for obs in markdown.observations
    ]
    return model
async def schema_to_markdown(schema: Any) -> Post:
    """
    Convert schema to markdown Post object.
    Args:
        schema: Schema to convert (must have title, entity_type, and permalink attributes)
    Returns:
        Post object with frontmatter metadata
    """
    # Extract content and metadata
    content = schema.content or ""
    entity_metadata = dict(schema.entity_metadata or {})
    # if the content contains frontmatter, remove it and merge
    if has_frontmatter(content):
        content_frontmatter = parse_frontmatter(content)
        content = remove_frontmatter(content)
        # Merge content frontmatter with entity metadata
        # (entity_metadata takes precedence for conflicts)
        content_frontmatter.update(entity_metadata)
        entity_metadata = content_frontmatter
    # Remove special fields for ordered frontmatter
    for field in ["type", "title", "permalink"]:
        entity_metadata.pop(field, None)
    # Create Post with fields ordered by insert order
    post = Post(
        content,
        title=schema.title,
        type=schema.entity_type,
    )
    # set the permalink if passed in
    if schema.permalink:
        post.metadata["permalink"] = schema.permalink
    if entity_metadata:
        post.metadata.update(entity_metadata)
    return post
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/mcp.py:
--------------------------------------------------------------------------------
```python
"""MCP server command with streamable HTTP transport."""
import asyncio
import os
import typer
from typing import Optional
from basic_memory.cli.app import app
from basic_memory.config import ConfigManager
# Import mcp instance
from basic_memory.mcp.server import mcp as mcp_server  # pragma: no cover
# Import mcp tools to register them
import basic_memory.mcp.tools  # noqa: F401  # pragma: no cover
# Import prompts to register them
import basic_memory.mcp.prompts  # noqa: F401  # pragma: no cover
from loguru import logger
import threading
from basic_memory.services.initialization import initialize_file_sync
config = ConfigManager().config
if not config.cloud_mode_enabled:
    @app.command()
    def mcp(
        transport: str = typer.Option(
            "stdio", help="Transport type: stdio, streamable-http, or sse"
        ),
        host: str = typer.Option(
            "0.0.0.0", help="Host for HTTP transports (use 0.0.0.0 to allow external connections)"
        ),
        port: int = typer.Option(8000, help="Port for HTTP transports"),
        path: str = typer.Option("/mcp", help="Path prefix for streamable-http transport"),
        project: Optional[str] = typer.Option(None, help="Restrict MCP server to single project"),
    ):  # pragma: no cover
        """Run the MCP server with configurable transport options.
        This command starts an MCP server using one of three transport options:
        - stdio: Standard I/O (good for local usage)
        - streamable-http: Recommended for web deployments (default)
        - sse: Server-Sent Events (for compatibility with existing clients)
        """
        # Validate and set project constraint if specified
        if project:
            config_manager = ConfigManager()
            project_name, _ = config_manager.get_project(project)
            if not project_name:
                typer.echo(f"No project found named: {project}", err=True)
                raise typer.Exit(1)
            # Set env var with validated project name
            os.environ["BASIC_MEMORY_MCP_PROJECT"] = project_name
            logger.info(f"MCP server constrained to project: {project_name}")
        app_config = ConfigManager().config
        def run_file_sync():
            """Run file sync in a separate thread with its own event loop."""
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            try:
                loop.run_until_complete(initialize_file_sync(app_config))
            except Exception as e:
                logger.error(f"File sync error: {e}", err=True)
            finally:
                loop.close()
        logger.info(f"Sync changes enabled: {app_config.sync_changes}")
        if app_config.sync_changes:
            # Start the sync thread
            sync_thread = threading.Thread(target=run_file_sync, daemon=True)
            sync_thread.start()
            logger.info("Started file sync in background")
        # Now run the MCP server (blocks)
        logger.info(f"Starting MCP server with {transport.upper()} transport")
        if transport == "stdio":
            mcp_server.run(
                transport=transport,
            )
        elif transport == "streamable-http" or transport == "sse":
            mcp_server.run(
                transport=transport,
                host=host,
                port=port,
                path=path,
                log_level="INFO",
            )
```
--------------------------------------------------------------------------------
/CLA.md:
--------------------------------------------------------------------------------
```markdown
# Contributor License Agreement
## Copyright Assignment and License Grant
By signing this Contributor License Agreement ("Agreement"), you accept and agree to the following terms and conditions
for your present and future Contributions submitted
to Basic Machines LLC. Except for the license granted herein to Basic Machines LLC and recipients of software
distributed by Basic Machines LLC, you reserve all right,
title, and interest in and to your Contributions.
### 1. Definitions
"You" (or "Your") shall mean the copyright owner or legal entity authorized by the copyright owner that is making this
Agreement with Basic Machines LLC.
"Contribution" shall mean any original work of authorship, including any modifications or additions to an existing work,
that is intentionally submitted by You to Basic
Machines LLC for inclusion in, or documentation of, any of the products owned or managed by Basic Machines LLC (the "
Work").
### 2. Grant of Copyright License
Subject to the terms and conditions of this Agreement, You hereby grant to Basic Machines LLC and to recipients of
software distributed by Basic Machines LLC a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the
Work, and to permit persons to whom the Work is furnished to do so.
### 3. Assignment of Copyright
You hereby assign to Basic Machines LLC all right, title, and interest worldwide in all Copyright covering your
Contributions. Basic Machines LLC may license the
Contributions under any license terms, including copyleft, permissive, commercial, or proprietary licenses.
### 4. Grant of Patent License
Subject to the terms and conditions of this Agreement, You hereby grant to Basic Machines LLC and to recipients of
software distributed by Basic Machines LLC a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, and
otherwise transfer the Work.
### 5. Developer Certificate of Origin
By making a Contribution to this project, You certify that:
(a) The Contribution was created in whole or in part by You and You have the right to submit it under this Agreement; or
(b) The Contribution is based upon previous work that, to the best of Your knowledge, is covered under an appropriate
open source license and You have the right under that
license to submit that work with modifications, whether created in whole or in part by You, under this Agreement; or
(c) The Contribution was provided directly to You by some other person who certified (a), (b) or (c) and You have not
modified it.
(d) You understand and agree that this project and the Contribution are public and that a record of the Contribution (
including all personal information You submit with
it, including Your sign-off) is maintained indefinitely and may be redistributed consistent with this project or the
open source license(s) involved.
### 6. Representations
You represent that you are legally entitled to grant the above license and assignment. If your employer(s) has rights to
intellectual property that you create that
includes your Contributions, you represent that you have received permission to make Contributions on behalf of that
employer, or that your employer has waived such rights
for your Contributions to Basic Machines LLC.
---
This Agreement is effective as of the date you first submit a Contribution to Basic Machines LLC.
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "basic-memory"
dynamic = ["version"]
description = "Local-first knowledge management combining Zettelkasten with knowledge graphs"
readme = "README.md"
requires-python = ">=3.12"
license = { text = "AGPL-3.0-or-later" }
authors = [
    { name = "Basic Machines", email = "[email protected]" }
]
dependencies = [
    "sqlalchemy>=2.0.0",
    "pyyaml>=6.0.1",
    "typer>=0.9.0",
    "aiosqlite>=0.20.0",
    "greenlet>=3.1.1",
    "pydantic[email,timezone]>=2.10.3",
    "icecream>=2.1.3",
    "mcp>=1.2.0",
    "pydantic-settings>=2.6.1",
    "loguru>=0.7.3",
    "pyright>=1.1.390",
    "markdown-it-py>=3.0.0",
    "python-frontmatter>=1.1.0",
    "rich>=13.9.4",
    "unidecode>=1.3.8",
    "dateparser>=1.2.0",
    "watchfiles>=1.0.4",
    "fastapi[standard]>=0.115.8",
    "alembic>=1.14.1",
    "pillow>=11.1.0",
    "pybars3>=0.9.7",
    "fastmcp>=2.10.2",
    "pyjwt>=2.10.1",
    "python-dotenv>=1.1.0",
    "pytest-aio>=1.9.0",
    "aiofiles>=24.1.0", # Async file I/O
    "logfire>=0.73.0", # Optional observability (disabled by default via config)
]
[project.urls]
Homepage = "https://github.com/basicmachines-co/basic-memory"
Repository = "https://github.com/basicmachines-co/basic-memory"
Documentation = "https://github.com/basicmachines-co/basic-memory#readme"
[project.scripts]
basic-memory = "basic_memory.cli.main:app"
bm = "basic_memory.cli.main:app"
[build-system]
requires = ["hatchling", "uv-dynamic-versioning>=0.7.0"]
build-backend = "hatchling.build"
[tool.pytest.ini_options]
pythonpath = ["src", "tests"]
addopts = "--cov=basic_memory --cov-report term-missing"
testpaths = ["tests", "test-int"]
asyncio_mode = "strict"
asyncio_default_fixture_loop_scope = "function"
markers = [
    "benchmark: Performance benchmark tests (deselect with '-m \"not benchmark\"')",
    "slow: Slow-running tests (deselect with '-m \"not slow\"')",
]
[tool.ruff]
line-length = 100
target-version = "py312"
[dependency-groups]
dev = [
    "gevent>=24.11.1",
    "icecream>=2.1.3",
    "pytest>=8.3.4",
    "pytest-cov>=4.1.0",
    "pytest-mock>=3.12.0",
    "pytest-asyncio>=0.24.0",
    "pytest-xdist>=3.0.0",
    "ruff>=0.1.6",
    "freezegun>=1.5.5",
]
[tool.hatch.version]
source = "uv-dynamic-versioning"
[tool.uv-dynamic-versioning]
vcs = "git"
style = "pep440"
bump = true
fallback-version = "0.0.0"
[tool.pyright]
include = ["src/"]
exclude = ["**/__pycache__"]
ignore = ["test/"]
defineConstant = { DEBUG = true }
reportMissingImports = "error"
reportMissingTypeStubs = false
pythonVersion = "3.12"
[tool.coverage.run]
concurrency = ["thread", "gevent"]
[tool.coverage.report]
exclude_lines = [
    "pragma: no cover",
    "def __repr__",
    "if self.debug:",
    "if settings.DEBUG",
    "raise AssertionError",
    "raise NotImplementedError",
    "if 0:",
    "if __name__ == .__main__.:",
    "class .*\\bProtocol\\):",
    "@(abc\\.)?abstractmethod",
]
# Exclude specific modules that are difficult to test comprehensively
omit = [
    "*/external_auth_provider.py",  # External HTTP calls to OAuth providers
    "*/supabase_auth_provider.py",  # External HTTP calls to Supabase APIs
    "*/watch_service.py",           # File system watching - complex integration testing
    "*/background_sync.py",         # Background processes
    "*/cli/main.py",               # CLI entry point
    "*/mcp/tools/project_management.py",  # Covered by integration tests
    "*/mcp/tools/sync_status.py",  # Covered by integration tests
    "*/services/migration_service.py", # Complex migration scenarios
]
[tool.logfire]
ignore_no_config = true
```
--------------------------------------------------------------------------------
/src/basic_memory/schemas/prompt.py:
--------------------------------------------------------------------------------
```python
"""Request and response schemas for prompt-related operations."""
from typing import Optional, List, Any, Dict
from pydantic import BaseModel, Field
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import EntitySummary, ObservationSummary, RelationSummary
class PromptContextItem(BaseModel):
    """Container for primary and related results to render in a prompt."""
    primary_results: List[EntitySummary]
    related_results: List[EntitySummary | ObservationSummary | RelationSummary]
class ContinueConversationRequest(BaseModel):
    """Request for generating a continue conversation prompt.
    Used to provide context for continuing a conversation on a specific topic
    or with recent activity from a given timeframe.
    """
    topic: Optional[str] = Field(None, description="Topic or keyword to search for")
    timeframe: Optional[TimeFrame] = Field(
        None, description="How far back to look for activity (e.g. '1d', '1 week')"
    )
    # Limit depth to max 2 for performance reasons - higher values cause significant slowdown
    search_items_limit: int = Field(
        5,
        description="Maximum number of search results to include in context (max 10)",
        ge=1,
        le=10,
    )
    depth: int = Field(
        1,
        description="How many relationship 'hops' to follow when building context (max 5)",
        ge=1,
        le=5,
    )
    # Limit related items to prevent overloading the context
    related_items_limit: int = Field(
        5, description="Maximum number of related items to include in context (max 10)", ge=1, le=10
    )
class SearchPromptRequest(BaseModel):
    """Request for generating a search results prompt.
    Used to format search results into a prompt with context and suggestions.
    """
    query: str = Field(..., description="The search query text")
    timeframe: Optional[TimeFrame] = Field(
        None, description="Optional timeframe to limit results (e.g. '1d', '1 week')"
    )
class PromptMetadata(BaseModel):
    """Metadata about a prompt response.
    Contains statistical information about the prompt generation process
    and results, useful for debugging and UI display.
    """
    query: Optional[str] = Field(None, description="The original query or topic")
    timeframe: Optional[str] = Field(None, description="The timeframe used for filtering")
    search_count: int = Field(0, description="Number of search results found")
    context_count: int = Field(0, description="Number of context items retrieved")
    observation_count: int = Field(0, description="Total number of observations included")
    relation_count: int = Field(0, description="Total number of relations included")
    total_items: int = Field(0, description="Total number of all items included in the prompt")
    search_limit: int = Field(0, description="Maximum search results requested")
    context_depth: int = Field(0, description="Context depth used")
    related_limit: int = Field(0, description="Maximum related items requested")
    generated_at: str = Field(..., description="ISO timestamp when this prompt was generated")
class PromptResponse(BaseModel):
    """Response containing the rendered prompt.
    Includes both the rendered prompt text and the context that was used
    to render it, for potential client-side use.
    """
    prompt: str = Field(..., description="The rendered prompt text")
    context: Dict[str, Any] = Field(..., description="The context used to render the prompt")
    metadata: PromptMetadata = Field(
        ..., description="Metadata about the prompt generation process"
    )
```
--------------------------------------------------------------------------------
/src/basic_memory/schemas/search.py:
--------------------------------------------------------------------------------
```python
"""Search schemas for Basic Memory.
The search system supports three primary modes:
1. Exact permalink lookup
2. Pattern matching with *
3. Full-text search across content
"""
from typing import Optional, List, Union
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, field_validator
from basic_memory.schemas.base import Permalink
class SearchItemType(str, Enum):
    """Types of searchable items."""
    ENTITY = "entity"
    OBSERVATION = "observation"
    RELATION = "relation"
class SearchQuery(BaseModel):
    """Search query parameters.
    Use ONE of these primary search modes:
    - permalink: Exact permalink match
    - permalink_match: Path pattern with *
    - text: Full-text search of title/content (supports boolean operators: AND, OR, NOT)
    Optionally filter results by:
    - types: Limit to specific item types
    - entity_types: Limit to specific entity types
    - after_date: Only items after date
    Boolean search examples:
    - "python AND flask" - Find items with both terms
    - "python OR django" - Find items with either term
    - "python NOT django" - Find items with python but not django
    - "(python OR flask) AND web" - Use parentheses for grouping
    """
    # Primary search modes (use ONE of these)
    permalink: Optional[str] = None  # Exact permalink match
    permalink_match: Optional[str] = None  # Glob permalink match
    text: Optional[str] = None  # Full-text search (now supports boolean operators)
    title: Optional[str] = None  # title only search
    # Optional filters
    types: Optional[List[str]] = None  # Filter by type
    entity_types: Optional[List[SearchItemType]] = None  # Filter by entity type
    after_date: Optional[Union[datetime, str]] = None  # Time-based filter
    @field_validator("after_date")
    @classmethod
    def validate_date(cls, v: Optional[Union[datetime, str]]) -> Optional[str]:
        """Convert datetime to ISO format if needed."""
        if isinstance(v, datetime):
            return v.isoformat()
        return v
    def no_criteria(self) -> bool:
        return (
            self.permalink is None
            and self.permalink_match is None
            and self.title is None
            and self.text is None
            and self.after_date is None
            and self.types is None
            and self.entity_types is None
        )
    def has_boolean_operators(self) -> bool:
        """Check if the text query contains boolean operators (AND, OR, NOT)."""
        if not self.text:  # pragma: no cover
            return False
        # Check for common boolean operators with correct word boundaries
        # to avoid matching substrings like "GRAND" containing "AND"
        boolean_patterns = [" AND ", " OR ", " NOT ", "(", ")"]
        text = f" {self.text} "  # Add spaces to ensure we match word boundaries
        return any(pattern in text for pattern in boolean_patterns)
class SearchResult(BaseModel):
    """Search result with score and metadata."""
    title: str
    type: SearchItemType
    score: float
    entity: Optional[Permalink] = None
    permalink: Optional[str]
    content: Optional[str] = None
    file_path: str
    metadata: Optional[dict] = None
    # Type-specific fields
    category: Optional[str] = None  # For observations
    from_entity: Optional[Permalink] = None  # For relations
    to_entity: Optional[Permalink] = None  # For relations
    relation_type: Optional[str] = None  # For relations
class SearchResponse(BaseModel):
    """Wrapper for search results."""
    results: List[SearchResult]
    current_page: int
    page_size: int
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/cc7172b46608_update_search_index_schema.py:
--------------------------------------------------------------------------------
```python
"""Update search index schema
Revision ID: cc7172b46608
Revises: 502b60eaa905
Create Date: 2025-02-28 18:48:23.244941
"""
from typing import Sequence, Union
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "cc7172b46608"
down_revision: Union[str, None] = "502b60eaa905"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
    """Upgrade database schema to use new search index with content_stems and content_snippet."""
    # First, drop the existing search_index table
    op.execute("DROP TABLE IF EXISTS search_index")
    # Create new search_index with updated schema
    op.execute("""
    CREATE VIRTUAL TABLE IF NOT EXISTS search_index USING fts5(
        -- Core entity fields
        id UNINDEXED,          -- Row ID
        title,                 -- Title for searching
        content_stems,         -- Main searchable content split into stems
        content_snippet,       -- File content snippet for display
        permalink,             -- Stable identifier (now indexed for path search)
        file_path UNINDEXED,   -- Physical location
        type UNINDEXED,        -- entity/relation/observation
        
        -- Relation fields 
        from_id UNINDEXED,     -- Source entity
        to_id UNINDEXED,       -- Target entity
        relation_type UNINDEXED, -- Type of relation
        
        -- Observation fields
        entity_id UNINDEXED,   -- Parent entity
        category UNINDEXED,    -- Observation category
        
        -- Common fields
        metadata UNINDEXED,    -- JSON metadata
        created_at UNINDEXED,  -- Creation timestamp
        updated_at UNINDEXED,  -- Last update
        
        -- Configuration
        tokenize='unicode61 tokenchars 0x2F',  -- Hex code for /
        prefix='1,2,3,4'                    -- Support longer prefixes for paths
    );
    """)
def downgrade() -> None:
    """Downgrade database schema to use old search index."""
    # Drop the updated search_index table
    op.execute("DROP TABLE IF EXISTS search_index")
    # Recreate the original search_index schema
    op.execute("""
    CREATE VIRTUAL TABLE IF NOT EXISTS search_index USING fts5(
        -- Core entity fields
        id UNINDEXED,          -- Row ID
        title,                 -- Title for searching
        content,               -- Main searchable content
        permalink,             -- Stable identifier (now indexed for path search)
        file_path UNINDEXED,   -- Physical location
        type UNINDEXED,        -- entity/relation/observation
        
        -- Relation fields 
        from_id UNINDEXED,     -- Source entity
        to_id UNINDEXED,       -- Target entity
        relation_type UNINDEXED, -- Type of relation
        
        -- Observation fields
        entity_id UNINDEXED,   -- Parent entity
        category UNINDEXED,    -- Observation category
        
        -- Common fields
        metadata UNINDEXED,    -- JSON metadata
        created_at UNINDEXED,  -- Creation timestamp
        updated_at UNINDEXED,  -- Last update
        
        -- Configuration
        tokenize='unicode61 tokenchars 0x2F',  -- Hex code for /
        prefix='1,2,3,4'                    -- Support longer prefixes for paths
    );
    """)
    # Print instruction to manually reindex after migration
    print("\n------------------------------------------------------------------")
    print("IMPORTANT: After downgrade completes, manually run the reindex command:")
    print("basic-memory sync")
    print("------------------------------------------------------------------\n")
```
--------------------------------------------------------------------------------
/src/basic_memory/repository/relation_repository.py:
--------------------------------------------------------------------------------
```python
"""Repository for managing Relation objects."""
from sqlalchemy import and_, delete
from typing import Sequence, List, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload, aliased
from sqlalchemy.orm.interfaces import LoaderOption
from basic_memory import db
from basic_memory.models import Relation, Entity
from basic_memory.repository.repository import Repository
class RelationRepository(Repository[Relation]):
    """Repository for Relation model with memory-specific operations."""
    def __init__(self, session_maker: async_sessionmaker, project_id: int):
        """Initialize with session maker and project_id filter.
        Args:
            session_maker: SQLAlchemy session maker
            project_id: Project ID to filter all operations by
        """
        super().__init__(session_maker, Relation, project_id=project_id)
    async def find_relation(
        self, from_permalink: str, to_permalink: str, relation_type: str
    ) -> Optional[Relation]:
        """Find a relation by its from and to path IDs."""
        from_entity = aliased(Entity)
        to_entity = aliased(Entity)
        query = (
            select(Relation)
            .join(from_entity, Relation.from_id == from_entity.id)
            .join(to_entity, Relation.to_id == to_entity.id)
            .where(
                and_(
                    from_entity.permalink == from_permalink,
                    to_entity.permalink == to_permalink,
                    Relation.relation_type == relation_type,
                )
            )
        )
        return await self.find_one(query)
    async def find_by_entities(self, from_id: int, to_id: int) -> Sequence[Relation]:
        """Find all relations between two entities."""
        query = select(Relation).where((Relation.from_id == from_id) & (Relation.to_id == to_id))
        result = await self.execute_query(query)
        return result.scalars().all()
    async def find_by_type(self, relation_type: str) -> Sequence[Relation]:
        """Find all relations of a specific type."""
        query = select(Relation).filter(Relation.relation_type == relation_type)
        result = await self.execute_query(query)
        return result.scalars().all()
    async def delete_outgoing_relations_from_entity(self, entity_id: int) -> None:
        """Delete outgoing relations for an entity.
        Only deletes relations where this entity is the source (from_id),
        as these are the ones owned by this entity's markdown file.
        """
        async with db.scoped_session(self.session_maker) as session:
            await session.execute(delete(Relation).where(Relation.from_id == entity_id))
    async def find_unresolved_relations(self) -> Sequence[Relation]:
        """Find all unresolved relations, where to_id is null."""
        query = select(Relation).filter(Relation.to_id.is_(None))
        result = await self.execute_query(query)
        return result.scalars().all()
    async def find_unresolved_relations_for_entity(self, entity_id: int) -> Sequence[Relation]:
        """Find unresolved relations for a specific entity.
        Args:
            entity_id: The entity whose unresolved outgoing relations to find.
        Returns:
            List of unresolved relations where this entity is the source.
        """
        query = select(Relation).filter(Relation.from_id == entity_id, Relation.to_id.is_(None))
        result = await self.execute_query(query)
        return result.scalars().all()
    def get_load_options(self) -> List[LoaderOption]:
        return [selectinload(Relation.from_entity), selectinload(Relation.to_entity)]
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/alembic.ini:
--------------------------------------------------------------------------------
```
# A generic, single database configuration.
[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = .
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to migrations/versions.  When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
# version_path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
version_path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts.  See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
```
--------------------------------------------------------------------------------
/src/basic_memory/schemas/request.py:
--------------------------------------------------------------------------------
```python
"""Request schemas for interacting with the knowledge graph."""
from typing import List, Optional, Annotated, Literal
from annotated_types import MaxLen, MinLen
from pydantic import BaseModel, field_validator
from basic_memory.schemas.base import (
    Relation,
    Permalink,
)
class SearchNodesRequest(BaseModel):
    """Search for entities in the knowledge graph.
    The search looks across multiple fields:
    - Entity title
    - Entity types
    - summary
    - file content
    - Observations
    Features:
    - Case-insensitive matching
    - Partial word matches
    - Returns full entity objects with relations
    - Includes all matching entities
    - If a category is specified, only entities with that category are returned
    Example Queries:
    - "memory" - Find entities related to memory systems
    - "SQLite" - Find database-related components
    - "test" - Find test-related entities
    - "implementation" - Find concrete implementations
    - "service" - Find service components
    Note: Currently uses SQL ILIKE for matching. Wildcard (*) searches
    and full-text search capabilities are planned for future versions.
    """
    query: Annotated[str, MinLen(1), MaxLen(200)]
    category: Optional[str] = None
class GetEntitiesRequest(BaseModel):
    """Retrieve specific entities by their IDs.
    Used to load complete entity details including all observations
    and relations. Particularly useful for following relations
    discovered through search.
    """
    permalinks: Annotated[List[Permalink], MinLen(1), MaxLen(10)]
class CreateRelationsRequest(BaseModel):
    relations: List[Relation]
class EditEntityRequest(BaseModel):
    """Request schema for editing an existing entity's content.
    This allows for targeted edits without requiring the full entity content.
    Supports various operation types for different editing scenarios.
    """
    operation: Literal["append", "prepend", "find_replace", "replace_section"]
    content: str
    section: Optional[str] = None
    find_text: Optional[str] = None
    expected_replacements: int = 1
    @field_validator("section")
    @classmethod
    def validate_section_for_replace_section(cls, v, info):
        """Ensure section is provided for replace_section operation."""
        if info.data.get("operation") == "replace_section" and not v:
            raise ValueError("section parameter is required for replace_section operation")
        return v
    @field_validator("find_text")
    @classmethod
    def validate_find_text_for_find_replace(cls, v, info):
        """Ensure find_text is provided for find_replace operation."""
        if info.data.get("operation") == "find_replace" and not v:
            raise ValueError("find_text parameter is required for find_replace operation")
        return v
class MoveEntityRequest(BaseModel):
    """Request schema for moving an entity to a new file location.
    This allows moving notes to different paths while maintaining project
    consistency and optionally updating permalinks based on configuration.
    """
    identifier: Annotated[str, MinLen(1), MaxLen(200)]
    destination_path: Annotated[str, MinLen(1), MaxLen(500)]
    project: Optional[str] = None
    @field_validator("destination_path")
    @classmethod
    def validate_destination_path(cls, v):
        """Ensure destination path is relative and valid."""
        if v.startswith("/"):
            raise ValueError("destination_path must be relative, not absolute")
        if ".." in v:
            raise ValueError("destination_path cannot contain '..' path components")
        if not v.strip():
            raise ValueError("destination_path cannot be empty or whitespace only")
        return v.strip()
```
--------------------------------------------------------------------------------
/src/basic_memory/repository/project_repository.py:
--------------------------------------------------------------------------------
```python
"""Repository for managing projects in Basic Memory."""
from pathlib import Path
from typing import Optional, Sequence, Union
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from basic_memory import db
from basic_memory.models.project import Project
from basic_memory.repository.repository import Repository
class ProjectRepository(Repository[Project]):
    """Repository for Project model.
    Projects represent collections of knowledge entities grouped together.
    Each entity, observation, and relation belongs to a specific project.
    """
    def __init__(self, session_maker: async_sessionmaker[AsyncSession]):
        """Initialize with session maker."""
        super().__init__(session_maker, Project)
    async def get_by_name(self, name: str) -> Optional[Project]:
        """Get project by name.
        Args:
            name: Unique name of the project
        """
        query = self.select().where(Project.name == name)
        return await self.find_one(query)
    async def get_by_permalink(self, permalink: str) -> Optional[Project]:
        """Get project by permalink.
        Args:
            permalink: URL-friendly identifier for the project
        """
        query = self.select().where(Project.permalink == permalink)
        return await self.find_one(query)
    async def get_by_path(self, path: Union[Path, str]) -> Optional[Project]:
        """Get project by filesystem path.
        Args:
            path: Path to the project directory (will be converted to string internally)
        """
        query = self.select().where(Project.path == Path(path).as_posix())
        return await self.find_one(query)
    async def get_default_project(self) -> Optional[Project]:
        """Get the default project (the one marked as is_default=True)."""
        query = self.select().where(Project.is_default.is_not(None))
        return await self.find_one(query)
    async def get_active_projects(self) -> Sequence[Project]:
        """Get all active projects."""
        query = self.select().where(Project.is_active == True)  # noqa: E712
        result = await self.execute_query(query)
        return list(result.scalars().all())
    async def set_as_default(self, project_id: int) -> Optional[Project]:
        """Set a project as the default and unset previous default.
        Args:
            project_id: ID of the project to set as default
        Returns:
            The updated project if found, None otherwise
        """
        async with db.scoped_session(self.session_maker) as session:
            # First, clear the default flag for all projects using direct SQL
            await session.execute(
                text("UPDATE project SET is_default = NULL WHERE is_default IS NOT NULL")
            )
            await session.flush()
            # Set the new default project
            target_project = await self.select_by_id(session, project_id)
            if target_project:
                target_project.is_default = True
                await session.flush()
                return target_project
            return None  # pragma: no cover
    async def update_path(self, project_id: int, new_path: str) -> Optional[Project]:
        """Update project path.
        Args:
            project_id: ID of the project to update
            new_path: New filesystem path for the project
        Returns:
            The updated project if found, None otherwise
        """
        async with db.scoped_session(self.session_maker) as session:
            project = await self.select_by_id(session, project_id)
            if project:
                project.path = new_path
                await session.flush()
                return project
            return None
```
--------------------------------------------------------------------------------
/test-int/mcp/test_project_state_sync_integration.py:
--------------------------------------------------------------------------------
```python
"""Integration test for project state synchronization between MCP session and CLI config.
This test validates the fix for GitHub issue #148 where MCP session and CLI commands
had inconsistent project state, causing "Project not found" errors and edit failures.
The test simulates the exact workflow reported in the issue:
1. MCP server starts with a default project
2. Default project is changed via CLI/API
3. MCP tools should immediately use the new project (no restart needed)
4. All operations should work consistently in the new project context
"""
import pytest
from fastmcp import Client
@pytest.mark.asyncio
async def test_project_state_sync_after_default_change(
    mcp_server, app, config_manager, test_project
):
    """Test that MCP session stays in sync when default project is changed."""
    async with Client(mcp_server) as client:
        # Step 1: Create a second project that we can switch to
        create_result = await client.call_tool(
            "create_memory_project",
            {
                "project_name": "minerva",
                "project_path": "/tmp/minerva-test-project",
                "set_default": False,  # Don't set as default yet
            },
        )
        assert len(create_result.content) == 1
        assert "✓" in create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert "minerva" in create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        # Step 2: Test that note operations work in the new project context
        # This validates that the identifier resolution works correctly
        write_result = await client.call_tool(
            "write_note",
            {
                "project": "minerva",
                "title": "Test Consistency Note",
                "folder": "test",
                "content": "# Test Note\n\nThis note tests project state consistency.\n\n- [test] Project state sync working",
                "tags": "test,consistency",
            },
        )
        assert len(write_result.content) == 1
        assert "Test Consistency Note" in write_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        # Step 3: Test that we can read the note we just created
        read_result = await client.call_tool(
            "read_note", {"project": "minerva", "identifier": "Test Consistency Note"}
        )
        assert len(read_result.content) == 1
        assert "Test Consistency Note" in read_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert "project state sync working" in read_result.content[0].text.lower()  # pyright: ignore [reportAttributeAccessIssue]
        # Step 4: Test that edit operations work (this was failing in the original issue)
        edit_result = await client.call_tool(
            "edit_note",
            {
                "project": "minerva",
                "identifier": "Test Consistency Note",
                "operation": "append",
                "content": "\n\n## Update\n\nEdit operation successful after project switch!",
            },
        )
        assert len(edit_result.content) == 1
        assert (
            "added" in edit_result.content[0].text.lower()  # pyright: ignore [reportAttributeAccessIssue]
            and "lines" in edit_result.content[0].text.lower()  # pyright: ignore [reportAttributeAccessIssue]
        )
        # Step 5: Verify the edit was applied
        final_read_result = await client.call_tool(
            "read_note", {"project": "minerva", "identifier": "Test Consistency Note"}
        )
        assert len(final_read_result.content) == 1
        final_content = final_read_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert "Edit operation successful" in final_content
```
--------------------------------------------------------------------------------
/tests/markdown/test_relation_edge_cases.py:
--------------------------------------------------------------------------------
```python
"""Tests for edge cases in relation parsing."""
from markdown_it import MarkdownIt
from basic_memory.markdown.plugins import relation_plugin, parse_relation, parse_inline_relations
from basic_memory.markdown.schemas import Relation
def test_empty_targets():
    """Test handling of empty targets."""
    md = MarkdownIt().use(relation_plugin)
    # Empty brackets
    tokens = md.parse("- type [[]]")
    token = next(t for t in tokens if t.type == "inline")
    assert parse_relation(token) is None
    # Only spaces
    tokens = md.parse("- type [[ ]]")
    token = next(t for t in tokens if t.type == "inline")
    assert parse_relation(token) is None
    # Whitespace in brackets
    tokens = md.parse("- type [[   ]]")
    token = next(t for t in tokens if t.type == "inline")
    assert parse_relation(token) is None
def test_malformed_links():
    """Test handling of malformed wiki links."""
    md = MarkdownIt().use(relation_plugin)
    # Missing close brackets
    tokens = md.parse("- type [[Target")
    assert not any(t.meta and "relations" in t.meta for t in tokens)
    # Missing open brackets
    tokens = md.parse("- type Target]]")
    assert not any(t.meta and "relations" in t.meta for t in tokens)
    # Backwards brackets
    tokens = md.parse("- type ]]Target[[")
    assert not any(t.meta and "relations" in t.meta for t in tokens)
    # Nested brackets
    tokens = md.parse("- type [[Outer [[Inner]] ]]")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel is not None
    assert "Outer" in rel["target"]
def test_context_handling():
    """Test handling of contexts."""
    md = MarkdownIt().use(relation_plugin)
    # Unclosed context
    tokens = md.parse("- type [[Target]] (unclosed")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["context"] is None
    # Multiple parens
    tokens = md.parse("- type [[Target]] (with (nested) parens)")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["context"] == "with (nested) parens"
    # Empty context
    tokens = md.parse("- type [[Target]] ()")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["context"] is None
def test_inline_relations():
    """Test inline relation detection."""
    md = MarkdownIt().use(relation_plugin)
    # Multiple links in text
    text = "Text with [[Link1]] and [[Link2]] and [[Link3]]"
    rels = parse_inline_relations(text)
    assert len(rels) == 3
    assert {r["target"] for r in rels} == {"Link1", "Link2", "Link3"}
    # Links with surrounding text
    text = "Before [[Target]] After"
    rels = parse_inline_relations(text)
    assert len(rels) == 1
    assert rels[0]["target"] == "Target"
    # Multiple links on same line
    tokens = md.parse("[[One]] [[Two]] [[Three]]")
    token = next(t for t in tokens if t.type == "inline")
    assert len(token.meta["relations"]) == 3
def test_unicode_targets():
    """Test handling of Unicode in targets."""
    md = MarkdownIt().use(relation_plugin)
    # Unicode in target
    tokens = md.parse("- type [[测试]]")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["target"] == "测试"
    # Unicode in type
    tokens = md.parse("- 使用 [[Target]]")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["type"] == "使用"
    # Unicode in context
    tokens = md.parse("- type [[Target]] (测试)")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["context"] == "测试"
    # Model validation with Unicode
    relation = Relation.model_validate(rel)
    assert relation.type == "type"
    assert relation.target == "Target"
    assert relation.context == "测试"
```
--------------------------------------------------------------------------------
/test-int/BENCHMARKS.md:
--------------------------------------------------------------------------------
```markdown
# Performance Benchmarks
This directory contains performance benchmark tests for Basic Memory's sync/indexing operations.
## Purpose
These benchmarks measure baseline performance to track improvements from optimizations. They are particularly important for:
- Cloud deployments with ephemeral databases that need fast re-indexing
- Large repositories (100s to 1000s of files)
- Validating optimization efforts
## Running Benchmarks
### Run all benchmarks (excluding slow ones)
```bash
pytest test-int/test_sync_performance_benchmark.py -v -m "benchmark and not slow"
```
### Run specific benchmark
```bash
# 100 files (fast, ~10-30 seconds)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_sync_100_files -v
# 500 files (medium, ~1-3 minutes)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_sync_500_files -v
# 1000 files (slow, ~3-10 minutes)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_sync_1000_files -v
# Re-sync with no changes (tests scan performance)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_resync_no_changes -v
```
### Run all benchmarks including slow ones
```bash
pytest test-int/test_sync_performance_benchmark.py -v -m benchmark
```
### Skip benchmarks in regular test runs
```bash
pytest -m "not benchmark"
```
## Benchmark Output
Each benchmark provides detailed metrics including:
- **Performance Metrics**:
  - Total sync time
  - Files processed per second
  - Milliseconds per file
- **Database Metrics**:
  - Initial database size
  - Final database size
  - Database growth (total and per file)
- **Operation Counts**:
  - New files indexed
  - Modified files processed
  - Deleted files handled
  - Moved files tracked
## Example Output
```
======================================================================
BENCHMARK: Sync 100 files (small repository)
======================================================================
Generating 100 test files...
  Created files 0-100 (100/100)
  File generation completed in 0.15s (666.7 files/sec)
Initial database size: 120.00 KB
Starting sync of 100 files...
----------------------------------------------------------------------
RESULTS:
----------------------------------------------------------------------
Files processed:      100
  New:                100
  Modified:           0
  Deleted:            0
  Moved:              0
Performance:
  Total time:         12.34s
  Files/sec:          8.1
  ms/file:            123.4
Database:
  Initial size:       120.00 KB
  Final size:         5.23 MB
  Growth:             5.11 MB
  Growth per file:    52.31 KB
======================================================================
```
## Interpreting Results
### Good Performance Indicators
- **Files/sec > 10**: Good indexing speed for small-medium repos
- **Files/sec > 5**: Acceptable for large repos with complex relations
- **DB growth < 100KB per file**: Reasonable index size
### Areas for Improvement
- **Files/sec < 5**: May benefit from batch operations
- **ms/file > 200**: High latency per file, check for N+1 queries
- **DB growth > 200KB per file**: Search index may be bloated (trigrams?)
## Tracking Improvements
Before making optimizations:
1. Run benchmarks to establish baseline
2. Save output for comparison
3. Note any particular pain points (e.g., slow search indexing)
After optimizations:
1. Run the same benchmarks
2. Compare metrics:
   - Files/sec should increase
   - ms/file should decrease
   - DB growth per file may decrease (with search optimizations)
3. Document improvements in PR
## Related Issues
- [#351: Performance: Optimize sync/indexing for cloud deployments](https://github.com/basicmachines-co/basic-memory/issues/351)
## Test File Generation
Benchmarks generate realistic markdown files with:
- YAML frontmatter with tags
- 3-10 observations per file with categories
- 1-3 relations per file (including forward references)
- Varying content to simulate real usage
- Files organized in category subdirectories
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/upload_command.py:
--------------------------------------------------------------------------------
```python
"""Upload CLI commands for basic-memory projects."""
import asyncio
from pathlib import Path
import typer
from rich.console import Console
from basic_memory.cli.app import cloud_app
from basic_memory.cli.commands.cloud.cloud_utils import (
    create_cloud_project,
    project_exists,
    sync_project,
)
from basic_memory.cli.commands.cloud.upload import upload_path
console = Console()
@cloud_app.command("upload")
def upload(
    path: Path = typer.Argument(
        ...,
        help="Path to local file or directory to upload",
        exists=True,
        readable=True,
        resolve_path=True,
    ),
    project: str = typer.Option(
        ...,
        "--project",
        "-p",
        help="Cloud project name (destination)",
    ),
    create_project: bool = typer.Option(
        False,
        "--create-project",
        "-c",
        help="Create project if it doesn't exist",
    ),
    sync: bool = typer.Option(
        True,
        "--sync/--no-sync",
        help="Sync project after upload (default: true)",
    ),
    verbose: bool = typer.Option(
        False,
        "--verbose",
        "-v",
        help="Show detailed information about file filtering and upload",
    ),
    no_gitignore: bool = typer.Option(
        False,
        "--no-gitignore",
        help="Skip .gitignore patterns (still respects .bmignore)",
    ),
    dry_run: bool = typer.Option(
        False,
        "--dry-run",
        help="Show what would be uploaded without actually uploading",
    ),
) -> None:
    """Upload local files or directories to cloud project via WebDAV.
    Examples:
      bm cloud upload ~/my-notes --project research
      bm cloud upload notes.md --project research --create-project
      bm cloud upload ~/docs --project work --no-sync
      bm cloud upload ./history --project proto --verbose
      bm cloud upload ./notes --project work --no-gitignore
      bm cloud upload ./files --project test --dry-run
    """
    async def _upload():
        # Check if project exists
        if not await project_exists(project):
            if create_project:
                console.print(f"[blue]Creating cloud project '{project}'...[/blue]")
                try:
                    await create_cloud_project(project)
                    console.print(f"[green]✓ Created project '{project}'[/green]")
                except Exception as e:
                    console.print(f"[red]Failed to create project: {e}[/red]")
                    raise typer.Exit(1)
            else:
                console.print(
                    f"[red]Project '{project}' does not exist.[/red]\n"
                    f"[yellow]Options:[/yellow]\n"
                    f"  1. Create it first: bm project add {project}\n"
                    f"  2. Use --create-project flag to create automatically"
                )
                raise typer.Exit(1)
        # Perform upload (or dry run)
        if dry_run:
            console.print(
                f"[yellow]DRY RUN: Showing what would be uploaded to '{project}'[/yellow]"
            )
        else:
            console.print(f"[blue]Uploading {path} to project '{project}'...[/blue]")
        success = await upload_path(
            path, project, verbose=verbose, use_gitignore=not no_gitignore, dry_run=dry_run
        )
        if not success:
            console.print("[red]Upload failed[/red]")
            raise typer.Exit(1)
        if dry_run:
            console.print("[yellow]DRY RUN complete - no files were uploaded[/yellow]")
        else:
            console.print(f"[green]✅ Successfully uploaded to '{project}'[/green]")
        # Sync project if requested (skip on dry run)
        if sync and not dry_run:
            console.print(f"[blue]Syncing project '{project}'...[/blue]")
            try:
                await sync_project(project)
            except Exception as e:
                console.print(f"[yellow]Warning: Sync failed: {e}[/yellow]")
                console.print("[dim]Files uploaded but may not be indexed yet[/dim]")
    asyncio.run(_upload())
```
--------------------------------------------------------------------------------
/tests/utils/test_permalink_formatting.py:
--------------------------------------------------------------------------------
```python
"""Test permalink formatting during sync."""
from pathlib import Path
import pytest
from basic_memory.config import ProjectConfig
from basic_memory.services import EntityService
from basic_memory.sync.sync_service import SyncService
from basic_memory.utils import generate_permalink
async def create_test_file(path: Path, content: str = "test content") -> None:
    """Create a test file with given content."""
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content)
@pytest.mark.asyncio
async def test_permalink_formatting(
    sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
    """Test that permalinks are properly formatted during sync.
    This ensures:
    - Underscores are converted to hyphens
    - Spaces are converted to hyphens
    - Mixed case is lowercased
    - Directory structure is preserved
    - Multiple directories work correctly
    """
    project_dir = project_config.home
    # Test cases with different filename formats
    test_cases = [
        # filename -> expected permalink
        ("my_awesome_feature.md", "my-awesome-feature"),
        ("MIXED_CASE_NAME.md", "mixed-case-name"),
        ("spaces and_underscores.md", "spaces-and-underscores"),
        ("design/model_refactor.md", "design/model-refactor"),
        (
            "test/multiple_word_directory/feature_name.md",
            "test/multiple-word-directory/feature-name",
        ),
    ]
    # Create test files
    for filename, _ in test_cases:
        content = """
---
type: knowledge
created: 2024-01-01
modified: 2024-01-01
---
# Test File
Testing permalink generation.
"""
        await create_test_file(project_dir / filename, content)
    # Run sync
    await sync_service.sync(project_config.home)
    # Verify permalinks
    for filename, expected_permalink in test_cases:
        entity = await entity_service.repository.get_by_file_path(filename)
        assert entity.permalink == expected_permalink, (
            f"File {filename} should have permalink {expected_permalink}"
        )
@pytest.mark.parametrize(
    "input_path, expected",
    [
        ("test/Über File.md", "test/uber-file"),
        ("docs/résumé.md", "docs/resume"),
        ("notes/Déjà vu.md", "notes/deja-vu"),
        ("papers/Jürgen's Findings.md", "papers/jurgens-findings"),
        ("archive/François Müller.md", "archive/francois-muller"),
        ("research/Søren Kierkegård.md", "research/soren-kierkegard"),
        ("articles/El Niño.md", "articles/el-nino"),
        ("ArticlesElNiño.md", "articles-el-nino"),
        ("articleselniño.md", "articleselnino"),
        ("articles-El-Niño.md", "articles-el-nino"),
    ],
)
def test_latin_accents_transliteration(input_path, expected):
    """Test that Latin letters with accents are properly transliterated."""
    assert generate_permalink(input_path) == expected
@pytest.mark.parametrize(
    "input_path, expected",
    [
        ("中文/测试文档.md", "中文/测试文档"),
        ("notes/北京市.md", "notes/北京市"),
        ("research/上海简介.md", "research/上海简介"),
        ("docs/中文 English Mixed.md", "docs/中文-english-mixed"),
        ("articles/东京Tokyo混合.md", "articles/东京-tokyo-混合"),
        ("papers/汉字_underscore_test.md", "papers/汉字-underscore-test"),
        ("projects/中文CamelCase测试.md", "projects/中文-camel-case-测试"),
    ],
)
def test_chinese_character_preservation(input_path, expected):
    """Test that Chinese characters are preserved in permalinks."""
    assert generate_permalink(input_path) == expected
@pytest.mark.parametrize(
    "input_path, expected",
    [
        ("mixed/北京Café.md", "mixed/北京-cafe"),
        ("notes/东京Tōkyō.md", "notes/东京-tokyo"),
        ("research/München中文.md", "research/munchen-中文"),
        ("docs/Über测试.md", "docs/uber-测试"),
        ("complex/北京Beijing上海Shanghai.md", "complex/北京-beijing-上海-shanghai"),
        ("special/中文!@#$%^&*()_+.md", "special/中文"),
        ("punctuation/你好,世界!.md", "punctuation/你好世界"),
    ],
)
def test_mixed_character_sets(input_path, expected):
    """Test handling of mixed character sets and edge cases."""
    assert generate_permalink(input_path) == expected
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/647e7a75e2cd_project_constraint_fix.py:
--------------------------------------------------------------------------------
```python
"""project constraint fix
Revision ID: 647e7a75e2cd
Revises: 5fe1ab1ccebe
Create Date: 2025-06-03 12:48:30.162566
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "647e7a75e2cd"
down_revision: Union[str, None] = "5fe1ab1ccebe"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
    """Remove the problematic UNIQUE constraint on is_default column.
    The UNIQUE constraint prevents multiple projects from having is_default=FALSE,
    which breaks project creation when the service sets is_default=False.
    Since SQLite doesn't support dropping specific constraints easily, we'll
    recreate the table without the problematic constraint.
    """
    # For SQLite, we need to recreate the table without the UNIQUE constraint
    # Create a new table without the UNIQUE constraint on is_default
    op.create_table(
        "project_new",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("name", sa.String(), nullable=False),
        sa.Column("description", sa.Text(), nullable=True),
        sa.Column("permalink", sa.String(), nullable=False),
        sa.Column("path", sa.String(), nullable=False),
        sa.Column("is_active", sa.Boolean(), nullable=False),
        sa.Column("is_default", sa.Boolean(), nullable=True),  # No UNIQUE constraint!
        sa.Column("created_at", sa.DateTime(), nullable=False),
        sa.Column("updated_at", sa.DateTime(), nullable=False),
        sa.PrimaryKeyConstraint("id"),
        sa.UniqueConstraint("name"),
        sa.UniqueConstraint("permalink"),
    )
    # Copy data from old table to new table
    op.execute("INSERT INTO project_new SELECT * FROM project")
    # Drop the old table
    op.drop_table("project")
    # Rename the new table
    op.rename_table("project_new", "project")
    # Recreate the indexes
    with op.batch_alter_table("project", schema=None) as batch_op:
        batch_op.create_index("ix_project_created_at", ["created_at"], unique=False)
        batch_op.create_index("ix_project_name", ["name"], unique=True)
        batch_op.create_index("ix_project_path", ["path"], unique=False)
        batch_op.create_index("ix_project_permalink", ["permalink"], unique=True)
        batch_op.create_index("ix_project_updated_at", ["updated_at"], unique=False)
def downgrade() -> None:
    """Add back the UNIQUE constraint on is_default column.
    WARNING: This will break project creation again if multiple projects
    have is_default=FALSE.
    """
    # Recreate the table with the UNIQUE constraint
    op.create_table(
        "project_old",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("name", sa.String(), nullable=False),
        sa.Column("description", sa.Text(), nullable=True),
        sa.Column("permalink", sa.String(), nullable=False),
        sa.Column("path", sa.String(), nullable=False),
        sa.Column("is_active", sa.Boolean(), nullable=False),
        sa.Column("is_default", sa.Boolean(), nullable=True),
        sa.Column("created_at", sa.DateTime(), nullable=False),
        sa.Column("updated_at", sa.DateTime(), nullable=False),
        sa.PrimaryKeyConstraint("id"),
        sa.UniqueConstraint("is_default"),  # Add back the problematic constraint
        sa.UniqueConstraint("name"),
        sa.UniqueConstraint("permalink"),
    )
    # Copy data (this may fail if multiple FALSE values exist)
    op.execute("INSERT INTO project_old SELECT * FROM project")
    # Drop the current table and rename
    op.drop_table("project")
    op.rename_table("project_old", "project")
    # Recreate indexes
    with op.batch_alter_table("project", schema=None) as batch_op:
        batch_op.create_index("ix_project_created_at", ["created_at"], unique=False)
        batch_op.create_index("ix_project_name", ["name"], unique=True)
        batch_op.create_index("ix_project_path", ["path"], unique=False)
        batch_op.create_index("ix_project_permalink", ["permalink"], unique=True)
        batch_op.create_index("ix_project_updated_at", ["updated_at"], unique=False)
```
--------------------------------------------------------------------------------
/tests/services/test_project_service_operations.py:
--------------------------------------------------------------------------------
```python
"""Additional tests for ProjectService operations."""
import os
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from basic_memory.services.project_service import ProjectService
@pytest.mark.asyncio
async def test_get_project_from_database(project_service: ProjectService):
    """Test getting projects from the database."""
    # Generate unique project name for testing
    test_project_name = f"test-project-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_path = str(test_root / "test-project")
        # Make sure directory exists
        os.makedirs(test_path, exist_ok=True)
        try:
            # Add a project to the database
            project_data = {
                "name": test_project_name,
                "path": test_path,
                "permalink": test_project_name.lower().replace(" ", "-"),
                "is_active": True,
                "is_default": False,
            }
            await project_service.repository.create(project_data)
            # Verify we can get the project
            project = await project_service.repository.get_by_name(test_project_name)
            assert project is not None
            assert project.name == test_project_name
            assert project.path == test_path
        finally:
            # Clean up
            project = await project_service.repository.get_by_name(test_project_name)
            if project:
                await project_service.repository.delete(project.id)
@pytest.mark.asyncio
async def test_add_project_to_config(project_service: ProjectService, config_manager):
    """Test adding a project to the config manager."""
    # Generate unique project name for testing
    test_project_name = f"config-project-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_path = (test_root / "config-project").as_posix()
        # Make sure directory exists
        os.makedirs(test_path, exist_ok=True)
        try:
            # Add a project to config only (using ConfigManager directly)
            config_manager.add_project(test_project_name, test_path)
            # Verify it's in the config
            assert test_project_name in project_service.projects
            assert project_service.projects[test_project_name] == test_path
        finally:
            # Clean up
            if test_project_name in project_service.projects:
                config_manager.remove_project(test_project_name)
@pytest.mark.asyncio
async def test_update_project_path(project_service: ProjectService, config_manager):
    """Test updating a project's path."""
    # Create a test project
    test_project = f"path-update-test-project-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        original_path = (test_root / "original-path").as_posix()
        new_path = (test_root / "new-path").as_posix()
        # Make sure directories exist
        os.makedirs(original_path, exist_ok=True)
        os.makedirs(new_path, exist_ok=True)
        try:
            # Add the project
            await project_service.add_project(test_project, original_path)
            # Mock the update_project method to avoid issues with complex DB updates
            with patch.object(project_service, "update_project"):
                # Just check if the project exists
                project = await project_service.repository.get_by_name(test_project)
                assert project is not None
                assert project.path == original_path
            # Since we mock the update_project method, we skip verifying path updates
        finally:
            # Clean up
            if test_project in project_service.projects:
                try:
                    project = await project_service.repository.get_by_name(test_project)
                    if project:
                        await project_service.repository.delete(project.id)
                    config_manager.remove_project(test_project)
                except Exception:
                    pass
```
--------------------------------------------------------------------------------
/test-int/cli/test_project_commands_integration.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for project CLI commands."""
import tempfile
from pathlib import Path
from typer.testing import CliRunner
from basic_memory.cli.main import app
def test_project_list(app_config, test_project, config_manager):
    """Test 'bm project list' command shows projects."""
    runner = CliRunner()
    result = runner.invoke(app, ["project", "list"])
    if result.exit_code != 0:
        print(f"STDOUT: {result.stdout}")
        print(f"STDERR: {result.stderr}")
        print(f"Exception: {result.exception}")
    assert result.exit_code == 0
    assert "test-project" in result.stdout
    assert "✓" in result.stdout  # default marker
def test_project_info(app_config, test_project, config_manager):
    """Test 'bm project info' command shows project details."""
    runner = CliRunner()
    result = runner.invoke(app, ["project", "info", "test-project"])
    if result.exit_code != 0:
        print(f"STDOUT: {result.stdout}")
        print(f"STDERR: {result.stderr}")
    assert result.exit_code == 0
    assert "Basic Memory Project Info" in result.stdout
    assert "test-project" in result.stdout
    assert "Statistics" in result.stdout
def test_project_info_json(app_config, test_project, config_manager):
    """Test 'bm project info --json' command outputs valid JSON."""
    import json
    runner = CliRunner()
    result = runner.invoke(app, ["project", "info", "test-project", "--json"])
    if result.exit_code != 0:
        print(f"STDOUT: {result.stdout}")
        print(f"STDERR: {result.stderr}")
    assert result.exit_code == 0
    # Parse JSON to verify it's valid
    data = json.loads(result.stdout)
    assert data["project_name"] == "test-project"
    assert "statistics" in data
    assert "system" in data
def test_project_add_and_remove(app_config, config_manager):
    """Test adding and removing a project."""
    runner = CliRunner()
    # Use a separate temporary directory to avoid nested path conflicts
    with tempfile.TemporaryDirectory() as temp_dir:
        new_project_path = Path(temp_dir) / "new-project"
        new_project_path.mkdir()
        # Add project
        result = runner.invoke(app, ["project", "add", "new-project", str(new_project_path)])
        if result.exit_code != 0:
            print(f"STDOUT: {result.stdout}")
            print(f"STDERR: {result.stderr}")
        assert result.exit_code == 0
        assert (
            "Project 'new-project' added successfully" in result.stdout
            or "added" in result.stdout.lower()
        )
        # Verify it shows up in list
        result = runner.invoke(app, ["project", "list"])
        assert result.exit_code == 0
        assert "new-project" in result.stdout
        # Remove project
        result = runner.invoke(app, ["project", "remove", "new-project"])
        assert result.exit_code == 0
        assert "removed" in result.stdout.lower() or "deleted" in result.stdout.lower()
def test_project_set_default(app_config, config_manager):
    """Test setting default project."""
    runner = CliRunner()
    # Use a separate temporary directory to avoid nested path conflicts
    with tempfile.TemporaryDirectory() as temp_dir:
        new_project_path = Path(temp_dir) / "another-project"
        new_project_path.mkdir()
        # Add a second project
        result = runner.invoke(app, ["project", "add", "another-project", str(new_project_path)])
        if result.exit_code != 0:
            print(f"STDOUT: {result.stdout}")
            print(f"STDERR: {result.stderr}")
        assert result.exit_code == 0
        # Set as default
        result = runner.invoke(app, ["project", "default", "another-project"])
        if result.exit_code != 0:
            print(f"STDOUT: {result.stdout}")
            print(f"STDERR: {result.stderr}")
        assert result.exit_code == 0
        assert "default" in result.stdout.lower()
        # Verify in list
        result = runner.invoke(app, ["project", "list"])
        assert result.exit_code == 0
        # The new project should have the checkmark now
        lines = result.stdout.split("\n")
        for line in lines:
            if "another-project" in line:
                assert "✓" in line
```
--------------------------------------------------------------------------------
/tests/markdown/test_observation_edge_cases.py:
--------------------------------------------------------------------------------
```python
"""Tests for edge cases in observation parsing."""
from markdown_it import MarkdownIt
from basic_memory.markdown.plugins import observation_plugin, parse_observation
from basic_memory.markdown.schemas import Observation
def test_empty_input():
    """Test handling of empty input."""
    md = MarkdownIt().use(observation_plugin)
    tokens = md.parse("")
    assert not any(t.meta and "observation" in t.meta for t in tokens)
    tokens = md.parse("   ")
    assert not any(t.meta and "observation" in t.meta for t in tokens)
    tokens = md.parse("\n")
    assert not any(t.meta and "observation" in t.meta for t in tokens)
def test_invalid_context():
    """Test handling of invalid context format."""
    md = MarkdownIt().use(observation_plugin)
    # Unclosed context
    tokens = md.parse("- [test] Content (unclosed")
    token = next(t for t in tokens if t.type == "inline")
    obs = parse_observation(token)
    assert obs["content"] == "Content (unclosed"
    assert obs["context"] is None
    # Multiple parens
    tokens = md.parse("- [test] Content (with) extra) parens)")
    token = next(t for t in tokens if t.type == "inline")
    obs = parse_observation(token)
    assert obs["content"] == "Content"
    assert obs["context"] == "with) extra) parens"
def test_complex_format():
    """Test parsing complex observation formats."""
    md = MarkdownIt().use(observation_plugin)
    # Multiple hashtags together
    tokens = md.parse("- [complex test] This is #tag1#tag2 with #tag3 content")
    token = next(t for t in tokens if t.type == "inline")
    obs = parse_observation(token)
    assert obs["category"] == "complex test"
    assert set(obs["tags"]) == {"tag1", "tag2", "tag3"}
    assert obs["content"] == "This is #tag1#tag2 with #tag3 content"
    # Pydantic model validation
    observation = Observation.model_validate(obs)
    assert observation.category == "complex test"
    assert set(observation.tags) == {"tag1", "tag2", "tag3"}
    assert observation.content == "This is #tag1#tag2 with #tag3 content"
def test_malformed_category():
    """Test handling of malformed category brackets."""
    md = MarkdownIt().use(observation_plugin)
    # Empty category
    tokens = md.parse("- [] Empty category")
    token = next(t for t in tokens if t.type == "inline")
    observation = Observation.model_validate(parse_observation(token))
    assert observation.category is None
    assert observation.content == "Empty category"
    # Missing close bracket
    tokens = md.parse("- [test Content")
    token = next(t for t in tokens if t.type == "inline")
    observation = Observation.model_validate(parse_observation(token))
    # Should treat whole thing as content
    assert observation.category is None
    assert "test Content" in observation.content
def test_no_category():
    """Test handling of malformed category brackets."""
    md = MarkdownIt().use(observation_plugin)
    # Empty category
    tokens = md.parse("- No category")
    token = next(t for t in tokens if t.type == "inline")
    observation = Observation.model_validate(parse_observation(token))
    assert observation.category is None
    assert observation.content == "No category"
def test_unicode_content():
    """Test handling of Unicode content."""
    md = MarkdownIt().use(observation_plugin)
    # Emoji
    tokens = md.parse("- [test] Emoji test 👍 #emoji #test (Testing emoji)")
    token = next(t for t in tokens if t.type == "inline")
    obs = parse_observation(token)
    assert "👍" in obs["content"]
    assert "emoji" in obs["tags"]
    # Non-Latin scripts
    tokens = md.parse("- [中文] Chinese text 测试 #language (Script test)")
    token = next(t for t in tokens if t.type == "inline")
    obs = parse_observation(token)
    assert obs["category"] == "中文"
    assert "测试" in obs["content"]
    # Mixed scripts and emoji
    tokens = md.parse("- [test] Mixed 中文 and 👍 #mixed")
    token = next(t for t in tokens if t.type == "inline")
    obs = parse_observation(token)
    assert "中文" in obs["content"]
    assert "👍" in obs["content"]
    # Model validation with Unicode
    observation = Observation.model_validate(obs)
    assert "中文" in observation.content
    assert "👍" in observation.content
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/api_client.py:
--------------------------------------------------------------------------------
```python
"""Cloud API client utilities."""
from typing import Optional
import httpx
import typer
from rich.console import Console
from basic_memory.cli.auth import CLIAuth
from basic_memory.config import ConfigManager
console = Console()
class CloudAPIError(Exception):
    """Exception raised for cloud API errors."""
    def __init__(
        self, message: str, status_code: Optional[int] = None, detail: Optional[dict] = None
    ):
        super().__init__(message)
        self.status_code = status_code
        self.detail = detail or {}
class SubscriptionRequiredError(CloudAPIError):
    """Exception raised when user needs an active subscription."""
    def __init__(self, message: str, subscribe_url: str):
        super().__init__(message, status_code=403, detail={"error": "subscription_required"})
        self.subscribe_url = subscribe_url
def get_cloud_config() -> tuple[str, str, str]:
    """Get cloud OAuth configuration from config."""
    config_manager = ConfigManager()
    config = config_manager.config
    return config.cloud_client_id, config.cloud_domain, config.cloud_host
async def get_authenticated_headers() -> dict[str, str]:
    """
    Get authentication headers with JWT token.
    handles jwt refresh if needed.
    """
    client_id, domain, _ = get_cloud_config()
    auth = CLIAuth(client_id=client_id, authkit_domain=domain)
    token = await auth.get_valid_token()
    if not token:
        console.print("[red]Not authenticated. Please run 'basic-memory cloud login' first.[/red]")
        raise typer.Exit(1)
    return {"Authorization": f"Bearer {token}"}
async def make_api_request(
    method: str,
    url: str,
    headers: Optional[dict] = None,
    json_data: Optional[dict] = None,
    timeout: float = 30.0,
) -> httpx.Response:
    """Make an API request to the cloud service."""
    headers = headers or {}
    auth_headers = await get_authenticated_headers()
    headers.update(auth_headers)
    # Add debug headers to help with compression issues
    headers.setdefault("Accept-Encoding", "identity")  # Disable compression for debugging
    async with httpx.AsyncClient(timeout=timeout) as client:
        try:
            response = await client.request(method=method, url=url, headers=headers, json=json_data)
            response.raise_for_status()
            return response
        except httpx.HTTPError as e:
            # Check if this is a response error with response details
            if hasattr(e, "response") and e.response is not None:  # pyright: ignore [reportAttributeAccessIssue]
                response = e.response  # type: ignore
                # Try to parse error detail from response
                error_detail = None
                try:
                    error_detail = response.json()
                except Exception:
                    # If JSON parsing fails, we'll handle it as a generic error
                    pass
                # Check for subscription_required error (403)
                if response.status_code == 403 and isinstance(error_detail, dict):
                    # Handle both FastAPI HTTPException format (nested under "detail")
                    # and direct format
                    detail_obj = error_detail.get("detail", error_detail)
                    if (
                        isinstance(detail_obj, dict)
                        and detail_obj.get("error") == "subscription_required"
                    ):
                        message = detail_obj.get("message", "Active subscription required")
                        subscribe_url = detail_obj.get(
                            "subscribe_url", "https://basicmemory.com/subscribe"
                        )
                        raise SubscriptionRequiredError(
                            message=message, subscribe_url=subscribe_url
                        ) from e
                # Raise generic CloudAPIError with status code and detail
                raise CloudAPIError(
                    f"API request failed: {e}",
                    status_code=response.status_code,
                    detail=error_detail if isinstance(error_detail, dict) else {},
                ) from e
            raise CloudAPIError(f"API request failed: {e}") from e
```
--------------------------------------------------------------------------------
/tests/importers/test_importer_base.py:
--------------------------------------------------------------------------------
```python
"""Tests for the base importer class."""
import pytest
from unittest.mock import AsyncMock
from basic_memory.importers.base import Importer
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.markdown.schemas import EntityMarkdown
from basic_memory.schemas.importer import ImportResult
# Create a concrete implementation of the abstract class for testing
class TestImporter(Importer[ImportResult]):
    """Test implementation of Importer base class."""
    async def import_data(self, source_data, destination_folder: str, **kwargs):
        """Implement the abstract method for testing."""
        try:
            # Test implementation that returns success
            self.ensure_folder_exists(destination_folder)
            return ImportResult(
                import_count={"files": 1},
                success=True,
                error_message=None,
            )
        except Exception as e:
            return self.handle_error("Test import failed", e)
    def handle_error(self, message: str, error=None) -> ImportResult:
        """Implement the abstract handle_error method."""
        import logging
        logger = logging.getLogger(__name__)
        error_message = f"{message}"
        if error:
            error_message += f": {str(error)}"
        logger.error(error_message)
        return ImportResult(
            import_count={},
            success=False,
            error_message=error_message,
        )
@pytest.fixture
def mock_markdown_processor():
    """Mock MarkdownProcessor for testing."""
    processor = AsyncMock(spec=MarkdownProcessor)
    processor.write_file = AsyncMock()
    return processor
@pytest.fixture
def test_importer(tmp_path, mock_markdown_processor):
    """Create a TestImporter instance for testing."""
    return TestImporter(tmp_path, mock_markdown_processor)
@pytest.mark.asyncio
async def test_import_data_success(test_importer, tmp_path):
    """Test successful import_data implementation."""
    result = await test_importer.import_data({}, "test_folder")
    assert result.success
    assert result.import_count == {"files": 1}
    assert result.error_message is None
    # Verify folder was created
    folder_path = tmp_path / "test_folder"
    assert folder_path.exists()
    assert folder_path.is_dir()
@pytest.mark.asyncio
async def test_write_entity(test_importer, mock_markdown_processor, tmp_path):
    """Test write_entity method."""
    # Create test entity
    entity = EntityMarkdown(
        title="Test Entity",
        content="Test content",
        frontmatter={},
        observations=[],
        relations=[],
    )
    # Call write_entity
    file_path = tmp_path / "test_entity.md"
    await test_importer.write_entity(entity, file_path)
    # Verify markdown processor was called with correct arguments
    mock_markdown_processor.write_file.assert_called_once_with(file_path, entity)
def test_ensure_folder_exists(test_importer, tmp_path):
    """Test ensure_folder_exists method."""
    # Test with simple folder
    folder_path = test_importer.ensure_folder_exists("test_folder")
    assert folder_path.exists()
    assert folder_path.is_dir()
    assert folder_path == tmp_path / "test_folder"
    # Test with nested folder
    nested_path = test_importer.ensure_folder_exists("nested/folder/path")
    assert nested_path.exists()
    assert nested_path.is_dir()
    assert nested_path == tmp_path / "nested" / "folder" / "path"
    # Test with existing folder (should not raise error)
    existing_path = test_importer.ensure_folder_exists("test_folder")
    assert existing_path.exists()
    assert existing_path.is_dir()
@pytest.mark.asyncio
async def test_handle_error(test_importer):
    """Test handle_error method."""
    # Test with message only
    result = test_importer.handle_error("Test error message")
    assert not result.success
    assert result.error_message == "Test error message"
    assert result.import_count == {}
    # Test with message and exception
    test_exception = ValueError("Test exception")
    result = test_importer.handle_error("Error occurred", test_exception)
    assert not result.success
    assert "Error occurred" in result.error_message
    assert "Test exception" in result.error_message
    assert result.import_count == {}
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/3dae7c7b1564_initial_schema.py:
--------------------------------------------------------------------------------
```python
"""initial schema
Revision ID: 3dae7c7b1564
Revises:
Create Date: 2025-02-12 21:23:00.336344
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "3dae7c7b1564"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table(
        "entity",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("title", sa.String(), nullable=False),
        sa.Column("entity_type", sa.String(), nullable=False),
        sa.Column("entity_metadata", sa.JSON(), nullable=True),
        sa.Column("content_type", sa.String(), nullable=False),
        sa.Column("permalink", sa.String(), nullable=False),
        sa.Column("file_path", sa.String(), nullable=False),
        sa.Column("checksum", sa.String(), nullable=True),
        sa.Column("created_at", sa.DateTime(), nullable=False),
        sa.Column("updated_at", sa.DateTime(), nullable=False),
        sa.PrimaryKeyConstraint("id"),
        sa.UniqueConstraint("permalink", name="uix_entity_permalink"),
    )
    op.create_index("ix_entity_created_at", "entity", ["created_at"], unique=False)
    op.create_index(op.f("ix_entity_file_path"), "entity", ["file_path"], unique=True)
    op.create_index(op.f("ix_entity_permalink"), "entity", ["permalink"], unique=True)
    op.create_index("ix_entity_title", "entity", ["title"], unique=False)
    op.create_index("ix_entity_type", "entity", ["entity_type"], unique=False)
    op.create_index("ix_entity_updated_at", "entity", ["updated_at"], unique=False)
    op.create_table(
        "observation",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("entity_id", sa.Integer(), nullable=False),
        sa.Column("content", sa.Text(), nullable=False),
        sa.Column("category", sa.String(), nullable=False),
        sa.Column("context", sa.Text(), nullable=True),
        sa.Column("tags", sa.JSON(), server_default="[]", nullable=True),
        sa.ForeignKeyConstraint(["entity_id"], ["entity.id"], ondelete="CASCADE"),
        sa.PrimaryKeyConstraint("id"),
    )
    op.create_index("ix_observation_category", "observation", ["category"], unique=False)
    op.create_index("ix_observation_entity_id", "observation", ["entity_id"], unique=False)
    op.create_table(
        "relation",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("from_id", sa.Integer(), nullable=False),
        sa.Column("to_id", sa.Integer(), nullable=True),
        sa.Column("to_name", sa.String(), nullable=False),
        sa.Column("relation_type", sa.String(), nullable=False),
        sa.Column("context", sa.Text(), nullable=True),
        sa.ForeignKeyConstraint(["from_id"], ["entity.id"], ondelete="CASCADE"),
        sa.ForeignKeyConstraint(["to_id"], ["entity.id"], ondelete="CASCADE"),
        sa.PrimaryKeyConstraint("id"),
        sa.UniqueConstraint("from_id", "to_id", "relation_type", name="uix_relation"),
    )
    op.create_index("ix_relation_from_id", "relation", ["from_id"], unique=False)
    op.create_index("ix_relation_to_id", "relation", ["to_id"], unique=False)
    op.create_index("ix_relation_type", "relation", ["relation_type"], unique=False)
    # ### end Alembic commands ###
def downgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_index("ix_relation_type", table_name="relation")
    op.drop_index("ix_relation_to_id", table_name="relation")
    op.drop_index("ix_relation_from_id", table_name="relation")
    op.drop_table("relation")
    op.drop_index("ix_observation_entity_id", table_name="observation")
    op.drop_index("ix_observation_category", table_name="observation")
    op.drop_table("observation")
    op.drop_index("ix_entity_updated_at", table_name="entity")
    op.drop_index("ix_entity_type", table_name="entity")
    op.drop_index("ix_entity_title", table_name="entity")
    op.drop_index(op.f("ix_entity_permalink"), table_name="entity")
    op.drop_index(op.f("ix_entity_file_path"), table_name="entity")
    op.drop_index("ix_entity_created_at", table_name="entity")
    op.drop_table("entity")
    # ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/5fe1ab1ccebe_add_projects_table.py:
--------------------------------------------------------------------------------
```python
"""add projects table
Revision ID: 5fe1ab1ccebe
Revises: cc7172b46608
Create Date: 2025-05-14 09:05:18.214357
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "5fe1ab1ccebe"
down_revision: Union[str, None] = "cc7172b46608"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table(
        "project",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("name", sa.String(), nullable=False),
        sa.Column("description", sa.Text(), nullable=True),
        sa.Column("permalink", sa.String(), nullable=False),
        sa.Column("path", sa.String(), nullable=False),
        sa.Column("is_active", sa.Boolean(), nullable=False),
        sa.Column("is_default", sa.Boolean(), nullable=True),
        sa.Column("created_at", sa.DateTime(), nullable=False),
        sa.Column("updated_at", sa.DateTime(), nullable=False),
        sa.PrimaryKeyConstraint("id"),
        sa.UniqueConstraint("is_default"),
        sa.UniqueConstraint("name"),
        sa.UniqueConstraint("permalink"),
        if_not_exists=True,
    )
    with op.batch_alter_table("project", schema=None) as batch_op:
        batch_op.create_index(
            "ix_project_created_at", ["created_at"], unique=False, if_not_exists=True
        )
        batch_op.create_index("ix_project_name", ["name"], unique=True, if_not_exists=True)
        batch_op.create_index("ix_project_path", ["path"], unique=False, if_not_exists=True)
        batch_op.create_index(
            "ix_project_permalink", ["permalink"], unique=True, if_not_exists=True
        )
        batch_op.create_index(
            "ix_project_updated_at", ["updated_at"], unique=False, if_not_exists=True
        )
    with op.batch_alter_table("entity", schema=None) as batch_op:
        batch_op.add_column(sa.Column("project_id", sa.Integer(), nullable=False))
        batch_op.drop_index(
            "uix_entity_permalink",
            sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL"),
        )
        batch_op.drop_index("ix_entity_file_path")
        batch_op.create_index(batch_op.f("ix_entity_file_path"), ["file_path"], unique=False)
        batch_op.create_index("ix_entity_project_id", ["project_id"], unique=False)
        batch_op.create_index(
            "uix_entity_file_path_project", ["file_path", "project_id"], unique=True
        )
        batch_op.create_index(
            "uix_entity_permalink_project",
            ["permalink", "project_id"],
            unique=True,
            sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL"),
        )
        batch_op.create_foreign_key("fk_entity_project_id", "project", ["project_id"], ["id"])
    # drop the search index table. it will be recreated
    op.drop_table("search_index")
    # ### end Alembic commands ###
def downgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    with op.batch_alter_table("entity", schema=None) as batch_op:
        batch_op.drop_constraint("fk_entity_project_id", type_="foreignkey")
        batch_op.drop_index(
            "uix_entity_permalink_project",
            sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL"),
        )
        batch_op.drop_index("uix_entity_file_path_project")
        batch_op.drop_index("ix_entity_project_id")
        batch_op.drop_index(batch_op.f("ix_entity_file_path"))
        batch_op.create_index("ix_entity_file_path", ["file_path"], unique=1)
        batch_op.create_index(
            "uix_entity_permalink",
            ["permalink"],
            unique=1,
            sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL"),
        )
        batch_op.drop_column("project_id")
    with op.batch_alter_table("project", schema=None) as batch_op:
        batch_op.drop_index("ix_project_updated_at")
        batch_op.drop_index("ix_project_permalink")
        batch_op.drop_index("ix_project_path")
        batch_op.drop_index("ix_project_name")
        batch_op.drop_index("ix_project_created_at")
    op.drop_table("project")
    # ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/canvas.py:
--------------------------------------------------------------------------------
```python
"""Canvas creation tool for Basic Memory MCP server.
This tool creates Obsidian canvas files (.canvas) using the JSON Canvas 1.0 spec.
"""
import json
from typing import Dict, List, Any, Optional
from loguru import logger
from fastmcp import Context
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_put
@mcp.tool(
    description="Create an Obsidian canvas file to visualize concepts and connections.",
)
async def canvas(
    nodes: List[Dict[str, Any]],
    edges: List[Dict[str, Any]],
    title: str,
    folder: str,
    project: Optional[str] = None,
    context: Context | None = None,
) -> str:
    """Create an Obsidian canvas file with the provided nodes and edges.
    This tool creates a .canvas file compatible with Obsidian's Canvas feature,
    allowing visualization of relationships between concepts or documents.
    Project Resolution:
    Server resolves projects in this order: Single Project Mode → project parameter → default project.
    If project unknown, use list_memory_projects() or recent_activity() first.
    For the full JSON Canvas 1.0 specification, see the 'spec://canvas' resource.
    Args:
        project: Project name to create canvas in. Optional - server will resolve using hierarchy.
                If unknown, use list_memory_projects() to discover available projects.
        nodes: List of node objects following JSON Canvas 1.0 spec
        edges: List of edge objects following JSON Canvas 1.0 spec
        title: The title of the canvas (will be saved as title.canvas)
        folder: Folder path relative to project root where the canvas should be saved.
                Use forward slashes (/) as separators. Examples: "diagrams", "projects/2025", "visual/maps"
        context: Optional FastMCP context for performance caching.
    Returns:
        A summary of the created canvas file
    Important Notes:
    - When referencing files, use the exact file path as shown in Obsidian
      Example: "folder/Document Name.md" (not permalink format)
    - For file nodes, the "file" attribute must reference an existing file
    - Nodes require id, type, x, y, width, height properties
    - Edges require id, fromNode, toNode properties
    - Position nodes in a logical layout (x,y coordinates in pixels)
    - Use color attributes ("1"-"6" or hex) for visual organization
    Basic Structure:
    ```json
    {
      "nodes": [
        {
          "id": "node1",
          "type": "file",  // Options: "file", "text", "link", "group"
          "file": "folder/Document.md",
          "x": 0,
          "y": 0,
          "width": 400,
          "height": 300
        }
      ],
      "edges": [
        {
          "id": "edge1",
          "fromNode": "node1",
          "toNode": "node2",
          "label": "connects to"
        }
      ]
    }
    ```
    Examples:
        # Create canvas in project
        canvas("my-project", nodes=[...], edges=[...], title="My Canvas", folder="diagrams")
        # Create canvas in work project
        canvas("work-project", nodes=[...], edges=[...], title="Process Flow", folder="visual/maps")
    Raises:
        ToolError: If project doesn't exist or folder path is invalid
    """
    async with get_client() as client:
        active_project = await get_active_project(client, project, context)
        project_url = active_project.project_url
        # Ensure path has .canvas extension
        file_title = title if title.endswith(".canvas") else f"{title}.canvas"
        file_path = f"{folder}/{file_title}"
        # Create canvas data structure
        canvas_data = {"nodes": nodes, "edges": edges}
        # Convert to JSON
        canvas_json = json.dumps(canvas_data, indent=2)
        # Write the file using the resource API
        logger.info(f"Creating canvas file: {file_path} in project {project}")
        response = await call_put(client, f"{project_url}/resource/{file_path}", json=canvas_json)
        # Parse response
        result = response.json()
        logger.debug(result)
        # Build summary
        action = "Created" if response.status_code == 201 else "Updated"
        summary = [f"# {action}: {file_path}", "\nThe canvas is ready to open in Obsidian."]
        return "\n".join(summary)
```
--------------------------------------------------------------------------------
/tests/schemas/test_base_timeframe_minimum.py:
--------------------------------------------------------------------------------
```python
"""Test minimum 1-day timeframe enforcement for timezone handling."""
from datetime import datetime, timedelta
import pytest
from freezegun import freeze_time
from basic_memory.schemas.base import parse_timeframe
class TestTimeframeMinimum:
    """Test that parse_timeframe enforces a minimum 1-day lookback."""
    @freeze_time("2025-01-15 15:00:00")
    def test_today_returns_one_day_ago(self):
        """Test that 'today' returns 1 day ago instead of start of today."""
        result = parse_timeframe("today")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)
        # Should be approximately 1 day ago (within a second for test tolerance)
        diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
        assert diff < 1, f"Expected ~1 day ago, got {result}"
    @freeze_time("2025-01-15 15:00:00")
    def test_one_hour_returns_one_day_minimum(self):
        """Test that '1h' returns 1 day ago due to minimum enforcement."""
        result = parse_timeframe("1h")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)
        # Should be approximately 1 day ago, not 1 hour ago
        diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
        assert diff < 1, f"Expected ~1 day ago for '1h', got {result}"
    @freeze_time("2025-01-15 15:00:00")
    def test_six_hours_returns_one_day_minimum(self):
        """Test that '6h' returns 1 day ago due to minimum enforcement."""
        result = parse_timeframe("6h")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)
        # Should be approximately 1 day ago, not 6 hours ago
        diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
        assert diff < 1, f"Expected ~1 day ago for '6h', got {result}"
    @freeze_time("2025-01-15 15:00:00")
    def test_one_day_returns_one_day(self):
        """Test that '1d' correctly returns approximately 1 day ago."""
        result = parse_timeframe("1d")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)
        # Should be approximately 1 day ago (within 24 hours)
        diff_hours = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds()) / 3600
        assert diff_hours < 24, (
            f"Expected ~1 day ago for '1d', got {result} (diff: {diff_hours} hours)"
        )
    @freeze_time("2025-01-15 15:00:00")
    def test_two_days_returns_two_days(self):
        """Test that '2d' correctly returns approximately 2 days ago (not affected by minimum)."""
        result = parse_timeframe("2d")
        now = datetime.now()
        two_days_ago = now - timedelta(days=2)
        # Should be approximately 2 days ago (within 24 hours)
        diff_hours = abs((result.replace(tzinfo=None) - two_days_ago).total_seconds()) / 3600
        assert diff_hours < 24, (
            f"Expected ~2 days ago for '2d', got {result} (diff: {diff_hours} hours)"
        )
    @freeze_time("2025-01-15 15:00:00")
    def test_one_week_returns_one_week(self):
        """Test that '1 week' correctly returns approximately 1 week ago (not affected by minimum)."""
        result = parse_timeframe("1 week")
        now = datetime.now()
        one_week_ago = now - timedelta(weeks=1)
        # Should be approximately 1 week ago (within 24 hours)
        diff_hours = abs((result.replace(tzinfo=None) - one_week_ago).total_seconds()) / 3600
        assert diff_hours < 24, (
            f"Expected ~1 week ago for '1 week', got {result} (diff: {diff_hours} hours)"
        )
    @freeze_time("2025-01-15 15:00:00")
    def test_zero_days_returns_one_day_minimum(self):
        """Test that '0d' returns 1 day ago due to minimum enforcement."""
        result = parse_timeframe("0d")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)
        # Should be approximately 1 day ago, not now
        diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
        assert diff < 1, f"Expected ~1 day ago for '0d', got {result}"
    def test_timezone_awareness(self):
        """Test that returned datetime is timezone-aware."""
        result = parse_timeframe("1d")
        assert result.tzinfo is not None, "Expected timezone-aware datetime"
    def test_invalid_timeframe_raises_error(self):
        """Test that invalid timeframe strings raise ValueError."""
        with pytest.raises(ValueError, match="Could not parse timeframe"):
            parse_timeframe("invalid_timeframe")
```
--------------------------------------------------------------------------------
/src/basic_memory/api/routers/importer_router.py:
--------------------------------------------------------------------------------
```python
"""Import router for Basic Memory API."""
import json
import logging
from fastapi import APIRouter, Form, HTTPException, UploadFile, status
from basic_memory.deps import (
    ChatGPTImporterDep,
    ClaudeConversationsImporterDep,
    ClaudeProjectsImporterDep,
    MemoryJsonImporterDep,
)
from basic_memory.importers import Importer
from basic_memory.schemas.importer import (
    ChatImportResult,
    EntityImportResult,
    ProjectImportResult,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/import", tags=["import"])
@router.post("/chatgpt", response_model=ChatImportResult)
async def import_chatgpt(
    importer: ChatGPTImporterDep,
    file: UploadFile,
    folder: str = Form("conversations"),
) -> ChatImportResult:
    """Import conversations from ChatGPT JSON export.
    Args:
        file: The ChatGPT conversations.json file.
        folder: The folder to place the files in.
        markdown_processor: MarkdownProcessor instance.
    Returns:
        ChatImportResult with import statistics.
    Raises:
        HTTPException: If import fails.
    """
    return await import_file(importer, file, folder)
@router.post("/claude/conversations", response_model=ChatImportResult)
async def import_claude_conversations(
    importer: ClaudeConversationsImporterDep,
    file: UploadFile,
    folder: str = Form("conversations"),
) -> ChatImportResult:
    """Import conversations from Claude conversations.json export.
    Args:
        file: The Claude conversations.json file.
        folder: The folder to place the files in.
        markdown_processor: MarkdownProcessor instance.
    Returns:
        ChatImportResult with import statistics.
    Raises:
        HTTPException: If import fails.
    """
    return await import_file(importer, file, folder)
@router.post("/claude/projects", response_model=ProjectImportResult)
async def import_claude_projects(
    importer: ClaudeProjectsImporterDep,
    file: UploadFile,
    folder: str = Form("projects"),
) -> ProjectImportResult:
    """Import projects from Claude projects.json export.
    Args:
        file: The Claude projects.json file.
        base_folder: The base folder to place the files in.
        markdown_processor: MarkdownProcessor instance.
    Returns:
        ProjectImportResult with import statistics.
    Raises:
        HTTPException: If import fails.
    """
    return await import_file(importer, file, folder)
@router.post("/memory-json", response_model=EntityImportResult)
async def import_memory_json(
    importer: MemoryJsonImporterDep,
    file: UploadFile,
    folder: str = Form("conversations"),
) -> EntityImportResult:
    """Import entities and relations from a memory.json file.
    Args:
        file: The memory.json file.
        destination_folder: Optional destination folder within the project.
        markdown_processor: MarkdownProcessor instance.
    Returns:
        EntityImportResult with import statistics.
    Raises:
        HTTPException: If import fails.
    """
    try:
        file_data = []
        file_bytes = await file.read()
        file_str = file_bytes.decode("utf-8")
        for line in file_str.splitlines():
            json_data = json.loads(line)
            file_data.append(json_data)
        result = await importer.import_data(file_data, folder)
        if not result.success:  # pragma: no cover
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=result.error_message or "Import failed",
            )
    except Exception as e:
        logger.exception("Import failed")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Import failed: {str(e)}",
        )
    return result
async def import_file(importer: Importer, file: UploadFile, destination_folder: str):
    try:
        # Process file
        json_data = json.load(file.file)
        result = await importer.import_data(json_data, destination_folder)
        if not result.success:  # pragma: no cover
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=result.error_message or "Import failed",
            )
        return result
    except Exception as e:
        logger.exception("Import failed")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Import failed: {str(e)}",
        )
```
--------------------------------------------------------------------------------
/tests/cli/test_import_claude_projects.py:
--------------------------------------------------------------------------------
```python
"""Tests for import_claude_projects command."""
import json
import pytest
from typer.testing import CliRunner
from basic_memory.cli.app import app
from basic_memory.cli.commands.import_claude_projects import import_projects  # noqa
from basic_memory.config import get_project_config
# Set up CLI runner
runner = CliRunner()
@pytest.fixture
def sample_project():
    """Sample project data for testing."""
    return {
        "uuid": "test-uuid",
        "name": "Test Project",
        "created_at": "2025-01-05T20:55:32.499880+00:00",
        "updated_at": "2025-01-05T20:56:39.477600+00:00",
        "prompt_template": "# Test Prompt\n\nThis is a test prompt.",
        "docs": [
            {
                "uuid": "doc-uuid-1",
                "filename": "Test Document",
                "content": "# Test Document\n\nThis is test content.",
                "created_at": "2025-01-05T20:56:39.477600+00:00",
            },
            {
                "uuid": "doc-uuid-2",
                "filename": "Another Document",
                "content": "# Another Document\n\nMore test content.",
                "created_at": "2025-01-05T20:56:39.477600+00:00",
            },
        ],
    }
@pytest.fixture
def sample_projects_json(tmp_path, sample_project):
    """Create a sample projects.json file."""
    json_file = tmp_path / "projects.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([sample_project], f)
    return json_file
def test_import_projects_command_file_not_found(tmp_path):
    """Test error handling for nonexistent file."""
    nonexistent = tmp_path / "nonexistent.json"
    result = runner.invoke(app, ["import", "claude", "projects", str(nonexistent)])
    assert result.exit_code == 1
    assert "File not found" in result.output
def test_import_projects_command_success(tmp_path, sample_projects_json, monkeypatch):
    """Test successful project import via command."""
    # Set up test environment
    config = get_project_config()
    config.home = tmp_path
    # Run import
    result = runner.invoke(app, ["import", "claude", "projects", str(sample_projects_json)])
    assert result.exit_code == 0
    assert "Import complete" in result.output
    assert "Imported 2 project documents" in result.output
    assert "Imported 1 prompt templates" in result.output
def test_import_projects_command_invalid_json(tmp_path):
    """Test error handling for invalid JSON."""
    # Create invalid JSON file
    invalid_file = tmp_path / "invalid.json"
    invalid_file.write_text("not json")
    result = runner.invoke(app, ["import", "claude", "projects", str(invalid_file)])
    assert result.exit_code == 1
    assert "Error during import" in result.output
def test_import_projects_with_base_folder(tmp_path, sample_projects_json, monkeypatch):
    """Test import with custom base folder."""
    # Set up test environment
    config = get_project_config()
    config.home = tmp_path
    base_folder = "claude-exports"
    # Run import
    result = runner.invoke(
        app,
        [
            "import",
            "claude",
            "projects",
            str(sample_projects_json),
            "--base-folder",
            base_folder,
        ],
    )
    assert result.exit_code == 0
    # Check files in base folder
    project_dir = tmp_path / base_folder / "Test_Project"
    assert project_dir.exists()
    assert (project_dir / "docs").exists()
    assert (project_dir / "prompt-template.md").exists()
def test_import_project_without_prompt(tmp_path):
    """Test importing project without prompt template."""
    # Create project without prompt
    project = {
        "uuid": "test-uuid",
        "name": "No Prompt Project",
        "created_at": "2025-01-05T20:55:32.499880+00:00",
        "updated_at": "2025-01-05T20:56:39.477600+00:00",
        "docs": [
            {
                "uuid": "doc-uuid-1",
                "filename": "Test Document",
                "content": "# Test Document\n\nContent.",
                "created_at": "2025-01-05T20:56:39.477600+00:00",
            }
        ],
    }
    json_file = tmp_path / "no_prompt.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([project], f)
    # Set up environment
    config = get_project_config()
    config.home = tmp_path
    # Run import
    result = runner.invoke(app, ["import", "claude", "projects", str(json_file)])
    assert result.exit_code == 0
    assert "Imported 1 project documents" in result.output
    assert "Imported 0 prompt templates" in result.output
```
--------------------------------------------------------------------------------
/src/basic_memory/services/link_resolver.py:
--------------------------------------------------------------------------------
```python
"""Service for resolving markdown links to permalinks."""
from typing import Optional, Tuple
from loguru import logger
from basic_memory.models import Entity
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.schemas.search import SearchQuery, SearchItemType
from basic_memory.services.search_service import SearchService
class LinkResolver:
    """Service for resolving markdown links to permalinks.
    Uses a combination of exact matching and search-based resolution:
    1. Try exact permalink match (fastest)
    2. Try exact title match
    3. Try exact file path match
    4. Try file path with .md extension (for folder/title patterns)
    5. Fall back to search for fuzzy matching
    """
    def __init__(self, entity_repository: EntityRepository, search_service: SearchService):
        """Initialize with repositories."""
        self.entity_repository = entity_repository
        self.search_service = search_service
    async def resolve_link(
        self, link_text: str, use_search: bool = True, strict: bool = False
    ) -> Optional[Entity]:
        """Resolve a markdown link to a permalink.
        Args:
            link_text: The link text to resolve
            use_search: Whether to use search-based fuzzy matching as fallback
            strict: If True, only exact matches are allowed (no fuzzy search fallback)
        """
        logger.trace(f"Resolving link: {link_text}")
        # Clean link text and extract any alias
        clean_text, alias = self._normalize_link_text(link_text)
        # 1. Try exact permalink match first (most efficient)
        entity = await self.entity_repository.get_by_permalink(clean_text)
        if entity:
            logger.debug(f"Found exact permalink match: {entity.permalink}")
            return entity
        # 2. Try exact title match
        found = await self.entity_repository.get_by_title(clean_text)
        if found:
            # Return first match if there are duplicates (consistent behavior)
            entity = found[0]
            logger.debug(f"Found title match: {entity.title}")
            return entity
        # 3. Try file path
        found_path = await self.entity_repository.get_by_file_path(clean_text)
        if found_path:
            logger.debug(f"Found entity with path: {found_path.file_path}")
            return found_path
        # 4. Try file path with .md extension if not already present
        if not clean_text.endswith(".md") and "/" in clean_text:
            file_path_with_md = f"{clean_text}.md"
            found_path_md = await self.entity_repository.get_by_file_path(file_path_with_md)
            if found_path_md:
                logger.debug(f"Found entity with path (with .md): {found_path_md.file_path}")
                return found_path_md
        # In strict mode, don't try fuzzy search - return None if no exact match found
        if strict:
            return None
        # 5. Fall back to search for fuzzy matching (only if not in strict mode)
        if use_search and "*" not in clean_text:
            results = await self.search_service.search(
                query=SearchQuery(text=clean_text, entity_types=[SearchItemType.ENTITY]),
            )
            if results:
                # Look for best match
                best_match = min(results, key=lambda x: x.score)  # pyright: ignore
                logger.trace(
                    f"Selected best match from {len(results)} results: {best_match.permalink}"
                )
                if best_match.permalink:
                    return await self.entity_repository.get_by_permalink(best_match.permalink)
        # if we couldn't find anything then return None
        return None
    def _normalize_link_text(self, link_text: str) -> Tuple[str, Optional[str]]:
        """Normalize link text and extract alias if present.
        Args:
            link_text: Raw link text from markdown
        Returns:
            Tuple of (normalized_text, alias or None)
        """
        # Strip whitespace
        text = link_text.strip()
        # Remove enclosing brackets if present
        if text.startswith("[[") and text.endswith("]]"):
            text = text[2:-2]
        # Handle Obsidian-style aliases (format: [[actual|alias]])
        alias = None
        if "|" in text:
            text, alias = text.split("|", 1)
            text = text.strip()
            alias = alias.strip()
        else:
            # Strip whitespace from text even if no alias
            text = text.strip()
        return text, alias
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_delete_note.py:
--------------------------------------------------------------------------------
```python
"""Tests for delete_note MCP tool."""
from basic_memory.mcp.tools.delete_note import _format_delete_error_response
class TestDeleteNoteErrorFormatting:
    """Test the error formatting function for better user experience."""
    def test_format_delete_error_note_not_found(self, test_project):
        """Test formatting for note not found errors."""
        result = _format_delete_error_response(test_project.name, "entity not found", "test-note")
        assert "# Delete Failed - Note Not Found" in result
        assert "The note 'test-note' could not be found" in result
        assert 'search_notes("test-project", "test-note")' in result
        assert "Already deleted" in result
        assert "Wrong identifier" in result
    def test_format_delete_error_permission_denied(self, test_project):
        """Test formatting for permission errors."""
        result = _format_delete_error_response(test_project.name, "permission denied", "test-note")
        assert "# Delete Failed - Permission Error" in result
        assert "You don't have permission to delete 'test-note'" in result
        assert "Check permissions" in result
        assert "File locks" in result
        assert "list_memory_projects()" in result
    def test_format_delete_error_access_forbidden(self, test_project):
        """Test formatting for access forbidden errors."""
        result = _format_delete_error_response(test_project.name, "access forbidden", "test-note")
        assert "# Delete Failed - Permission Error" in result
        assert "You don't have permission to delete 'test-note'" in result
    def test_format_delete_error_server_error(self, test_project):
        """Test formatting for server errors."""
        result = _format_delete_error_response(
            test_project.name, "server error occurred", "test-note"
        )
        assert "# Delete Failed - System Error" in result
        assert "A system error occurred while deleting 'test-note'" in result
        assert "Try again" in result
        assert "Check file status" in result
    def test_format_delete_error_filesystem_error(self, test_project):
        """Test formatting for filesystem errors."""
        result = _format_delete_error_response(test_project.name, "filesystem error", "test-note")
        assert "# Delete Failed - System Error" in result
        assert "A system error occurred while deleting 'test-note'" in result
    def test_format_delete_error_disk_error(self, test_project):
        """Test formatting for disk errors."""
        result = _format_delete_error_response(test_project.name, "disk full", "test-note")
        assert "# Delete Failed - System Error" in result
        assert "A system error occurred while deleting 'test-note'" in result
    def test_format_delete_error_database_error(self, test_project):
        """Test formatting for database errors."""
        result = _format_delete_error_response(test_project.name, "database error", "test-note")
        assert "# Delete Failed - Database Error" in result
        assert "A database error occurred while deleting 'test-note'" in result
        assert "Sync conflict" in result
        assert "Database lock" in result
    def test_format_delete_error_sync_error(self, test_project):
        """Test formatting for sync errors."""
        result = _format_delete_error_response(test_project.name, "sync failed", "test-note")
        assert "# Delete Failed - Database Error" in result
        assert "A database error occurred while deleting 'test-note'" in result
    def test_format_delete_error_generic(self, test_project):
        """Test formatting for generic errors."""
        result = _format_delete_error_response(test_project.name, "unknown error", "test-note")
        assert "# Delete Failed" in result
        assert "Error deleting note 'test-note': unknown error" in result
        assert "General troubleshooting" in result
        assert "Verify the note exists" in result
    def test_format_delete_error_with_complex_identifier(self, test_project):
        """Test formatting with complex identifiers (permalinks)."""
        result = _format_delete_error_response(
            test_project.name, "entity not found", "folder/note-title"
        )
        assert 'search_notes("test-project", "note-title")' in result
        assert "Note Title" in result  # Title format
        assert "folder/note-title" in result  # Permalink format
# Integration tests removed to focus on error formatting coverage
# The error formatting tests above provide the necessary coverage for MCP tool error messaging
```
--------------------------------------------------------------------------------
/src/basic_memory/importers/memory_json_importer.py:
--------------------------------------------------------------------------------
```python
"""Memory JSON import service for Basic Memory."""
import logging
from typing import Any, Dict, List
from basic_memory.config import get_project_config
from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown, Observation, Relation
from basic_memory.importers.base import Importer
from basic_memory.schemas.importer import EntityImportResult
logger = logging.getLogger(__name__)
class MemoryJsonImporter(Importer[EntityImportResult]):
    """Service for importing memory.json format data."""
    async def import_data(
        self, source_data, destination_folder: str = "", **kwargs: Any
    ) -> EntityImportResult:
        """Import entities and relations from a memory.json file.
        Args:
            source_data: Path to the memory.json file.
            destination_folder: Optional destination folder within the project.
            **kwargs: Additional keyword arguments.
        Returns:
            EntityImportResult containing statistics and status of the import.
        """
        config = get_project_config()
        try:
            # First pass - collect all relations by source entity
            entity_relations: Dict[str, List[Relation]] = {}
            entities: Dict[str, Dict[str, Any]] = {}
            skipped_entities: int = 0
            # Ensure the base path exists
            base_path = config.home  # pragma: no cover
            if destination_folder:  # pragma: no cover
                base_path = self.ensure_folder_exists(destination_folder)
            # First pass - collect entities and relations
            for line in source_data:
                data = line
                if data["type"] == "entity":
                    # Handle different possible name keys
                    entity_name = data.get("name") or data.get("entityName") or data.get("id")
                    if not entity_name:
                        logger.warning(f"Entity missing name field: {data}")
                        skipped_entities += 1
                        continue
                    entities[entity_name] = data
                elif data["type"] == "relation":
                    # Store relation with its source entity
                    source = data.get("from") or data.get("from_id")
                    if source not in entity_relations:
                        entity_relations[source] = []
                    entity_relations[source].append(
                        Relation(
                            type=data.get("relationType") or data.get("relation_type"),
                            target=data.get("to") or data.get("to_id"),
                        )
                    )
            # Second pass - create and write entities
            entities_created = 0
            for name, entity_data in entities.items():
                # Get entity type with fallback
                entity_type = entity_data.get("entityType") or entity_data.get("type") or "entity"
                # Ensure entity type directory exists
                entity_type_dir = base_path / entity_type
                entity_type_dir.mkdir(parents=True, exist_ok=True)
                # Get observations with fallback to empty list
                observations = entity_data.get("observations", [])
                entity = EntityMarkdown(
                    frontmatter=EntityFrontmatter(
                        metadata={
                            "type": entity_type,
                            "title": name,
                            "permalink": f"{entity_type}/{name}",
                        }
                    ),
                    content=f"# {name}\n",
                    observations=[Observation(content=obs) for obs in observations],
                    relations=entity_relations.get(name, []),
                )
                # Write entity file
                file_path = base_path / f"{entity_type}/{name}.md"
                await self.write_entity(entity, file_path)
                entities_created += 1
            relations_count = sum(len(rels) for rels in entity_relations.values())
            return EntityImportResult(
                import_count={"entities": entities_created, "relations": relations_count},
                success=True,
                entities=entities_created,
                relations=relations_count,
                skipped_entities=skipped_entities,
            )
        except Exception as e:  # pragma: no cover
            logger.exception("Failed to import memory.json")
            return self.handle_error("Failed to import memory.json", e)  # pyright: ignore [reportReturnType]
```
--------------------------------------------------------------------------------
/.claude/commands/release/changelog.md:
--------------------------------------------------------------------------------
```markdown
# /changelog - Generate or Update Changelog Entry
Analyze commits and generate formatted changelog entry for a version.
## Usage
```
/changelog <version> [type]
```
**Parameters:**
- `version` (required): Version like `v0.14.0` or `v0.14.0b1`
- `type` (optional): `beta`, `rc`, or `stable` (default: `stable`)
## Implementation
You are an expert technical writer for the Basic Memory project. When the user runs `/changelog`, execute the following steps:
### Step 1: Version Analysis
1. **Determine Commit Range**
   ```bash
   # Find last release tag
   git tag -l "v*" --sort=-version:refname | grep -v "b\|rc" | head -1
   
   # Get commits since last release
   git log --oneline ${last_tag}..HEAD
   ```
2. **Parse Conventional Commits**
   - Extract feat: (features)
   - Extract fix: (bug fixes)  
   - Extract BREAKING CHANGE: (breaking changes)
   - Extract chore:, docs:, test: (other improvements)
### Step 2: Categorize Changes
1. **Features (feat:)**
   - New MCP tools
   - New CLI commands
   - New API endpoints
   - Major functionality additions
2. **Bug Fixes (fix:)**
   - User-facing bug fixes
   - Critical issues resolved
   - Performance improvements
   - Security fixes
3. **Technical Improvements**
   - Test coverage improvements
   - Code quality enhancements
   - Dependency updates
   - Documentation updates
4. **Breaking Changes**
   - API changes
   - Configuration changes
   - Behavior changes
   - Migration requirements
### Step 3: Generate Changelog Entry
Create formatted entry following existing CHANGELOG.md style:
Example:
```markdown
## <version> (<date>)
### Features
- **Multi-Project Management System** - Switch between projects instantly during conversations
  ([`993e88a`](https://github.com/basicmachines-co/basic-memory/commit/993e88a)) 
  - Instant project switching with session context
  - Project-specific operations and isolation
  - Project discovery and management tools
- **Advanced Note Editing** - Incremental editing with append, prepend, find/replace, and section operations
  ([`6fc3904`](https://github.com/basicmachines-co/basic-memory/commit/6fc3904))
  - `edit_note` tool with multiple operation types
  - Smart frontmatter-aware editing
  - Validation and error handling
### Bug Fixes
- **#118**: Fix YAML tag formatting to follow standard specification
  ([`2dc7e27`](https://github.com/basicmachines-co/basic-memory/commit/2dc7e27))
- **#110**: Make --project flag work consistently across CLI commands
  ([`02dd91a`](https://github.com/basicmachines-co/basic-memory/commit/02dd91a))
### Technical Improvements
- **Comprehensive Testing** - 100% test coverage with integration testing
  ([`468a22f`](https://github.com/basicmachines-co/basic-memory/commit/468a22f))
  - MCP integration test suite
  - End-to-end testing framework
  - Performance and edge case validation
### Breaking Changes
- **Database Migration**: Automatic migration from per-project to unified database. 
    Data will be re-index from the filesystem, resulting in no data loss. 
- **Configuration Changes**: Projects now synced between config.json and database
- **Full Backward Compatibility**: All existing setups continue to work seamlessly
```
### Step 4: Integration
1. **Update CHANGELOG.md**
   - Insert new entry at top
   - Maintain consistent formatting
   - Include commit links and issue references
2. **Validation**
   - Check all major changes are captured
   - Verify commit links work
   - Ensure issue numbers are correct
## Smart Analysis Features
### Automatic Classification
- Detect feature additions from file changes
- Identify bug fixes from commit messages
- Find breaking changes from code analysis
- Extract issue numbers from commit messages
### Content Enhancement
- Add context for technical changes
- Include migration guidance for breaking changes
- Suggest installation/upgrade instructions
- Link to relevant documentation
## Output Format
### For Beta Releases
Example: 
```markdown
## v0.13.0b4 (2025-06-03)
### Beta Changes Since v0.13.0b3
- Fix FastMCP API compatibility issues
- Update dependencies to latest versions  
- Resolve setuptools import error
### Installation
```bash
uv tool install basic-memory --prerelease=allow
```
### Known Issues
- [List any known issues for beta testing]
```
### For Stable Releases
Full changelog with complete feature list, organized by impact and category.
## Context
- Follows existing CHANGELOG.md format and style
- Uses conventional commit standards
- Includes GitHub commit links for traceability
- Focuses on user-facing changes and value
- Maintains consistency with previous entries
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/build_context.py:
--------------------------------------------------------------------------------
```python
"""Build context tool for Basic Memory MCP server."""
from typing import Optional
from loguru import logger
from fastmcp import Context
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_get
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import (
    GraphContext,
    MemoryUrl,
    memory_url_path,
)
type StringOrInt = str | int
@mcp.tool(
    description="""Build context from a memory:// URI to continue conversations naturally.
    Use this to follow up on previous discussions or explore related topics.
    Memory URL Format:
    - Use paths like "folder/note" or "memory://folder/note"
    - Pattern matching: "folder/*" matches all notes in folder
    - Valid characters: letters, numbers, hyphens, underscores, forward slashes
    - Avoid: double slashes (//), angle brackets (<>), quotes, pipes (|)
    - Examples: "specs/search", "projects/basic-memory", "notes/*"
    Timeframes support natural language like:
    - "2 days ago", "last week", "today", "3 months ago"
    - Or standard formats like "7d", "24h"
    """,
)
async def build_context(
    url: MemoryUrl,
    project: Optional[str] = None,
    depth: Optional[StringOrInt] = 1,
    timeframe: Optional[TimeFrame] = "7d",
    page: int = 1,
    page_size: int = 10,
    max_related: int = 10,
    context: Context | None = None,
) -> GraphContext:
    """Get context needed to continue a discussion within a specific project.
    This tool enables natural continuation of discussions by loading relevant context
    from memory:// URIs. It uses pattern matching to find relevant content and builds
    a rich context graph of related information.
    Project Resolution:
    Server resolves projects in this order: Single Project Mode → project parameter → default project.
    If project unknown, use list_memory_projects() or recent_activity() first.
    Args:
        project: Project name to build context from. Optional - server will resolve using hierarchy.
                If unknown, use list_memory_projects() to discover available projects.
        url: memory:// URI pointing to discussion content (e.g. memory://specs/search)
        depth: How many relation hops to traverse (1-3 recommended for performance)
        timeframe: How far back to look. Supports natural language like "2 days ago", "last week"
        page: Page number of results to return (default: 1)
        page_size: Number of results to return per page (default: 10)
        max_related: Maximum number of related results to return (default: 10)
        context: Optional FastMCP context for performance caching.
    Returns:
        GraphContext containing:
            - primary_results: Content matching the memory:// URI
            - related_results: Connected content via relations
            - metadata: Context building details
    Examples:
        # Continue a specific discussion
        build_context("my-project", "memory://specs/search")
        # Get deeper context about a component
        build_context("work-docs", "memory://components/memory-service", depth=2)
        # Look at recent changes to a specification
        build_context("research", "memory://specs/document-format", timeframe="today")
        # Research the history of a feature
        build_context("dev-notes", "memory://features/knowledge-graph", timeframe="3 months ago")
    Raises:
        ToolError: If project doesn't exist or depth parameter is invalid
    """
    logger.info(f"Building context from {url} in project {project}")
    # Convert string depth to integer if needed
    if isinstance(depth, str):
        try:
            depth = int(depth)
        except ValueError:
            from mcp.server.fastmcp.exceptions import ToolError
            raise ToolError(f"Invalid depth parameter: '{depth}' is not a valid integer")
    # URL is already validated and normalized by MemoryUrl type annotation
    async with get_client() as client:
        # Get the active project using the new stateless approach
        active_project = await get_active_project(client, project, context)
        project_url = active_project.project_url
        response = await call_get(
            client,
            f"{project_url}/memory/{memory_url_path(url)}",
            params={
                "depth": depth,
                "timeframe": timeframe,
                "page": page,
                "page_size": page_size,
                "max_related": max_related,
            },
        )
        return GraphContext.model_validate(response.json())
```
--------------------------------------------------------------------------------
/tests/cli/test_import_memory_json.py:
--------------------------------------------------------------------------------
```python
"""Tests for import_memory_json command."""
import json
import pytest
from typer.testing import CliRunner
from basic_memory.cli.app import import_app
from basic_memory.cli.commands import import_memory_json  # noqa
from basic_memory.markdown import MarkdownProcessor
# Set up CLI runner
runner = CliRunner()
@pytest.fixture
def sample_entities():
    """Sample entities for testing."""
    return [
        {
            "type": "entity",
            "name": "test_entity",
            "entityType": "test",
            "observations": ["Test observation 1", "Test observation 2"],
        },
        {
            "type": "relation",
            "from": "test_entity",
            "to": "related_entity",
            "relationType": "test_relation",
        },
    ]
@pytest.fixture
def sample_json_file(tmp_path, sample_entities):
    """Create a sample memory.json file."""
    json_file = tmp_path / "memory.json"
    with open(json_file, "w", encoding="utf-8") as f:
        for entity in sample_entities:
            f.write(json.dumps(entity) + "\n")
    return json_file
@pytest.mark.asyncio
async def test_get_markdown_processor(tmp_path, monkeypatch):
    """Test getting markdown processor."""
    monkeypatch.setenv("HOME", str(tmp_path))
    processor = await import_memory_json.get_markdown_processor()
    assert isinstance(processor, MarkdownProcessor)
def test_import_json_command_file_not_found(tmp_path):
    """Test error handling for nonexistent file."""
    nonexistent = tmp_path / "nonexistent.json"
    result = runner.invoke(import_app, ["memory-json", str(nonexistent)])
    assert result.exit_code == 1
    assert "File not found" in result.output
def test_import_json_command_success(tmp_path, sample_json_file, monkeypatch):
    """Test successful JSON import via command."""
    # Set up test environment
    monkeypatch.setenv("HOME", str(tmp_path))
    # Run import
    result = runner.invoke(import_app, ["memory-json", str(sample_json_file)])
    assert result.exit_code == 0
    assert "Import complete" in result.output
    assert "Created 1 entities" in result.output
    assert "Added 1 relations" in result.output
def test_import_json_command_invalid_json(tmp_path):
    """Test error handling for invalid JSON."""
    # Create invalid JSON file
    invalid_file = tmp_path / "invalid.json"
    invalid_file.write_text("not json")
    result = runner.invoke(import_app, ["memory-json", str(invalid_file)])
    assert result.exit_code == 1
    assert "Error during import" in result.output
def test_import_json_command_handle_old_format(tmp_path):
    """Test handling old format JSON with from_id/to_id."""
    # Create JSON with old format
    old_format = [
        {
            "type": "entity",
            "name": "test_entity",
            "entityType": "test",
            "observations": ["Test observation"],
        },
        {
            "type": "relation",
            "from_id": "test_entity",
            "to_id": "other_entity",
            "relation_type": "test_relation",
        },
    ]
    json_file = tmp_path / "old_format.json"
    with open(json_file, "w", encoding="utf-8") as f:
        for item in old_format:
            f.write(json.dumps(item) + "\n")
    # Set up test environment
    monkeypatch = pytest.MonkeyPatch()
    monkeypatch.setenv("HOME", str(tmp_path))
    # Run import
    result = runner.invoke(import_app, ["memory-json", str(json_file)])
    assert result.exit_code == 0
    assert "Import complete" in result.output
def test_import_json_command_missing_name_key(tmp_path):
    """Test handling JSON with missing 'name' key using 'id' instead."""
    # Create JSON with id instead of name (common in Knowledge Graph Memory Server)
    data_with_id = [
        {
            "type": "entity",
            "id": "test_entity_id",
            "entityType": "test",
            "observations": ["Test observation with id"],
        },
        {
            "type": "entity",
            "entityName": "test_entity_2",
            "entityType": "test",
            "observations": ["Test observation with entityName"],
        },
        {
            "type": "entity",
            "name": "test_entity_title",
            "entityType": "test",
            "observations": ["Test observation with name"],
        },
    ]
    json_file = tmp_path / "missing_name.json"
    with open(json_file, "w", encoding="utf-8") as f:
        for item in data_with_id:
            f.write(json.dumps(item) + "\n")
    # Set up test environment
    monkeypatch = pytest.MonkeyPatch()
    monkeypatch.setenv("HOME", str(tmp_path))
    # Run import - should not fail even without 'name' key
    result = runner.invoke(import_app, ["memory-json", str(json_file)])
    assert result.exit_code == 0
    assert "Import complete" in result.output
    assert "Created 3 entities" in result.output
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/project_context.py:
--------------------------------------------------------------------------------
```python
"""Project context utilities for Basic Memory MCP server.
Provides project lookup utilities for MCP tools.
Handles project validation and context management in one place.
"""
import os
from typing import Optional, List
from httpx import AsyncClient
from httpx._types import (
    HeaderTypes,
)
from loguru import logger
from fastmcp import Context
from basic_memory.config import ConfigManager
from basic_memory.mcp.tools.utils import call_get
from basic_memory.schemas.project_info import ProjectItem, ProjectList
from basic_memory.utils import generate_permalink
async def resolve_project_parameter(project: Optional[str] = None) -> Optional[str]:
    """Resolve project parameter using three-tier hierarchy.
    if config.cloud_mode:
        project is required
    else:
        Resolution order:
        1. Single Project Mode  (--project cli arg, or BASIC_MEMORY_MCP_PROJECT env var) - highest priority
        2. Explicit project parameter - medium priority
        3. Default project if default_project_mode=true - lowest priority
    Args:
        project: Optional explicit project parameter
    Returns:
        Resolved project name or None if no resolution possible
    """
    config = ConfigManager().config
    # if cloud_mode, project is required
    if config.cloud_mode:
        if project:
            logger.debug(f"project: {project}, cloud_mode: {config.cloud_mode}")
            return project
        else:
            raise ValueError("No project specified. Project is required for cloud mode.")
    # Priority 1: CLI constraint overrides everything (--project arg sets env var)
    constrained_project = os.environ.get("BASIC_MEMORY_MCP_PROJECT")
    if constrained_project:
        logger.debug(f"Using CLI constrained project: {constrained_project}")
        return constrained_project
    # Priority 2: Explicit project parameter
    if project:
        logger.debug(f"Using explicit project parameter: {project}")
        return project
    # Priority 3: Default project mode
    if config.default_project_mode:
        logger.debug(f"Using default project from config: {config.default_project}")
        return config.default_project
    # No resolution possible
    return None
async def get_project_names(client: AsyncClient, headers: HeaderTypes | None = None) -> List[str]:
    response = await call_get(client, "/projects/projects", headers=headers)
    project_list = ProjectList.model_validate(response.json())
    return [project.name for project in project_list.projects]
async def get_active_project(
    client: AsyncClient,
    project: Optional[str] = None,
    context: Optional[Context] = None,
    headers: HeaderTypes | None = None,
) -> ProjectItem:
    """Get and validate project, setting it in context if available.
    Args:
        client: HTTP client for API calls
        project: Optional project name (resolved using hierarchy)
        context: Optional FastMCP context to cache the result
    Returns:
        The validated project item
    Raises:
        ValueError: If no project can be resolved
        HTTPError: If project doesn't exist or is inaccessible
    """
    resolved_project = await resolve_project_parameter(project)
    if not resolved_project:
        project_names = await get_project_names(client, headers)
        raise ValueError(
            "No project specified. "
            "Either set 'default_project_mode=true' in config, or use 'project' argument.\n"
            f"Available projects: {project_names}"
        )
    project = resolved_project
    # Check if already cached in context
    if context:
        cached_project = context.get_state("active_project")
        if cached_project and cached_project.name == project:
            logger.debug(f"Using cached project from context: {project}")
            return cached_project
    # Validate project exists by calling API
    logger.debug(f"Validating project: {project}")
    permalink = generate_permalink(project)
    response = await call_get(client, f"/{permalink}/project/item", headers=headers)
    active_project = ProjectItem.model_validate(response.json())
    # Cache in context if available
    if context:
        context.set_state("active_project", active_project)
        logger.debug(f"Cached project in context: {project}")
    logger.debug(f"Validated project: {active_project.name}")
    return active_project
def add_project_metadata(result: str, project_name: str) -> str:
    """Add project context as metadata footer for assistant session tracking.
    Provides clear project context to help the assistant remember which
    project is being used throughout the conversation session.
    Args:
        result: The tool result string
        project_name: The project name that was used
    Returns:
        Result with project session tracking metadata
    """
    return f"{result}\n\n[Session: Using project '{project_name}']"
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_recent_activity.py:
--------------------------------------------------------------------------------
```python
"""Tests for discussion context MCP tool."""
import pytest
from mcp.server.fastmcp.exceptions import ToolError
from basic_memory.mcp.tools import recent_activity
from basic_memory.schemas.search import SearchItemType
# Test data for different timeframe formats
valid_timeframes = [
    "7d",  # Standard format
    "yesterday",  # Natural language
    "0d",  # Zero duration
]
invalid_timeframes = [
    "invalid",  # Nonsense string
    # NOTE: "tomorrow" now returns 1 day ago due to timezone safety - no longer invalid
]
@pytest.mark.asyncio
async def test_recent_activity_timeframe_formats(client, test_project, test_graph):
    """Test that recent_activity accepts various timeframe formats."""
    # Test each valid timeframe with project-specific mode
    for timeframe in valid_timeframes:
        try:
            result = await recent_activity.fn(
                project=test_project.name,
                type=["entity"],
                timeframe=timeframe,
            )
            assert result is not None
            assert isinstance(result, str)
            assert "Recent Activity:" in result
            assert timeframe in result
        except Exception as e:
            pytest.fail(f"Failed with valid timeframe '{timeframe}': {str(e)}")
    # Test invalid timeframes should raise ValidationError
    for timeframe in invalid_timeframes:
        with pytest.raises(ToolError):
            await recent_activity.fn(project=test_project.name, timeframe=timeframe)
@pytest.mark.asyncio
async def test_recent_activity_type_filters(client, test_project, test_graph):
    """Test that recent_activity correctly filters by types."""
    # Test single string type
    result = await recent_activity.fn(project=test_project.name, type=SearchItemType.ENTITY)
    assert result is not None
    assert isinstance(result, str)
    assert "Recent Activity:" in result
    assert "Recent Notes & Documents" in result
    # Test single string type
    result = await recent_activity.fn(project=test_project.name, type="entity")
    assert result is not None
    assert isinstance(result, str)
    assert "Recent Activity:" in result
    assert "Recent Notes & Documents" in result
    # Test single type
    result = await recent_activity.fn(project=test_project.name, type=["entity"])
    assert result is not None
    assert isinstance(result, str)
    assert "Recent Activity:" in result
    assert "Recent Notes & Documents" in result
    # Test multiple types
    result = await recent_activity.fn(project=test_project.name, type=["entity", "observation"])
    assert result is not None
    assert isinstance(result, str)
    assert "Recent Activity:" in result
    # Should contain sections for both types
    assert "Recent Notes & Documents" in result or "Recent Observations" in result
    # Test multiple types
    result = await recent_activity.fn(
        project=test_project.name, type=[SearchItemType.ENTITY, SearchItemType.OBSERVATION]
    )
    assert result is not None
    assert isinstance(result, str)
    assert "Recent Activity:" in result
    # Should contain sections for both types
    assert "Recent Notes & Documents" in result or "Recent Observations" in result
    # Test all types
    result = await recent_activity.fn(
        project=test_project.name, type=["entity", "observation", "relation"]
    )
    assert result is not None
    assert isinstance(result, str)
    assert "Recent Activity:" in result
    assert "Activity Summary:" in result
@pytest.mark.asyncio
async def test_recent_activity_type_invalid(client, test_project, test_graph):
    """Test that recent_activity correctly filters by types."""
    # Test single invalid string type
    with pytest.raises(ValueError) as e:
        await recent_activity.fn(project=test_project.name, type="note")
    assert (
        str(e.value) == "Invalid type: note. Valid types are: ['entity', 'observation', 'relation']"
    )
    # Test invalid string array type
    with pytest.raises(ValueError) as e:
        await recent_activity.fn(project=test_project.name, type=["note"])
    assert (
        str(e.value) == "Invalid type: note. Valid types are: ['entity', 'observation', 'relation']"
    )
@pytest.mark.asyncio
async def test_recent_activity_discovery_mode(client, test_project, test_graph):
    """Test that recent_activity discovery mode works without project parameter."""
    # Test discovery mode (no project parameter)
    result = await recent_activity.fn()
    assert result is not None
    assert isinstance(result, str)
    # Check that we get a formatted summary
    assert "Recent Activity Summary" in result
    assert "Most Active Project:" in result or "Other Active Projects:" in result
    assert "Summary:" in result
    assert "active projects" in result
    # Should contain project discovery guidance
    assert "Suggested project:" in result or "Multiple active projects" in result
    assert "Session reminder:" in result
```
--------------------------------------------------------------------------------
/src/basic_memory/markdown/markdown_processor.py:
--------------------------------------------------------------------------------
```python
from pathlib import Path
from typing import Optional
from collections import OrderedDict
from frontmatter import Post
from loguru import logger
from basic_memory import file_utils
from basic_memory.file_utils import dump_frontmatter
from basic_memory.markdown.entity_parser import EntityParser
from basic_memory.markdown.schemas import EntityMarkdown, Observation, Relation
class DirtyFileError(Exception):
    """Raised when attempting to write to a file that has been modified."""
    pass
class MarkdownProcessor:
    """Process markdown files while preserving content and structure.
    used only for import
    This class handles the file I/O aspects of our markdown processing. It:
    1. Uses EntityParser for reading/parsing files into our schema
    2. Handles writing files with proper frontmatter
    3. Formats structured sections (observations/relations) consistently
    4. Preserves user content exactly as written
    5. Performs atomic writes using temp files
    It does NOT:
    1. Modify the schema directly (that's done by services)
    2. Handle in-place updates (everything is read->modify->write)
    3. Track schema changes (that's done by the database)
    """
    def __init__(self, entity_parser: EntityParser):
        """Initialize processor with base path and parser."""
        self.entity_parser = entity_parser
    async def read_file(self, path: Path) -> EntityMarkdown:
        """Read and parse file into EntityMarkdown schema.
        This is step 1 of our read->modify->write pattern.
        We use EntityParser to handle all the markdown parsing.
        """
        return await self.entity_parser.parse_file(path)
    async def write_file(
        self,
        path: Path,
        markdown: EntityMarkdown,
        expected_checksum: Optional[str] = None,
    ) -> str:
        """Write EntityMarkdown schema back to file.
        This is step 3 of our read->modify->write pattern.
        The entire file is rewritten atomically on each update.
        File Structure:
        ---
        frontmatter fields
        ---
        user content area (preserved exactly)
        ## Observations (if any)
        formatted observations
        ## Relations (if any)
        formatted relations
        Args:
            path: Where to write the file
            markdown: Complete schema to write
            expected_checksum: If provided, verify file hasn't changed
        Returns:
            Checksum of written file
        Raises:
            DirtyFileError: If file has been modified (when expected_checksum provided)
        """
        # Dirty check if needed
        if expected_checksum is not None:
            current_content = path.read_text(encoding="utf-8")
            current_checksum = await file_utils.compute_checksum(current_content)
            if current_checksum != expected_checksum:
                raise DirtyFileError(f"File {path} has been modified")
        # Convert frontmatter to dict
        frontmatter_dict = OrderedDict()
        frontmatter_dict["title"] = markdown.frontmatter.title
        frontmatter_dict["type"] = markdown.frontmatter.type
        frontmatter_dict["permalink"] = markdown.frontmatter.permalink
        metadata = markdown.frontmatter.metadata or {}
        for k, v in metadata.items():
            frontmatter_dict[k] = v
        # Start with user content (or minimal title for new files)
        content = markdown.content or f"# {markdown.frontmatter.title}\n"
        # Add structured sections with proper spacing
        content = content.rstrip()  # Remove trailing whitespace
        # add a blank line if we have semantic content
        if markdown.observations or markdown.relations:
            content += "\n"
        if markdown.observations:
            content += self.format_observations(markdown.observations)
        if markdown.relations:
            content += self.format_relations(markdown.relations)
        # Create Post object for frontmatter
        post = Post(content, **frontmatter_dict)
        final_content = dump_frontmatter(post)
        logger.debug(f"writing file {path} with content:\n{final_content}")
        # Write atomically and return checksum of updated file
        path.parent.mkdir(parents=True, exist_ok=True)
        await file_utils.write_file_atomic(path, final_content)
        return await file_utils.compute_checksum(final_content)
    def format_observations(self, observations: list[Observation]) -> str:
        """Format observations section in standard way.
        Format: - [category] content #tag1 #tag2 (context)
        """
        lines = [f"{obs}" for obs in observations]
        return "\n".join(lines) + "\n"
    def format_relations(self, relations: list[Relation]) -> str:
        """Format relations section in standard way.
        Format: - relation_type [[target]] (context)
        """
        lines = [f"{rel}" for rel in relations]
        return "\n".join(lines) + "\n"
```
--------------------------------------------------------------------------------
/tests/api/test_prompt_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for the prompt router endpoints."""
import pytest
import pytest_asyncio
from httpx import AsyncClient
from basic_memory.services.context_service import ContextService
@pytest_asyncio.fixture
async def context_service(entity_repository, search_service, observation_repository):
    """Create a real context service for testing."""
    return ContextService(entity_repository, search_service, observation_repository)
@pytest.mark.asyncio
async def test_continue_conversation_endpoint(
    client: AsyncClient,
    entity_service,
    search_service,
    context_service,
    entity_repository,
    test_graph,
    project_url,
):
    """Test the continue_conversation endpoint with real services."""
    # Create request data
    request_data = {
        "topic": "Root",  # This should match our test entity in test_graph
        "timeframe": "7d",
        "depth": 1,
        "related_items_limit": 2,
    }
    # Call the endpoint
    response = await client.post(f"{project_url}/prompt/continue-conversation", json=request_data)
    # Verify response
    assert response.status_code == 200
    result = response.json()
    assert "prompt" in result
    assert "context" in result
    # Check content of context
    context = result["context"]
    assert context["topic"] == "Root"
    assert context["timeframe"] == "7d"
    assert context["has_results"] is True
    assert len(context["hierarchical_results"]) > 0
    # Check content of prompt
    prompt = result["prompt"]
    assert "Continuing conversation on: Root" in prompt
    assert "memory retrieval session" in prompt
    # Test without topic - should use recent activity
    request_data = {"timeframe": "1d", "depth": 1, "related_items_limit": 2}
    response = await client.post(f"{project_url}/prompt/continue-conversation", json=request_data)
    assert response.status_code == 200
    result = response.json()
    assert "Recent Activity" in result["context"]["topic"]
@pytest.mark.asyncio
async def test_search_prompt_endpoint(
    client: AsyncClient, entity_service, search_service, test_graph, project_url
):
    """Test the search_prompt endpoint with real services."""
    # Create request data
    request_data = {
        "query": "Root",  # This should match our test entity
        "timeframe": "7d",
    }
    # Call the endpoint
    response = await client.post(f"{project_url}/prompt/search", json=request_data)
    # Verify response
    assert response.status_code == 200
    result = response.json()
    assert "prompt" in result
    assert "context" in result
    # Check content of context
    context = result["context"]
    assert context["query"] == "Root"
    assert context["timeframe"] == "7d"
    assert context["has_results"] is True
    assert len(context["results"]) > 0
    # Check content of prompt
    prompt = result["prompt"]
    assert 'Search Results for: "Root"' in prompt
    assert "This is a memory search session" in prompt
@pytest.mark.asyncio
async def test_search_prompt_no_results(
    client: AsyncClient, entity_service, search_service, project_url
):
    """Test the search_prompt endpoint with a query that returns no results."""
    # Create request data with a query that shouldn't match anything
    request_data = {"query": "NonExistentQuery12345", "timeframe": "7d"}
    # Call the endpoint
    response = await client.post(f"{project_url}/prompt/search", json=request_data)
    # Verify response
    assert response.status_code == 200
    result = response.json()
    # Check content of context
    context = result["context"]
    assert context["query"] == "NonExistentQuery12345"
    assert context["has_results"] is False
    assert len(context["results"]) == 0
    # Check content of prompt
    prompt = result["prompt"]
    assert 'Search Results for: "NonExistentQuery12345"' in prompt
    assert "I couldn't find any results for this query" in prompt
    assert "Opportunity to Capture Knowledge" in prompt
@pytest.mark.asyncio
async def test_error_handling(client: AsyncClient, monkeypatch, project_url):
    """Test error handling in the endpoints by breaking the template loader."""
    # Patch the template loader to raise an exception
    def mock_render(*args, **kwargs):
        raise Exception("Template error")
    # Apply the patch
    monkeypatch.setattr("basic_memory.api.template_loader.TemplateLoader.render", mock_render)
    # Test continue_conversation error handling
    response = await client.post(
        f"{project_url}/prompt/continue-conversation",
        json={"topic": "test error", "timeframe": "7d"},
    )
    assert response.status_code == 500
    assert "detail" in response.json()
    assert "Template error" in response.json()["detail"]
    # Test search_prompt error handling
    response = await client.post(
        f"{project_url}/prompt/search", json={"query": "test error", "timeframe": "7d"}
    )
    assert response.status_code == 500
    assert "detail" in response.json()
    assert "Template error" in response.json()["detail"]
```
--------------------------------------------------------------------------------
/tests/markdown/test_parser_edge_cases.py:
--------------------------------------------------------------------------------
```python
"""Tests for markdown parser edge cases."""
from pathlib import Path
from textwrap import dedent
import pytest
from basic_memory.markdown.entity_parser import EntityParser
@pytest.mark.asyncio
async def test_unicode_content(tmp_path):
    """Test handling of Unicode content including emoji and non-Latin scripts."""
    content = dedent("""
        ---
        type: test
        id: test/unicode
        created: 2024-12-21T14:00:00Z
        modified: 2024-12-21T14:00:00Z
        tags: [unicode, 测试]
        ---
        
        # Unicode Test 🧪
        
        ## Observations
        - [test] Emoji test 👍 #emoji #test (Testing emoji)
        - [中文] Chinese text 测试 #language (Script test)
        - [русский] Russian привет #language (More scripts)
        - [note] Emoji in text 😀 #meta (Category test)
        
        ## Relations
        - tested_by [[测试组件]] (Unicode test)
        - depends_on [[компонент]] (Another test)
        """)
    test_file = tmp_path / "unicode.md"
    test_file.write_text(content, encoding="utf-8")
    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(test_file)
    assert "测试" in entity.frontmatter.metadata["tags"]
    assert "chinese" not in entity.frontmatter.metadata["tags"]
    assert "🧪" in entity.content
    # Verify Unicode in observations
    assert any(o.content == "Emoji test 👍 #emoji #test" for o in entity.observations)
    assert any(o.category == "中文" for o in entity.observations)
    assert any(o.category == "русский" for o in entity.observations)
    # Verify Unicode in relations
    assert any(r.target == "测试组件" for r in entity.relations)
    assert any(r.target == "компонент" for r in entity.relations)
@pytest.mark.asyncio
async def test_empty_file(tmp_path):
    """Test handling of empty files."""
    empty_file = tmp_path / "empty.md"
    empty_file.write_text("")
    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(empty_file)
    assert entity.observations == []
    assert entity.relations == []
@pytest.mark.asyncio
async def test_missing_sections(tmp_path):
    """Test handling of files with missing sections."""
    content = dedent("""
        ---
        type: test
        id: test/missing
        created: 2024-01-09
        modified: 2024-01-09
        tags: []
        ---
        
        Just some content
        with [[links]] but no sections
        """)
    test_file = tmp_path / "missing.md"
    test_file.write_text(content)
    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(test_file)
    assert len(entity.relations) == 1
    assert entity.relations[0].target == "links"
    assert entity.relations[0].type == "links to"
@pytest.mark.asyncio
async def test_tasks_are_not_observations(tmp_path):
    """Test handling of plain observations without categories."""
    content = dedent("""
        ---
        type: test
        id: test/missing
        created: 2024-01-09
        modified: 2024-01-09
        tags: []
        ---
        - [ ] one
        -[ ] two
        - [x] done
        - [-] not done
        """)
    test_file = tmp_path / "missing.md"
    test_file.write_text(content)
    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(test_file)
    assert len(entity.observations) == 0
@pytest.mark.asyncio
async def test_nested_content(tmp_path):
    """Test handling of deeply nested content."""
    content = dedent("""
        ---
        type: test
        id: test/nested
        created: 2024-01-09
        modified: 2024-01-09
        tags: []
        ---
        
        # Test
        
        ## Level 1
        - [test] Level 1 #test (First level)
        - implements [[One]]
            
            ### Level 2
            - [test] Level 2 #test (Second level)
            - uses [[Two]]
                
                #### Level 3
                - [test] Level 3 #test (Third level)
                - needs [[Three]]
        """)
    test_file = tmp_path / "nested.md"
    test_file.write_text(content)
    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(test_file)
    # Should find all observations and relations regardless of nesting
    assert len(entity.observations) == 3
    assert len(entity.relations) == 3
    assert {r.target for r in entity.relations} == {"One", "Two", "Three"}
@pytest.mark.asyncio
async def test_malformed_frontmatter(tmp_path):
    """Test handling of malformed frontmatter."""
    # Missing fields
    content = dedent("""
        ---
        type: test
        ---
        
        # Test
        """)
    test_file = tmp_path / "malformed.md"
    test_file.write_text(content)
    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(test_file)
    assert entity.frontmatter.permalink is None
@pytest.mark.asyncio
async def test_file_not_found():
    """Test handling of non-existent files."""
    parser = EntityParser(Path("/tmp"))
    with pytest.raises(FileNotFoundError):
        await parser.parse_file(Path("nonexistent.md"))
```
--------------------------------------------------------------------------------
/tests/repository/test_entity_upsert_issue_187.py:
--------------------------------------------------------------------------------
```python
"""Tests for issue #187 - UNIQUE constraint violation on file_path during sync."""
import pytest
from datetime import datetime, timezone
from basic_memory.models.knowledge import Entity, Observation
from basic_memory.repository.entity_repository import EntityRepository
@pytest.mark.asyncio
async def test_upsert_entity_with_observations_conflict(entity_repository: EntityRepository):
    """Test upserting an entity that already exists with observations.
    This reproduces issue #187 where sync fails with UNIQUE constraint violations
    when trying to update entities that already exist with observations.
    """
    # Create initial entity with observations
    entity1 = Entity(
        project_id=entity_repository.project_id,
        title="Original Title",
        entity_type="note",
        permalink="debugging/backup-system/coderabbit-feedback-resolution",
        file_path="debugging/backup-system/CodeRabbit Feedback Resolution - Backup System Issues.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )
    # Add observations to the entity
    obs1 = Observation(
        content="This is a test observation",
        category="testing",
        tags=["test"],
    )
    entity1.observations.append(obs1)
    result1 = await entity_repository.upsert_entity(entity1)
    original_id = result1.id
    # Verify entity was created with observations
    assert result1.id is not None
    assert len(result1.observations) == 1
    # Now try to upsert the same file_path with different content/observations
    # This simulates a file being modified and re-synced
    entity2 = Entity(
        project_id=entity_repository.project_id,
        title="Updated Title",
        entity_type="note",
        permalink="debugging/backup-system/coderabbit-feedback-resolution",  # Same permalink
        file_path="debugging/backup-system/CodeRabbit Feedback Resolution - Backup System Issues.md",  # Same file_path
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )
    # Add different observations
    obs2 = Observation(
        content="This is an updated observation",
        category="updated",
        tags=["updated"],
    )
    obs3 = Observation(
        content="This is a second observation",
        category="second",
        tags=["second"],
    )
    entity2.observations.extend([obs2, obs3])
    # This should UPDATE the existing entity, not fail with IntegrityError
    result2 = await entity_repository.upsert_entity(entity2)
    # Should update existing entity (same ID)
    assert result2.id == original_id
    assert result2.title == "Updated Title"
    assert result2.file_path == entity1.file_path
    assert result2.permalink == entity1.permalink
    # Observations should be updated
    assert len(result2.observations) == 2
    assert result2.observations[0].content == "This is an updated observation"
    assert result2.observations[1].content == "This is a second observation"
@pytest.mark.asyncio
async def test_upsert_entity_repeated_sync_same_file(entity_repository: EntityRepository):
    """Test that syncing the same file multiple times doesn't cause IntegrityError.
    This tests the specific scenario from issue #187 where files are being
    synced repeatedly and hitting UNIQUE constraint violations.
    """
    file_path = "processes/Complete Process for Uploading New Training Videos.md"
    permalink = "processes/complete-process-for-uploading-new-training-videos"
    # Create initial entity
    entity1 = Entity(
        project_id=entity_repository.project_id,
        title="Complete Process for Uploading New Training Videos",
        entity_type="note",
        permalink=permalink,
        file_path=file_path,
        content_type="text/markdown",
        checksum="abc123",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )
    result1 = await entity_repository.upsert_entity(entity1)
    first_id = result1.id
    # Simulate multiple sync attempts (like the infinite retry loop in the issue)
    for i in range(5):
        entity_new = Entity(
            project_id=entity_repository.project_id,
            title="Complete Process for Uploading New Training Videos",
            entity_type="note",
            permalink=permalink,
            file_path=file_path,
            content_type="text/markdown",
            checksum=f"def{456 + i}",  # Different checksum each time
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        # Each upsert should succeed and update the existing entity
        result = await entity_repository.upsert_entity(entity_new)
        # Should always return the same entity (updated)
        assert result.id == first_id
        assert result.checksum == entity_new.checksum
        assert result.file_path == file_path
        assert result.permalink == permalink
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_build_context.py:
--------------------------------------------------------------------------------
```python
"""Tests for discussion context MCP tool."""
import pytest
from datetime import datetime
from mcp.server.fastmcp.exceptions import ToolError
from basic_memory.mcp.tools import build_context
from basic_memory.schemas.memory import (
    GraphContext,
)
@pytest.mark.asyncio
async def test_get_basic_discussion_context(client, test_graph, test_project):
    """Test getting basic discussion context."""
    context = await build_context.fn(project=test_project.name, url="memory://test/root")
    assert isinstance(context, GraphContext)
    assert len(context.results) == 1
    assert context.results[0].primary_result.permalink == "test/root"
    assert len(context.results[0].related_results) > 0
    # Verify metadata
    assert context.metadata.uri == "test/root"
    assert context.metadata.depth == 1  # default depth
    assert context.metadata.timeframe is not None
    assert isinstance(context.metadata.generated_at, datetime)
    assert context.metadata.primary_count == 1
    if context.metadata.related_count:
        assert context.metadata.related_count > 0
@pytest.mark.asyncio
async def test_get_discussion_context_pattern(client, test_graph, test_project):
    """Test getting context with pattern matching."""
    context = await build_context.fn(project=test_project.name, url="memory://test/*", depth=1)
    assert isinstance(context, GraphContext)
    assert len(context.results) > 1  # Should match multiple test/* paths
    assert all("test/" in item.primary_result.permalink for item in context.results)  # pyright: ignore [reportOperatorIssue]
    assert context.metadata.depth == 1
@pytest.mark.asyncio
async def test_get_discussion_context_timeframe(client, test_graph, test_project):
    """Test timeframe parameter filtering."""
    # Get recent context
    recent_context = await build_context.fn(
        project=test_project.name,
        url="memory://test/root",
        timeframe="1d",  # Last 24 hours
    )
    # Get older context
    older_context = await build_context.fn(
        project=test_project.name,
        url="memory://test/root",
        timeframe="30d",  # Last 30 days
    )
    # Calculate total related items
    total_recent_related = (
        sum(len(item.related_results) for item in recent_context.results)
        if recent_context.results
        else 0
    )
    total_older_related = (
        sum(len(item.related_results) for item in older_context.results)
        if older_context.results
        else 0
    )
    assert total_older_related >= total_recent_related
@pytest.mark.asyncio
async def test_get_discussion_context_not_found(client, test_project):
    """Test handling of non-existent URIs."""
    context = await build_context.fn(project=test_project.name, url="memory://test/does-not-exist")
    assert isinstance(context, GraphContext)
    assert len(context.results) == 0
    assert context.metadata.primary_count == 0
    assert context.metadata.related_count == 0
# Test data for different timeframe formats
valid_timeframes = [
    "7d",  # Standard format
    "yesterday",  # Natural language
    "0d",  # Zero duration
]
invalid_timeframes = [
    "invalid",  # Nonsense string
    # NOTE: "tomorrow" now returns 1 day ago due to timezone safety - no longer invalid
]
@pytest.mark.asyncio
async def test_build_context_timeframe_formats(client, test_graph, test_project):
    """Test that build_context accepts various timeframe formats."""
    test_url = "memory://specs/test"
    # Test each valid timeframe
    for timeframe in valid_timeframes:
        try:
            result = await build_context.fn(
                project=test_project.name,
                url=test_url,
                timeframe=timeframe,
                page=1,
                page_size=10,
                max_related=10,
            )
            assert result is not None
        except Exception as e:
            pytest.fail(f"Failed with valid timeframe '{timeframe}': {str(e)}")
    # Test invalid timeframes should raise ValidationError
    for timeframe in invalid_timeframes:
        with pytest.raises(ToolError):
            await build_context.fn(project=test_project.name, url=test_url, timeframe=timeframe)
@pytest.mark.asyncio
async def test_build_context_string_depth_parameter(client, test_graph, test_project):
    """Test that build_context handles string depth parameter correctly."""
    test_url = "memory://test/root"
    # Test valid string depth parameter - should either raise ToolError or convert to int
    try:
        result = await build_context.fn(url=test_url, depth="2", project=test_project.name)
        # If it succeeds, verify the depth was converted to an integer
        assert isinstance(result.metadata.depth, int)
        assert result.metadata.depth == 2
    except ToolError:
        # This is also acceptable behavior - type validation should catch it
        pass
    # Test invalid string depth parameter - should raise ToolError
    with pytest.raises(ToolError):
        await build_context.fn(test_url, depth="invalid", project=test_project.name)
```
--------------------------------------------------------------------------------
/tests/api/test_continue_conversation_template.py:
--------------------------------------------------------------------------------
```python
"""Tests for the continue_conversation template rendering."""
import datetime
import pytest
from basic_memory.api.template_loader import TemplateLoader
from basic_memory.schemas.memory import EntitySummary
from basic_memory.schemas.search import SearchItemType
@pytest.fixture
def template_loader():
    """Return a TemplateLoader instance for testing."""
    return TemplateLoader()
@pytest.fixture
def entity_summary():
    """Create a sample EntitySummary for testing."""
    return EntitySummary(
        title="Test Entity",
        permalink="test/entity",
        type=SearchItemType.ENTITY,
        content="This is a test entity with some content.",
        file_path="/path/to/test/entity.md",
        created_at=datetime.datetime(2023, 1, 1, 12, 0),
    )
@pytest.fixture
def context_with_results(entity_summary):
    """Create a sample context with results for testing."""
    from basic_memory.schemas.memory import ObservationSummary, ContextResult
    # Create an observation for the entity
    observation = ObservationSummary(
        title="Test Observation",
        permalink="test/entity/observations/1",
        category="test",
        content="This is a test observation.",
        file_path="/path/to/test/entity.md",
        created_at=datetime.datetime(2023, 1, 1, 12, 0),
    )
    # Create a context result with primary_result, observations, and related_results
    context_item = ContextResult(
        primary_result=entity_summary,
        observations=[observation],
        related_results=[entity_summary],
    )
    return {
        "topic": "Test Topic",
        "timeframe": "7d",
        "has_results": True,
        "hierarchical_results": [context_item],
    }
@pytest.fixture
def context_without_results():
    """Create a sample context without results for testing."""
    return {
        "topic": "Empty Topic",
        "timeframe": "1d",
        "has_results": False,
        "hierarchical_results": [],
    }
@pytest.mark.asyncio
async def test_continue_conversation_with_results(template_loader, context_with_results):
    """Test rendering the continue_conversation template with results."""
    result = await template_loader.render("prompts/continue_conversation.hbs", context_with_results)
    # Check that key elements are present
    assert "Continuing conversation on: Test Topic" in result
    assert "memory://test/entity" in result
    assert "Test Entity" in result
    assert "This is a test entity with some content." in result
    assert "Related Context" in result
    assert "read_note" in result
    assert "Next Steps" in result
    assert "Knowledge Capture Recommendation" in result
@pytest.mark.asyncio
async def test_continue_conversation_without_results(template_loader, context_without_results):
    """Test rendering the continue_conversation template without results."""
    result = await template_loader.render(
        "prompts/continue_conversation.hbs", context_without_results
    )
    # Check that key elements are present
    assert "Continuing conversation on: Empty Topic" in result
    assert "The supplied query did not return any information" in result
    assert "Opportunity to Capture New Knowledge!" in result
    assert 'title="Empty Topic"' in result
    assert "Next Steps" in result
    assert "Knowledge Capture Recommendation" in result
@pytest.mark.asyncio
async def test_next_steps_section(template_loader, context_with_results):
    """Test that the next steps section is rendered correctly."""
    result = await template_loader.render("prompts/continue_conversation.hbs", context_with_results)
    assert "Next Steps" in result
    assert 'Explore more with: `search_notes("Test Topic")`' in result
    assert (
        f'See what\'s changed: `recent_activity(timeframe="{context_with_results["timeframe"]}")`'
        in result
    )
    assert "Record new learnings or decisions from this conversation" in result
@pytest.mark.asyncio
async def test_knowledge_capture_recommendation(template_loader, context_with_results):
    """Test that the knowledge capture recommendation is rendered."""
    result = await template_loader.render("prompts/continue_conversation.hbs", context_with_results)
    assert "Knowledge Capture Recommendation" in result
    assert "actively look for opportunities to:" in result
    assert "Record key information, decisions, or insights" in result
    assert "Link new knowledge to existing topics" in result
    assert "Suggest capturing important context" in result
    assert "one of the most valuable aspects of Basic Memory" in result
@pytest.mark.asyncio
async def test_timeframe_default_value(template_loader, context_with_results):
    """Test that the timeframe uses the default value when not provided."""
    # Remove the timeframe from the context
    context_without_timeframe = context_with_results.copy()
    context_without_timeframe["timeframe"] = None
    result = await template_loader.render(
        "prompts/continue_conversation.hbs", context_without_timeframe
    )
    # Check that the default value is used
    assert 'recent_activity(timeframe="7d")' in result
```
--------------------------------------------------------------------------------
/src/basic_memory/api/routers/utils.py:
--------------------------------------------------------------------------------
```python
from typing import Optional, List
from basic_memory.repository import EntityRepository
from basic_memory.repository.search_repository import SearchIndexRow
from basic_memory.schemas.memory import (
    EntitySummary,
    ObservationSummary,
    RelationSummary,
    MemoryMetadata,
    GraphContext,
    ContextResult,
)
from basic_memory.schemas.search import SearchItemType, SearchResult
from basic_memory.services import EntityService
from basic_memory.services.context_service import (
    ContextResultRow,
    ContextResult as ServiceContextResult,
)
async def to_graph_context(
    context_result: ServiceContextResult,
    entity_repository: EntityRepository,
    page: Optional[int] = None,
    page_size: Optional[int] = None,
):
    # Helper function to convert items to summaries
    async def to_summary(item: SearchIndexRow | ContextResultRow):
        match item.type:
            case SearchItemType.ENTITY:
                return EntitySummary(
                    title=item.title,  # pyright: ignore
                    permalink=item.permalink,
                    content=item.content,
                    file_path=item.file_path,
                    created_at=item.created_at,
                )
            case SearchItemType.OBSERVATION:
                return ObservationSummary(
                    title=item.title,  # pyright: ignore
                    file_path=item.file_path,
                    category=item.category,  # pyright: ignore
                    content=item.content,  # pyright: ignore
                    permalink=item.permalink,  # pyright: ignore
                    created_at=item.created_at,
                )
            case SearchItemType.RELATION:
                from_entity = await entity_repository.find_by_id(item.from_id)  # pyright: ignore
                to_entity = await entity_repository.find_by_id(item.to_id) if item.to_id else None
                return RelationSummary(
                    title=item.title,  # pyright: ignore
                    file_path=item.file_path,
                    permalink=item.permalink,  # pyright: ignore
                    relation_type=item.relation_type,  # pyright: ignore
                    from_entity=from_entity.title if from_entity else None,
                    to_entity=to_entity.title if to_entity else None,
                    created_at=item.created_at,
                )
            case _:  # pragma: no cover
                raise ValueError(f"Unexpected type: {item.type}")
    # Process the hierarchical results
    hierarchical_results = []
    for context_item in context_result.results:
        # Process primary result
        primary_result = await to_summary(context_item.primary_result)
        # Process observations
        observations = []
        for obs in context_item.observations:
            observations.append(await to_summary(obs))
        # Process related results
        related = []
        for rel in context_item.related_results:
            related.append(await to_summary(rel))
        # Add to hierarchical results
        hierarchical_results.append(
            ContextResult(
                primary_result=primary_result,
                observations=observations,
                related_results=related,
            )
        )
    # Create schema metadata from service metadata
    metadata = MemoryMetadata(
        uri=context_result.metadata.uri,
        types=context_result.metadata.types,
        depth=context_result.metadata.depth,
        timeframe=context_result.metadata.timeframe,
        generated_at=context_result.metadata.generated_at,
        primary_count=context_result.metadata.primary_count,
        related_count=context_result.metadata.related_count,
        total_results=context_result.metadata.primary_count + context_result.metadata.related_count,
        total_relations=context_result.metadata.total_relations,
        total_observations=context_result.metadata.total_observations,
    )
    # Return new GraphContext with just hierarchical results
    return GraphContext(
        results=hierarchical_results,
        metadata=metadata,
        page=page,
        page_size=page_size,
    )
async def to_search_results(entity_service: EntityService, results: List[SearchIndexRow]):
    search_results = []
    for r in results:
        entities = await entity_service.get_entities_by_id([r.entity_id, r.from_id, r.to_id])  # pyright: ignore
        search_results.append(
            SearchResult(
                title=r.title,  # pyright: ignore
                type=r.type,  # pyright: ignore
                permalink=r.permalink,
                score=r.score,  # pyright: ignore
                entity=entities[0].permalink if entities else None,
                content=r.content,
                file_path=r.file_path,
                metadata=r.metadata,
                category=r.category,
                from_entity=entities[0].permalink if entities else None,
                to_entity=entities[1].permalink if len(entities) > 1 else None,
                relation_type=r.relation_type,
            )
        )
    return search_results
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/async_client.py:
--------------------------------------------------------------------------------
```python
from contextlib import asynccontextmanager, AbstractAsyncContextManager
from typing import AsyncIterator, Callable, Optional
from httpx import ASGITransport, AsyncClient, Timeout
from loguru import logger
from basic_memory.api.app import app as fastapi_app
from basic_memory.config import ConfigManager
# Optional factory override for dependency injection
_client_factory: Optional[Callable[[], AbstractAsyncContextManager[AsyncClient]]] = None
def set_client_factory(factory: Callable[[], AbstractAsyncContextManager[AsyncClient]]) -> None:
    """Override the default client factory (for cloud app, testing, etc).
    Args:
        factory: An async context manager that yields an AsyncClient
    Example:
        @asynccontextmanager
        async def custom_client_factory():
            async with AsyncClient(...) as client:
                yield client
        set_client_factory(custom_client_factory)
    """
    global _client_factory
    _client_factory = factory
@asynccontextmanager
async def get_client() -> AsyncIterator[AsyncClient]:
    """Get an AsyncClient as a context manager.
    This function provides proper resource management for HTTP clients,
    ensuring connections are closed after use. It supports three modes:
    1. **Factory injection** (cloud app, tests):
       If a custom factory is set via set_client_factory(), use that.
    2. **CLI cloud mode**:
       When cloud_mode_enabled is True, create HTTP client with auth
       token from CLIAuth for requests to cloud proxy endpoint.
    3. **Local mode** (default):
       Use ASGI transport for in-process requests to local FastAPI app.
    Usage:
        async with get_client() as client:
            response = await client.get("/path")
    Yields:
        AsyncClient: Configured HTTP client for the current mode
    Raises:
        RuntimeError: If cloud mode is enabled but user is not authenticated
    """
    if _client_factory:
        # Use injected factory (cloud app, tests)
        async with _client_factory() as client:
            yield client
    else:
        # Default: create based on config
        config = ConfigManager().config
        timeout = Timeout(
            connect=10.0,  # 10 seconds for connection
            read=30.0,  # 30 seconds for reading response
            write=30.0,  # 30 seconds for writing request
            pool=30.0,  # 30 seconds for connection pool
        )
        if config.cloud_mode_enabled:
            # CLI cloud mode: inject auth when creating client
            from basic_memory.cli.auth import CLIAuth
            auth = CLIAuth(client_id=config.cloud_client_id, authkit_domain=config.cloud_domain)
            token = await auth.get_valid_token()
            if not token:
                raise RuntimeError(
                    "Cloud mode enabled but not authenticated. "
                    "Run 'basic-memory cloud login' first."
                )
            # Auth header set ONCE at client creation
            proxy_base_url = f"{config.cloud_host}/proxy"
            logger.info(f"Creating HTTP client for cloud proxy at: {proxy_base_url}")
            async with AsyncClient(
                base_url=proxy_base_url,
                headers={"Authorization": f"Bearer {token}"},
                timeout=timeout,
            ) as client:
                yield client
        else:
            # Local mode: ASGI transport for in-process calls
            logger.info("Creating ASGI client for local Basic Memory API")
            async with AsyncClient(
                transport=ASGITransport(app=fastapi_app), base_url="http://test", timeout=timeout
            ) as client:
                yield client
def create_client() -> AsyncClient:
    """Create an HTTP client based on configuration.
    DEPRECATED: Use get_client() context manager instead for proper resource management.
    This function is kept for backward compatibility but will be removed in a future version.
    The returned client should be closed manually by calling await client.aclose().
    Returns:
        AsyncClient configured for either local ASGI or remote proxy
    """
    config_manager = ConfigManager()
    config = config_manager.config
    # Configure timeout for longer operations like write_note
    # Default httpx timeout is 5 seconds which is too short for file operations
    timeout = Timeout(
        connect=10.0,  # 10 seconds for connection
        read=30.0,  # 30 seconds for reading response
        write=30.0,  # 30 seconds for writing request
        pool=30.0,  # 30 seconds for connection pool
    )
    if config.cloud_mode_enabled:
        # Use HTTP transport to proxy endpoint
        proxy_base_url = f"{config.cloud_host}/proxy"
        logger.info(f"Creating HTTP client for proxy at: {proxy_base_url}")
        return AsyncClient(base_url=proxy_base_url, timeout=timeout)
    else:
        # Default: use ASGI transport for local API (development mode)
        logger.info("Creating ASGI client for local Basic Memory API")
        return AsyncClient(
            transport=ASGITransport(app=fastapi_app), base_url="http://test", timeout=timeout
        )
```
--------------------------------------------------------------------------------
/test-int/test_db_wal_mode.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for WAL mode and Windows-specific SQLite optimizations.
These tests use real filesystem databases (not in-memory) to verify WAL mode
and other SQLite configuration settings work correctly in production scenarios.
"""
import pytest
from unittest.mock import patch
from sqlalchemy import text
@pytest.mark.asyncio
async def test_wal_mode_enabled(engine_factory):
    """Test that WAL mode is enabled on filesystem database connections."""
    engine, _ = engine_factory
    # Execute a query to verify WAL mode is enabled
    async with engine.connect() as conn:
        result = await conn.execute(text("PRAGMA journal_mode"))
        journal_mode = result.fetchone()[0]
        # WAL mode should be enabled for filesystem databases
        assert journal_mode.upper() == "WAL"
@pytest.mark.asyncio
async def test_busy_timeout_configured(engine_factory):
    """Test that busy timeout is configured for database connections."""
    engine, _ = engine_factory
    async with engine.connect() as conn:
        result = await conn.execute(text("PRAGMA busy_timeout"))
        busy_timeout = result.fetchone()[0]
        # Busy timeout should be 10 seconds (10000 milliseconds)
        assert busy_timeout == 10000
@pytest.mark.asyncio
async def test_synchronous_mode_configured(engine_factory):
    """Test that synchronous mode is set to NORMAL for performance."""
    engine, _ = engine_factory
    async with engine.connect() as conn:
        result = await conn.execute(text("PRAGMA synchronous"))
        synchronous = result.fetchone()[0]
        # Synchronous should be NORMAL (1)
        assert synchronous == 1
@pytest.mark.asyncio
async def test_cache_size_configured(engine_factory):
    """Test that cache size is configured for performance."""
    engine, _ = engine_factory
    async with engine.connect() as conn:
        result = await conn.execute(text("PRAGMA cache_size"))
        cache_size = result.fetchone()[0]
        # Cache size should be -64000 (64MB)
        assert cache_size == -64000
@pytest.mark.asyncio
async def test_temp_store_configured(engine_factory):
    """Test that temp_store is set to MEMORY."""
    engine, _ = engine_factory
    async with engine.connect() as conn:
        result = await conn.execute(text("PRAGMA temp_store"))
        temp_store = result.fetchone()[0]
        # temp_store should be MEMORY (2)
        assert temp_store == 2
@pytest.mark.asyncio
async def test_windows_locking_mode_when_on_windows(tmp_path):
    """Test that Windows-specific locking mode is set when running on Windows."""
    from basic_memory.db import engine_session_factory, DatabaseType
    db_path = tmp_path / "test_windows.db"
    with patch("os.name", "nt"):
        # Need to patch at module level where it's imported
        with patch("basic_memory.db.os.name", "nt"):
            async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (
                engine,
                _,
            ):
                async with engine.connect() as conn:
                    result = await conn.execute(text("PRAGMA locking_mode"))
                    locking_mode = result.fetchone()[0]
                    # Locking mode should be NORMAL on Windows
                    assert locking_mode.upper() == "NORMAL"
@pytest.mark.asyncio
async def test_null_pool_on_windows(tmp_path):
    """Test that NullPool is used on Windows to avoid connection pooling issues."""
    from basic_memory.db import engine_session_factory, DatabaseType
    from sqlalchemy.pool import NullPool
    db_path = tmp_path / "test_windows_pool.db"
    with patch("basic_memory.db.os.name", "nt"):
        async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (engine, _):
            # Engine should be using NullPool on Windows
            assert isinstance(engine.pool, NullPool)
@pytest.mark.asyncio
async def test_regular_pool_on_non_windows(tmp_path):
    """Test that regular pooling is used on non-Windows platforms."""
    from basic_memory.db import engine_session_factory, DatabaseType
    from sqlalchemy.pool import NullPool
    db_path = tmp_path / "test_posix_pool.db"
    with patch("basic_memory.db.os.name", "posix"):
        async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (engine, _):
            # Engine should NOT be using NullPool on non-Windows
            assert not isinstance(engine.pool, NullPool)
@pytest.mark.asyncio
async def test_memory_database_no_null_pool_on_windows(tmp_path):
    """Test that in-memory databases do NOT use NullPool even on Windows.
    NullPool closes connections immediately, which destroys in-memory databases.
    This test ensures in-memory databases maintain connection pooling.
    """
    from basic_memory.db import engine_session_factory, DatabaseType
    from sqlalchemy.pool import NullPool
    db_path = tmp_path / "test_memory.db"
    with patch("basic_memory.db.os.name", "nt"):
        async with engine_session_factory(db_path, DatabaseType.MEMORY) as (engine, _):
            # In-memory databases should NOT use NullPool on Windows
            assert not isinstance(engine.pool, NullPool)
```