This is page 4 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── python-developer.md
│ │ └── system-architect.md
│ └── commands
│ ├── release
│ │ ├── beta.md
│ │ ├── changelog.md
│ │ ├── release-check.md
│ │ └── release.md
│ ├── spec.md
│ └── test-live.md
├── .dockerignore
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ └── template_loader.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── mount_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ ├── sync.py
│ │ │ └── tool.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ └── search_repository.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ └── sync_report.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ ├── test_sync_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ ├── test_disable_permalinks_integration.py
│ └── test_sync_performance_benchmark.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ └── test_template_loader.py
│ ├── cli
│ │ ├── conftest.py
│ │ ├── test_bisync_commands.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_cloud_utils.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── conftest.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_prompts.py
│ │ ├── test_resources.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_db_migration_deduplication.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
├── api-performance.md
├── background-relations.md
├── basic-memory-home.md
├── bug-fixes.md
├── chatgpt-integration.md
├── cloud-authentication.md
├── cloud-bisync.md
├── cloud-mode-usage.md
├── cloud-mount.md
├── default-project-mode.md
├── env-file-removal.md
├── env-var-overrides.md
├── explicit-project-parameter.md
├── gitignore-integration.md
├── project-root-env-var.md
├── README.md
└── sqlite-performance.md
```
# Files
--------------------------------------------------------------------------------
/src/basic_memory/services/initialization.py:
--------------------------------------------------------------------------------
```python
"""Shared initialization service for Basic Memory.
This module provides shared initialization functions used by both CLI and API
to ensure consistent application startup across all entry points.
"""
import asyncio
from pathlib import Path
from loguru import logger
from basic_memory import db
from basic_memory.config import BasicMemoryConfig
from basic_memory.models import Project
from basic_memory.repository import (
ProjectRepository,
)
async def initialize_database(app_config: BasicMemoryConfig) -> None:
"""Initialize database with migrations handled automatically by get_or_create_db.
Args:
app_config: The Basic Memory project configuration
Note:
Database migrations are now handled automatically when the database
connection is first established via get_or_create_db().
"""
# Trigger database initialization and migrations by getting the database connection
try:
await db.get_or_create_db(app_config.database_path)
logger.info("Database initialization completed")
except Exception as e:
logger.error(f"Error initializing database: {e}")
# Allow application to continue - it might still work
# depending on what the error was, and will fail with a
# more specific error if the database is actually unusable
async def reconcile_projects_with_config(app_config: BasicMemoryConfig):
"""Ensure all projects in config.json exist in the projects table and vice versa.
This uses the ProjectService's synchronize_projects method to ensure bidirectional
synchronization between the configuration file and the database.
Args:
app_config: The Basic Memory application configuration
"""
logger.info("Reconciling projects from config with database...")
# Get database session - migrations handled centrally
_, session_maker = await db.get_or_create_db(
db_path=app_config.database_path,
db_type=db.DatabaseType.FILESYSTEM,
ensure_migrations=False,
)
project_repository = ProjectRepository(session_maker)
# Import ProjectService here to avoid circular imports
from basic_memory.services.project_service import ProjectService
try:
# Create project service and synchronize projects
project_service = ProjectService(repository=project_repository)
await project_service.synchronize_projects()
logger.info("Projects successfully reconciled between config and database")
except Exception as e:
# Log the error but continue with initialization
logger.error(f"Error during project synchronization: {e}")
logger.info("Continuing with initialization despite synchronization error")
async def initialize_file_sync(
app_config: BasicMemoryConfig,
):
"""Initialize file synchronization services. This function starts the watch service and does not return
Args:
app_config: The Basic Memory project configuration
Returns:
The watch service task that's monitoring file changes
"""
# delay import
from basic_memory.sync import WatchService
# Load app configuration - migrations handled centrally
_, session_maker = await db.get_or_create_db(
db_path=app_config.database_path,
db_type=db.DatabaseType.FILESYSTEM,
ensure_migrations=False,
)
project_repository = ProjectRepository(session_maker)
# Initialize watch service
watch_service = WatchService(
app_config=app_config,
project_repository=project_repository,
quiet=True,
)
# Get active projects
active_projects = await project_repository.get_active_projects()
# Start sync for all projects as background tasks (non-blocking)
async def sync_project_background(project: Project):
"""Sync a single project in the background."""
# avoid circular imports
from basic_memory.sync.sync_service import get_sync_service
logger.info(f"Starting background sync for project: {project.name}")
try:
# Create sync service
sync_service = await get_sync_service(project)
sync_dir = Path(project.path)
await sync_service.sync(sync_dir, project_name=project.name)
logger.info(f"Background sync completed successfully for project: {project.name}")
except Exception as e: # pragma: no cover
logger.error(f"Error in background sync for project {project.name}: {e}")
# Create background tasks for all project syncs (non-blocking)
sync_tasks = [
asyncio.create_task(sync_project_background(project)) for project in active_projects
]
logger.info(f"Created {len(sync_tasks)} background sync tasks")
# Don't await the tasks - let them run in background while we continue
# Then start the watch service in the background
logger.info("Starting watch service for all projects")
# run the watch service
try:
await watch_service.run()
logger.info("Watch service started")
except Exception as e: # pragma: no cover
logger.error(f"Error starting watch service: {e}")
return None
async def initialize_app(
app_config: BasicMemoryConfig,
):
"""Initialize the Basic Memory application.
This function handles all initialization steps:
- Running database migrations
- Reconciling projects from config.json with projects table
- Setting up file synchronization
- Starting background migration for legacy project data
Args:
app_config: The Basic Memory project configuration
"""
logger.info("Initializing app...")
# Initialize database first
await initialize_database(app_config)
# Reconcile projects from config.json with projects table
await reconcile_projects_with_config(app_config)
logger.info("App initialization completed (migration running in background if needed)")
def ensure_initialization(app_config: BasicMemoryConfig) -> None:
"""Ensure initialization runs in a synchronous context.
This is a wrapper for the async initialize_app function that can be
called from synchronous code like CLI entry points.
No-op if app_config.cloud_mode == True. Cloud basic memory manages it's own projects
Args:
app_config: The Basic Memory project configuration
"""
# Skip initialization in cloud mode - cloud manages its own projects
if app_config.cloud_mode_enabled:
logger.debug("Skipping initialization in cloud mode - projects managed by cloud")
return
try:
result = asyncio.run(initialize_app(app_config))
logger.info(f"Initialization completed successfully: result={result}")
except Exception as e: # pragma: no cover
logger.exception(f"Error during initialization: {e}")
# Continue execution even if initialization fails
# The command might still work, or will fail with a
# more specific error message
```
--------------------------------------------------------------------------------
/specs/SPEC-9 Signed Header Tenant Information.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-9: Signed Header Tenant Information'
type: spec
permalink: specs/spec-9-signed-header-tenant-information
tags:
- authentication
- tenant-isolation
- proxy
- security
- mcp
---
# SPEC-9: Signed Header Tenant Information
## Why
WorkOS JWT templates don't work with MCP's dynamic client registration requirement, preventing us from getting tenant information directly in JWT tokens. We need an alternative secure method to pass tenant context from the Cloud Proxy Service to tenant instances.
**Problem Context:**
- MCP spec requires dynamic client registration
- WorkOS JWT templates only apply to statically configured clients
- Without tenant information, we can't properly route requests or isolate tenant data
- Current JWT tokens only contain standard OIDC claims (sub, email, etc.)
**Affected Areas:**
- Cloud Proxy Service (`apps/cloud`) - request forwarding
- Tenant API instances (`apps/api`) - tenant context validation
- MCP Gateway (`apps/mcp`) - authentication flow
- Overall tenant isolation security model
## What
Implement HMAC-signed headers that the Cloud Proxy Service adds when forwarding requests to tenant instances. This provides secure, tamper-proof tenant information without relying on JWT custom claims.
**Components:**
- Header signing utility in Cloud Proxy Service
- Header validation middleware in Tenant API instances
- Shared secret configuration across services
- Fallback mechanisms for development and error cases
## How (High Level)
### 1. Header Format
Add these signed headers to all proxied requests:
```
X-BM-Tenant-ID: {tenant_id}
X-BM-Timestamp: {unix_timestamp}
X-BM-Signature: {hmac_sha256_signature}
```
### 2. Signature Algorithm
```python
# Canonical message format
message = f"{tenant_id}:{timestamp}"
# HMAC-SHA256 signature
signature = hmac.new(
key=shared_secret.encode('utf-8'),
msg=message.encode('utf-8'),
digestmod=hashlib.sha256
).hexdigest()
```
### 3. Implementation Flow
#### Cloud Proxy Service (`apps/cloud`)
1. Extract `tenant_id` from authenticated user profile
2. Generate timestamp and canonical message
3. Sign message with shared secret
4. Add headers to request before forwarding to tenant instance
#### Tenant API Instances (`apps/api`)
1. Middleware validates headers on all incoming requests
2. Extract tenant_id, timestamp from headers
3. Verify timestamp is within acceptable window (5 minutes)
4. Recompute signature and compare in constant time
5. If valid, make tenant context available to Basic Memory tools
### 4. Security Properties
- **Authenticity**: Only services with shared secret can create valid signatures
- **Integrity**: Header tampering invalidates signature
- **Replay Protection**: Timestamp prevents reuse of old signatures
- **Non-repudiation**: Each request is cryptographically tied to specific tenant
### 5. Configuration
```bash
# Shared across Cloud Proxy and Tenant instances
BM_TENANT_HEADER_SECRET=randomly-generated-256-bit-secret
# Tenant API configuration
BM_TENANT_HEADER_VALIDATION=true # true (production) | false (dev only)
```
## How to Evaluate
### Unit Tests
- [ ] Header signing utility generates correct signatures
- [ ] Header validation correctly accepts/rejects signatures
- [ ] Timestamp validation within acceptable windows
- [ ] Constant-time signature comparison prevents timing attacks
### Integration Tests
- [ ] End-to-end request flow from MCP client → proxy → tenant
- [ ] Tenant isolation verified with signed headers
- [ ] Error handling for missing/invalid headers
- [ ] Disabled validation in development environment
### Security Validation
- [ ] Shared secret rotation procedure
- [ ] Header tampering detection
- [ ] Clock skew tolerance testing
- [ ] Performance impact measurement
### Production Readiness
- [ ] Logging and monitoring of header validation
- [ ] Graceful degradation for header validation failures
- [ ] Documentation for secret management
- [ ] Deployment configuration templates
## Implementation Notes
### Shared Secret Management
- Generate cryptographically secure 256-bit secret
- Same secret deployed to Cloud Proxy and all Tenant instances
- Consider secret rotation strategy for production
### Error Handling
```python
# Strict mode (production)
if not validate_headers(request):
raise HTTPException(status_code=401, detail="Invalid tenant headers")
# Fallback mode (development)
if not validate_headers(request):
logger.warning("Invalid headers, falling back to default tenant")
tenant_id = "default"
```
### Performance Considerations
- HMAC-SHA256 computation is fast (~microseconds)
- Headers add ~200 bytes to each request
- Validation happens once per request in middleware
## Benefits
✅ **Works with MCP dynamic client registration** - No dependency on JWT custom claims
✅ **Simple and reliable** - Standard HMAC signature approach
✅ **Secure by design** - Cryptographic authenticity and integrity
✅ **Infrastructure controlled** - No external service dependencies
✅ **Easy to implement** - Clear signature algorithm and validation
## Trade-offs
⚠️ **Shared secret management** - Need secure distribution and rotation
⚠️ **Clock synchronization** - Timestamp validation requires reasonably synced clocks
⚠️ **Header visibility** - Headers visible in logs (tenant_id not sensitive)
⚠️ **Additional complexity** - More moving parts in proxy forwarding
## Implementation Tasks
### Cloud Service (Header Signing)
- [ ] Create `utils/header_signing.py` with HMAC-SHA256 signing function
- [ ] Add `bm_tenant_header_secret` to Cloud service configuration
- [ ] Update `ProxyService.forward_request()` to call signing utility
- [ ] Add signed headers (X-BM-Tenant-ID, X-BM-Timestamp, X-BM-Signature)
### Tenant API (Header Validation)
- [ ] Create `utils/header_validation.py` with signature verification
- [ ] Add `bm_tenant_header_secret` to API service configuration
- [ ] Create `TenantHeaderValidationMiddleware` class
- [ ] Add middleware to FastAPI app (before other middleware)
- [ ] Skip validation for `/health` endpoint
- [ ] Store validated tenant_id in request.state
### Testing
- [ ] Unit test for header signing utility
- [ ] Unit test for header validation utility
- [ ] Integration test for proxy → tenant flow
- [ ] Test invalid/missing header handling
- [ ] Test timestamp window validation
- [ ] Test signature tampering detection
### Configuration & Deployment
- [ ] Update `.env.example` with BM_TENANT_HEADER_SECRET
- [ ] Generate secure 256-bit secret for production
- [ ] Update Fly.io secrets for both services
- [ ] Document secret rotation procedure
## Status
- [x] **Specification Complete** - Design finalized and documented
- [ ] **Implementation Started** - Header signing utility development
- [ ] **Cloud Proxy Updated** - ProxyService adds signed headers
- [ ] **Tenant Validation Added** - Middleware validates headers
- [ ] **Testing Complete** - All validation criteria met
- [ ] **Production Deployed** - Live with tenant isolation via headers
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/upload.py:
--------------------------------------------------------------------------------
```python
"""WebDAV upload functionality for basic-memory projects."""
import os
from pathlib import Path
import aiofiles
import httpx
from basic_memory.ignore_utils import load_gitignore_patterns, should_ignore_path
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.tools.utils import call_put
async def upload_path(
local_path: Path,
project_name: str,
verbose: bool = False,
use_gitignore: bool = True,
dry_run: bool = False,
) -> bool:
"""
Upload a file or directory to cloud project via WebDAV.
Args:
local_path: Path to local file or directory
project_name: Name of cloud project (destination)
verbose: Show detailed information about filtering and upload
use_gitignore: If False, skip .gitignore patterns (still use .bmignore)
dry_run: If True, show what would be uploaded without uploading
Returns:
True if upload succeeded, False otherwise
"""
try:
# Resolve path
local_path = local_path.resolve()
# Check if path exists
if not local_path.exists():
print(f"Error: Path does not exist: {local_path}")
return False
# Get files to upload
if local_path.is_file():
files_to_upload = [(local_path, local_path.name)]
if verbose:
print(f"Uploading single file: {local_path.name}")
else:
files_to_upload = _get_files_to_upload(local_path, verbose, use_gitignore)
if not files_to_upload:
print("No files found to upload")
if verbose:
print(
"\nTip: Use --verbose to see which files are being filtered, "
"or --no-gitignore to skip .gitignore patterns"
)
return True
print(f"Found {len(files_to_upload)} file(s) to upload")
# Calculate total size
total_bytes = sum(file_path.stat().st_size for file_path, _ in files_to_upload)
# If dry run, just show what would be uploaded
if dry_run:
print("\nFiles that would be uploaded:")
for file_path, relative_path in files_to_upload:
size = file_path.stat().st_size
if size < 1024:
size_str = f"{size} bytes"
elif size < 1024 * 1024:
size_str = f"{size / 1024:.1f} KB"
else:
size_str = f"{size / (1024 * 1024):.1f} MB"
print(f" {relative_path} ({size_str})")
else:
# Upload files using httpx
async with get_client() as client:
for i, (file_path, relative_path) in enumerate(files_to_upload, 1):
# Build remote path: /webdav/{project_name}/{relative_path}
remote_path = f"/webdav/{project_name}/{relative_path}"
print(f"Uploading {relative_path} ({i}/{len(files_to_upload)})")
# Get file modification time
file_stat = file_path.stat()
mtime = int(file_stat.st_mtime)
# Read file content asynchronously
async with aiofiles.open(file_path, "rb") as f:
content = await f.read()
# Upload via HTTP PUT to WebDAV endpoint with mtime header
# Using X-OC-Mtime (ownCloud/Nextcloud standard)
response = await call_put(
client, remote_path, content=content, headers={"X-OC-Mtime": str(mtime)}
)
response.raise_for_status()
# Format total size based on magnitude
if total_bytes < 1024:
size_str = f"{total_bytes} bytes"
elif total_bytes < 1024 * 1024:
size_str = f"{total_bytes / 1024:.1f} KB"
else:
size_str = f"{total_bytes / (1024 * 1024):.1f} MB"
if dry_run:
print(f"\nTotal: {len(files_to_upload)} file(s) ({size_str})")
else:
print(f"✓ Upload complete: {len(files_to_upload)} file(s) ({size_str})")
return True
except httpx.HTTPStatusError as e:
print(f"Upload failed: HTTP {e.response.status_code} - {e.response.text}")
return False
except Exception as e:
print(f"Upload failed: {e}")
return False
def _get_files_to_upload(
directory: Path, verbose: bool = False, use_gitignore: bool = True
) -> list[tuple[Path, str]]:
"""
Get list of files to upload from directory.
Uses .bmignore and optionally .gitignore patterns for filtering.
Args:
directory: Directory to scan
verbose: Show detailed filtering information
use_gitignore: If False, skip .gitignore patterns (still use .bmignore)
Returns:
List of (absolute_path, relative_path) tuples
"""
files = []
ignored_files = []
# Load ignore patterns from .bmignore and optionally .gitignore
ignore_patterns = load_gitignore_patterns(directory, use_gitignore=use_gitignore)
if verbose:
gitignore_path = directory / ".gitignore"
gitignore_exists = gitignore_path.exists() and use_gitignore
print(f"\nScanning directory: {directory}")
print("Using .bmignore: Yes")
print(f"Using .gitignore: {'Yes' if gitignore_exists else 'No'}")
print(f"Ignore patterns loaded: {len(ignore_patterns)}")
if ignore_patterns and len(ignore_patterns) <= 20:
print(f"Patterns: {', '.join(sorted(ignore_patterns))}")
print()
# Walk through directory
for root, dirs, filenames in os.walk(directory):
root_path = Path(root)
# Filter directories based on ignore patterns
filtered_dirs = []
for d in dirs:
dir_path = root_path / d
if should_ignore_path(dir_path, directory, ignore_patterns):
if verbose:
rel_path = dir_path.relative_to(directory)
print(f" [IGNORED DIR] {rel_path}/")
else:
filtered_dirs.append(d)
dirs[:] = filtered_dirs
# Process files
for filename in filenames:
file_path = root_path / filename
# Calculate relative path for display/remote
rel_path = file_path.relative_to(directory)
remote_path = str(rel_path).replace("\\", "/")
# Check if file should be ignored
if should_ignore_path(file_path, directory, ignore_patterns):
ignored_files.append(remote_path)
if verbose:
print(f" [IGNORED] {remote_path}")
continue
if verbose:
print(f" [INCLUDE] {remote_path}")
files.append((file_path, remote_path))
if verbose:
print("\nSummary:")
print(f" Files to upload: {len(files)}")
print(f" Files ignored: {len(ignored_files)}")
return files
```
--------------------------------------------------------------------------------
/test-int/mcp/test_build_context_validation.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for build_context memory URL validation."""
import pytest
from fastmcp import Client
@pytest.mark.asyncio
async def test_build_context_valid_urls(mcp_server, app, test_project):
"""Test that build_context works with valid memory URLs."""
async with Client(mcp_server) as client:
# Create a test note to ensure we have something to find
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "URL Validation Test",
"folder": "testing",
"content": "# URL Validation Test\n\nThis note tests URL validation.",
"tags": "test,validation",
},
)
# Test various valid URL formats
valid_urls = [
"memory://testing/url-validation-test", # Full memory URL
"testing/url-validation-test", # Relative path
"testing/*", # Pattern matching
]
for url in valid_urls:
result = await client.call_tool(
"build_context", {"project": test_project.name, "url": url}
)
# Should return a valid GraphContext response
assert len(result.content) == 1
response = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert '"results"' in response # Should contain results structure
assert '"metadata"' in response # Should contain metadata
@pytest.mark.asyncio
async def test_build_context_invalid_urls_fail_validation(mcp_server, app, test_project):
"""Test that build_context properly validates and rejects invalid memory URLs."""
async with Client(mcp_server) as client:
# Test cases: (invalid_url, expected_error_fragment)
invalid_test_cases = [
("memory//test", "double slashes"),
("invalid://test", "protocol scheme"),
("notes<brackets>", "invalid characters"),
('notes"quotes"', "invalid characters"),
]
for invalid_url, expected_error in invalid_test_cases:
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"build_context", {"project": test_project.name, "url": invalid_url}
)
error_message = str(exc_info.value).lower()
assert expected_error in error_message, (
f"URL '{invalid_url}' should fail with '{expected_error}' error"
)
@pytest.mark.asyncio
async def test_build_context_empty_urls_fail_validation(mcp_server, app, test_project):
"""Test that empty or whitespace-only URLs fail validation."""
async with Client(mcp_server) as client:
# These should fail validation
empty_urls = [
"", # Empty string
" ", # Whitespace only
]
for empty_url in empty_urls:
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"build_context", {"project": test_project.name, "url": empty_url}
)
error_message = str(exc_info.value)
# Should fail with validation error
assert (
"cannot be empty" in error_message
or "empty or whitespace" in error_message
or "value_error" in error_message
or "should be non-empty" in error_message
)
@pytest.mark.asyncio
async def test_build_context_nonexistent_urls_return_empty_results(mcp_server, app, test_project):
"""Test that valid but nonexistent URLs return empty results (not errors)."""
async with Client(mcp_server) as client:
# These are valid URL formats but don't exist in the system
nonexistent_valid_urls = [
"memory://nonexistent/note",
"nonexistent/note",
"missing/*",
]
for url in nonexistent_valid_urls:
result = await client.call_tool(
"build_context", {"project": test_project.name, "url": url}
)
# Should return valid response with empty results
assert len(result.content) == 1
response = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert '"results":[]' in response # Empty results
assert '"total_results":0' in response # Zero count
assert '"metadata"' in response # But should have metadata
@pytest.mark.asyncio
async def test_build_context_error_messages_are_helpful(mcp_server, app, test_project):
"""Test that validation error messages provide helpful guidance."""
async with Client(mcp_server) as client:
# Test double slash error message
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"build_context", {"project": test_project.name, "url": "memory//bad"}
)
error_msg = str(exc_info.value).lower()
# Should contain validation error info
assert (
"double slashes" in error_msg
or "value_error" in error_msg
or "validation error" in error_msg
)
# Test protocol scheme error message
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"build_context", {"project": test_project.name, "url": "http://example.com"}
)
error_msg = str(exc_info.value).lower()
assert (
"protocol scheme" in error_msg
or "protocol" in error_msg
or "value_error" in error_msg
or "validation error" in error_msg
)
@pytest.mark.asyncio
async def test_build_context_pattern_matching_works(mcp_server, app, test_project):
"""Test that valid pattern matching URLs work correctly."""
async with Client(mcp_server) as client:
# Create multiple test notes
test_notes = [
("Pattern Test One", "patterns", "# Pattern Test One\n\nFirst pattern test."),
("Pattern Test Two", "patterns", "# Pattern Test Two\n\nSecond pattern test."),
("Other Note", "other", "# Other Note\n\nNot a pattern match."),
]
for title, folder, content in test_notes:
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": title,
"folder": folder,
"content": content,
},
)
# Test pattern matching
result = await client.call_tool(
"build_context", {"project": test_project.name, "url": "patterns/*"}
)
assert len(result.content) == 1
response = result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Should find the pattern matches but not the other note
assert '"total_results":2' in response or '"primary_count":2' in response
assert "Pattern Test" in response
assert "Other Note" not in response
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_resource.py:
--------------------------------------------------------------------------------
```python
"""Tests for resource tools that exercise the full stack with SQLite."""
import io
import base64
from PIL import Image as PILImage
import pytest
from mcp.server.fastmcp.exceptions import ToolError
from basic_memory.mcp.tools import read_content, write_note
from basic_memory.mcp.tools.read_content import (
calculate_target_params,
resize_image,
optimize_image,
)
@pytest.mark.asyncio
async def test_read_file_text_file(app, synced_files, test_project):
"""Test reading a text file.
Should:
- Correctly identify text content
- Return the content as text
- Include correct metadata
"""
# First create a text file via notes
result = await write_note.fn(
project=test_project.name,
title="Text Resource",
folder="test",
content="This is a test text resource",
tags=["test", "resource"],
)
assert result is not None
# Now read it as a resource
response = await read_content.fn("test/text-resource", project=test_project.name)
assert response["type"] == "text"
assert "This is a test text resource" in response["text"]
assert response["content_type"].startswith("text/")
assert response["encoding"] == "utf-8"
@pytest.mark.asyncio
async def test_read_content_file_path(app, synced_files, test_project):
"""Test reading a text file.
Should:
- Correctly identify text content
- Return the content as text
- Include correct metadata
"""
# First create a text file via notes
result = await write_note.fn(
project=test_project.name,
title="Text Resource",
folder="test",
content="This is a test text resource",
tags=["test", "resource"],
)
assert result is not None
# Now read it as a resource
response = await read_content.fn("test/Text Resource.md", project=test_project.name)
assert response["type"] == "text"
assert "This is a test text resource" in response["text"]
assert response["content_type"].startswith("text/")
assert response["encoding"] == "utf-8"
@pytest.mark.asyncio
async def test_read_file_image_file(app, synced_files, test_project):
"""Test reading an image file.
Should:
- Correctly identify image content
- Optimize the image
- Return base64 encoded image data
"""
# Get the path to the synced image file
image_path = synced_files["image"].name
# Read it as a resource
response = await read_content.fn(image_path, project=test_project.name)
assert response["type"] == "image"
assert response["source"]["type"] == "base64"
assert response["source"]["media_type"] == "image/jpeg"
# Verify the image data is valid base64 that can be decoded
img_data = base64.b64decode(response["source"]["data"])
assert len(img_data) > 0
# Should be able to open as an image
img = PILImage.open(io.BytesIO(img_data))
assert img.width > 0
assert img.height > 0
@pytest.mark.asyncio
async def test_read_file_pdf_file(app, synced_files, test_project):
"""Test reading a PDF file.
Should:
- Correctly identify PDF content
- Return base64 encoded PDF data
"""
# Get the path to the synced PDF file
pdf_path = synced_files["pdf"].name
# Read it as a resource
response = await read_content.fn(pdf_path, project=test_project.name)
assert response["type"] == "document"
assert response["source"]["type"] == "base64"
assert response["source"]["media_type"] == "application/pdf"
# Verify the PDF data is valid base64 that can be decoded
pdf_data = base64.b64decode(response["source"]["data"])
assert len(pdf_data) > 0
assert pdf_data.startswith(b"%PDF") # PDF signature
@pytest.mark.asyncio
async def test_read_file_not_found(app, test_project):
"""Test trying to read a non-existent"""
with pytest.raises(ToolError, match="Resource not found"):
await read_content.fn("does-not-exist", project=test_project.name)
@pytest.mark.asyncio
async def test_read_file_memory_url(app, synced_files, test_project):
"""Test reading a resource using a memory:// URL."""
# Create a text file via notes
await write_note.fn(
project=test_project.name,
title="Memory URL Test",
folder="test",
content="Testing memory:// URL handling for resources",
)
# Read it with a memory:// URL
memory_url = "memory://test/memory-url-test"
response = await read_content.fn(memory_url, project=test_project.name)
assert response["type"] == "text"
assert "Testing memory:// URL handling for resources" in response["text"]
@pytest.mark.asyncio
async def test_image_optimization_functions(app):
"""Test the image optimization helper functions."""
# Create a test image
img = PILImage.new("RGB", (1000, 800), color="white")
# Test calculate_target_params function
# Small image
quality, size = calculate_target_params(100000)
assert quality == 70
assert size == 1000
# Medium image
quality, size = calculate_target_params(800000)
assert quality == 60
assert size == 800
# Large image
quality, size = calculate_target_params(2000000)
assert quality == 50
assert size == 600
# Test resize_image function
# Image that needs resizing
resized = resize_image(img, 500)
assert resized.width <= 500
assert resized.height <= 500
# Image that doesn't need resizing
small_img = PILImage.new("RGB", (300, 200), color="white")
resized = resize_image(small_img, 500)
assert resized.width == 300
assert resized.height == 200
# Test optimize_image function
img_bytes = io.BytesIO()
img.save(img_bytes, format="PNG")
img_bytes.seek(0)
content_length = len(img_bytes.getvalue())
# In a small test image, optimization might make the image larger
# because of JPEG overhead. Let's just test that it returns something
optimized = optimize_image(img, content_length)
assert len(optimized) > 0
@pytest.mark.asyncio
async def test_image_conversion(app, synced_files, test_project):
"""Test reading an image and verify conversion works.
Should:
- Handle image content correctly
- Return optimized image data
"""
# Use the synced image file that's already part of our test fixtures
image_path = synced_files["image"].name
# Test reading the resource
response = await read_content.fn(image_path, project=test_project.name)
assert response["type"] == "image"
assert response["source"]["media_type"] == "image/jpeg"
# Verify the image data is valid
img_data = base64.b64decode(response["source"]["data"])
img = PILImage.open(io.BytesIO(img_data))
assert img.width > 0
assert img.height > 0
assert img.mode == "RGB" # Should be in RGB mode
# Skip testing the large document size handling since it would require
# complex mocking of internal logic. We've already tested the happy path
# with the PDF file, and the error handling with our updated tool_utils tests.
# We have 100% coverage of this code in read_file.py according to the coverage report.
```
--------------------------------------------------------------------------------
/tests/services/test_entity_service_disable_permalinks.py:
--------------------------------------------------------------------------------
```python
"""Tests for EntityService with disable_permalinks flag."""
from textwrap import dedent
import pytest
import yaml
from basic_memory.config import BasicMemoryConfig
from basic_memory.schemas import Entity as EntitySchema
from basic_memory.services import FileService
from basic_memory.services.entity_service import EntityService
@pytest.mark.asyncio
async def test_create_entity_with_permalinks_disabled(
entity_repository,
observation_repository,
relation_repository,
entity_parser,
file_service: FileService,
link_resolver,
):
"""Test that entities created with disable_permalinks=True don't have permalinks."""
# Create entity service with permalinks disabled
app_config = BasicMemoryConfig(disable_permalinks=True)
entity_service = EntityService(
entity_parser=entity_parser,
entity_repository=entity_repository,
observation_repository=observation_repository,
relation_repository=relation_repository,
file_service=file_service,
link_resolver=link_resolver,
app_config=app_config,
)
entity_data = EntitySchema(
title="Test Entity",
folder="test",
entity_type="note",
content="Test content",
)
# Create entity
entity = await entity_service.create_entity(entity_data)
# Assert entity has no permalink
assert entity.permalink is None
# Verify file frontmatter doesn't contain permalink
file_path = file_service.get_entity_path(entity)
file_content, _ = await file_service.read_file(file_path)
_, frontmatter, doc_content = file_content.split("---", 2)
metadata = yaml.safe_load(frontmatter)
assert "permalink" not in metadata
assert metadata["title"] == "Test Entity"
assert metadata["type"] == "note"
@pytest.mark.asyncio
async def test_update_entity_with_permalinks_disabled(
entity_repository,
observation_repository,
relation_repository,
entity_parser,
file_service: FileService,
link_resolver,
):
"""Test that entities updated with disable_permalinks=True don't get permalinks added."""
# First create with permalinks enabled
app_config_enabled = BasicMemoryConfig(disable_permalinks=False)
entity_service_enabled = EntityService(
entity_parser=entity_parser,
entity_repository=entity_repository,
observation_repository=observation_repository,
relation_repository=relation_repository,
file_service=file_service,
link_resolver=link_resolver,
app_config=app_config_enabled,
)
entity_data = EntitySchema(
title="Test Entity",
folder="test",
entity_type="note",
content="Original content",
)
# Create entity with permalinks enabled
entity = await entity_service_enabled.create_entity(entity_data)
assert entity.permalink is not None
original_permalink = entity.permalink
# Now create service with permalinks disabled
app_config_disabled = BasicMemoryConfig(disable_permalinks=True)
entity_service_disabled = EntityService(
entity_parser=entity_parser,
entity_repository=entity_repository,
observation_repository=observation_repository,
relation_repository=relation_repository,
file_service=file_service,
link_resolver=link_resolver,
app_config=app_config_disabled,
)
# Update entity with permalinks disabled
entity_data.content = "Updated content"
updated = await entity_service_disabled.update_entity(entity, entity_data)
# Permalink should remain unchanged (not removed, just not updated)
assert updated.permalink == original_permalink
# Verify file still has the original permalink
file_path = file_service.get_entity_path(updated)
file_content, _ = await file_service.read_file(file_path)
assert "Updated content" in file_content
assert f"permalink: {original_permalink}" in file_content
@pytest.mark.asyncio
async def test_create_entity_with_content_frontmatter_permalinks_disabled(
entity_repository,
observation_repository,
relation_repository,
entity_parser,
file_service: FileService,
link_resolver,
):
"""Test that content frontmatter permalinks are ignored when disabled."""
# Create entity service with permalinks disabled
app_config = BasicMemoryConfig(disable_permalinks=True)
entity_service = EntityService(
entity_parser=entity_parser,
entity_repository=entity_repository,
observation_repository=observation_repository,
relation_repository=relation_repository,
file_service=file_service,
link_resolver=link_resolver,
app_config=app_config,
)
# Content with frontmatter containing permalink
content = dedent(
"""
---
permalink: custom-permalink
---
# Test Content
"""
).strip()
entity_data = EntitySchema(
title="Test Entity",
folder="test",
entity_type="note",
content=content,
)
# Create entity
entity = await entity_service.create_entity(entity_data)
# Entity should not have a permalink set
assert entity.permalink is None
# Verify file doesn't have permalink in frontmatter
file_path = file_service.get_entity_path(entity)
file_content, _ = await file_service.read_file(file_path)
_, frontmatter, doc_content = file_content.split("---", 2)
metadata = yaml.safe_load(frontmatter)
# The permalink from content frontmatter should not be present
assert "permalink" not in metadata
@pytest.mark.asyncio
async def test_move_entity_with_permalinks_disabled(
entity_repository,
observation_repository,
relation_repository,
entity_parser,
file_service: FileService,
link_resolver,
project_config,
):
"""Test that moving an entity with disable_permalinks=True doesn't update permalinks."""
# First create with permalinks enabled
app_config = BasicMemoryConfig(disable_permalinks=False, update_permalinks_on_move=True)
entity_service = EntityService(
entity_parser=entity_parser,
entity_repository=entity_repository,
observation_repository=observation_repository,
relation_repository=relation_repository,
file_service=file_service,
link_resolver=link_resolver,
app_config=app_config,
)
entity_data = EntitySchema(
title="Test Entity",
folder="test",
entity_type="note",
content="Test content",
)
# Create entity
entity = await entity_service.create_entity(entity_data)
original_permalink = entity.permalink
# Now disable permalinks
app_config_disabled = BasicMemoryConfig(disable_permalinks=True, update_permalinks_on_move=True)
# Move entity
moved = await entity_service.move_entity(
identifier=entity.permalink,
destination_path="new_folder/test_entity.md",
project_config=project_config,
app_config=app_config_disabled,
)
# Permalink should remain unchanged even though update_permalinks_on_move is True
assert moved.permalink == original_permalink
```
--------------------------------------------------------------------------------
/tests/db/test_issue_254_foreign_key_constraints.py:
--------------------------------------------------------------------------------
```python
"""Test to verify that issue #254 is fixed.
Issue #254: Foreign key constraint failures when deleting projects with related entities.
The issue was that when migration 647e7a75e2cd recreated the project table,
it did not re-establish the foreign key constraint from entity.project_id to project.id
with CASCADE DELETE, causing foreign key constraint failures when trying to delete
projects that have related entities.
Migration a1b2c3d4e5f6 was created to fix this by adding the missing foreign key
constraint with CASCADE DELETE behavior.
This test file verifies that the fix works correctly in production databases
that have had the migration applied.
"""
import tempfile
from datetime import datetime, timezone
from pathlib import Path
import pytest
from basic_memory.services.project_service import ProjectService
# @pytest.mark.skip(reason="Issue #254 not fully resolved yet - foreign key constraint errors still occur")
@pytest.mark.asyncio
async def test_issue_254_foreign_key_constraint_fix(project_service: ProjectService):
"""Test to verify issue #254 is fixed: project removal with foreign key constraints.
This test reproduces the exact scenario from issue #254:
1. Create a project
2. Create entities, observations, and relations linked to that project
3. Attempt to remove the project
4. Verify it succeeds without "FOREIGN KEY constraint failed" errors
5. Verify all related data is properly cleaned up via CASCADE DELETE
Once issue #254 is fully fixed, remove the @pytest.mark.skip decorator.
"""
test_project_name = "issue-254-verification"
with tempfile.TemporaryDirectory() as temp_dir:
test_root = Path(temp_dir)
test_project_path = str(test_root / "issue-254-verification")
# Step 1: Create test project
await project_service.add_project(test_project_name, test_project_path)
project = await project_service.get_project(test_project_name)
assert project is not None, "Project should be created successfully"
# Step 2: Create related entities that would cause foreign key constraint issues
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.repository.observation_repository import ObservationRepository
from basic_memory.repository.relation_repository import RelationRepository
entity_repo = EntityRepository(
project_service.repository.session_maker, project_id=project.id
)
obs_repo = ObservationRepository(
project_service.repository.session_maker, project_id=project.id
)
rel_repo = RelationRepository(
project_service.repository.session_maker, project_id=project.id
)
# Create entity
entity_data = {
"title": "Issue 254 Test Entity",
"entity_type": "note",
"content_type": "text/markdown",
"project_id": project.id,
"permalink": "issue-254-entity",
"file_path": "issue-254-entity.md",
"checksum": "issue254test",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
entity = await entity_repo.create(entity_data)
# Create observation linked to entity
observation_data = {
"entity_id": entity.id,
"content": "This observation should be cascade deleted",
"category": "test",
}
observation = await obs_repo.create(observation_data)
# Create relation involving the entity
relation_data = {
"from_id": entity.id,
"to_name": "some-other-entity",
"relation_type": "relates-to",
}
relation = await rel_repo.create(relation_data)
# Step 3: Attempt to remove the project
# This is where issue #254 manifested - should NOT raise "FOREIGN KEY constraint failed"
try:
await project_service.remove_project(test_project_name)
except Exception as e:
if "FOREIGN KEY constraint failed" in str(e):
pytest.fail(
f"Issue #254 not fixed - foreign key constraint error still occurs: {e}. "
f"The migration a1b2c3d4e5f6 may not have been applied correctly or "
f"the CASCADE DELETE constraint is not working as expected."
)
else:
# Re-raise unexpected errors
raise
# Step 4: Verify project was successfully removed
removed_project = await project_service.get_project(test_project_name)
assert removed_project is None, "Project should have been removed"
# Step 5: Verify related data was cascade deleted
remaining_entity = await entity_repo.find_by_id(entity.id)
assert remaining_entity is None, "Entity should have been cascade deleted"
remaining_observation = await obs_repo.find_by_id(observation.id)
assert remaining_observation is None, "Observation should have been cascade deleted"
remaining_relation = await rel_repo.find_by_id(relation.id)
assert remaining_relation is None, "Relation should have been cascade deleted"
@pytest.mark.asyncio
async def test_issue_254_reproduction(project_service: ProjectService):
"""Test that reproduces issue #254 to document the current state.
This test demonstrates the current behavior and will fail until the issue is fixed.
It serves as documentation of what the problem was.
"""
test_project_name = "issue-254-reproduction"
with tempfile.TemporaryDirectory() as temp_dir:
test_root = Path(temp_dir)
test_project_path = str(test_root / "issue-254-reproduction")
# Create project and entity
await project_service.add_project(test_project_name, test_project_path)
project = await project_service.get_project(test_project_name)
from basic_memory.repository.entity_repository import EntityRepository
entity_repo = EntityRepository(
project_service.repository.session_maker, project_id=project.id
)
entity_data = {
"title": "Reproduction Entity",
"entity_type": "note",
"content_type": "text/markdown",
"project_id": project.id,
"permalink": "reproduction-entity",
"file_path": "reproduction-entity.md",
"checksum": "repro123",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
await entity_repo.create(entity_data)
# This should eventually work without errors once issue #254 is fixed
# with pytest.raises(Exception) as exc_info:
await project_service.remove_project(test_project_name)
# Document the current error for tracking
# error_message = str(exc_info.value)
# assert any(keyword in error_message for keyword in [
# "FOREIGN KEY constraint failed",
# "constraint",
# "integrity"
# ]), f"Expected foreign key or integrity constraint error, got: {error_message}"
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/resources/ai_assistant_guide.md:
--------------------------------------------------------------------------------
```markdown
# AI Assistant Guide for Basic Memory
Quick reference for using Basic Memory tools effectively through MCP.
**For comprehensive coverage**: See the [Extended AI Assistant Guide](https://github.com/basicmachines-co/basic-memory/blob/main/docs/ai-assistant-guide-extended.md) with detailed examples, advanced patterns, and self-contained sections.
## Overview
Basic Memory creates a semantic knowledge graph from markdown files. Focus on building rich connections between notes.
- **Local-First**: Plain text files on user's computer
- **Persistent**: Knowledge survives across sessions
- **Semantic**: Observations and relations create a knowledge graph
**Your role**: You're helping humans build enduring knowledge they'll own forever. The semantic graph (observations, relations, context) helps you provide better assistance by understanding connections and maintaining continuity. Think: lasting insights worth keeping, not disposable chat logs.
## Project Management
All tools require explicit project specification.
**Three-tier resolution:**
1. CLI constraint: `--project name` (highest priority)
2. Explicit parameter: `project="name"` in tool calls
3. Default mode: `default_project_mode=true` in config (fallback)
### Quick Setup Check
```python
# Discover projects
projects = await list_memory_projects()
# Check if default_project_mode enabled
# If yes: project parameter optional
# If no: project parameter required
```
### Default Project Mode
When `default_project_mode=true`:
```python
# These are equivalent:
await write_note("Note", "Content", "folder")
await write_note("Note", "Content", "folder", project="main")
```
When `default_project_mode=false` (default):
```python
# Project required:
await write_note("Note", "Content", "folder", project="main") # ✓
await write_note("Note", "Content", "folder") # ✗ Error
```
## Core Tools
### Writing Knowledge
```python
await write_note(
title="Topic",
content="# Topic\n## Observations\n- [category] fact\n## Relations\n- relates_to [[Other]]",
folder="notes",
project="main" # Required unless default_project_mode=true
)
```
### Reading Knowledge
```python
# By identifier
content = await read_note("Topic", project="main")
# By memory:// URL
content = await read_note("memory://folder/topic", project="main")
```
### Searching
```python
results = await search_notes(
query="authentication",
project="main",
page_size=10
)
```
### Building Context
```python
context = await build_context(
url="memory://specs/auth",
project="main",
depth=2,
timeframe="1 week"
)
```
## Knowledge Graph Essentials
### Observations
Categorized facts with optional tags:
```markdown
- [decision] Use JWT for authentication #security
- [technique] Hash passwords with bcrypt #best-practice
- [requirement] Support OAuth 2.0 providers
```
### Relations
Directional links between entities:
```markdown
- implements [[Authentication Spec]]
- requires [[User Database]]
- extends [[Base Security Model]]
```
**Common relation types:** `relates_to`, `implements`, `requires`, `extends`, `part_of`, `contrasts_with`
### Forward References
Reference entities that don't exist yet:
```python
# Create note with forward reference
await write_note(
title="Login Flow",
content="## Relations\n- requires [[OAuth Provider]]", # Doesn't exist yet
folder="auth",
project="main"
)
# Later, create referenced entity
await write_note(
title="OAuth Provider",
content="# OAuth Provider\n...",
folder="auth",
project="main"
)
# → Relation automatically resolved
```
## Best Practices
### 1. Project Management
**Single-project users:**
- Enable `default_project_mode=true`
- Simpler tool calls
**Multi-project users:**
- Keep `default_project_mode=false`
- Always specify project explicitly
**Discovery:**
```python
# Start with discovery
projects = await list_memory_projects()
# Cross-project activity (no project param = all projects)
activity = await recent_activity()
# Or specific project
activity = await recent_activity(project="main")
```
### 2. Building Rich Graphs
**Always include:**
- 3-5 observations per note
- 2-3 relations per note
- Meaningful categories and relation types
**Search before creating:**
```python
# Find existing entities to reference
results = await search_notes(query="authentication", project="main")
# Use exact titles in [[WikiLinks]]
```
### 3. Writing Effective Notes
**Structure:**
```markdown
# Title
## Context
Background information
## Observations
- [category] Fact with #tags
- [category] Another fact
## Relations
- relation_type [[Exact Entity Title]]
```
**Categories:** `[idea]`, `[decision]`, `[fact]`, `[technique]`, `[requirement]`
### 4. Error Handling
**Missing project:**
```python
try:
await search_notes(query="test") # Missing project parameter - will error
except:
# Show available projects
projects = await list_memory_projects()
# Then retry with project
results = await search_notes(query="test", project=projects[0].name)
```
**Forward references:**
```python
# Check response for unresolved relations
response = await write_note(
title="New Topic",
content="## Relations\n- relates_to [[Future Topic]]",
folder="notes",
project="main"
)
# Forward refs will resolve when target created
```
### 5. Recording Context
**Ask permission:**
> "Would you like me to save our discussion about [topic] to Basic Memory?"
**Confirm when done:**
> "I've saved our discussion to Basic Memory."
**What to record:**
- Decisions and rationales
- Important discoveries
- Action items and plans
- Connected topics
## Common Patterns
### Capture Decision
```python
await write_note(
title="DB Choice",
content="""# DB Choice\n## Decision\nUse PostgreSQL\n## Observations\n- [requirement] ACID compliance #reliability\n- [decision] PostgreSQL over MySQL\n## Relations\n- implements [[Data Architecture]]""",
folder="decisions",
project="main"
)
```
### Link Topics & Build Context
```python
# Link bidirectionally
await write_note(title="API Auth", content="## Relations\n- part_of [[API Design]]", folder="api", project="main")
await edit_note(identifier="API Design", operation="append", content="\n- includes [[API Auth]]", project="main")
# Search and build context
results = await search_notes(query="authentication", project="main")
context = await build_context(url=f"memory://{results[0].permalink}", project="main", depth=2)
```
## Tool Quick Reference
| Tool | Purpose | Key Params |
|------|---------|------------|
| `write_note` | Create/update | title, content, folder, project |
| `read_note` | Read content | identifier, project |
| `edit_note` | Modify existing | identifier, operation, content, project |
| `search_notes` | Find notes | query, project |
| `build_context` | Graph traversal | url, depth, project |
| `recent_activity` | Recent changes | timeframe, project |
| `list_memory_projects` | Show projects | (none) |
## memory:// URL Format
- `memory://title` - By title
- `memory://folder/title` - By folder + title
- `memory://permalink` - By permalink
- `memory://folder/*` - All in folder
For full documentation: https://docs.basicmemory.com
Built with ♥️ by Basic Machines
```
--------------------------------------------------------------------------------
/docs/character-handling.md:
--------------------------------------------------------------------------------
```markdown
# Character Handling and Conflict Resolution
Basic Memory handles various character encoding scenarios and file naming conventions to provide consistent permalink generation and conflict resolution. This document explains how the system works and how to resolve common character-related issues.
## Overview
Basic Memory uses a sophisticated system to generate permalinks from file paths while maintaining consistency across different operating systems and character encodings. The system normalizes file paths and generates unique permalinks to prevent conflicts.
## Character Normalization Rules
### 1. Permalink Generation
When Basic Memory processes a file path, it applies these normalization rules:
```
Original: "Finance/My Investment Strategy.md"
Permalink: "finance/my-investment-strategy"
```
**Transformation process:**
1. Remove file extension (`.md`)
2. Convert to lowercase (case-insensitive)
3. Replace spaces with hyphens
4. Replace underscores with hyphens
5. Handle international characters (transliteration for Latin, preservation for non-Latin)
6. Convert camelCase to kebab-case
### 2. International Character Support
**Latin characters with diacritics** are transliterated:
- `ø` → `o` (Søren → soren)
- `ü` → `u` (Müller → muller)
- `é` → `e` (Café → cafe)
- `ñ` → `n` (Niño → nino)
**Non-Latin characters** are preserved:
- Chinese: `中文/测试文档.md` → `中文/测试文档`
- Japanese: `日本語/文書.md` → `日本語/文書`
## Common Conflict Scenarios
### 1. Hyphen vs Space Conflicts
**Problem:** Files with existing hyphens conflict with generated permalinks from spaces.
**Example:**
```
File 1: "basic memory bug.md" → permalink: "basic-memory-bug"
File 2: "basic-memory-bug.md" → permalink: "basic-memory-bug" (CONFLICT!)
```
**Resolution:** The system automatically resolves this by adding suffixes:
```
File 1: "basic memory bug.md" → permalink: "basic-memory-bug"
File 2: "basic-memory-bug.md" → permalink: "basic-memory-bug-1"
```
**Best Practice:** Choose consistent naming conventions within your project.
### 2. Case Sensitivity Conflicts
**Problem:** Different case variations that normalize to the same permalink.
**Example on macOS:**
```
Directory: Finance/investment.md
Directory: finance/investment.md (different on filesystem, same permalink)
```
**Resolution:** Basic Memory detects case conflicts and prevents them during sync operations with helpful error messages.
**Best Practice:** Use consistent casing for directory and file names.
### 3. Character Encoding Conflicts
**Problem:** Different Unicode normalizations of the same logical character.
**Example:**
```
File 1: "café.md" (é as single character)
File 2: "café.md" (e + combining accent)
```
**Resolution:** Basic Memory normalizes Unicode characters using NFD normalization to detect these conflicts.
### 4. Forward Slash Conflicts
**Problem:** Forward slashes in frontmatter or file names interpreted as path separators.
**Example:**
```yaml
---
permalink: finance/investment/strategy
---
```
**Resolution:** Basic Memory validates frontmatter permalinks and warns about path separator conflicts.
## Error Messages and Troubleshooting
### "UNIQUE constraint failed: entity.file_path, entity.project_id"
**Cause:** Two entities trying to use the same file path within a project.
**Common scenarios:**
1. File move operation where destination is already occupied
2. Case sensitivity differences on macOS
3. Character encoding conflicts
4. Concurrent file operations
**Resolution steps:**
1. Check for duplicate file names with different cases
2. Look for files with similar names but different character encodings
3. Rename conflicting files to have unique names
4. Run sync again after resolving conflicts
### "File path conflict detected during move"
**Cause:** Enhanced conflict detection preventing potential database integrity violations.
**What this means:** The system detected that moving a file would create a conflict before attempting the database operation.
**Resolution:** Follow the specific guidance in the error message, which will indicate the type of conflict detected.
## Best Practices
### 1. File Naming Conventions
**Recommended patterns:**
- Use consistent casing (prefer lowercase)
- Use hyphens instead of spaces for multi-word files
- Avoid special characters that could conflict with path separators
- Be consistent with directory structure casing
**Examples:**
```
✅ Good:
- finance/investment-strategy.md
- projects/basic-memory-features.md
- docs/api-reference.md
❌ Problematic:
- Finance/Investment Strategy.md (mixed case, spaces)
- finance/Investment Strategy.md (inconsistent case)
- docs/API/Reference.md (mixed case directories)
```
### 2. Permalink Management
**Custom permalinks in frontmatter:**
```yaml
---
type: knowledge
permalink: custom-permalink-name
---
```
**Guidelines:**
- Use lowercase permalinks
- Use hyphens for word separation
- Avoid path separators unless creating sub-paths
- Ensure uniqueness within your project
### 3. Directory Structure
**Consistent casing:**
```
✅ Good:
finance/
investment-strategies.md
portfolio-management.md
❌ Problematic:
Finance/ (capital F)
investment-strategies.md
finance/ (lowercase f)
portfolio-management.md
```
## Migration and Cleanup
### Identifying Conflicts
Use Basic Memory's built-in conflict detection:
```bash
# Sync will report conflicts
basic-memory sync
# Check sync status for warnings
basic-memory status
```
### Resolving Existing Conflicts
1. **Identify conflicting files** from sync error messages
2. **Choose consistent naming convention** for your project
3. **Rename files** to follow the convention
4. **Re-run sync** to verify resolution
### Bulk Renaming Strategy
For projects with many conflicts:
1. **Backup your project** before making changes
2. **Standardize on lowercase** file and directory names
3. **Replace spaces with hyphens** in file names
4. **Use consistent character encoding** (UTF-8)
5. **Test sync after each batch** of changes
## System Enhancements
### Recent Improvements (v0.13+)
1. **Enhanced conflict detection** before database operations
2. **Improved error messages** with specific resolution guidance
3. **Character normalization utilities** for consistent handling
4. **File swap detection** for complex move scenarios
5. **Proactive conflict warnings** during permalink resolution
### Monitoring and Logging
The system now provides detailed logging for conflict resolution:
```
DEBUG: Detected potential file path conflicts for 'Finance/Investment.md': ['finance/investment.md']
WARNING: File path conflict detected during move: entity_id=123 trying to move from 'old.md' to 'new.md'
```
These logs help identify and resolve conflicts before they cause sync failures.
## Support and Resources
If you encounter character-related conflicts not covered in this guide:
1. **Check the logs** for specific conflict details
2. **Review error messages** for resolution guidance
3. **Report issues** with examples of the conflicting files
4. **Consider the file naming best practices** outlined above
The Basic Memory system is designed to handle most character conflicts automatically while providing clear guidance for manual resolution when needed.
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/rclone_installer.py:
--------------------------------------------------------------------------------
```python
"""Cross-platform rclone installation utilities."""
import platform
import shutil
import subprocess
from typing import Optional
from rich.console import Console
console = Console()
class RcloneInstallError(Exception):
"""Exception raised for rclone installation errors."""
pass
def is_rclone_installed() -> bool:
"""Check if rclone is already installed and available in PATH."""
return shutil.which("rclone") is not None
def get_platform() -> str:
"""Get the current platform identifier."""
system = platform.system().lower()
if system == "darwin":
return "macos"
elif system == "linux":
return "linux"
elif system == "windows":
return "windows"
else:
raise RcloneInstallError(f"Unsupported platform: {system}")
def run_command(command: list[str], check: bool = True) -> subprocess.CompletedProcess:
"""Run a command with proper error handling."""
try:
console.print(f"[dim]Running: {' '.join(command)}[/dim]")
result = subprocess.run(command, capture_output=True, text=True, check=check)
if result.stdout:
console.print(f"[dim]Output: {result.stdout.strip()}[/dim]")
return result
except subprocess.CalledProcessError as e:
console.print(f"[red]Command failed: {e}[/red]")
if e.stderr:
console.print(f"[red]Error output: {e.stderr}[/red]")
raise RcloneInstallError(f"Command failed: {e}") from e
except FileNotFoundError as e:
raise RcloneInstallError(f"Command not found: {' '.join(command)}") from e
def install_rclone_macos() -> None:
"""Install rclone on macOS using Homebrew or official script."""
# Try Homebrew first
if shutil.which("brew"):
try:
console.print("[blue]Installing rclone via Homebrew...[/blue]")
run_command(["brew", "install", "rclone"])
console.print("[green]✓ rclone installed via Homebrew[/green]")
return
except RcloneInstallError:
console.print(
"[yellow]Homebrew installation failed, trying official script...[/yellow]"
)
# Fallback to official script
console.print("[blue]Installing rclone via official script...[/blue]")
try:
run_command(["sh", "-c", "curl https://rclone.org/install.sh | sudo bash"])
console.print("[green]✓ rclone installed via official script[/green]")
except RcloneInstallError:
raise RcloneInstallError(
"Failed to install rclone. Please install manually: brew install rclone"
)
def install_rclone_linux() -> None:
"""Install rclone on Linux using package managers or official script."""
# Try snap first (most universal)
if shutil.which("snap"):
try:
console.print("[blue]Installing rclone via snap...[/blue]")
run_command(["sudo", "snap", "install", "rclone"])
console.print("[green]✓ rclone installed via snap[/green]")
return
except RcloneInstallError:
console.print("[yellow]Snap installation failed, trying apt...[/yellow]")
# Try apt (Debian/Ubuntu)
if shutil.which("apt"):
try:
console.print("[blue]Installing rclone via apt...[/blue]")
run_command(["sudo", "apt", "update"])
run_command(["sudo", "apt", "install", "-y", "rclone"])
console.print("[green]✓ rclone installed via apt[/green]")
return
except RcloneInstallError:
console.print("[yellow]apt installation failed, trying official script...[/yellow]")
# Fallback to official script
console.print("[blue]Installing rclone via official script...[/blue]")
try:
run_command(["sh", "-c", "curl https://rclone.org/install.sh | sudo bash"])
console.print("[green]✓ rclone installed via official script[/green]")
except RcloneInstallError:
raise RcloneInstallError(
"Failed to install rclone. Please install manually: sudo snap install rclone"
)
def install_rclone_windows() -> None:
"""Install rclone on Windows using package managers."""
# Try winget first (built into Windows 10+)
if shutil.which("winget"):
try:
console.print("[blue]Installing rclone via winget...[/blue]")
run_command(["winget", "install", "Rclone.Rclone"])
console.print("[green]✓ rclone installed via winget[/green]")
return
except RcloneInstallError:
console.print("[yellow]winget installation failed, trying chocolatey...[/yellow]")
# Try chocolatey
if shutil.which("choco"):
try:
console.print("[blue]Installing rclone via chocolatey...[/blue]")
run_command(["choco", "install", "rclone", "-y"])
console.print("[green]✓ rclone installed via chocolatey[/green]")
return
except RcloneInstallError:
console.print("[yellow]chocolatey installation failed, trying scoop...[/yellow]")
# Try scoop
if shutil.which("scoop"):
try:
console.print("[blue]Installing rclone via scoop...[/blue]")
run_command(["scoop", "install", "rclone"])
console.print("[green]✓ rclone installed via scoop[/green]")
return
except RcloneInstallError:
console.print("[yellow]scoop installation failed[/yellow]")
# No package manager available
raise RcloneInstallError(
"Could not install rclone automatically. Please install a package manager "
"(winget, chocolatey, or scoop) or install rclone manually from https://rclone.org/downloads/"
)
def install_rclone(platform_override: Optional[str] = None) -> None:
"""Install rclone for the current platform."""
if is_rclone_installed():
console.print("[green]rclone is already installed[/green]")
return
platform_name = platform_override or get_platform()
console.print(f"[blue]Installing rclone for {platform_name}...[/blue]")
try:
if platform_name == "macos":
install_rclone_macos()
elif platform_name == "linux":
install_rclone_linux()
elif platform_name == "windows":
install_rclone_windows()
else:
raise RcloneInstallError(f"Unsupported platform: {platform_name}")
# Verify installation
if not is_rclone_installed():
raise RcloneInstallError("rclone installation completed but command not found in PATH")
console.print("[green]✓ rclone installation completed successfully[/green]")
except RcloneInstallError:
raise
except Exception as e:
raise RcloneInstallError(f"Unexpected error during installation: {e}") from e
def get_rclone_version() -> Optional[str]:
"""Get the installed rclone version."""
if not is_rclone_installed():
return None
try:
result = run_command(["rclone", "version"], check=False)
if result.returncode == 0:
# Parse version from output (format: "rclone v1.64.0")
lines = result.stdout.strip().split("\n")
for line in lines:
if line.startswith("rclone v"):
return line.split()[1]
return "unknown"
except Exception:
return "unknown"
```
--------------------------------------------------------------------------------
/tests/markdown/test_markdown_plugins.py:
--------------------------------------------------------------------------------
```python
"""Tests for markdown plugins."""
from textwrap import dedent
from markdown_it import MarkdownIt
from markdown_it.token import Token
from basic_memory.markdown.plugins import (
observation_plugin,
relation_plugin,
is_observation,
is_explicit_relation,
parse_relation,
parse_inline_relations,
)
def test_observation_plugin():
"""Test observation plugin."""
# Set up markdown-it instance
md = MarkdownIt().use(observation_plugin)
# Test basic observation with all features
content = dedent("""
- [design] Basic observation #tag1 #tag2 (with context)
""")
tokens = md.parse(content)
token = [t for t in tokens if t.type == "inline"][0]
assert "observation" in token.meta
obs = token.meta["observation"]
assert obs["category"] == "design"
assert obs["content"] == "Basic observation #tag1 #tag2"
assert set(obs["tags"]) == {"tag1", "tag2"}
assert obs["context"] == "with context"
# Test without category
content = "- Basic observation #tag1 (context)"
token = [t for t in md.parse(content) if t.type == "inline"][0]
obs = token.meta["observation"]
assert obs["category"] is None
assert obs["content"] == "Basic observation #tag1"
assert obs["tags"] == ["tag1"]
assert obs["context"] == "context"
# Test without tags
content = "- [note] Basic observation (context)"
token = [t for t in md.parse(content) if t.type == "inline"][0]
obs = token.meta["observation"]
assert obs["category"] == "note"
assert obs["content"] == "Basic observation"
assert obs["tags"] is None
assert obs["context"] == "context"
def test_observation_edge_cases():
"""Test observation parsing edge cases."""
# Test non-inline token
token = Token("paragraph", "", 0)
assert not is_observation(token)
# Test empty content
token = Token("inline", "", 0)
assert not is_observation(token)
# Test markdown task
token = Token("inline", "[ ] Task item", 0)
assert not is_observation(token)
# Test completed task
token = Token("inline", "[x] Done task", 0)
assert not is_observation(token)
# Test in-progress task
token = Token("inline", "[-] Ongoing task", 0)
assert not is_observation(token)
def test_observation_excludes_markdown_and_wiki_links():
"""Test that markdown links and wiki links are NOT parsed as observations.
This test validates the fix for issue #247 where:
- [text](url) markdown links were incorrectly parsed as observations
- [[text]] wiki links were incorrectly parsed as observations
"""
# Test markdown links are NOT observations
token = Token("inline", "[Click here](https://example.com)", 0)
assert not is_observation(token), "Markdown links should not be parsed as observations"
token = Token("inline", "[Documentation](./docs/readme.md)", 0)
assert not is_observation(token), "Relative markdown links should not be parsed as observations"
token = Token("inline", "[Empty link]()", 0)
assert not is_observation(token), "Empty markdown links should not be parsed as observations"
# Test wiki links are NOT observations
token = Token("inline", "[[SomeWikiPage]]", 0)
assert not is_observation(token), "Wiki links should not be parsed as observations"
token = Token("inline", "[[Multi Word Page]]", 0)
assert not is_observation(token), "Multi-word wiki links should not be parsed as observations"
# Test nested brackets are NOT observations
token = Token("inline", "[[Nested [[Inner]] Link]]", 0)
assert not is_observation(token), "Nested wiki links should not be parsed as observations"
# Test valid observations still work (should return True)
token = Token("inline", "[category] This is a valid observation", 0)
assert is_observation(token), "Valid observations should still be parsed correctly"
token = Token("inline", "[design] Valid observation #tag", 0)
assert is_observation(token), "Valid observations with tags should still work"
token = Token("inline", "Just some text #tag", 0)
assert is_observation(token), "Tag-only observations should still work"
# Test edge cases that should NOT be observations
token = Token("inline", "[]Empty brackets", 0)
assert not is_observation(token), "Empty category brackets should not be observations"
token = Token("inline", "[category]No space after category", 0)
assert not is_observation(token), "No space after category should not be valid observation"
def test_relation_plugin():
"""Test relation plugin."""
md = MarkdownIt().use(relation_plugin)
# Test explicit relation with all features
content = dedent("""
- implements [[Component]] (with context)
""")
tokens = md.parse(content)
token = [t for t in tokens if t.type == "inline"][0]
assert "relations" in token.meta
rel = token.meta["relations"][0]
assert rel["type"] == "implements"
assert rel["target"] == "Component"
assert rel["context"] == "with context"
# Test implicit relations in text
content = "Some text with a [[Link]] and [[Another Link]]"
token = [t for t in md.parse(content) if t.type == "inline"][0]
rels = token.meta["relations"]
assert len(rels) == 2
assert rels[0]["type"] == "links to"
assert rels[0]["target"] == "Link"
assert rels[1]["target"] == "Another Link"
def test_relation_edge_cases():
"""Test relation parsing edge cases."""
# Test non-inline token
token = Token("paragraph", "", 0)
assert not is_explicit_relation(token)
# Test empty content
token = Token("inline", "", 0)
assert not is_explicit_relation(token)
# Test incomplete relation (missing target)
token = Token("inline", "relates_to [[]]", 0)
result = parse_relation(token)
assert result is None
# Test non-relation content
token = Token("inline", "Just some text", 0)
result = parse_relation(token)
assert result is None
# Test invalid inline link (empty target)
assert not parse_inline_relations("Text with [[]] empty link")
# Test nested links (avoid duplicates)
result = parse_inline_relations("Text with [[Outer [[Inner]] Link]]")
assert len(result) == 1
assert result[0]["target"] == "Outer [[Inner]] Link"
def test_combined_plugins():
"""Test both plugins working together."""
md = MarkdownIt().use(observation_plugin).use(relation_plugin)
content = dedent("""
# Section
- [design] Observation with [[Link]] #tag (context)
- implements [[Component]] (details)
- Just a [[Reference]] in text
Some text with a [[Link]] reference.
""")
tokens = md.parse(content)
inline_tokens = [t for t in tokens if t.type == "inline"]
# First token has both observation and relation
obs_token = inline_tokens[1]
assert "observation" in obs_token.meta
assert "relations" in obs_token.meta
# Second token has explicit relation
rel_token = inline_tokens[2]
assert "relations" in rel_token.meta
rel = rel_token.meta["relations"][0]
assert rel["type"] == "implements"
# Third token has implicit relation
text_token = inline_tokens[4]
assert "relations" in text_token.meta
link = text_token.meta["relations"][0]
assert link["type"] == "links to"
```
--------------------------------------------------------------------------------
/.claude/agents/python-developer.md:
--------------------------------------------------------------------------------
```markdown
---
name: python-developer
description: Python backend developer specializing in FastAPI, DBOS workflows, and API implementation. Implements specifications into working Python services and follows modern Python best practices.
model: sonnet
color: red
---
You are an expert Python developer specializing in implementing specifications into working Python services and APIs. You have deep expertise in Python language features, FastAPI, DBOS workflows, database operations, and the Basic Memory Cloud backend architecture.
**Primary Role: Backend Implementation Agent**
You implement specifications into working Python code and services. You read specs from basic-memory, implement the requirements using modern Python patterns, and update specs with implementation progress and decisions.
**Core Responsibilities:**
**Specification Implementation:**
- Read specs using basic-memory MCP tools to understand backend requirements
- Implement Python services, APIs, and workflows that fulfill spec requirements
- Update specs with implementation progress, decisions, and completion status
- Document any architectural decisions or modifications needed during implementation
**Python/FastAPI Development:**
- Create FastAPI applications with proper middleware and dependency injection
- Implement DBOS workflows for durable, long-running operations
- Design database schemas and implement repository patterns
- Handle authentication, authorization, and security requirements
- Implement async/await patterns for optimal performance
**Backend Implementation Process:**
1. **Read Spec**: Use `mcp__basic-memory__read_note` to get spec requirements
2. **Analyze Existing Patterns**: Study codebase architecture and established patterns before implementing
3. **Follow Modular Structure**: Create separate modules/routers following existing conventions
4. **Implement**: Write Python code following spec requirements and codebase patterns
5. **Test**: Create tests that validate spec success criteria
6. **Update Spec**: Document completion and any implementation decisions
7. **Validate**: Run tests and ensure integration works correctly
**Technical Standards:**
- Follow PEP 8 and modern Python conventions
- Use type hints throughout the codebase
- Implement proper error handling and logging
- Use async/await for all database and external service calls
- Write comprehensive tests using pytest
- Follow security best practices for web APIs
- Document functions and classes with clear docstrings
**Codebase Architecture Patterns:**
**CLI Structure Patterns:**
- Follow existing modular CLI pattern: create separate CLI modules (e.g., `upload_cli.py`) instead of adding commands directly to `main.py`
- Existing examples: `polar_cli.py`, `tenant_cli.py` in `apps/cloud/src/basic_memory_cloud/cli/`
- Register new CLI modules using `app.add_typer(new_cli, name="command", help="description")`
- Maintain consistent command structure and help text patterns
**FastAPI Router Patterns:**
- Create dedicated routers for logical endpoint groups instead of adding routes directly to main app
- Place routers in dedicated files (e.g., `apps/api/src/basic_memory_cloud_api/routers/webdav_router.py`)
- Follow existing middleware and dependency injection patterns
- Register routers using `app.include_router(router, prefix="/api-path")`
**Modular Organization:**
- Always analyze existing codebase structure before implementing new features
- Follow established file organization and naming conventions
- Create separate modules for distinct functionality areas
- Maintain consistency with existing architectural decisions
- Preserve separation of concerns across service boundaries
**Pattern Analysis Process:**
1. Examine similar existing functionality in the codebase
2. Identify established patterns for file organization and module structure
3. Follow the same architectural approach for consistency
4. Create new modules/routers following existing conventions
5. Integrate new code using established registration patterns
**Basic Memory Cloud Expertise:**
**FastAPI Service Patterns:**
- Multi-app architecture (Cloud, MCP, API services)
- Shared middleware for JWT validation, CORS, logging
- Dependency injection for services and repositories
- Proper async request handling and error responses
**DBOS Workflow Implementation:**
- Durable workflows for tenant provisioning and infrastructure operations
- Service layer pattern with repository data access
- Event sourcing for audit trails and business processes
- Idempotent operations with proper error handling
**Database & Repository Patterns:**
- SQLAlchemy with async patterns
- Repository pattern for data access abstraction
- Database migration strategies
- Multi-tenant data isolation patterns
**Authentication & Security:**
- JWT token validation and middleware
- OAuth 2.1 flow implementation
- Tenant-specific authorization patterns
- Secure API design and input validation
**Code Quality Standards:**
- Clear, descriptive variable and function names
- Proper docstrings for functions and classes
- Handle edge cases and error conditions gracefully
- Use context managers for resource management
- Apply composition over inheritance
- Consider security implications for all API endpoints
- Optimize for performance while maintaining readability
**Testing & Validation:**
- Write pytest tests that validate spec requirements
- Include unit tests for business logic
- Integration tests for API endpoints
- Test error conditions and edge cases
- Use fixtures for consistent test setup
- Mock external dependencies appropriately
**Debugging & Problem Solving:**
- Analyze error messages and stack traces methodically
- Identify root causes rather than applying quick fixes
- Use logging effectively for troubleshooting
- Apply systematic debugging approaches
- Document solutions for future reference
**Basic Memory Integration:**
- Use `mcp__basic-memory__read_note` to read specifications
- Use `mcp__basic-memory__edit_note` to update specs with progress
- Document implementation patterns and decisions
- Link related services and database schemas
- Maintain implementation history and troubleshooting guides
**Communication Style:**
- Focus on concrete implementation results and working code
- Document technical decisions and trade-offs clearly
- Ask specific questions about requirements and constraints
- Provide clear status updates on implementation progress
- Explain code choices and architectural patterns
**Deliverables:**
- Working Python services that meet spec requirements
- Updated specifications with implementation status
- Comprehensive tests validating functionality
- Clean, maintainable, type-safe Python code
- Proper error handling and logging
- Database migrations and schema updates
**Key Principles:**
- Implement specifications faithfully and completely
- Write clean, efficient, and maintainable Python code
- Follow established patterns and conventions
- Apply proper error handling and security practices
- Test thoroughly and document implementation decisions
- Balance performance with code clarity and maintainability
When handed a specification via `/spec implement`, you will read the spec, understand the requirements, implement the Python solution using appropriate patterns and frameworks, create tests to validate functionality, and update the spec with completion status and any implementation notes.
```
--------------------------------------------------------------------------------
/tests/api/test_template_loader_helpers.py:
--------------------------------------------------------------------------------
```python
"""Tests for additional template loader helpers."""
import pytest
from datetime import datetime
from basic_memory.api.template_loader import TemplateLoader
@pytest.fixture
def temp_template_dir(tmpdir):
"""Create a temporary directory for test templates."""
template_dir = tmpdir.mkdir("templates").mkdir("prompts")
return template_dir
@pytest.fixture
def custom_template_loader(temp_template_dir):
"""Return a TemplateLoader instance with a custom template directory."""
return TemplateLoader(str(temp_template_dir))
@pytest.mark.asyncio
async def test_round_helper(custom_template_loader, temp_template_dir):
"""Test the round helper for number formatting."""
# Create template file
round_path = temp_template_dir / "round.hbs"
round_path.write_text(
"{{round number}} {{round number 0}} {{round number 3}}",
encoding="utf-8",
)
# Test with various values
result = await custom_template_loader.render("round.hbs", {"number": 3.14159})
assert result == "3.14 3.0 3.142" or result == "3.14 3 3.142"
# Test with non-numeric value
result = await custom_template_loader.render("round.hbs", {"number": "not-a-number"})
assert "not-a-number" in result
# Test with insufficient args
empty_path = temp_template_dir / "round_empty.hbs"
empty_path.write_text("{{round}}", encoding="utf-8")
result = await custom_template_loader.render("round_empty.hbs", {})
assert result == ""
@pytest.mark.asyncio
async def test_date_helper_edge_cases(custom_template_loader, temp_template_dir):
"""Test edge cases for the date helper."""
# Create template file
date_path = temp_template_dir / "date_edge.hbs"
date_path.write_text(
"{{date timestamp}} {{date timestamp '%Y'}} {{date string_date}} {{date invalid_date}} {{date}}",
encoding="utf-8",
)
# Test with various values
result = await custom_template_loader.render(
"date_edge.hbs",
{
"timestamp": datetime(2023, 1, 1, 12, 30),
"string_date": "2023-01-01T12:30:00",
"invalid_date": "not-a-date",
},
)
assert "2023-01-01" in result
assert "2023" in result # Custom format
assert "not-a-date" in result # Invalid date passed through
assert result.strip() != "" # Empty date case
@pytest.mark.asyncio
async def test_size_helper_edge_cases(custom_template_loader, temp_template_dir):
"""Test edge cases for the size helper."""
# Create template file
size_path = temp_template_dir / "size_edge.hbs"
size_path.write_text(
"{{size list}} {{size string}} {{size dict}} {{size null}} {{size}}",
encoding="utf-8",
)
# Test with various values
result = await custom_template_loader.render(
"size_edge.hbs",
{
"list": [1, 2, 3, 4, 5],
"string": "hello",
"dict": {"a": 1, "b": 2, "c": 3},
"null": None,
},
)
assert "5" in result # List size
assert "hello".find("5") == -1 # String size should be 5
assert "3" in result # Dict size
assert "0" in result # Null size
assert result.count("0") >= 2 # At least two zeros (null and empty args)
@pytest.mark.asyncio
async def test_math_helper(custom_template_loader, temp_template_dir):
"""Test the math helper for basic arithmetic."""
# Create template file
math_path = temp_template_dir / "math.hbs"
math_path.write_text(
"{{math 5 '+' 3}} {{math 10 '-' 4}} {{math 6 '*' 7}} {{math 20 '/' 5}}",
encoding="utf-8",
)
# Test basic operations
result = await custom_template_loader.render("math.hbs", {})
assert "8" in result # Addition
assert "6" in result # Subtraction
assert "42" in result # Multiplication
assert "4" in result # Division
# Test with invalid operator
invalid_op_path = temp_template_dir / "math_invalid_op.hbs"
invalid_op_path.write_text("{{math 5 'invalid' 3}}", encoding="utf-8")
result = await custom_template_loader.render("math_invalid_op.hbs", {})
assert "Unsupported operator" in result
# Test with invalid numeric values
invalid_num_path = temp_template_dir / "math_invalid_num.hbs"
invalid_num_path.write_text("{{math 'not-a-number' '+' 3}}", encoding="utf-8")
result = await custom_template_loader.render("math_invalid_num.hbs", {})
assert "Math error" in result
# Test with insufficient arguments
insufficient_path = temp_template_dir / "math_insufficient.hbs"
insufficient_path.write_text("{{math 5 '+'}}", encoding="utf-8")
result = await custom_template_loader.render("math_insufficient.hbs", {})
assert "Insufficient arguments" in result
@pytest.mark.asyncio
async def test_if_cond_helper(custom_template_loader, temp_template_dir):
"""Test the if_cond helper for conditionals."""
# Create template file with true condition
if_true_path = temp_template_dir / "if_true.hbs"
if_true_path.write_text(
"{{#if_cond (lt 5 10)}}True condition{{else}}False condition{{/if_cond}}",
encoding="utf-8",
)
# Create template file with false condition
if_false_path = temp_template_dir / "if_false.hbs"
if_false_path.write_text(
"{{#if_cond (lt 15 10)}}True condition{{else}}False condition{{/if_cond}}",
encoding="utf-8",
)
# Test true condition
result = await custom_template_loader.render("if_true.hbs", {})
assert result == "True condition"
# Test false condition
result = await custom_template_loader.render("if_false.hbs", {})
assert result == "False condition"
@pytest.mark.asyncio
async def test_lt_helper_edge_cases(custom_template_loader, temp_template_dir):
"""Test edge cases for the lt (less than) helper."""
# Create template file
lt_path = temp_template_dir / "lt_edge.hbs"
lt_path.write_text(
"{{#if_cond (lt 'a' 'b')}}String LT True{{else}}String LT False{{/if_cond}} "
"{{#if_cond (lt 'z' 'a')}}String LT2 True{{else}}String LT2 False{{/if_cond}} "
"{{#if_cond (lt)}}Missing args True{{else}}Missing args False{{/if_cond}}",
encoding="utf-8",
)
# Test with string values and missing args
result = await custom_template_loader.render("lt_edge.hbs", {})
assert "String LT True" in result # 'a' < 'b' is true
assert "String LT2 False" in result # 'z' < 'a' is false
assert "Missing args False" in result # Missing args should return false
@pytest.mark.asyncio
async def test_dedent_helper_edge_case(custom_template_loader, temp_template_dir):
"""Test an edge case for the dedent helper."""
# Create template with empty dedent block
empty_dedent_path = temp_template_dir / "empty_dedent.hbs"
empty_dedent_path.write_text("{{#dedent}}{{/dedent}}", encoding="utf-8")
# Test empty block
result = await custom_template_loader.render("empty_dedent.hbs", {})
assert result == ""
# Test with complex content including lists
complex_dedent_path = temp_template_dir / "complex_dedent.hbs"
complex_dedent_path.write_text(
"{{#dedent}}\n {{#each items}}\n - {{this}}\n {{/each}}\n{{/dedent}}",
encoding="utf-8",
)
result = await custom_template_loader.render("complex_dedent.hbs", {"items": [1, 2, 3]})
assert "- 1" in result
assert "- 2" in result
assert "- 3" in result
```
--------------------------------------------------------------------------------
/src/basic_memory/markdown/plugins.py:
--------------------------------------------------------------------------------
```python
"""Markdown-it plugins for Basic Memory markdown parsing."""
from typing import List, Any, Dict
from markdown_it import MarkdownIt
from markdown_it.token import Token
# Observation handling functions
def is_observation(token: Token) -> bool:
"""Check if token looks like our observation format."""
import re
if token.type != "inline": # pragma: no cover
return False
# Use token.tag which contains the actual content for test tokens, fallback to content
content = (token.tag or token.content).strip()
if not content: # pragma: no cover
return False
# if it's a markdown_task, return false
if content.startswith("[ ]") or content.startswith("[x]") or content.startswith("[-]"):
return False
# Exclude markdown links: [text](url)
if re.match(r"^\[.*?\]\(.*?\)$", content):
return False
# Exclude wiki links: [[text]]
if re.match(r"^\[\[.*?\]\]$", content):
return False
# Check for proper observation format: [category] content
match = re.match(r"^\[([^\[\]()]+)\]\s+(.+)", content)
has_tags = "#" in content
return bool(match) or has_tags
def parse_observation(token: Token) -> Dict[str, Any]:
"""Extract observation parts from token."""
import re
# Use token.tag which contains the actual content for test tokens, fallback to content
content = (token.tag or token.content).strip()
# Parse [category] with regex
match = re.match(r"^\[([^\[\]()]+)\]\s+(.+)", content)
category = None
if match:
category = match.group(1).strip()
content = match.group(2).strip()
else:
# Handle empty brackets [] followed by content
empty_match = re.match(r"^\[\]\s+(.+)", content)
if empty_match:
content = empty_match.group(1).strip()
# Parse (context)
context = None
if content.endswith(")"):
start = content.rfind("(")
if start != -1:
context = content[start + 1 : -1].strip()
content = content[:start].strip()
# Extract tags and keep original content
tags = []
parts = content.split()
for part in parts:
if part.startswith("#"):
if "#" in part[1:]:
subtags = [t for t in part.split("#") if t]
tags.extend(subtags)
else:
tags.append(part[1:])
return {
"category": category,
"content": content,
"tags": tags if tags else None,
"context": context,
}
# Relation handling functions
def is_explicit_relation(token: Token) -> bool:
"""Check if token looks like our relation format."""
if token.type != "inline": # pragma: no cover
return False
# Use token.tag which contains the actual content for test tokens, fallback to content
content = (token.tag or token.content).strip()
return "[[" in content and "]]" in content
def parse_relation(token: Token) -> Dict[str, Any] | None:
"""Extract relation parts from token."""
# Remove bullet point if present
# Use token.tag which contains the actual content for test tokens, fallback to content
content = (token.tag or token.content).strip()
# Extract [[target]]
target = None
rel_type = "relates_to" # default
context = None
start = content.find("[[")
end = content.find("]]")
if start != -1 and end != -1:
# Get text before link as relation type
before = content[:start].strip()
if before:
rel_type = before
# Get target
target = content[start + 2 : end].strip()
# Look for context after
after = content[end + 2 :].strip()
if after.startswith("(") and after.endswith(")"):
context = after[1:-1].strip() or None
if not target: # pragma: no cover
return None
return {"type": rel_type, "target": target, "context": context}
def parse_inline_relations(content: str) -> List[Dict[str, Any]]:
"""Find wiki-style links in regular content."""
relations = []
start = 0
while True:
# Find next outer-most [[
start = content.find("[[", start)
if start == -1: # pragma: no cover
break
# Find matching ]]
depth = 1
pos = start + 2
end = -1
while pos < len(content):
if content[pos : pos + 2] == "[[":
depth += 1
pos += 2
elif content[pos : pos + 2] == "]]":
depth -= 1
if depth == 0:
end = pos
break
pos += 2
else:
pos += 1
if end == -1:
# No matching ]] found
break
target = content[start + 2 : end].strip()
if target:
relations.append({"type": "links to", "target": target, "context": None})
start = end + 2
return relations
def observation_plugin(md: MarkdownIt) -> None:
"""Plugin for parsing observation format:
- [category] Content text #tag1 #tag2 (context)
- Content text #tag1 (context) # No category is also valid
"""
def observation_rule(state: Any) -> None:
"""Process observations in token stream."""
tokens = state.tokens
for idx in range(len(tokens)):
token = tokens[idx]
# Initialize meta for all tokens
token.meta = token.meta or {}
# Parse observations in list items
if token.type == "inline" and is_observation(token):
obs = parse_observation(token)
if obs["content"]: # Only store if we have content
token.meta["observation"] = obs
# Add the rule after inline processing
md.core.ruler.after("inline", "observations", observation_rule)
def relation_plugin(md: MarkdownIt) -> None:
"""Plugin for parsing relation formats:
Explicit relations:
- relation_type [[target]] (context)
Implicit relations (links in content):
Some text with [[target]] reference
"""
def relation_rule(state: Any) -> None:
"""Process relations in token stream."""
tokens = state.tokens
in_list_item = False
for idx in range(len(tokens)):
token = tokens[idx]
# Track list nesting
if token.type == "list_item_open":
in_list_item = True
elif token.type == "list_item_close":
in_list_item = False
# Initialize meta for all tokens
token.meta = token.meta or {}
# Only process inline tokens
if token.type == "inline":
# Check for explicit relations in list items
if in_list_item and is_explicit_relation(token):
rel = parse_relation(token)
if rel:
token.meta["relations"] = [rel]
# Always check for inline links in any text
else:
content = token.tag or token.content
if "[[" in content:
rels = parse_inline_relations(content)
if rels:
token.meta["relations"] = token.meta.get("relations", []) + rels
# Add the rule after inline processing
md.core.ruler.after("inline", "relations", relation_rule)
```
--------------------------------------------------------------------------------
/specs/SPEC-5 CLI Cloud Upload via WebDAV.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-5: CLI Cloud Upload via WebDAV'
type: spec
permalink: specs/spec-5-cli-cloud-upload-via-webdav
tags:
- cli
- webdav
- upload
- migration
- poc
---
# SPEC-5: CLI Cloud Upload via WebDAV
## Why
Existing basic-memory users need a simple migration path to basic-memory-cloud. The web UI drag-and-drop approach outlined in GitHub issue #59, while user-friendly, introduces significant complexity for a proof-of-concept:
- Complex web UI components for file upload and progress tracking
- Browser file handling limitations and CORS complexity
- Proxy routing overhead for large file transfers
- Authentication integration across multiple services
A CLI-first approach solves these issues by:
- **Leveraging existing infrastructure**: Both cloud CLI and tenant API already exist with WorkOS JWT authentication
- **Familiar user experience**: Basic-memory users are CLI-comfortable and expect command-line tools
- **Direct connection efficiency**: Bypassing the MCP gateway/proxy for bulk file transfers
- **Rapid implementation**: Building on existing `CLIAuth` and FastAPI foundations
The fundamental problem is migration friction - users have local basic-memory projects but no path to cloud tenants. A simple CLI upload command removes this barrier immediately.
## What
This spec defines a CLI-based project upload system using WebDAV for direct tenant connections.
**Affected Areas:**
- `apps/cloud/src/basic_memory_cloud/cli/main.py` - Add upload command to existing CLI
- `apps/api/src/basic_memory_cloud_api/main.py` - Add WebDAV endpoints to tenant FastAPI
- Authentication flow - Reuse existing WorkOS JWT validation
- File transfer protocol - WebDAV for cross-platform compatibility
**Core Components:**
### CLI Upload Command
```bash
basic-memory-cloud upload <project-path> --tenant-url https://basic-memory-{tenant}.fly.dev
```
### WebDAV Server Endpoints
- `GET/PUT/DELETE /webdav/*` - Standard WebDAV operations on tenant file system
- Authentication via existing JWT validation
- File operations preserve timestamps and directory structure
### Authentication Flow
```
1. User runs `basic-memory-cloud login` (existing)
2. CLI stores WorkOS JWT token (existing)
3. Upload command reads JWT from storage
4. WebDAV requests include JWT in Authorization header
5. Tenant API validates JWT using existing middleware
```
## How (High Level)
### Implementation Strategy
**Phase 1: CLI Command**
- Add `upload` command to existing Typer app
- Reuse `CLIAuth` class for token management
- Implement WebDAV client using `webdavclient3` or similar
- Rich progress bars for transfer feedback
**Phase 2: WebDAV Server**
- Add WebDAV endpoints to existing tenant FastAPI app
- Leverage existing `get_current_user` dependency for authentication
- Map WebDAV operations to tenant file system
- Preserve file modification times using `os.utime()`
**Phase 3: Integration**
- Direct connection bypasses MCP gateway and proxy
- Simple conflict resolution: overwrite existing files
- Error handling: fail fast with clear error messages
### Technical Architecture
```
basic-memory-cloud CLI → WorkOS JWT → Direct WebDAV → Tenant FastAPI
↓
Tenant File System
```
**Key Libraries:**
- CLI: `webdavclient3` for WebDAV client operations
- API: `wsgidav` or FastAPI-compatible WebDAV server
- Progress: `rich` library (already imported in CLI)
- Auth: Existing WorkOS JWT infrastructure
### WebDAV Protocol Choice
WebDAV provides:
- **Cross-platform clients**: Native support in most operating systems
- **Standardized protocol**: Well-defined for file operations
- **HTTP-based**: Works with existing FastAPI and JWT auth
- **Library support**: Good Python libraries for both client and server
### POC Constraints
**Simplifications for rapid implementation:**
- **Known tenant URLs**: Assume `https://basic-memory-{tenant}.fly.dev` format
- **Upload only**: No download or bidirectional sync
- **Overwrite conflicts**: No merge or conflict resolution prompting
- **No fallbacks**: Fail fast if WebDAV connection issues occur
- **Direct connection only**: No proxy fallback mechanism
## How to Evaluate
### Success Criteria
**Functional Requirements:**
- [ ] Transfer complete basic-memory project (100+ files) in < 30 seconds
- [ ] Preserve directory structure exactly as in source project
- [ ] Preserve file modification timestamps for proper sync behavior
- [ ] Rich progress bars show real-time transfer status (files/MB transferred)
- [ ] WorkOS JWT authentication validates correctly on WebDAV endpoints
- [ ] Direct tenant connection bypasses MCP gateway successfully
**Quality Requirements:**
- [ ] Clear error messages for authentication failures
- [ ] Graceful handling of network interruptions
- [ ] CLI follows existing command patterns and help text standards
- [ ] WebDAV endpoints integrate cleanly with existing FastAPI app
**Performance Requirements:**
- [ ] File transfer speed > 1MB/s on typical connections
- [ ] Memory usage remains reasonable for large projects
- [ ] No timeout issues with 500+ file projects
### Testing Procedure
**Unit Testing:**
1. CLI command parsing and argument validation
2. WebDAV client connection and authentication
3. File timestamp preservation during transfer
4. JWT token validation on WebDAV endpoints
**Integration Testing:**
1. End-to-end upload of test project
2. Direct tenant connection without proxy
3. File integrity verification after upload
4. Progress tracking accuracy during transfer
**User Experience Testing:**
1. Upload existing basic-memory project from local installation
2. Verify uploaded files appear correctly in cloud tenant
3. Confirm basic-memory database rebuilds properly with uploaded files
4. Test CLI help text and error message clarity
### Validation Commands
**Setup:**
```bash
# Login to WorkOS
basic-memory-cloud login
# Upload project
basic-memory-cloud upload ~/my-notes --tenant-url https://basic-memory-test.fly.dev
```
**Verification:**
```bash
# Check tenant health and file count via API
curl -H "Authorization: Bearer $JWT" https://basic-memory-test.fly.dev/health
curl -H "Authorization: Bearer $JWT" https://basic-memory-test.fly.dev/notes/search
```
### Performance Benchmarks
**Target metrics for 100MB basic-memory project:**
- Transfer time: < 30 seconds
- Memory usage: < 100MB during transfer
- Progress updates: Every 1MB or 10 files
- Authentication time: < 2 seconds
## Observations
- [implementation-speed] CLI approach significantly faster than web UI for POC development #rapid-prototyping
- [user-experience] Basic-memory users already comfortable with CLI tools #user-familiarity
- [architecture-benefit] Direct connection eliminates proxy complexity and latency #performance
- [auth-reuse] Existing WorkOS JWT infrastructure handles authentication cleanly #code-reuse
- [webdav-choice] WebDAV protocol provides cross-platform compatibility and standard libraries #protocol-selection
- [poc-scope] Simple conflict handling and error recovery sufficient for proof-of-concept #scope-management
- [migration-value] Removes primary barrier for local users migrating to cloud platform #business-value
## Relations
- depends_on [[SPEC-1: Specification-Driven Development Process]]
- enables [[GitHub Issue #59: Web UI Upload Feature]]
- uses [[WorkOS Authentication Integration]]
- builds_on [[Existing Cloud CLI Infrastructure]]
- builds_on [[Existing Tenant API Architecture]]
```
--------------------------------------------------------------------------------
/tests/mcp/tools/test_chatgpt_tools.py:
--------------------------------------------------------------------------------
```python
"""Tests for ChatGPT-compatible MCP tools."""
import json
import pytest
from unittest.mock import AsyncMock, patch
from basic_memory.schemas.search import SearchResponse, SearchResult, SearchItemType
@pytest.mark.asyncio
async def test_search_successful_results():
"""Test search with successful results returns proper MCP content array format."""
# Mock successful search results
mock_results = SearchResponse(
results=[
SearchResult(
title="Test Document 1",
permalink="docs/test-doc-1",
content="This is test content for document 1",
type=SearchItemType.ENTITY,
score=1.0,
file_path="/test/docs/test-doc-1.md",
),
SearchResult(
title="Test Document 2",
permalink="docs/test-doc-2",
content="This is test content for document 2",
type=SearchItemType.ENTITY,
score=0.9,
file_path="/test/docs/test-doc-2.md",
),
],
current_page=1,
page_size=10,
)
with patch(
"basic_memory.mcp.tools.chatgpt_tools.search_notes.fn", new_callable=AsyncMock
) as mock_search:
mock_search.return_value = mock_results
# Import and call the actual function
from basic_memory.mcp.tools.chatgpt_tools import search
result = await search.fn("test query")
# Verify MCP content array format
assert isinstance(result, list)
assert len(result) == 1
assert result[0]["type"] == "text"
# Parse the JSON content
content = json.loads(result[0]["text"])
assert "results" in content
assert "query" in content
# Verify result structure
assert len(content["results"]) == 2
assert content["query"] == "test query"
# Verify individual result format
result_item = content["results"][0]
assert result_item["id"] == "docs/test-doc-1"
assert result_item["title"] == "Test Document 1"
assert result_item["url"] == "docs/test-doc-1"
@pytest.mark.asyncio
async def test_search_with_error_response():
"""Test search when underlying search_notes returns error string."""
error_message = "# Search Failed - Invalid Syntax\n\nThe search query contains errors..."
with patch(
"basic_memory.mcp.tools.chatgpt_tools.search_notes.fn", new_callable=AsyncMock
) as mock_search:
mock_search.return_value = error_message
from basic_memory.mcp.tools.chatgpt_tools import search
result = await search.fn("invalid query")
# Verify MCP content array format
assert isinstance(result, list)
assert len(result) == 1
assert result[0]["type"] == "text"
# Parse the JSON content
content = json.loads(result[0]["text"])
assert content["results"] == []
assert content["error"] == "Search failed"
assert "error_details" in content
@pytest.mark.asyncio
async def test_fetch_successful_document():
"""Test fetch with successful document retrieval."""
document_content = """# Test Document
This is the content of a test document.
## Section 1
Some content here.
## Observations
- [observation] This is a test observation
## Relations
- relates_to [[Another Document]]
"""
with patch(
"basic_memory.mcp.tools.chatgpt_tools.read_note.fn", new_callable=AsyncMock
) as mock_read:
mock_read.return_value = document_content
from basic_memory.mcp.tools.chatgpt_tools import fetch
result = await fetch.fn("docs/test-document")
# Verify MCP content array format
assert isinstance(result, list)
assert len(result) == 1
assert result[0]["type"] == "text"
# Parse the JSON content
content = json.loads(result[0]["text"])
assert content["id"] == "docs/test-document"
assert content["title"] == "Test Document" # Extracted from markdown
assert content["text"] == document_content
assert content["url"] == "docs/test-document"
assert content["metadata"]["format"] == "markdown"
@pytest.mark.asyncio
async def test_fetch_document_not_found():
"""Test fetch when document is not found."""
error_content = """# Note Not Found: "nonexistent-doc"
I couldn't find any notes matching "nonexistent-doc". Here are some suggestions:
## Check Identifier Type
- If you provided a title, try using the exact permalink instead
"""
with patch(
"basic_memory.mcp.tools.chatgpt_tools.read_note.fn", new_callable=AsyncMock
) as mock_read:
mock_read.return_value = error_content
from basic_memory.mcp.tools.chatgpt_tools import fetch
result = await fetch.fn("nonexistent-doc")
# Verify MCP content array format
assert isinstance(result, list)
assert len(result) == 1
assert result[0]["type"] == "text"
# Parse the JSON content
content = json.loads(result[0]["text"])
assert content["id"] == "nonexistent-doc"
assert content["text"] == error_content
assert content["metadata"]["error"] == "Document not found"
def test_format_search_results_for_chatgpt():
"""Test search results formatting."""
from basic_memory.mcp.tools.chatgpt_tools import _format_search_results_for_chatgpt
mock_results = SearchResponse(
results=[
SearchResult(
title="Document One",
permalink="docs/doc-one",
content="Content for document one",
type=SearchItemType.ENTITY,
score=1.0,
file_path="/test/docs/doc-one.md",
),
SearchResult(
title="", # Test empty title handling
permalink="docs/untitled",
content="Content without title",
type=SearchItemType.ENTITY,
score=0.8,
file_path="/test/docs/untitled.md",
),
],
current_page=1,
page_size=10,
)
formatted = _format_search_results_for_chatgpt(mock_results)
assert len(formatted) == 2
assert formatted[0]["id"] == "docs/doc-one"
assert formatted[0]["title"] == "Document One"
assert formatted[0]["url"] == "docs/doc-one"
# Test empty title handling
assert formatted[1]["title"] == "Untitled"
def test_format_document_for_chatgpt():
"""Test document formatting."""
from basic_memory.mcp.tools.chatgpt_tools import _format_document_for_chatgpt
content = "# Test Document\n\nThis is test content."
result = _format_document_for_chatgpt(content, "docs/test")
assert result["id"] == "docs/test"
assert result["title"] == "Test Document"
assert result["text"] == content
assert result["url"] == "docs/test"
assert result["metadata"]["format"] == "markdown"
def test_format_document_error_handling():
"""Test document formatting with error content."""
from basic_memory.mcp.tools.chatgpt_tools import _format_document_for_chatgpt
error_content = '# Note Not Found: "missing-doc"\n\nDocument not found.'
result = _format_document_for_chatgpt(error_content, "missing-doc", "Missing Doc")
assert result["id"] == "missing-doc"
assert result["title"] == "Missing Doc"
assert result["text"] == error_content
assert result["metadata"]["error"] == "Document not found"
```
--------------------------------------------------------------------------------
/src/basic_memory/importers/chatgpt_importer.py:
--------------------------------------------------------------------------------
```python
"""ChatGPT import service for Basic Memory."""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Set
from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
from basic_memory.importers.base import Importer
from basic_memory.schemas.importer import ChatImportResult
from basic_memory.importers.utils import clean_filename, format_timestamp
logger = logging.getLogger(__name__)
class ChatGPTImporter(Importer[ChatImportResult]):
"""Service for importing ChatGPT conversations."""
async def import_data(
self, source_data, destination_folder: str, **kwargs: Any
) -> ChatImportResult:
"""Import conversations from ChatGPT JSON export.
Args:
source_path: Path to the ChatGPT conversations.json file.
destination_folder: Destination folder within the project.
**kwargs: Additional keyword arguments.
Returns:
ChatImportResult containing statistics and status of the import.
"""
try: # pragma: no cover
# Ensure the destination folder exists
self.ensure_folder_exists(destination_folder)
conversations = source_data
# Process each conversation
messages_imported = 0
chats_imported = 0
for chat in conversations:
# Convert to entity
entity = self._format_chat_content(destination_folder, chat)
# Write file
file_path = self.base_path / f"{entity.frontmatter.metadata['permalink']}.md"
await self.write_entity(entity, file_path)
# Count messages
msg_count = sum(
1
for node in chat["mapping"].values()
if node.get("message")
and not node.get("message", {})
.get("metadata", {})
.get("is_visually_hidden_from_conversation")
)
chats_imported += 1
messages_imported += msg_count
return ChatImportResult(
import_count={"conversations": chats_imported, "messages": messages_imported},
success=True,
conversations=chats_imported,
messages=messages_imported,
)
except Exception as e: # pragma: no cover
logger.exception("Failed to import ChatGPT conversations")
return self.handle_error("Failed to import ChatGPT conversations", e) # pyright: ignore [reportReturnType]
def _format_chat_content(
self, folder: str, conversation: Dict[str, Any]
) -> EntityMarkdown: # pragma: no cover
"""Convert chat conversation to Basic Memory entity.
Args:
folder: Destination folder name.
conversation: ChatGPT conversation data.
Returns:
EntityMarkdown instance representing the conversation.
"""
# Extract timestamps
created_at = conversation["create_time"]
modified_at = conversation["update_time"]
root_id = None
# Find root message
for node_id, node in conversation["mapping"].items():
if node.get("parent") is None:
root_id = node_id
break
# Generate permalink
date_prefix = datetime.fromtimestamp(created_at).astimezone().strftime("%Y%m%d")
clean_title = clean_filename(conversation["title"])
# Format content
content = self._format_chat_markdown(
title=conversation["title"],
mapping=conversation["mapping"],
root_id=root_id,
created_at=created_at,
modified_at=modified_at,
)
# Create entity
entity = EntityMarkdown(
frontmatter=EntityFrontmatter(
metadata={
"type": "conversation",
"title": conversation["title"],
"created": format_timestamp(created_at),
"modified": format_timestamp(modified_at),
"permalink": f"{folder}/{date_prefix}-{clean_title}",
}
),
content=content,
)
return entity
def _format_chat_markdown(
self,
title: str,
mapping: Dict[str, Any],
root_id: Optional[str],
created_at: float,
modified_at: float,
) -> str: # pragma: no cover
"""Format chat as clean markdown.
Args:
title: Chat title.
mapping: Message mapping.
root_id: Root message ID.
created_at: Creation timestamp.
modified_at: Modification timestamp.
Returns:
Formatted markdown content.
"""
# Start with title
lines = [f"# {title}\n"]
# Traverse message tree
seen_msgs: Set[str] = set()
messages = self._traverse_messages(mapping, root_id, seen_msgs)
# Format each message
for msg in messages:
# Skip hidden messages
if msg.get("metadata", {}).get("is_visually_hidden_from_conversation"):
continue
# Get author and timestamp
author = msg["author"]["role"].title()
ts = format_timestamp(msg["create_time"]) if msg.get("create_time") else ""
# Add message header
lines.append(f"### {author} ({ts})")
# Add message content
content = self._get_message_content(msg)
if content:
lines.append(content)
# Add spacing
lines.append("")
return "\n".join(lines)
def _get_message_content(self, message: Dict[str, Any]) -> str: # pragma: no cover
"""Extract clean message content.
Args:
message: Message data.
Returns:
Cleaned message content.
"""
if not message or "content" not in message:
return ""
content = message["content"]
if content.get("content_type") == "text":
return "\n".join(content.get("parts", []))
elif content.get("content_type") == "code":
return f"```{content.get('language', '')}\n{content.get('text', '')}\n```"
return ""
def _traverse_messages(
self, mapping: Dict[str, Any], root_id: Optional[str], seen: Set[str]
) -> List[Dict[str, Any]]: # pragma: no cover
"""Traverse message tree iteratively to handle deep conversations.
Args:
mapping: Message mapping.
root_id: Root message ID.
seen: Set of seen message IDs.
Returns:
List of message data.
"""
messages = []
if not root_id:
return messages
# Use iterative approach with stack to avoid recursion depth issues
stack = [root_id]
while stack:
node_id = stack.pop()
if not node_id:
continue
node = mapping.get(node_id)
if not node:
continue
# Process current node if it has a message and hasn't been seen
if node["id"] not in seen and node.get("message"):
seen.add(node["id"])
messages.append(node["message"])
# Add children to stack in reverse order to maintain conversation flow
children = node.get("children", [])
for child_id in reversed(children):
stack.append(child_id)
return messages
```
--------------------------------------------------------------------------------
/v15-docs/basic-memory-home.md:
--------------------------------------------------------------------------------
```markdown
# BASIC_MEMORY_HOME Environment Variable
**Status**: Existing (clarified in v0.15.0)
**Related**: project-root-env-var.md
## What It Is
`BASIC_MEMORY_HOME` specifies the location of your **default "main" project**. This is the primary directory where Basic Memory stores knowledge files when no other project is specified.
## Quick Reference
```bash
# Default (if not set)
~/basic-memory
# Custom location
export BASIC_MEMORY_HOME=/Users/you/Documents/knowledge-base
```
## How It Works
### Default Project Location
When Basic Memory initializes, it creates a "main" project:
```python
# Without BASIC_MEMORY_HOME
projects = {
"main": "~/basic-memory" # Default
}
# With BASIC_MEMORY_HOME set
export BASIC_MEMORY_HOME=/Users/you/custom-location
projects = {
"main": "/Users/you/custom-location" # Uses env var
}
```
### Only Affects "main" Project
**Important:** `BASIC_MEMORY_HOME` ONLY sets the path for the "main" project. Other projects are unaffected.
```bash
export BASIC_MEMORY_HOME=/Users/you/my-knowledge
# config.json will have:
{
"projects": {
"main": "/Users/you/my-knowledge", # ← From BASIC_MEMORY_HOME
"work": "/Users/you/work-notes", # ← Independently configured
"personal": "/Users/you/personal-kb" # ← Independently configured
}
}
```
## Relationship with BASIC_MEMORY_PROJECT_ROOT
These are **separate** environment variables with **different purposes**:
| Variable | Purpose | Scope | Default |
|----------|---------|-------|---------|
| `BASIC_MEMORY_HOME` | Where "main" project lives | Single project | `~/basic-memory` |
| `BASIC_MEMORY_PROJECT_ROOT` | Security boundary for ALL projects | All projects | None (unrestricted) |
### Using Together
```bash
# Common containerized setup
export BASIC_MEMORY_HOME=/app/data/basic-memory # Main project location
export BASIC_MEMORY_PROJECT_ROOT=/app/data # All projects must be under here
```
**Result:**
- Main project created at `/app/data/basic-memory`
- All other projects must be under `/app/data/`
- Provides both convenience and security
### Comparison Table
| Scenario | BASIC_MEMORY_HOME | BASIC_MEMORY_PROJECT_ROOT | Result |
|----------|-------------------|---------------------------|---------|
| **Default** | Not set | Not set | Main at `~/basic-memory`, projects anywhere |
| **Custom main** | `/Users/you/kb` | Not set | Main at `/Users/you/kb`, projects anywhere |
| **Containerized** | `/app/data/main` | `/app/data` | Main at `/app/data/main`, all projects under `/app/data/` |
| **Secure SaaS** | `/app/tenant-123/main` | `/app/tenant-123` | Main at `/app/tenant-123/main`, tenant isolated |
## Use Cases
### Personal Setup (Default)
```bash
# Use default location
# BASIC_MEMORY_HOME not set
# Main project created at:
~/basic-memory/
```
### Custom Location
```bash
# Store in Documents folder
export BASIC_MEMORY_HOME=~/Documents/BasicMemory
# Main project created at:
~/Documents/BasicMemory/
```
### Synchronized Cloud Folder
```bash
# Store in Dropbox/iCloud
export BASIC_MEMORY_HOME=~/Dropbox/BasicMemory
# Main project syncs via Dropbox:
~/Dropbox/BasicMemory/
```
### Docker Deployment
```bash
# Mount volume for persistence
docker run \
-e BASIC_MEMORY_HOME=/app/data/basic-memory \
-v $(pwd)/data:/app/data \
basic-memory:latest
# Main project persists at:
./data/basic-memory/ # (host)
/app/data/basic-memory/ # (container)
```
### Multi-User System
```bash
# Per-user isolation
export BASIC_MEMORY_HOME=/home/$USER/basic-memory
# Alice's main project:
/home/alice/basic-memory/
# Bob's main project:
/home/bob/basic-memory/
```
## Configuration Examples
### Basic Setup
```bash
# .bashrc or .zshrc
export BASIC_MEMORY_HOME=~/Documents/knowledge
```
### Docker Compose
```yaml
services:
basic-memory:
environment:
BASIC_MEMORY_HOME: /app/data/basic-memory
volumes:
- ./data:/app/data
```
### Kubernetes
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: basic-memory-config
data:
BASIC_MEMORY_HOME: "/app/data/basic-memory"
---
apiVersion: v1
kind: Pod
spec:
containers:
- name: basic-memory
envFrom:
- configMapRef:
name: basic-memory-config
```
### systemd Service
```ini
[Service]
Environment="BASIC_MEMORY_HOME=/var/lib/basic-memory"
ExecStart=/usr/local/bin/basic-memory serve
```
## Migration
### Changing BASIC_MEMORY_HOME
If you need to change the location:
**Option 1: Move files**
```bash
# Stop services
bm sync --stop
# Move data
mv ~/basic-memory ~/Documents/knowledge
# Update environment
export BASIC_MEMORY_HOME=~/Documents/knowledge
# Restart
bm sync
```
**Option 2: Copy and sync**
```bash
# Copy to new location
cp -r ~/basic-memory ~/Documents/knowledge
# Update environment
export BASIC_MEMORY_HOME=~/Documents/knowledge
# Verify
bm status
# Remove old location once verified
rm -rf ~/basic-memory
```
### From v0.14.x
No changes needed - `BASIC_MEMORY_HOME` works the same way:
```bash
# v0.14.x and v0.15.0+ both use:
export BASIC_MEMORY_HOME=~/my-knowledge
```
## Common Patterns
### Development vs Production
```bash
# Development (.bashrc)
export BASIC_MEMORY_HOME=~/dev/basic-memory-dev
# Production (systemd/docker)
export BASIC_MEMORY_HOME=/var/lib/basic-memory
```
### Shared Team Setup
```bash
# Shared network drive
export BASIC_MEMORY_HOME=/mnt/shared/team-knowledge
# Note: Use with caution, consider file locking
```
### Backup Strategy
```bash
# Primary location
export BASIC_MEMORY_HOME=~/basic-memory
# Automated backup script
rsync -av ~/basic-memory/ ~/Backups/basic-memory-$(date +%Y%m%d)/
```
## Verification
### Check Current Value
```bash
# View environment variable
echo $BASIC_MEMORY_HOME
# View resolved config
bm project list
# Shows actual path for "main" project
```
### Verify Main Project Location
```python
from basic_memory.config import ConfigManager
config = ConfigManager().config
print(config.projects["main"])
# Shows where "main" project is located
```
## Troubleshooting
### Main Project Not at Expected Location
**Problem:** Files not where you expect
**Check:**
```bash
# What's the environment variable?
echo $BASIC_MEMORY_HOME
# Where is main project actually?
bm project list | grep main
```
**Solution:** Set environment variable and restart
### Permission Errors
**Problem:** Can't write to BASIC_MEMORY_HOME location
```bash
$ bm sync
Error: Permission denied: /var/lib/basic-memory
```
**Solution:**
```bash
# Fix permissions
sudo chown -R $USER:$USER /var/lib/basic-memory
# Or use accessible location
export BASIC_MEMORY_HOME=~/basic-memory
```
### Conflicts with PROJECT_ROOT
**Problem:** BASIC_MEMORY_HOME outside PROJECT_ROOT
```bash
export BASIC_MEMORY_HOME=/Users/you/kb
export BASIC_MEMORY_PROJECT_ROOT=/app/data
# Error: /Users/you/kb not under /app/data
```
**Solution:** Align both variables
```bash
export BASIC_MEMORY_HOME=/app/data/basic-memory
export BASIC_MEMORY_PROJECT_ROOT=/app/data
```
## Best Practices
1. **Use absolute paths:**
```bash
export BASIC_MEMORY_HOME=/Users/you/knowledge # ✓
# not: export BASIC_MEMORY_HOME=~/knowledge # ✗ (may not expand)
```
2. **Document the location:**
- Add comment in shell config
- Document for team if shared
3. **Backup regularly:**
- Main project contains your primary knowledge
- Automate backups of this directory
4. **Consider PROJECT_ROOT for security:**
- Use both together in production/containers
5. **Test changes:**
- Verify with `bm project list` after changing
## See Also
- `project-root-env-var.md` - Security constraints for all projects
- `env-var-overrides.md` - Environment variable precedence
- Project management documentation
```
--------------------------------------------------------------------------------
/src/basic_memory/ignore_utils.py:
--------------------------------------------------------------------------------
```python
"""Utilities for handling .gitignore patterns and file filtering."""
import fnmatch
from pathlib import Path
from typing import Set
# Common directories and patterns to ignore by default
# These are used as fallback if .bmignore doesn't exist
DEFAULT_IGNORE_PATTERNS = {
# Hidden files (files starting with dot)
".*",
# Basic Memory internal files
"*.db",
"*.db-shm",
"*.db-wal",
"config.json",
# Version control
".git",
".svn",
# Python
"__pycache__",
"*.pyc",
"*.pyo",
"*.pyd",
".pytest_cache",
".coverage",
"*.egg-info",
".tox",
".mypy_cache",
".ruff_cache",
# Virtual environments
".venv",
"venv",
"env",
".env",
# Node.js
"node_modules",
# Build artifacts
"build",
"dist",
".cache",
# IDE
".idea",
".vscode",
# OS files
".DS_Store",
"Thumbs.db",
"desktop.ini",
# Obsidian
".obsidian",
# Temporary files
"*.tmp",
"*.swp",
"*.swo",
"*~",
}
def get_bmignore_path() -> Path:
"""Get path to .bmignore file.
Returns:
Path to ~/.basic-memory/.bmignore
"""
return Path.home() / ".basic-memory" / ".bmignore"
def create_default_bmignore() -> None:
"""Create default .bmignore file if it doesn't exist.
This ensures users have a file they can customize for all Basic Memory operations.
"""
bmignore_path = get_bmignore_path()
if bmignore_path.exists():
return
bmignore_path.parent.mkdir(parents=True, exist_ok=True)
bmignore_path.write_text("""# Basic Memory Ignore Patterns
# This file is used by both 'bm cloud upload', 'bm cloud bisync', and file sync
# Patterns use standard gitignore-style syntax
# Hidden files (files starting with dot)
.*
# Basic Memory internal files (includes test databases)
*.db
*.db-shm
*.db-wal
config.json
# Version control
.git
.svn
# Python
__pycache__
*.pyc
*.pyo
*.pyd
.pytest_cache
.coverage
*.egg-info
.tox
.mypy_cache
.ruff_cache
# Virtual environments
.venv
venv
env
.env
# Node.js
node_modules
# Build artifacts
build
dist
.cache
# IDE
.idea
.vscode
# OS files
.DS_Store
Thumbs.db
desktop.ini
# Obsidian
.obsidian
# Temporary files
*.tmp
*.swp
*.swo
*~
""")
def load_bmignore_patterns() -> Set[str]:
"""Load patterns from .bmignore file.
Returns:
Set of patterns from .bmignore, or DEFAULT_IGNORE_PATTERNS if file doesn't exist
"""
bmignore_path = get_bmignore_path()
# Create default file if it doesn't exist
if not bmignore_path.exists():
create_default_bmignore()
patterns = set()
try:
with bmignore_path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
# Skip empty lines and comments
if line and not line.startswith("#"):
patterns.add(line)
except Exception:
# If we can't read .bmignore, fall back to defaults
return set(DEFAULT_IGNORE_PATTERNS)
# If no patterns were loaded, use defaults
if not patterns:
return set(DEFAULT_IGNORE_PATTERNS)
return patterns
def load_gitignore_patterns(base_path: Path, use_gitignore: bool = True) -> Set[str]:
"""Load gitignore patterns from .gitignore file and .bmignore.
Combines patterns from:
1. ~/.basic-memory/.bmignore (user's global ignore patterns)
2. {base_path}/.gitignore (project-specific patterns, if use_gitignore=True)
Args:
base_path: The base directory to search for .gitignore file
use_gitignore: If False, only load patterns from .bmignore (default: True)
Returns:
Set of patterns to ignore
"""
# Start with patterns from .bmignore
patterns = load_bmignore_patterns()
if use_gitignore:
gitignore_file = base_path / ".gitignore"
if gitignore_file.exists():
try:
with gitignore_file.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
# Skip empty lines and comments
if line and not line.startswith("#"):
patterns.add(line)
except Exception:
# If we can't read .gitignore, just use default patterns
pass
return patterns
def should_ignore_path(file_path: Path, base_path: Path, ignore_patterns: Set[str]) -> bool:
"""Check if a file path should be ignored based on gitignore patterns.
Args:
file_path: The file path to check
base_path: The base directory for relative path calculation
ignore_patterns: Set of patterns to match against
Returns:
True if the path should be ignored, False otherwise
"""
# Get the relative path from base
try:
relative_path = file_path.relative_to(base_path)
relative_str = str(relative_path)
relative_posix = relative_path.as_posix() # Use forward slashes for matching
# Check each pattern
for pattern in ignore_patterns:
# Handle patterns starting with / (root relative)
if pattern.startswith("/"):
root_pattern = pattern[1:] # Remove leading /
# For directory patterns ending with /
if root_pattern.endswith("/"):
dir_name = root_pattern[:-1] # Remove trailing /
# Check if the first part of the path matches the directory name
if len(relative_path.parts) > 0 and relative_path.parts[0] == dir_name:
return True
else:
# Regular root-relative pattern
if fnmatch.fnmatch(relative_posix, root_pattern):
return True
continue
# Handle directory patterns (ending with /)
if pattern.endswith("/"):
dir_name = pattern[:-1] # Remove trailing /
# Check if any path part matches the directory name
if dir_name in relative_path.parts:
return True
continue
# Direct name match (e.g., ".git", "node_modules")
if pattern in relative_path.parts:
return True
# Check if any individual path part matches the glob pattern
# This handles cases like ".*" matching ".hidden.md" in "concept/.hidden.md"
for part in relative_path.parts:
if fnmatch.fnmatch(part, pattern):
return True
# Glob pattern match on full path
if fnmatch.fnmatch(relative_posix, pattern) or fnmatch.fnmatch(relative_str, pattern):
return True
return False
except ValueError:
# If we can't get relative path, don't ignore
return False
def filter_files(
files: list[Path], base_path: Path, ignore_patterns: Set[str] | None = None
) -> tuple[list[Path], int]:
"""Filter a list of files based on gitignore patterns.
Args:
files: List of file paths to filter
base_path: The base directory for relative path calculation
ignore_patterns: Set of patterns to ignore. If None, loads from .gitignore
Returns:
Tuple of (filtered_files, ignored_count)
"""
if ignore_patterns is None:
ignore_patterns = load_gitignore_patterns(base_path)
filtered_files = []
ignored_count = 0
for file_path in files:
if should_ignore_path(file_path, base_path, ignore_patterns):
ignored_count += 1
else:
filtered_files.append(file_path)
return filtered_files, ignored_count
```
--------------------------------------------------------------------------------
/v15-docs/cloud-authentication.md:
--------------------------------------------------------------------------------
```markdown
# Cloud Authentication (SPEC-13)
**Status**: New Feature
**PR**: #327
**Requires**: Active Basic Memory subscription
## What's New
v0.15.0 introduces **JWT-based cloud authentication** with automatic subscription validation. This enables secure access to Basic Memory Cloud features including bidirectional sync, cloud storage, and multi-device access.
## Quick Start
### Login to Cloud
```bash
# Authenticate with Basic Memory Cloud
bm cloud login
# Opens browser for OAuth flow
# Validates subscription status
# Stores JWT token locally
```
### Check Authentication Status
```bash
# View current authentication status
bm cloud status
```
### Logout
```bash
# Clear authentication session
bm cloud logout
```
## How It Works
### Authentication Flow
1. **Initiate Login**: `bm cloud login`
2. **Browser Opens**: OAuth 2.1 flow with PKCE
3. **Authorize**: Login with your Basic Memory account
4. **Subscription Check**: Validates active subscription
5. **Token Storage**: JWT stored in `~/.basic-memory/cloud-auth.json`
6. **Auto-Refresh**: Token automatically refreshed when needed
### Subscription Validation
All cloud commands validate your subscription status:
**Active Subscription:**
```bash
$ bm cloud sync
✓ Syncing with cloud...
```
**No Active Subscription:**
```bash
$ bm cloud sync
✗ Active subscription required
Subscribe at: https://basicmemory.com/subscribe
```
## Authentication Commands
### bm cloud login
Authenticate with Basic Memory Cloud.
```bash
# Basic login
bm cloud login
# Login opens browser automatically
# Redirects to: https://eloquent-lotus-05.authkit.app/...
```
**What happens:**
- Opens OAuth authorization in browser
- Handles PKCE challenge/response
- Validates subscription
- Stores JWT token
- Displays success message
**Error cases:**
- No subscription: Shows subscribe URL
- Network error: Retries with exponential backoff
- Invalid credentials: Prompts to try again
### bm cloud logout
Clear authentication session.
```bash
bm cloud logout
```
**What happens:**
- Removes `~/.basic-memory/cloud-auth.json`
- Clears cached credentials
- Requires re-authentication for cloud commands
### bm cloud status
View authentication and sync status.
```bash
bm cloud status
```
**Shows:**
- Authentication status (logged in/out)
- Subscription status (active/expired)
- Last sync time
- Cloud project count
- Tenant information
## Token Management
### Automatic Token Refresh
The CLI automatically handles token refresh:
```python
# Internal - happens automatically
async def get_authenticated_headers():
# Checks token expiration
# Refreshes if needed
# Returns valid Bearer token
return {"Authorization": f"Bearer {token}"}
```
### Token Storage
Location: `~/.basic-memory/cloud-auth.json`
```json
{
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGc...",
"refresh_token": "eyJ0eXAiOiJKV1QiLCJhbGc...",
"expires_at": 1234567890,
"tenant_id": "org_abc123"
}
```
**Security:**
- File permissions: 600 (user read/write only)
- Tokens expire after 1 hour
- Refresh tokens valid for 30 days
- Never commit this file to git
### Manual Token Revocation
To revoke access:
1. `bm cloud logout` (clears local token)
2. Visit account settings to revoke all sessions
## Subscription Management
### Check Subscription Status
```bash
# View current subscription
bm cloud status
# Shows:
# - Subscription tier
# - Expiration date
# - Features enabled
```
### Subscribe
If you don't have a subscription:
```bash
# Displays subscribe URL
bm cloud login
# > Active subscription required
# > Subscribe at: https://basicmemory.com/subscribe
```
### Subscription Tiers
| Feature | Free | Pro | Team |
|---------|------|-----|------|
| Cloud Authentication | ✓ | ✓ | ✓ |
| Cloud Sync | - | ✓ | ✓ |
| Cloud Storage | - | 10GB | 100GB |
| Multi-device | - | ✓ | ✓ |
| API Access | - | ✓ | ✓ |
## Using Authenticated APIs
### In CLI Commands
Authentication is automatic for all cloud commands:
```bash
# These all use stored JWT automatically
bm cloud sync
bm cloud mount
bm cloud check
bm cloud bisync
```
### In Custom Scripts
```python
from basic_memory.cli.auth import CLIAuth
# Get authenticated headers
client_id, domain, _ = get_cloud_config()
auth = CLIAuth(client_id=client_id, authkit_domain=domain)
token = await auth.get_valid_token()
headers = {"Authorization": f"Bearer {token}"}
# Use with httpx or requests
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(
"https://api.basicmemory.cloud/tenant/projects",
headers=headers
)
```
### Error Handling
```python
from basic_memory.cli.commands.cloud.api_client import (
CloudAPIError,
SubscriptionRequiredError
)
try:
response = await make_api_request("GET", url)
except SubscriptionRequiredError as e:
print(f"Subscription required: {e.message}")
print(f"Subscribe at: {e.subscribe_url}")
except CloudAPIError as e:
print(f"API error: {e.status_code} - {e.detail}")
```
## OAuth Configuration
### Default Settings
```python
# From config.py
cloud_client_id = "client_01K6KWQPW6J1M8VV7R3TZP5A6M"
cloud_domain = "https://eloquent-lotus-05.authkit.app"
cloud_host = "https://api.basicmemory.cloud"
```
### Custom Configuration
Override via environment variables:
```bash
export BASIC_MEMORY_CLOUD_CLIENT_ID="your_client_id"
export BASIC_MEMORY_CLOUD_DOMAIN="https://your-authkit.app"
export BASIC_MEMORY_CLOUD_HOST="https://your-api.example.com"
bm cloud login
```
Or in `~/.basic-memory/config.json`:
```json
{
"cloud_client_id": "your_client_id",
"cloud_domain": "https://your-authkit.app",
"cloud_host": "https://your-api.example.com"
}
```
## Troubleshooting
### "Not authenticated" Error
```bash
$ bm cloud sync
[red]Not authenticated. Please run 'bm cloud login' first.[/red]
```
**Solution**: Run `bm cloud login`
### Token Expired
```bash
$ bm cloud status
Token expired, refreshing...
✓ Authenticated
```
**Automatic**: Token refresh happens automatically
### Subscription Expired
```bash
$ bm cloud sync
Active subscription required
Subscribe at: https://basicmemory.com/subscribe
```
**Solution**: Renew subscription at provided URL
### Browser Not Opening
```bash
$ bm cloud login
# If browser doesn't open automatically:
# Visit this URL: https://eloquent-lotus-05.authkit.app/...
```
**Manual**: Copy/paste URL into browser
### Network Issues
```bash
$ bm cloud login
Connection error, retrying in 2s...
Connection error, retrying in 4s...
```
**Automatic**: Exponential backoff with retries
## Security Best Practices
1. **Never share tokens**: Keep `cloud-auth.json` private
2. **Use logout**: Always logout on shared machines
3. **Monitor sessions**: Check `bm cloud status` regularly
4. **Revoke access**: Use account settings to revoke compromised tokens
5. **Use HTTPS only**: Cloud commands enforce HTTPS
## Related Commands
- `bm cloud sync` - Bidirectional cloud sync (see `cloud-bisync.md`)
- `bm cloud mount` - Mount cloud storage (see `cloud-mount.md`)
- `bm cloud check` - Verify cloud integrity
- `bm cloud status` - View authentication and sync status
## Technical Details
### JWT Claims
```json
{
"sub": "user_abc123",
"org_id": "org_xyz789",
"tenant_id": "org_xyz789",
"subscription_status": "active",
"subscription_tier": "pro",
"exp": 1234567890,
"iat": 1234564290
}
```
### API Integration
The cloud API validates JWT on every request:
```python
# Middleware validates JWT and extracts tenant context
@app.middleware("http")
async def tenant_middleware(request: Request, call_next):
token = request.headers.get("Authorization")
claims = verify_jwt(token)
request.state.tenant_id = claims["tenant_id"]
request.state.subscription = claims["subscription_status"]
# ...
```
## See Also
- SPEC-13: CLI Authentication with Subscription Validation
- `cloud-bisync.md` - Using authenticated sync
- `cloud-mode-usage.md` - Working with cloud APIs
```
--------------------------------------------------------------------------------
/test-int/conftest.py:
--------------------------------------------------------------------------------
```python
"""
Shared fixtures for integration tests.
Integration tests verify the complete flow: MCP Client → MCP Server → FastAPI → Database.
Unlike unit tests which use in-memory databases and mocks, integration tests use real SQLite
files and test the full application stack to ensure all components work together correctly.
## Architecture
The integration test setup creates this flow:
```
Test → MCP Client → MCP Server → HTTP Request (ASGITransport) → FastAPI App → Database
↑
Dependency overrides
point to test database
```
## Key Components
1. **Real SQLite Database**: Uses `DatabaseType.FILESYSTEM` with actual SQLite files
in temporary directories instead of in-memory databases.
2. **Shared Database Connection**: Both MCP server and FastAPI app use the same
database via dependency injection overrides.
3. **Project Session Management**: Initializes the MCP project session with test
project configuration so tools know which project to operate on.
4. **Search Index Initialization**: Creates the FTS5 search index tables that
the application requires for search functionality.
5. **Global Configuration Override**: Modifies the global `basic_memory_app_config`
so MCP tools use test project settings instead of user configuration.
## Usage
Integration tests should include both `mcp_server` and `app` fixtures to ensure
the complete stack is wired correctly:
```python
@pytest.mark.asyncio
async def test_my_mcp_tool(mcp_server, app):
async with Client(mcp_server) as client:
result = await client.call_tool("tool_name", {"param": "value"})
# Assert on results...
```
The `app` fixture ensures FastAPI dependency overrides are active, and
`mcp_server` provides the MCP server with proper project session initialization.
"""
from typing import AsyncGenerator
import pytest
import pytest_asyncio
from pathlib import Path
from httpx import AsyncClient, ASGITransport
from basic_memory.config import BasicMemoryConfig, ProjectConfig, ConfigManager
from basic_memory.db import engine_session_factory, DatabaseType
from basic_memory.models import Project
from basic_memory.repository.project_repository import ProjectRepository
from fastapi import FastAPI
from basic_memory.deps import get_project_config, get_engine_factory, get_app_config
# Import MCP tools so they're available for testing
from basic_memory.mcp import tools # noqa: F401
@pytest_asyncio.fixture(scope="function")
async def engine_factory(tmp_path):
"""Create a SQLite file engine factory for integration testing."""
db_path = tmp_path / "test.db"
async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (
engine,
session_maker,
):
# Initialize database schema
from basic_memory.models.base import Base
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield engine, session_maker
@pytest_asyncio.fixture(scope="function")
async def test_project(config_home, engine_factory) -> Project:
"""Create a test project."""
project_data = {
"name": "test-project",
"description": "Project used for integration tests",
"path": str(config_home),
"is_active": True,
"is_default": True,
}
engine, session_maker = engine_factory
project_repository = ProjectRepository(session_maker)
project = await project_repository.create(project_data)
return project
@pytest.fixture
def config_home(tmp_path, monkeypatch) -> Path:
monkeypatch.setenv("HOME", str(tmp_path))
# Set BASIC_MEMORY_HOME to the test directory
monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
return tmp_path
@pytest.fixture(scope="function", autouse=True)
def app_config(config_home, tmp_path, monkeypatch) -> BasicMemoryConfig:
"""Create test app configuration."""
# Disable cloud mode for CLI tests
monkeypatch.setenv("BASIC_MEMORY_CLOUD_MODE", "false")
# Create a basic config with test-project like unit tests do
projects = {"test-project": str(config_home)}
app_config = BasicMemoryConfig(
env="test",
projects=projects,
default_project="test-project",
default_project_mode=False, # Match real-world usage - tools must pass explicit project
update_permalinks_on_move=True,
cloud_mode=False, # Explicitly disable cloud mode
)
return app_config
@pytest.fixture(scope="function", autouse=True)
def config_manager(app_config: BasicMemoryConfig, config_home) -> ConfigManager:
config_manager = ConfigManager()
# Update its paths to use the test directory
config_manager.config_dir = config_home / ".basic-memory"
config_manager.config_file = config_manager.config_dir / "config.json"
config_manager.config_dir.mkdir(parents=True, exist_ok=True)
# Ensure the config file is written to disk
config_manager.save_config(app_config)
return config_manager
@pytest.fixture(scope="function", autouse=True)
def project_config(test_project):
"""Create test project configuration."""
project_config = ProjectConfig(
name=test_project.name,
home=Path(test_project.path),
)
return project_config
@pytest.fixture(scope="function")
def app(app_config, project_config, engine_factory, test_project, config_manager) -> FastAPI:
"""Create test FastAPI application with single project."""
# Import the FastAPI app AFTER the config_manager has written the test config to disk
# This ensures that when the app's lifespan manager runs, it reads the correct test config
from basic_memory.api.app import app as fastapi_app
app = fastapi_app
app.dependency_overrides[get_project_config] = lambda: project_config
app.dependency_overrides[get_engine_factory] = lambda: engine_factory
app.dependency_overrides[get_app_config] = lambda: app_config
return app
@pytest_asyncio.fixture(scope="function")
async def search_service(engine_factory, test_project):
"""Create and initialize search service for integration tests."""
from basic_memory.repository.search_repository import SearchRepository
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.services.file_service import FileService
from basic_memory.services.search_service import SearchService
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.markdown import EntityParser
engine, session_maker = engine_factory
# Create repositories
search_repository = SearchRepository(session_maker, project_id=test_project.id)
entity_repository = EntityRepository(session_maker, project_id=test_project.id)
# Create file service
entity_parser = EntityParser(Path(test_project.path))
markdown_processor = MarkdownProcessor(entity_parser)
file_service = FileService(Path(test_project.path), markdown_processor)
# Create and initialize search service
service = SearchService(search_repository, entity_repository, file_service)
await service.init_search_index()
return service
@pytest.fixture(scope="function")
def mcp_server(config_manager, search_service):
# Import mcp instance
from basic_memory.mcp.server import mcp as server
# Import mcp tools to register them
import basic_memory.mcp.tools # noqa: F401
# Import prompts to register them
import basic_memory.mcp.prompts # noqa: F401
return server
@pytest_asyncio.fixture(scope="function")
async def client(app: FastAPI) -> AsyncGenerator[AsyncClient, None]:
"""Create test client that both MCP and tests will use."""
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
yield client
```
--------------------------------------------------------------------------------
/tests/test_deps.py:
--------------------------------------------------------------------------------
```python
"""Tests for dependency injection functions in deps.py."""
from datetime import datetime, timezone
from pathlib import Path
import pytest
import pytest_asyncio
from fastapi import HTTPException
from basic_memory.deps import get_project_config, get_project_id
from basic_memory.models.project import Project
from basic_memory.repository.project_repository import ProjectRepository
@pytest_asyncio.fixture
async def project_with_spaces(project_repository: ProjectRepository) -> Project:
"""Create a project with spaces in the name for testing permalink normalization."""
project_data = {
"name": "My Test Project",
"description": "A project with spaces in the name",
"path": "/my/test/project",
"is_active": True,
"is_default": False,
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
return await project_repository.create(project_data)
@pytest_asyncio.fixture
async def project_with_special_chars(project_repository: ProjectRepository) -> Project:
"""Create a project with special characters for testing permalink normalization."""
project_data = {
"name": "Project: Test & Development!",
"description": "A project with special characters",
"path": "/project/test/dev",
"is_active": True,
"is_default": False,
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
return await project_repository.create(project_data)
@pytest.mark.asyncio
async def test_get_project_config_with_spaces(
project_repository: ProjectRepository, project_with_spaces: Project
):
"""Test that get_project_config normalizes project names with spaces."""
# The project name has spaces: "My Test Project"
# The permalink should be: "my-test-project"
assert project_with_spaces.name == "My Test Project"
assert project_with_spaces.permalink == "my-test-project"
# Call get_project_config with the project name (not permalink)
# This simulates what happens when the project name comes from URL path
config = await get_project_config(
project="My Test Project", project_repository=project_repository
)
# Verify we got the correct project config
assert config.name == "My Test Project"
assert config.home == Path("/my/test/project")
@pytest.mark.asyncio
async def test_get_project_config_with_permalink(
project_repository: ProjectRepository, project_with_spaces: Project
):
"""Test that get_project_config works when already given a permalink."""
# Call with the permalink directly
config = await get_project_config(
project="my-test-project", project_repository=project_repository
)
# Verify we got the correct project config
assert config.name == "My Test Project"
assert config.home == Path("/my/test/project")
@pytest.mark.asyncio
async def test_get_project_config_with_special_chars(
project_repository: ProjectRepository, project_with_special_chars: Project
):
"""Test that get_project_config normalizes project names with special characters."""
# The project name has special chars: "Project: Test & Development!"
# The permalink should be: "project-test-development"
assert project_with_special_chars.name == "Project: Test & Development!"
assert project_with_special_chars.permalink == "project-test-development"
# Call get_project_config with the project name
config = await get_project_config(
project="Project: Test & Development!", project_repository=project_repository
)
# Verify we got the correct project config
assert config.name == "Project: Test & Development!"
assert config.home == Path("/project/test/dev")
@pytest.mark.asyncio
async def test_get_project_config_not_found(project_repository: ProjectRepository):
"""Test that get_project_config raises HTTPException when project not found."""
with pytest.raises(HTTPException) as exc_info:
await get_project_config(
project="Nonexistent Project", project_repository=project_repository
)
assert exc_info.value.status_code == 404
assert "Project 'Nonexistent Project' not found" in exc_info.value.detail
@pytest.mark.asyncio
async def test_get_project_id_with_spaces(
project_repository: ProjectRepository, project_with_spaces: Project
):
"""Test that get_project_id normalizes project names with spaces."""
# Call get_project_id with the project name (not permalink)
project_id = await get_project_id(
project_repository=project_repository, project="My Test Project"
)
# Verify we got the correct project ID
assert project_id == project_with_spaces.id
@pytest.mark.asyncio
async def test_get_project_id_with_permalink(
project_repository: ProjectRepository, project_with_spaces: Project
):
"""Test that get_project_id works when already given a permalink."""
# Call with the permalink directly
project_id = await get_project_id(
project_repository=project_repository, project="my-test-project"
)
# Verify we got the correct project ID
assert project_id == project_with_spaces.id
@pytest.mark.asyncio
async def test_get_project_id_with_special_chars(
project_repository: ProjectRepository, project_with_special_chars: Project
):
"""Test that get_project_id normalizes project names with special characters."""
# Call get_project_id with the project name
project_id = await get_project_id(
project_repository=project_repository, project="Project: Test & Development!"
)
# Verify we got the correct project ID
assert project_id == project_with_special_chars.id
@pytest.mark.asyncio
async def test_get_project_id_not_found(project_repository: ProjectRepository):
"""Test that get_project_id raises HTTPException when project not found."""
with pytest.raises(HTTPException) as exc_info:
await get_project_id(project_repository=project_repository, project="Nonexistent Project")
assert exc_info.value.status_code == 404
assert "Project 'Nonexistent Project' not found" in exc_info.value.detail
@pytest.mark.asyncio
async def test_get_project_id_fallback_to_name(
project_repository: ProjectRepository, test_project: Project
):
"""Test that get_project_id falls back to name lookup if permalink lookup fails.
This test verifies the fallback behavior in get_project_id where it tries
get_by_name if get_by_permalink returns None.
"""
# The test_project fixture has name "test-project" and permalink "test-project"
# Since both are the same, we can't easily test the fallback with existing fixtures
# So this test just verifies the normal path works with test_project
project_id = await get_project_id(project_repository=project_repository, project="test-project")
assert project_id == test_project.id
@pytest.mark.asyncio
async def test_get_project_config_case_sensitivity(
project_repository: ProjectRepository, project_with_spaces: Project
):
"""Test that get_project_config handles case variations correctly.
Permalink normalization should convert to lowercase, so different case
variations of the same name should resolve to the same project.
"""
# Create project with mixed case: "My Test Project" -> permalink "my-test-project"
# Try with different case variations
config1 = await get_project_config(
project="My Test Project", project_repository=project_repository
)
config2 = await get_project_config(
project="my test project", project_repository=project_repository
)
config3 = await get_project_config(
project="MY TEST PROJECT", project_repository=project_repository
)
# All should resolve to the same project
assert config1.name == config2.name == config3.name == "My Test Project"
assert config1.home == config2.home == config3.home == Path("/my/test/project")
```
--------------------------------------------------------------------------------
/tests/api/test_template_loader.py:
--------------------------------------------------------------------------------
```python
"""Tests for the template loader functionality."""
import datetime
import pytest
from pathlib import Path
from basic_memory.api.template_loader import TemplateLoader
@pytest.fixture
def temp_template_dir(tmpdir):
"""Create a temporary directory for test templates."""
template_dir = tmpdir.mkdir("templates").mkdir("prompts")
return template_dir
@pytest.fixture
def custom_template_loader(temp_template_dir):
"""Return a TemplateLoader instance with a custom template directory."""
return TemplateLoader(str(temp_template_dir))
@pytest.fixture
def simple_template(temp_template_dir):
"""Create a simple test template."""
template_path = temp_template_dir / "simple.hbs"
template_path.write_text("Hello, {{name}}!", encoding="utf-8")
return "simple.hbs"
@pytest.mark.asyncio
async def test_render_simple_template(custom_template_loader, simple_template):
"""Test rendering a simple template."""
context = {"name": "World"}
result = await custom_template_loader.render(simple_template, context)
assert result == "Hello, World!"
@pytest.mark.asyncio
async def test_template_cache(custom_template_loader, simple_template):
"""Test that templates are cached."""
context = {"name": "World"}
# First render, should load template
await custom_template_loader.render(simple_template, context)
# Check that template is in cache
assert simple_template in custom_template_loader.template_cache
# Modify the template file - shouldn't affect the cached version
template_path = Path(custom_template_loader.template_dir) / simple_template
template_path.write_text("Goodbye, {{name}}!", encoding="utf-8")
# Second render, should use cached template
result = await custom_template_loader.render(simple_template, context)
assert result == "Hello, World!"
# Clear cache and render again - should use updated template
custom_template_loader.clear_cache()
assert simple_template not in custom_template_loader.template_cache
result = await custom_template_loader.render(simple_template, context)
assert result == "Goodbye, World!"
@pytest.mark.asyncio
async def test_date_helper(custom_template_loader, temp_template_dir):
# Test date helper
date_path = temp_template_dir / "date.hbs"
date_path.write_text("{{date timestamp}}", encoding="utf-8")
date_result = await custom_template_loader.render(
"date.hbs", {"timestamp": datetime.datetime(2023, 1, 1, 12, 30)}
)
assert "2023-01-01" in date_result
@pytest.mark.asyncio
async def test_default_helper(custom_template_loader, temp_template_dir):
# Test default helper
default_path = temp_template_dir / "default.hbs"
default_path.write_text("{{default null 'default-value'}}", encoding="utf-8")
default_result = await custom_template_loader.render("default.hbs", {"null": None})
assert default_result == "default-value"
@pytest.mark.asyncio
async def test_capitalize_helper(custom_template_loader, temp_template_dir):
# Test capitalize helper
capitalize_path = temp_template_dir / "capitalize.hbs"
capitalize_path.write_text("{{capitalize 'test'}}", encoding="utf-8")
capitalize_result = await custom_template_loader.render("capitalize.hbs", {})
assert capitalize_result == "Test"
@pytest.mark.asyncio
async def test_size_helper(custom_template_loader, temp_template_dir):
# Test size helper
size_path = temp_template_dir / "size.hbs"
size_path.write_text("{{size collection}}", encoding="utf-8")
size_result = await custom_template_loader.render("size.hbs", {"collection": [1, 2, 3]})
assert size_result == "3"
@pytest.mark.asyncio
async def test_json_helper(custom_template_loader, temp_template_dir):
# Test json helper
json_path = temp_template_dir / "json.hbs"
json_path.write_text("{{json data}}", encoding="utf-8")
json_result = await custom_template_loader.render("json.hbs", {"data": {"key": "value"}})
assert json_result == '{"key": "value"}'
@pytest.mark.asyncio
async def test_less_than_helper(custom_template_loader, temp_template_dir):
# Test lt (less than) helper
lt_path = temp_template_dir / "lt.hbs"
lt_path.write_text("{{#if_cond (lt 2 3)}}true{{else}}false{{/if_cond}}", encoding="utf-8")
lt_result = await custom_template_loader.render("lt.hbs", {})
assert lt_result == "true"
@pytest.mark.asyncio
async def test_file_not_found(custom_template_loader):
"""Test that FileNotFoundError is raised when a template doesn't exist."""
with pytest.raises(FileNotFoundError):
await custom_template_loader.render("non_existent_template.hbs", {})
@pytest.mark.asyncio
async def test_extension_handling(custom_template_loader, temp_template_dir):
"""Test that template extensions are handled correctly."""
# Create template with .hbs extension
template_path = temp_template_dir / "test_extension.hbs"
template_path.write_text("Template with extension: {{value}}", encoding="utf-8")
# Test accessing with full extension
result = await custom_template_loader.render("test_extension.hbs", {"value": "works"})
assert result == "Template with extension: works"
# Test accessing without extension
result = await custom_template_loader.render("test_extension", {"value": "also works"})
assert result == "Template with extension: also works"
# Test accessing with wrong extension gets converted
template_path = temp_template_dir / "liquid_template.hbs"
template_path.write_text("Liquid template: {{value}}", encoding="utf-8")
result = await custom_template_loader.render("liquid_template.liquid", {"value": "converted"})
assert result == "Liquid template: converted"
@pytest.mark.asyncio
async def test_dedent_helper(custom_template_loader, temp_template_dir):
"""Test the dedent helper for text blocks."""
dedent_path = temp_template_dir / "dedent.hbs"
# Create a template with indented text blocks
template_content = """Before
{{#dedent}}
This is indented text
with nested indentation
that should be dedented
while preserving relative indentation
{{/dedent}}
After"""
dedent_path.write_text(template_content, encoding="utf-8")
# Render the template
result = await custom_template_loader.render("dedent.hbs", {})
# Print the actual output for debugging
print(f"Dedent helper result: {repr(result)}")
# Check that the indentation is properly removed
assert "This is indented text" in result
assert "with nested indentation" in result
assert "that should be dedented" in result
assert "while preserving relative indentation" in result
assert "Before" in result
assert "After" in result
# Check that relative indentation is preserved
assert result.find("with nested indentation") > result.find("This is indented text")
@pytest.mark.asyncio
async def test_nested_dedent_helper(custom_template_loader, temp_template_dir):
"""Test the dedent helper with nested content."""
dedent_path = temp_template_dir / "nested_dedent.hbs"
# Create a template with nested indented blocks
template_content = """
{{#each items}}
{{#dedent}}
--- Item {{this}}
Details for item {{this}}
- Indented detail 1
- Indented detail 2
{{/dedent}}
{{/each}}"""
dedent_path.write_text(template_content, encoding="utf-8")
# Render the template
result = await custom_template_loader.render("nested_dedent.hbs", {"items": [1, 2]})
# Print the actual output for debugging
print(f"Actual result: {repr(result)}")
# Use a more flexible assertion that checks individual components
# instead of exact string matching
assert "--- Item 1" in result
assert "Details for item 1" in result
assert "- Indented detail 1" in result
assert "--- Item 2" in result
assert "Details for item 2" in result
assert "- Indented detail 2" in result
```
--------------------------------------------------------------------------------
/src/basic_memory/models/knowledge.py:
--------------------------------------------------------------------------------
```python
"""Knowledge graph models."""
from datetime import datetime
from basic_memory.utils import ensure_timezone_aware
from typing import Optional
from sqlalchemy import (
Integer,
String,
Text,
ForeignKey,
UniqueConstraint,
DateTime,
Index,
JSON,
Float,
text,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from basic_memory.models.base import Base
from basic_memory.utils import generate_permalink
class Entity(Base):
"""Core entity in the knowledge graph.
Entities represent semantic nodes maintained by the AI layer. Each entity:
- Has a unique numeric ID (database-generated)
- Maps to a file on disk
- Maintains a checksum for change detection
- Tracks both source file and semantic properties
- Belongs to a specific project
"""
__tablename__ = "entity"
__table_args__ = (
# Regular indexes
Index("ix_entity_type", "entity_type"),
Index("ix_entity_title", "title"),
Index("ix_entity_created_at", "created_at"), # For timeline queries
Index("ix_entity_updated_at", "updated_at"), # For timeline queries
Index("ix_entity_project_id", "project_id"), # For project filtering
# Project-specific uniqueness constraints
Index(
"uix_entity_permalink_project",
"permalink",
"project_id",
unique=True,
sqlite_where=text("content_type = 'text/markdown' AND permalink IS NOT NULL"),
),
Index(
"uix_entity_file_path_project",
"file_path",
"project_id",
unique=True,
),
)
# Core identity
id: Mapped[int] = mapped_column(Integer, primary_key=True)
title: Mapped[str] = mapped_column(String)
entity_type: Mapped[str] = mapped_column(String)
entity_metadata: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True)
content_type: Mapped[str] = mapped_column(String)
# Project reference
project_id: Mapped[int] = mapped_column(Integer, ForeignKey("project.id"), nullable=False)
# Normalized path for URIs - required for markdown files only
permalink: Mapped[Optional[str]] = mapped_column(String, nullable=True, index=True)
# Actual filesystem relative path
file_path: Mapped[str] = mapped_column(String, index=True)
# checksum of file
checksum: Mapped[Optional[str]] = mapped_column(String, nullable=True)
# File metadata for sync
# mtime: file modification timestamp (Unix epoch float) for change detection
mtime: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
# size: file size in bytes for quick change detection
size: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
# Metadata and tracking
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now().astimezone()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now().astimezone(),
onupdate=lambda: datetime.now().astimezone(),
)
# Relationships
project = relationship("Project", back_populates="entities")
observations = relationship(
"Observation", back_populates="entity", cascade="all, delete-orphan"
)
outgoing_relations = relationship(
"Relation",
back_populates="from_entity",
foreign_keys="[Relation.from_id]",
cascade="all, delete-orphan",
)
incoming_relations = relationship(
"Relation",
back_populates="to_entity",
foreign_keys="[Relation.to_id]",
cascade="all, delete-orphan",
)
@property
def relations(self):
"""Get all relations (incoming and outgoing) for this entity."""
return self.incoming_relations + self.outgoing_relations
@property
def is_markdown(self):
"""Check if the entity is a markdown file."""
return self.content_type == "text/markdown"
def __getattribute__(self, name):
"""Override attribute access to ensure datetime fields are timezone-aware."""
value = super().__getattribute__(name)
# Ensure datetime fields are timezone-aware
if name in ("created_at", "updated_at") and isinstance(value, datetime):
return ensure_timezone_aware(value)
return value
def __repr__(self) -> str:
return f"Entity(id={self.id}, name='{self.title}', type='{self.entity_type}'"
class Observation(Base):
"""An observation about an entity.
Observations are atomic facts or notes about an entity.
"""
__tablename__ = "observation"
__table_args__ = (
Index("ix_observation_entity_id", "entity_id"), # Add FK index
Index("ix_observation_category", "category"), # Add category index
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
entity_id: Mapped[int] = mapped_column(Integer, ForeignKey("entity.id", ondelete="CASCADE"))
content: Mapped[str] = mapped_column(Text)
category: Mapped[str] = mapped_column(String, nullable=False, default="note")
context: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
tags: Mapped[Optional[list[str]]] = mapped_column(
JSON, nullable=True, default=list, server_default="[]"
)
# Relationships
entity = relationship("Entity", back_populates="observations")
@property
def permalink(self) -> str:
"""Create synthetic permalink for the observation.
We can construct these because observations are always defined in
and owned by a single entity.
"""
return generate_permalink(
f"{self.entity.permalink}/observations/{self.category}/{self.content}"
)
def __repr__(self) -> str: # pragma: no cover
return f"Observation(id={self.id}, entity_id={self.entity_id}, content='{self.content}')"
class Relation(Base):
"""A directed relation between two entities."""
__tablename__ = "relation"
__table_args__ = (
UniqueConstraint("from_id", "to_id", "relation_type", name="uix_relation_from_id_to_id"),
UniqueConstraint(
"from_id", "to_name", "relation_type", name="uix_relation_from_id_to_name"
),
Index("ix_relation_type", "relation_type"),
Index("ix_relation_from_id", "from_id"), # Add FK indexes
Index("ix_relation_to_id", "to_id"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True)
from_id: Mapped[int] = mapped_column(Integer, ForeignKey("entity.id", ondelete="CASCADE"))
to_id: Mapped[Optional[int]] = mapped_column(
Integer, ForeignKey("entity.id", ondelete="CASCADE"), nullable=True
)
to_name: Mapped[str] = mapped_column(String)
relation_type: Mapped[str] = mapped_column(String)
context: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
# Relationships
from_entity = relationship(
"Entity", foreign_keys=[from_id], back_populates="outgoing_relations"
)
to_entity = relationship("Entity", foreign_keys=[to_id], back_populates="incoming_relations")
@property
def permalink(self) -> str:
"""Create relation permalink showing the semantic connection.
Format: source/relation_type/target
Example: "specs/search/implements/features/search-ui"
"""
# Only create permalinks when both source and target have permalinks
from_permalink = self.from_entity.permalink or self.from_entity.file_path
if self.to_entity:
to_permalink = self.to_entity.permalink or self.to_entity.file_path
return generate_permalink(f"{from_permalink}/{self.relation_type}/{to_permalink}")
return generate_permalink(f"{from_permalink}/{self.relation_type}/{self.to_name}")
def __repr__(self) -> str:
return f"Relation(id={self.id}, from_id={self.from_id}, to_id={self.to_id}, to_name={self.to_name}, type='{self.relation_type}')" # pragma: no cover
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/project_management.py:
--------------------------------------------------------------------------------
```python
"""Project management tools for Basic Memory MCP server.
These tools allow users to switch between projects, list available projects,
and manage project context during conversations.
"""
import os
from fastmcp import Context
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_get, call_post, call_delete
from basic_memory.schemas.project_info import (
ProjectList,
ProjectStatusResponse,
ProjectInfoRequest,
)
from basic_memory.utils import generate_permalink
@mcp.tool("list_memory_projects")
async def list_memory_projects(context: Context | None = None) -> str:
"""List all available projects with their status.
Shows all Basic Memory projects that are available for MCP operations.
Use this tool to discover projects when you need to know which project to use.
Use this tool:
- At conversation start when project is unknown
- When user asks about available projects
- Before any operation requiring a project
After calling:
- Ask user which project to use
- Remember their choice for the session
Returns:
Formatted list of projects with session management guidance
Example:
list_memory_projects()
"""
async with get_client() as client:
if context: # pragma: no cover
await context.info("Listing all available projects")
# Check if server is constrained to a specific project
constrained_project = os.environ.get("BASIC_MEMORY_MCP_PROJECT")
# Get projects from API
response = await call_get(client, "/projects/projects")
project_list = ProjectList.model_validate(response.json())
if constrained_project:
result = f"Project: {constrained_project}\n\n"
result += "Note: This MCP server is constrained to a single project.\n"
result += "All operations will automatically use this project."
else:
# Show all projects with session guidance
result = "Available projects:\n"
for project in project_list.projects:
result += f"• {project.name}\n"
result += "\n" + "─" * 40 + "\n"
result += "Next: Ask which project to use for this session.\n"
result += "Example: 'Which project should I use for this task?'\n\n"
result += "Session reminder: Track the selected project for all subsequent operations in this conversation.\n"
result += "The user can say 'switch to [project]' to change projects."
return result
@mcp.tool("create_memory_project")
async def create_memory_project(
project_name: str, project_path: str, set_default: bool = False, context: Context | None = None
) -> str:
"""Create a new Basic Memory project.
Creates a new project with the specified name and path. The project directory
will be created if it doesn't exist. Optionally sets the new project as default.
Args:
project_name: Name for the new project (must be unique)
project_path: File system path where the project will be stored
set_default: Whether to set this project as the default (optional, defaults to False)
Returns:
Confirmation message with project details
Example:
create_memory_project("my-research", "~/Documents/research")
create_memory_project("work-notes", "/home/user/work", set_default=True)
"""
async with get_client() as client:
# Check if server is constrained to a specific project
constrained_project = os.environ.get("BASIC_MEMORY_MCP_PROJECT")
if constrained_project:
return f'# Error\n\nProject creation disabled - MCP server is constrained to project \'{constrained_project}\'.\nUse the CLI to create projects: `basic-memory project add "{project_name}" "{project_path}"`'
if context: # pragma: no cover
await context.info(f"Creating project: {project_name} at {project_path}")
# Create the project request
project_request = ProjectInfoRequest(
name=project_name, path=project_path, set_default=set_default
)
# Call API to create project
response = await call_post(client, "/projects/projects", json=project_request.model_dump())
status_response = ProjectStatusResponse.model_validate(response.json())
result = f"✓ {status_response.message}\n\n"
if status_response.new_project:
result += "Project Details:\n"
result += f"• Name: {status_response.new_project.name}\n"
result += f"• Path: {status_response.new_project.path}\n"
if set_default:
result += "• Set as default project\n"
result += "\nProject is now available for use in tool calls.\n"
result += f"Use '{project_name}' as the project parameter in MCP tool calls.\n"
return result
@mcp.tool()
async def delete_project(project_name: str, context: Context | None = None) -> str:
"""Delete a Basic Memory project.
Removes a project from the configuration and database. This does NOT delete
the actual files on disk - only removes the project from Basic Memory's
configuration and database records.
Args:
project_name: Name of the project to delete
Returns:
Confirmation message about project deletion
Example:
delete_project("old-project")
Warning:
This action cannot be undone. The project will need to be re-added
to access its content through Basic Memory again.
"""
async with get_client() as client:
# Check if server is constrained to a specific project
constrained_project = os.environ.get("BASIC_MEMORY_MCP_PROJECT")
if constrained_project:
return f"# Error\n\nProject deletion disabled - MCP server is constrained to project '{constrained_project}'.\nUse the CLI to delete projects: `basic-memory project remove \"{project_name}\"`"
if context: # pragma: no cover
await context.info(f"Deleting project: {project_name}")
# Get project info before deletion to validate it exists
response = await call_get(client, "/projects/projects")
project_list = ProjectList.model_validate(response.json())
# Find the project by name (case-insensitive) or permalink - same logic as switch_project
project_permalink = generate_permalink(project_name)
target_project = None
for p in project_list.projects:
# Match by permalink (handles case-insensitive input)
if p.permalink == project_permalink:
target_project = p
break
# Also match by name comparison (case-insensitive)
if p.name.lower() == project_name.lower():
target_project = p
break
if not target_project:
available_projects = [p.name for p in project_list.projects]
raise ValueError(
f"Project '{project_name}' not found. Available projects: {', '.join(available_projects)}"
)
# Call API to delete project using URL encoding for special characters
from urllib.parse import quote
encoded_name = quote(target_project.name, safe="")
response = await call_delete(client, f"/projects/{encoded_name}")
status_response = ProjectStatusResponse.model_validate(response.json())
result = f"✓ {status_response.message}\n\n"
if status_response.old_project:
result += "Removed project details:\n"
result += f"• Name: {status_response.old_project.name}\n"
if hasattr(status_response.old_project, "path"):
result += f"• Path: {status_response.old_project.path}\n"
result += "Files remain on disk but project is no longer tracked by Basic Memory.\n"
result += "Re-add the project to access its content again.\n"
return result
```
--------------------------------------------------------------------------------
/src/basic_memory/api/routers/resource_router.py:
--------------------------------------------------------------------------------
```python
"""Routes for getting entity content."""
import tempfile
from pathlib import Path
from typing import Annotated
from fastapi import APIRouter, HTTPException, BackgroundTasks, Body
from fastapi.responses import FileResponse, JSONResponse
from loguru import logger
from basic_memory.deps import (
ProjectConfigDep,
LinkResolverDep,
SearchServiceDep,
EntityServiceDep,
FileServiceDep,
EntityRepositoryDep,
)
from basic_memory.repository.search_repository import SearchIndexRow
from basic_memory.schemas.memory import normalize_memory_url
from basic_memory.schemas.search import SearchQuery, SearchItemType
from basic_memory.models.knowledge import Entity as EntityModel
from datetime import datetime
router = APIRouter(prefix="/resource", tags=["resources"])
def get_entity_ids(item: SearchIndexRow) -> set[int]:
match item.type:
case SearchItemType.ENTITY:
return {item.id}
case SearchItemType.OBSERVATION:
return {item.entity_id} # pyright: ignore [reportReturnType]
case SearchItemType.RELATION:
from_entity = item.from_id
to_entity = item.to_id # pyright: ignore [reportReturnType]
return {from_entity, to_entity} if to_entity else {from_entity} # pyright: ignore [reportReturnType]
case _: # pragma: no cover
raise ValueError(f"Unexpected type: {item.type}")
@router.get("/{identifier:path}")
async def get_resource_content(
config: ProjectConfigDep,
link_resolver: LinkResolverDep,
search_service: SearchServiceDep,
entity_service: EntityServiceDep,
file_service: FileServiceDep,
background_tasks: BackgroundTasks,
identifier: str,
page: int = 1,
page_size: int = 10,
) -> FileResponse:
"""Get resource content by identifier: name or permalink."""
logger.debug(f"Getting content for: {identifier}")
# Find single entity by permalink
entity = await link_resolver.resolve_link(identifier)
results = [entity] if entity else []
# pagination for multiple results
limit = page_size
offset = (page - 1) * page_size
# search using the identifier as a permalink
if not results:
# if the identifier contains a wildcard, use GLOB search
query = (
SearchQuery(permalink_match=identifier)
if "*" in identifier
else SearchQuery(permalink=identifier)
)
search_results = await search_service.search(query, limit, offset)
if not search_results:
raise HTTPException(status_code=404, detail=f"Resource not found: {identifier}")
# get the deduplicated entities related to the search results
entity_ids = {id for result in search_results for id in get_entity_ids(result)}
results = await entity_service.get_entities_by_id(list(entity_ids))
# return single response
if len(results) == 1:
entity = results[0]
file_path = Path(f"{config.home}/{entity.file_path}")
if not file_path.exists():
raise HTTPException(
status_code=404,
detail=f"File not found: {file_path}",
)
return FileResponse(path=file_path)
# for multiple files, initialize a temporary file for writing the results
with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".md") as tmp_file:
temp_file_path = tmp_file.name
for result in results:
# Read content for each entity
content = await file_service.read_entity_content(result)
memory_url = normalize_memory_url(result.permalink)
modified_date = result.updated_at.isoformat()
checksum = result.checksum[:8] if result.checksum else ""
# Prepare the delimited content
response_content = f"--- {memory_url} {modified_date} {checksum}\n"
response_content += f"\n{content}\n"
response_content += "\n"
# Write content directly to the temporary file in append mode
tmp_file.write(response_content)
# Ensure all content is written to disk
tmp_file.flush()
# Schedule the temporary file to be deleted after the response
background_tasks.add_task(cleanup_temp_file, temp_file_path)
# Return the file response
return FileResponse(path=temp_file_path)
def cleanup_temp_file(file_path: str):
"""Delete the temporary file."""
try:
Path(file_path).unlink() # Deletes the file
logger.debug(f"Temporary file deleted: {file_path}")
except Exception as e: # pragma: no cover
logger.error(f"Error deleting temporary file {file_path}: {e}")
@router.put("/{file_path:path}")
async def write_resource(
config: ProjectConfigDep,
file_service: FileServiceDep,
entity_repository: EntityRepositoryDep,
search_service: SearchServiceDep,
file_path: str,
content: Annotated[str, Body()],
) -> JSONResponse:
"""Write content to a file in the project.
This endpoint allows writing content directly to a file in the project.
Also creates an entity record and indexes the file for search.
Args:
file_path: Path to write to, relative to project root
request: Contains the content to write
Returns:
JSON response with file information
"""
try:
# Get content from request body
# Ensure it's UTF-8 string content
if isinstance(content, bytes): # pragma: no cover
content_str = content.decode("utf-8")
else:
content_str = str(content)
# Get full file path
full_path = Path(f"{config.home}/{file_path}")
# Ensure parent directory exists
full_path.parent.mkdir(parents=True, exist_ok=True)
# Write content to file
checksum = await file_service.write_file(full_path, content_str)
# Get file info
file_stats = file_service.file_stats(full_path)
# Determine file details
file_name = Path(file_path).name
content_type = file_service.content_type(full_path)
entity_type = "canvas" if file_path.endswith(".canvas") else "file"
# Check if entity already exists
existing_entity = await entity_repository.get_by_file_path(file_path)
if existing_entity:
# Update existing entity
entity = await entity_repository.update(
existing_entity.id,
{
"title": file_name,
"entity_type": entity_type,
"content_type": content_type,
"file_path": file_path,
"checksum": checksum,
"updated_at": datetime.fromtimestamp(file_stats.st_mtime).astimezone(),
},
)
status_code = 200
else:
# Create a new entity model
entity = EntityModel(
title=file_name,
entity_type=entity_type,
content_type=content_type,
file_path=file_path,
checksum=checksum,
created_at=datetime.fromtimestamp(file_stats.st_ctime).astimezone(),
updated_at=datetime.fromtimestamp(file_stats.st_mtime).astimezone(),
)
entity = await entity_repository.add(entity)
status_code = 201
# Index the file for search
await search_service.index_entity(entity) # pyright: ignore
# Return success response
return JSONResponse(
status_code=status_code,
content={
"file_path": file_path,
"checksum": checksum,
"size": file_stats.st_size,
"created_at": file_stats.st_ctime,
"modified_at": file_stats.st_mtime,
},
)
except Exception as e: # pragma: no cover
logger.error(f"Error writing resource {file_path}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to write resource: {str(e)}")
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/rclone_config.py:
--------------------------------------------------------------------------------
```python
"""rclone configuration management for Basic Memory Cloud."""
import configparser
import os
import shutil
import subprocess
from pathlib import Path
from typing import Dict, List, Optional
from rich.console import Console
console = Console()
class RcloneConfigError(Exception):
"""Exception raised for rclone configuration errors."""
pass
class RcloneMountProfile:
"""Mount profile with optimized settings."""
def __init__(
self,
name: str,
cache_time: str,
poll_interval: str,
attr_timeout: str,
write_back: str,
description: str,
extra_args: Optional[List[str]] = None,
):
self.name = name
self.cache_time = cache_time
self.poll_interval = poll_interval
self.attr_timeout = attr_timeout
self.write_back = write_back
self.description = description
self.extra_args = extra_args or []
# Mount profiles based on SPEC-7 Phase 4 testing
MOUNT_PROFILES = {
"fast": RcloneMountProfile(
name="fast",
cache_time="5s",
poll_interval="3s",
attr_timeout="3s",
write_back="1s",
description="Ultra-fast development (5s sync, higher bandwidth)",
),
"balanced": RcloneMountProfile(
name="balanced",
cache_time="10s",
poll_interval="5s",
attr_timeout="5s",
write_back="2s",
description="Fast development (10-15s sync, recommended)",
),
"safe": RcloneMountProfile(
name="safe",
cache_time="15s",
poll_interval="10s",
attr_timeout="10s",
write_back="5s",
description="Conflict-aware mount with backup",
extra_args=[
"--conflict-suffix",
".conflict-{DateTimeExt}",
"--backup-dir",
"~/.basic-memory/conflicts",
"--track-renames",
],
),
}
def get_rclone_config_path() -> Path:
"""Get the path to rclone configuration file."""
config_dir = Path.home() / ".config" / "rclone"
config_dir.mkdir(parents=True, exist_ok=True)
return config_dir / "rclone.conf"
def backup_rclone_config() -> Optional[Path]:
"""Create a backup of existing rclone config."""
config_path = get_rclone_config_path()
if not config_path.exists():
return None
backup_path = config_path.with_suffix(f".conf.backup-{os.getpid()}")
shutil.copy2(config_path, backup_path)
console.print(f"[dim]Created backup: {backup_path}[/dim]")
return backup_path
def load_rclone_config() -> configparser.ConfigParser:
"""Load existing rclone configuration."""
config = configparser.ConfigParser()
config_path = get_rclone_config_path()
if config_path.exists():
config.read(config_path)
return config
def save_rclone_config(config: configparser.ConfigParser) -> None:
"""Save rclone configuration to file."""
config_path = get_rclone_config_path()
with open(config_path, "w") as f:
config.write(f)
console.print(f"[dim]Updated rclone config: {config_path}[/dim]")
def add_tenant_to_rclone_config(
tenant_id: str,
bucket_name: str,
access_key: str,
secret_key: str,
endpoint: str = "https://fly.storage.tigris.dev",
region: str = "auto",
) -> str:
"""Add tenant configuration to rclone config file."""
# Backup existing config
backup_rclone_config()
# Load existing config
config = load_rclone_config()
# Create section name
section_name = f"basic-memory-{tenant_id}"
# Add/update the tenant section
if not config.has_section(section_name):
config.add_section(section_name)
config.set(section_name, "type", "s3")
config.set(section_name, "provider", "Other")
config.set(section_name, "access_key_id", access_key)
config.set(section_name, "secret_access_key", secret_key)
config.set(section_name, "endpoint", endpoint)
config.set(section_name, "region", region)
# Save updated config
save_rclone_config(config)
console.print(f"[green]✓ Added tenant {tenant_id} to rclone config[/green]")
return section_name
def remove_tenant_from_rclone_config(tenant_id: str) -> bool:
"""Remove tenant configuration from rclone config."""
config = load_rclone_config()
section_name = f"basic-memory-{tenant_id}"
if config.has_section(section_name):
backup_rclone_config()
config.remove_section(section_name)
save_rclone_config(config)
console.print(f"[green]✓ Removed tenant {tenant_id} from rclone config[/green]")
return True
return False
def get_default_mount_path() -> Path:
"""Get default mount path (fixed location per SPEC-9).
Returns:
Fixed mount path: ~/basic-memory-cloud/
"""
return Path.home() / "basic-memory-cloud"
def build_mount_command(
tenant_id: str, bucket_name: str, mount_path: Path, profile: RcloneMountProfile
) -> List[str]:
"""Build rclone mount command with optimized settings."""
rclone_remote = f"basic-memory-{tenant_id}:{bucket_name}"
cmd = [
"rclone",
"nfsmount",
rclone_remote,
str(mount_path),
"--vfs-cache-mode",
"writes",
"--dir-cache-time",
profile.cache_time,
"--vfs-cache-poll-interval",
profile.poll_interval,
"--attr-timeout",
profile.attr_timeout,
"--vfs-write-back",
profile.write_back,
"--daemon",
]
# Add profile-specific extra arguments
cmd.extend(profile.extra_args)
return cmd
def is_path_mounted(mount_path: Path) -> bool:
"""Check if a path is currently mounted."""
if not mount_path.exists():
return False
try:
# Check if mount point is actually mounted by looking for mount table entry
result = subprocess.run(["mount"], capture_output=True, text=True, check=False)
if result.returncode == 0:
# Look for our mount path in mount output
mount_str = str(mount_path.resolve())
return mount_str in result.stdout
return False
except Exception:
return False
def get_rclone_processes() -> List[Dict[str, str]]:
"""Get list of running rclone processes."""
try:
# Use ps to find rclone processes
result = subprocess.run(
["ps", "-eo", "pid,args"], capture_output=True, text=True, check=False
)
processes = []
if result.returncode == 0:
for line in result.stdout.split("\n"):
if "rclone" in line and "basic-memory" in line:
parts = line.strip().split(None, 1)
if len(parts) >= 2:
processes.append({"pid": parts[0], "command": parts[1]})
return processes
except Exception:
return []
def kill_rclone_process(pid: str) -> bool:
"""Kill a specific rclone process."""
try:
subprocess.run(["kill", pid], check=True)
console.print(f"[green]✓ Killed rclone process {pid}[/green]")
return True
except subprocess.CalledProcessError:
console.print(f"[red]✗ Failed to kill rclone process {pid}[/red]")
return False
def unmount_path(mount_path: Path) -> bool:
"""Unmount a mounted path."""
if not is_path_mounted(mount_path):
return True
try:
subprocess.run(["umount", str(mount_path)], check=True)
console.print(f"[green]✓ Unmounted {mount_path}[/green]")
return True
except subprocess.CalledProcessError as e:
console.print(f"[red]✗ Failed to unmount {mount_path}: {e}[/red]")
return False
def cleanup_orphaned_rclone_processes() -> int:
"""Clean up orphaned rclone processes for basic-memory."""
processes = get_rclone_processes()
killed_count = 0
for proc in processes:
console.print(
f"[yellow]Found rclone process: {proc['pid']} - {proc['command'][:80]}...[/yellow]"
)
if kill_rclone_process(proc["pid"]):
killed_count += 1
return killed_count
```