This is page 20 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── amp-bridge.md
│ │ ├── amp-pr-automator.md
│ │ ├── code-quality-guard.md
│ │ ├── gemini-pr-automator.md
│ │ └── github-release-manager.md
│ ├── settings.local.json.backup
│ └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── performance_issue.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── bridge-tests.yml
│ ├── CACHE_FIX.md
│ ├── claude-code-review.yml
│ ├── claude.yml
│ ├── cleanup-images.yml.disabled
│ ├── dev-setup-validation.yml
│ ├── docker-publish.yml
│ ├── LATEST_FIXES.md
│ ├── main-optimized.yml.disabled
│ ├── main.yml
│ ├── publish-and-test.yml
│ ├── README_OPTIMIZATION.md
│ ├── release-tag.yml.disabled
│ ├── release.yml
│ ├── roadmap-review-reminder.yml
│ ├── SECRET_CONDITIONAL_FIX.md
│ └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│ ├── .gitignore
│ └── reports
│ └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│ ├── deployment
│ │ ├── deploy_fastmcp_fixed.sh
│ │ ├── deploy_http_with_mcp.sh
│ │ └── deploy_mcp_v4.sh
│ ├── deployment-configs
│ │ ├── empty_config.yml
│ │ └── smithery.yaml
│ ├── development
│ │ └── test_fastmcp.py
│ ├── docs-removed-2025-08-23
│ │ ├── authentication.md
│ │ ├── claude_integration.md
│ │ ├── claude-code-compatibility.md
│ │ ├── claude-code-integration.md
│ │ ├── claude-code-quickstart.md
│ │ ├── claude-desktop-setup.md
│ │ ├── complete-setup-guide.md
│ │ ├── database-synchronization.md
│ │ ├── development
│ │ │ ├── autonomous-memory-consolidation.md
│ │ │ ├── CLEANUP_PLAN.md
│ │ │ ├── CLEANUP_README.md
│ │ │ ├── CLEANUP_SUMMARY.md
│ │ │ ├── dream-inspired-memory-consolidation.md
│ │ │ ├── hybrid-slm-memory-consolidation.md
│ │ │ ├── mcp-milestone.md
│ │ │ ├── multi-client-architecture.md
│ │ │ ├── test-results.md
│ │ │ └── TIMESTAMP_FIX_SUMMARY.md
│ │ ├── distributed-sync.md
│ │ ├── invocation_guide.md
│ │ ├── macos-intel.md
│ │ ├── master-guide.md
│ │ ├── mcp-client-configuration.md
│ │ ├── multi-client-server.md
│ │ ├── service-installation.md
│ │ ├── sessions
│ │ │ └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│ │ ├── UBUNTU_SETUP.md
│ │ ├── ubuntu.md
│ │ ├── windows-setup.md
│ │ └── windows.md
│ ├── docs-root-cleanup-2025-08-23
│ │ ├── AWESOME_LIST_SUBMISSION.md
│ │ ├── CLOUDFLARE_IMPLEMENTATION.md
│ │ ├── DOCUMENTATION_ANALYSIS.md
│ │ ├── DOCUMENTATION_CLEANUP_PLAN.md
│ │ ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│ │ ├── LITESTREAM_SETUP_GUIDE.md
│ │ ├── lm_studio_system_prompt.md
│ │ ├── PYTORCH_DOWNLOAD_FIX.md
│ │ └── README-ORIGINAL-BACKUP.md
│ ├── investigations
│ │ └── MACOS_HOOKS_INVESTIGATION.md
│ ├── litestream-configs-v6.3.0
│ │ ├── install_service.sh
│ │ ├── litestream_master_config_fixed.yml
│ │ ├── litestream_master_config.yml
│ │ ├── litestream_replica_config_fixed.yml
│ │ ├── litestream_replica_config.yml
│ │ ├── litestream_replica_simple.yml
│ │ ├── litestream-http.service
│ │ ├── litestream.service
│ │ └── requirements-cloudflare.txt
│ ├── release-notes
│ │ └── release-notes-v7.1.4.md
│ └── setup-development
│ ├── README.md
│ ├── setup_consolidation_mdns.sh
│ ├── STARTUP_SETUP_GUIDE.md
│ └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│ ├── memory-context.md
│ ├── memory-health.md
│ ├── memory-ingest-dir.md
│ ├── memory-ingest.md
│ ├── memory-recall.md
│ ├── memory-search.md
│ ├── memory-store.md
│ ├── README.md
│ └── session-start.md
├── claude-hooks
│ ├── config.json
│ ├── config.template.json
│ ├── CONFIGURATION.md
│ ├── core
│ │ ├── memory-retrieval.js
│ │ ├── mid-conversation.js
│ │ ├── session-end.js
│ │ ├── session-start.js
│ │ └── topic-change.js
│ ├── debug-pattern-test.js
│ ├── install_claude_hooks_windows.ps1
│ ├── install_hooks.py
│ ├── memory-mode-controller.js
│ ├── MIGRATION.md
│ ├── README-NATURAL-TRIGGERS.md
│ ├── README-phase2.md
│ ├── README.md
│ ├── simple-test.js
│ ├── statusline.sh
│ ├── test-adaptive-weights.js
│ ├── test-dual-protocol-hook.js
│ ├── test-mcp-hook.js
│ ├── test-natural-triggers.js
│ ├── test-recency-scoring.js
│ ├── tests
│ │ ├── integration-test.js
│ │ ├── phase2-integration-test.js
│ │ ├── test-code-execution.js
│ │ ├── test-cross-session.json
│ │ ├── test-session-tracking.json
│ │ └── test-threading.json
│ ├── utilities
│ │ ├── adaptive-pattern-detector.js
│ │ ├── context-formatter.js
│ │ ├── context-shift-detector.js
│ │ ├── conversation-analyzer.js
│ │ ├── dynamic-context-updater.js
│ │ ├── git-analyzer.js
│ │ ├── mcp-client.js
│ │ ├── memory-client.js
│ │ ├── memory-scorer.js
│ │ ├── performance-manager.js
│ │ ├── project-detector.js
│ │ ├── session-tracker.js
│ │ ├── tiered-conversation-monitor.js
│ │ └── version-checker.js
│ └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│ ├── amp-cli-bridge.md
│ ├── api
│ │ ├── code-execution-interface.md
│ │ ├── memory-metadata-api.md
│ │ ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_REPORT.md
│ │ └── tag-standardization.md
│ ├── architecture
│ │ ├── search-enhancement-spec.md
│ │ └── search-examples.md
│ ├── architecture.md
│ ├── archive
│ │ └── obsolete-workflows
│ │ ├── load_memory_context.md
│ │ └── README.md
│ ├── assets
│ │ └── images
│ │ ├── dashboard-v3.3.0-preview.png
│ │ ├── memory-awareness-hooks-example.png
│ │ ├── project-infographic.svg
│ │ └── README.md
│ ├── CLAUDE_CODE_QUICK_REFERENCE.md
│ ├── cloudflare-setup.md
│ ├── deployment
│ │ ├── docker.md
│ │ ├── dual-service.md
│ │ ├── production-guide.md
│ │ └── systemd-service.md
│ ├── development
│ │ ├── ai-agent-instructions.md
│ │ ├── code-quality
│ │ │ ├── phase-2a-completion.md
│ │ │ ├── phase-2a-handle-get-prompt.md
│ │ │ ├── phase-2a-index.md
│ │ │ ├── phase-2a-install-package.md
│ │ │ └── phase-2b-session-summary.md
│ │ ├── code-quality-workflow.md
│ │ ├── dashboard-workflow.md
│ │ ├── issue-management.md
│ │ ├── pr-review-guide.md
│ │ ├── refactoring-notes.md
│ │ ├── release-checklist.md
│ │ └── todo-tracker.md
│ ├── docker-optimized-build.md
│ ├── document-ingestion.md
│ ├── DOCUMENTATION_AUDIT.md
│ ├── enhancement-roadmap-issue-14.md
│ ├── examples
│ │ ├── analysis-scripts.js
│ │ ├── maintenance-session-example.md
│ │ ├── memory-distribution-chart.jsx
│ │ └── tag-schema.json
│ ├── first-time-setup.md
│ ├── glama-deployment.md
│ ├── guides
│ │ ├── advanced-command-examples.md
│ │ ├── chromadb-migration.md
│ │ ├── commands-vs-mcp-server.md
│ │ ├── mcp-enhancements.md
│ │ ├── mdns-service-discovery.md
│ │ ├── memory-consolidation-guide.md
│ │ ├── migration.md
│ │ ├── scripts.md
│ │ └── STORAGE_BACKENDS.md
│ ├── HOOK_IMPROVEMENTS.md
│ ├── hooks
│ │ └── phase2-code-execution-migration.md
│ ├── http-server-management.md
│ ├── ide-compatability.md
│ ├── IMAGE_RETENTION_POLICY.md
│ ├── images
│ │ └── dashboard-placeholder.md
│ ├── implementation
│ │ ├── health_checks.md
│ │ └── performance.md
│ ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│ ├── integration
│ │ ├── homebrew.md
│ │ └── multi-client.md
│ ├── integrations
│ │ ├── gemini.md
│ │ ├── groq-bridge.md
│ │ ├── groq-integration-summary.md
│ │ └── groq-model-comparison.md
│ ├── integrations.md
│ ├── legacy
│ │ └── dual-protocol-hooks.md
│ ├── LM_STUDIO_COMPATIBILITY.md
│ ├── maintenance
│ │ └── memory-maintenance.md
│ ├── mastery
│ │ ├── api-reference.md
│ │ ├── architecture-overview.md
│ │ ├── configuration-guide.md
│ │ ├── local-setup-and-run.md
│ │ ├── testing-guide.md
│ │ └── troubleshooting.md
│ ├── migration
│ │ └── code-execution-api-quick-start.md
│ ├── natural-memory-triggers
│ │ ├── cli-reference.md
│ │ ├── installation-guide.md
│ │ └── performance-optimization.md
│ ├── oauth-setup.md
│ ├── pr-graphql-integration.md
│ ├── quick-setup-cloudflare-dual-environment.md
│ ├── README.md
│ ├── remote-configuration-wiki-section.md
│ ├── research
│ │ ├── code-execution-interface-implementation.md
│ │ └── code-execution-interface-summary.md
│ ├── ROADMAP.md
│ ├── sqlite-vec-backend.md
│ ├── statistics
│ │ ├── charts
│ │ │ ├── activity_patterns.png
│ │ │ ├── contributors.png
│ │ │ ├── growth_trajectory.png
│ │ │ ├── monthly_activity.png
│ │ │ └── october_sprint.png
│ │ ├── data
│ │ │ ├── activity_by_day.csv
│ │ │ ├── activity_by_hour.csv
│ │ │ ├── contributors.csv
│ │ │ └── monthly_activity.csv
│ │ ├── generate_charts.py
│ │ └── REPOSITORY_STATISTICS.md
│ ├── technical
│ │ ├── development.md
│ │ ├── memory-migration.md
│ │ ├── migration-log.md
│ │ ├── sqlite-vec-embedding-fixes.md
│ │ └── tag-storage.md
│ ├── testing
│ │ └── regression-tests.md
│ ├── testing-cloudflare-backend.md
│ ├── troubleshooting
│ │ ├── cloudflare-api-token-setup.md
│ │ ├── cloudflare-authentication.md
│ │ ├── general.md
│ │ ├── hooks-quick-reference.md
│ │ ├── pr162-schema-caching-issue.md
│ │ ├── session-end-hooks.md
│ │ └── sync-issues.md
│ └── tutorials
│ ├── advanced-techniques.md
│ ├── data-analysis.md
│ └── demo-session-walkthrough.md
├── examples
│ ├── claude_desktop_config_template.json
│ ├── claude_desktop_config_windows.json
│ ├── claude-desktop-http-config.json
│ ├── config
│ │ └── claude_desktop_config.json
│ ├── http-mcp-bridge.js
│ ├── memory_export_template.json
│ ├── README.md
│ ├── setup
│ │ └── setup_multi_client_complete.py
│ └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│ ├── .claude
│ │ └── settings.local.json
│ ├── archive
│ │ └── check_missing_timestamps.py
│ ├── backup
│ │ ├── backup_memories.py
│ │ ├── backup_sqlite_vec.sh
│ │ ├── export_distributable_memories.sh
│ │ └── restore_memories.py
│ ├── benchmarks
│ │ ├── benchmark_code_execution_api.py
│ │ ├── benchmark_hybrid_sync.py
│ │ └── benchmark_server_caching.py
│ ├── database
│ │ ├── analyze_sqlite_vec_db.py
│ │ ├── check_sqlite_vec_status.py
│ │ ├── db_health_check.py
│ │ └── simple_timestamp_check.py
│ ├── development
│ │ ├── debug_server_initialization.py
│ │ ├── find_orphaned_files.py
│ │ ├── fix_mdns.sh
│ │ ├── fix_sitecustomize.py
│ │ ├── remote_ingest.sh
│ │ ├── setup-git-merge-drivers.sh
│ │ ├── uv-lock-merge.sh
│ │ └── verify_hybrid_sync.py
│ ├── hooks
│ │ └── pre-commit
│ ├── installation
│ │ ├── install_linux_service.py
│ │ ├── install_macos_service.py
│ │ ├── install_uv.py
│ │ ├── install_windows_service.py
│ │ ├── install.py
│ │ ├── setup_backup_cron.sh
│ │ ├── setup_claude_mcp.sh
│ │ └── setup_cloudflare_resources.py
│ ├── linux
│ │ ├── service_status.sh
│ │ ├── start_service.sh
│ │ ├── stop_service.sh
│ │ ├── uninstall_service.sh
│ │ └── view_logs.sh
│ ├── maintenance
│ │ ├── assign_memory_types.py
│ │ ├── check_memory_types.py
│ │ ├── cleanup_corrupted_encoding.py
│ │ ├── cleanup_memories.py
│ │ ├── cleanup_organize.py
│ │ ├── consolidate_memory_types.py
│ │ ├── consolidation_mappings.json
│ │ ├── delete_orphaned_vectors_fixed.py
│ │ ├── fast_cleanup_duplicates_with_tracking.sh
│ │ ├── find_all_duplicates.py
│ │ ├── find_cloudflare_duplicates.py
│ │ ├── find_duplicates.py
│ │ ├── memory-types.md
│ │ ├── README.md
│ │ ├── recover_timestamps_from_cloudflare.py
│ │ ├── regenerate_embeddings.py
│ │ ├── repair_malformed_tags.py
│ │ ├── repair_memories.py
│ │ ├── repair_sqlite_vec_embeddings.py
│ │ ├── repair_zero_embeddings.py
│ │ ├── restore_from_json_export.py
│ │ └── scan_todos.sh
│ ├── migration
│ │ ├── cleanup_mcp_timestamps.py
│ │ ├── legacy
│ │ │ └── migrate_chroma_to_sqlite.py
│ │ ├── mcp-migration.py
│ │ ├── migrate_sqlite_vec_embeddings.py
│ │ ├── migrate_storage.py
│ │ ├── migrate_tags.py
│ │ ├── migrate_timestamps.py
│ │ ├── migrate_to_cloudflare.py
│ │ ├── migrate_to_sqlite_vec.py
│ │ ├── migrate_v5_enhanced.py
│ │ ├── TIMESTAMP_CLEANUP_README.md
│ │ └── verify_mcp_timestamps.py
│ ├── pr
│ │ ├── amp_collect_results.sh
│ │ ├── amp_detect_breaking_changes.sh
│ │ ├── amp_generate_tests.sh
│ │ ├── amp_pr_review.sh
│ │ ├── amp_quality_gate.sh
│ │ ├── amp_suggest_fixes.sh
│ │ ├── auto_review.sh
│ │ ├── detect_breaking_changes.sh
│ │ ├── generate_tests.sh
│ │ ├── lib
│ │ │ └── graphql_helpers.sh
│ │ ├── quality_gate.sh
│ │ ├── resolve_threads.sh
│ │ ├── run_pyscn_analysis.sh
│ │ ├── run_quality_checks.sh
│ │ ├── thread_status.sh
│ │ └── watch_reviews.sh
│ ├── quality
│ │ ├── fix_dead_code_install.sh
│ │ ├── phase1_dead_code_analysis.md
│ │ ├── phase2_complexity_analysis.md
│ │ ├── README_PHASE1.md
│ │ ├── README_PHASE2.md
│ │ ├── track_pyscn_metrics.sh
│ │ └── weekly_quality_review.sh
│ ├── README.md
│ ├── run
│ │ ├── run_mcp_memory.sh
│ │ ├── run-with-uv.sh
│ │ └── start_sqlite_vec.sh
│ ├── run_memory_server.py
│ ├── server
│ │ ├── check_http_server.py
│ │ ├── check_server_health.py
│ │ ├── memory_offline.py
│ │ ├── preload_models.py
│ │ ├── run_http_server.py
│ │ ├── run_memory_server.py
│ │ ├── start_http_server.bat
│ │ └── start_http_server.sh
│ ├── service
│ │ ├── deploy_dual_services.sh
│ │ ├── install_http_service.sh
│ │ ├── mcp-memory-http.service
│ │ ├── mcp-memory.service
│ │ ├── memory_service_manager.sh
│ │ ├── service_control.sh
│ │ ├── service_utils.py
│ │ └── update_service.sh
│ ├── sync
│ │ ├── check_drift.py
│ │ ├── claude_sync_commands.py
│ │ ├── export_memories.py
│ │ ├── import_memories.py
│ │ ├── litestream
│ │ │ ├── apply_local_changes.sh
│ │ │ ├── enhanced_memory_store.sh
│ │ │ ├── init_staging_db.sh
│ │ │ ├── io.litestream.replication.plist
│ │ │ ├── manual_sync.sh
│ │ │ ├── memory_sync.sh
│ │ │ ├── pull_remote_changes.sh
│ │ │ ├── push_to_remote.sh
│ │ │ ├── README.md
│ │ │ ├── resolve_conflicts.sh
│ │ │ ├── setup_local_litestream.sh
│ │ │ ├── setup_remote_litestream.sh
│ │ │ ├── staging_db_init.sql
│ │ │ ├── stash_local_changes.sh
│ │ │ ├── sync_from_remote_noconfig.sh
│ │ │ └── sync_from_remote.sh
│ │ ├── README.md
│ │ ├── safe_cloudflare_update.sh
│ │ ├── sync_memory_backends.py
│ │ └── sync_now.py
│ ├── testing
│ │ ├── run_complete_test.py
│ │ ├── run_memory_test.sh
│ │ ├── simple_test.py
│ │ ├── test_cleanup_logic.py
│ │ ├── test_cloudflare_backend.py
│ │ ├── test_docker_functionality.py
│ │ ├── test_installation.py
│ │ ├── test_mdns.py
│ │ ├── test_memory_api.py
│ │ ├── test_memory_simple.py
│ │ ├── test_migration.py
│ │ ├── test_search_api.py
│ │ ├── test_sqlite_vec_embeddings.py
│ │ ├── test_sse_events.py
│ │ ├── test-connection.py
│ │ └── test-hook.js
│ ├── utils
│ │ ├── claude_commands_utils.py
│ │ ├── generate_personalized_claude_md.sh
│ │ ├── groq
│ │ ├── groq_agent_bridge.py
│ │ ├── list-collections.py
│ │ ├── memory_wrapper_uv.py
│ │ ├── query_memories.py
│ │ ├── smithery_wrapper.py
│ │ ├── test_groq_bridge.sh
│ │ └── uv_wrapper.py
│ └── validation
│ ├── check_dev_setup.py
│ ├── check_documentation_links.py
│ ├── diagnose_backend_config.py
│ ├── validate_configuration_complete.py
│ ├── validate_memories.py
│ ├── validate_migration.py
│ ├── validate_timestamp_integrity.py
│ ├── verify_environment.py
│ ├── verify_pytorch_windows.py
│ └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│ └── mcp_memory_service
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── operations.py
│ │ ├── sync_wrapper.py
│ │ └── types.py
│ ├── backup
│ │ ├── __init__.py
│ │ └── scheduler.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── ingestion.py
│ │ ├── main.py
│ │ └── utils.py
│ ├── config.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── associations.py
│ │ ├── base.py
│ │ ├── clustering.py
│ │ ├── compression.py
│ │ ├── consolidator.py
│ │ ├── decay.py
│ │ ├── forgetting.py
│ │ ├── health.py
│ │ └── scheduler.py
│ ├── dependency_check.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mdns_service.py
│ ├── embeddings
│ │ ├── __init__.py
│ │ └── onnx_embeddings.py
│ ├── ingestion
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chunker.py
│ │ ├── csv_loader.py
│ │ ├── json_loader.py
│ │ ├── pdf_loader.py
│ │ ├── registry.py
│ │ ├── semtools_loader.py
│ │ └── text_loader.py
│ ├── lm_studio_compat.py
│ ├── mcp_server.py
│ ├── models
│ │ ├── __init__.py
│ │ └── memory.py
│ ├── server.py
│ ├── services
│ │ ├── __init__.py
│ │ └── memory_service.py
│ ├── storage
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloudflare.py
│ │ ├── factory.py
│ │ ├── http_client.py
│ │ ├── hybrid.py
│ │ └── sqlite_vec.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── exporter.py
│ │ ├── importer.py
│ │ └── litestream_config.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── content_splitter.py
│ │ ├── db_utils.py
│ │ ├── debug.py
│ │ ├── document_processing.py
│ │ ├── gpu_detection.py
│ │ ├── hashing.py
│ │ ├── http_server_manager.py
│ │ ├── port_detection.py
│ │ ├── system_detection.py
│ │ └── time_parser.py
│ └── web
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── analytics.py
│ │ ├── backup.py
│ │ ├── consolidation.py
│ │ ├── documents.py
│ │ ├── events.py
│ │ ├── health.py
│ │ ├── manage.py
│ │ ├── mcp.py
│ │ ├── memories.py
│ │ ├── search.py
│ │ └── sync.py
│ ├── app.py
│ ├── dependencies.py
│ ├── oauth
│ │ ├── __init__.py
│ │ ├── authorization.py
│ │ ├── discovery.py
│ │ ├── middleware.py
│ │ ├── models.py
│ │ ├── registration.py
│ │ └── storage.py
│ ├── sse.py
│ └── static
│ ├── app.js
│ ├── index.html
│ ├── README.md
│ ├── sse_test.html
│ └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── test_compact_types.py
│ │ └── test_operations.py
│ ├── bridge
│ │ ├── mock_responses.js
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ └── test_http_mcp_bridge.js
│ ├── conftest.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_associations.py
│ │ ├── test_clustering.py
│ │ ├── test_compression.py
│ │ ├── test_consolidator.py
│ │ ├── test_decay.py
│ │ └── test_forgetting.py
│ ├── contracts
│ │ └── api-specification.yml
│ ├── integration
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── test_api_key_fallback.py
│ │ ├── test_api_memories_chronological.py
│ │ ├── test_api_tag_time_search.py
│ │ ├── test_api_with_memory_service.py
│ │ ├── test_bridge_integration.js
│ │ ├── test_cli_interfaces.py
│ │ ├── test_cloudflare_connection.py
│ │ ├── test_concurrent_clients.py
│ │ ├── test_data_serialization_consistency.py
│ │ ├── test_http_server_startup.py
│ │ ├── test_mcp_memory.py
│ │ ├── test_mdns_integration.py
│ │ ├── test_oauth_basic_auth.py
│ │ ├── test_oauth_flow.py
│ │ ├── test_server_handlers.py
│ │ └── test_store_memory.py
│ ├── performance
│ │ ├── test_background_sync.py
│ │ └── test_hybrid_live.py
│ ├── README.md
│ ├── smithery
│ │ └── test_smithery.py
│ ├── sqlite
│ │ └── simple_sqlite_vec_test.py
│ ├── test_client.py
│ ├── test_content_splitting.py
│ ├── test_database.py
│ ├── test_hybrid_cloudflare_limits.py
│ ├── test_hybrid_storage.py
│ ├── test_memory_ops.py
│ ├── test_semantic_search.py
│ ├── test_sqlite_vec_storage.py
│ ├── test_time_parser.py
│ ├── test_timestamp_preservation.py
│ ├── timestamp
│ │ ├── test_hook_vs_manual_storage.py
│ │ ├── test_issue99_final_validation.py
│ │ ├── test_search_retrieval_inconsistency.py
│ │ ├── test_timestamp_issue.py
│ │ └── test_timestamp_simple.py
│ └── unit
│ ├── conftest.py
│ ├── test_cloudflare_storage.py
│ ├── test_csv_loader.py
│ ├── test_fastapi_dependencies.py
│ ├── test_import.py
│ ├── test_json_loader.py
│ ├── test_mdns_simple.py
│ ├── test_mdns.py
│ ├── test_memory_service.py
│ ├── test_memory.py
│ ├── test_semtools_loader.py
│ ├── test_storage_interface_compatibility.py
│ └── test_tag_time_filtering.py
├── tools
│ ├── docker
│ │ ├── DEPRECATED.md
│ │ ├── docker-compose.http.yml
│ │ ├── docker-compose.pythonpath.yml
│ │ ├── docker-compose.standalone.yml
│ │ ├── docker-compose.uv.yml
│ │ ├── docker-compose.yml
│ │ ├── docker-entrypoint-persistent.sh
│ │ ├── docker-entrypoint-unified.sh
│ │ ├── docker-entrypoint.sh
│ │ ├── Dockerfile
│ │ ├── Dockerfile.glama
│ │ ├── Dockerfile.slim
│ │ ├── README.md
│ │ └── test-docker-modes.sh
│ └── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/mcp_memory_service/backup/scheduler.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Automatic backup scheduler for MCP Memory Service.
Provides scheduled database backups with configurable intervals and retention policies.
"""
import asyncio
import shutil
import sqlite3
import logging
import time
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Dict, Any, List, Optional
from ..config import (
BACKUPS_PATH,
BACKUP_ENABLED,
BACKUP_INTERVAL,
BACKUP_RETENTION,
BACKUP_MAX_COUNT,
SQLITE_VEC_PATH
)
logger = logging.getLogger(__name__)
class BackupService:
"""Service for creating and managing database backups."""
def __init__(self, backups_dir: str = None, db_path: str = None):
"""Initialize backup service.
Args:
backups_dir: Directory to store backups (defaults to BACKUPS_PATH)
db_path: Path to database file (defaults to SQLITE_VEC_PATH)
"""
self.backups_dir = Path(backups_dir or BACKUPS_PATH)
# Determine database path with clear fallback logic
db_path_str = db_path or SQLITE_VEC_PATH
self.db_path = Path(db_path_str) if db_path_str else None
self.last_backup_time: Optional[float] = None
self.backup_count: int = 0
self._lock = asyncio.Lock() # Ensure thread-safe operations
# Ensure backup directory exists
self.backups_dir.mkdir(parents=True, exist_ok=True)
# Load existing backup metadata
self._load_backup_metadata()
logger.info(f"BackupService initialized: backups_dir={self.backups_dir}, db_path={self.db_path}")
def _load_backup_metadata(self):
"""Load metadata about existing backups."""
backups = self.list_backups()
self.backup_count = len(backups)
if backups:
# Get most recent backup time
latest = backups[0]
self.last_backup_time = latest.get('created_timestamp', 0)
def _generate_backup_filename(self) -> str:
"""Generate a timestamped backup filename."""
timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
return f"memory_backup_{timestamp}.db"
async def create_backup(self, description: str = None) -> Dict[str, Any]:
"""Create a new database backup.
Args:
description: Optional description for the backup
Returns:
Dict with backup details
"""
if not self.db_path or not self.db_path.exists():
return {
'success': False,
'error': f'Database file not found: {self.db_path}',
'timestamp': datetime.now(timezone.utc).isoformat()
}
async with self._lock: # Ensure atomic operations
try:
start_time = time.time()
created_at = datetime.now(timezone.utc)
# Generate backup filename
backup_filename = self._generate_backup_filename()
backup_path = self.backups_dir / backup_filename
# Use SQLite's native backup API for safe, consistent backups
# This handles active database connections properly
def _do_backup():
source = sqlite3.connect(str(self.db_path))
dest = sqlite3.connect(str(backup_path))
try:
source.backup(dest)
finally:
source.close()
dest.close()
await asyncio.to_thread(_do_backup)
# Calculate backup duration (just the backup operation)
backup_duration = time.time() - start_time
# Get backup size
backup_size = backup_path.stat().st_size
# Update metadata
self.last_backup_time = created_at.timestamp()
self.backup_count += 1
logger.info(f"Created backup: {backup_filename} ({backup_size} bytes) in {backup_duration:.2f}s")
# Cleanup old backups (outside of duration calculation)
await self.cleanup_old_backups()
return {
'success': True,
'filename': backup_filename,
'path': str(backup_path),
'size_bytes': backup_size,
'description': description,
'created_at': created_at.isoformat(),
'duration_seconds': round(backup_duration, 3)
}
except Exception as e:
logger.error(f"Failed to create backup: {e}")
return {
'success': False,
'error': str(e),
'timestamp': datetime.now(timezone.utc).isoformat()
}
def list_backups(self) -> List[Dict[str, Any]]:
"""List all available backups.
Returns:
List of backup info dicts, sorted by date (newest first)
"""
backups = []
try:
for backup_file in self.backups_dir.glob('memory_backup_*.db'):
stat = backup_file.stat()
# Parse timestamp from filename
try:
timestamp_str = backup_file.stem.replace('memory_backup_', '')
created_dt = datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S')
created_dt = created_dt.replace(tzinfo=timezone.utc)
except ValueError:
created_dt = datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)
backups.append({
'filename': backup_file.name,
'path': str(backup_file),
'size_bytes': stat.st_size,
'created_at': created_dt.isoformat(),
'created_timestamp': created_dt.timestamp(),
'age_days': (datetime.now(timezone.utc) - created_dt).days
})
# Sort by creation time, newest first
backups.sort(key=lambda x: x['created_timestamp'], reverse=True)
except Exception as e:
logger.error(f"Error listing backups: {e}")
return backups
async def cleanup_old_backups(self) -> Dict[str, Any]:
"""Remove old backups based on retention policy.
Removes backups that are:
- Older than BACKUP_RETENTION days
- Exceed BACKUP_MAX_COUNT
Returns:
Dict with cleanup results
"""
removed = []
errors = []
try:
backups = self.list_backups()
retention_cutoff = datetime.now(timezone.utc) - timedelta(days=BACKUP_RETENTION)
for i, backup in enumerate(backups):
should_remove = False
reason = ""
# Check if exceeds max count
if i >= BACKUP_MAX_COUNT:
should_remove = True
reason = f"exceeds max count ({BACKUP_MAX_COUNT})"
# Check if older than retention period
try:
created_dt = datetime.fromisoformat(backup['created_at'].replace('Z', '+00:00'))
if created_dt < retention_cutoff:
should_remove = True
reason = f"older than {BACKUP_RETENTION} days"
except (ValueError, KeyError) as e:
logger.warning(f"Could not parse timestamp for backup {backup.get('filename', 'unknown')}: {e}")
if should_remove:
try:
# Use asyncio.to_thread to avoid blocking the event loop
await asyncio.to_thread(Path(backup['path']).unlink)
removed.append({
'filename': backup['filename'],
'reason': reason
})
logger.info(f"Removed old backup: {backup['filename']} ({reason})")
except Exception as e:
errors.append({
'filename': backup['filename'],
'error': str(e)
})
logger.error(f"Failed to remove backup {backup['filename']}: {e}")
# Update count more efficiently by subtracting removed count
self.backup_count = max(0, self.backup_count - len(removed))
except Exception as e:
logger.error(f"Error during backup cleanup: {e}")
errors.append({'error': str(e)})
return {
'removed_count': len(removed),
'removed': removed,
'errors': errors
}
async def restore_backup(self, filename: str) -> Dict[str, Any]:
"""Restore database from a backup.
Args:
filename: Name of backup file to restore
Returns:
Dict with restore results
"""
backup_path = self.backups_dir / filename
if not backup_path.exists():
return {
'success': False,
'error': f'Backup file not found: {filename}'
}
if not self.db_path:
return {
'success': False,
'error': 'Database path not configured'
}
try:
# Create a backup of current database first
if self.db_path.exists():
current_backup = self.db_path.with_suffix('.db.pre_restore')
# Use asyncio.to_thread to avoid blocking the event loop
await asyncio.to_thread(shutil.copy2, str(self.db_path), str(current_backup))
logger.info(f"Created pre-restore backup: {current_backup}")
# Restore from backup
# Use asyncio.to_thread to avoid blocking the event loop
await asyncio.to_thread(shutil.copy2, str(backup_path), str(self.db_path))
logger.info(f"Restored database from backup: {filename}")
return {
'success': True,
'filename': filename,
'restored_at': datetime.now(timezone.utc).isoformat()
}
except Exception as e:
logger.error(f"Failed to restore backup: {e}")
return {
'success': False,
'error': str(e)
}
def get_status(self) -> Dict[str, Any]:
"""Get current backup service status.
Returns:
Dict with backup service status
"""
backups = self.list_backups()
total_size = sum(b['size_bytes'] for b in backups)
# Calculate time since last backup
time_since_last = None
if self.last_backup_time:
time_since_last = time.time() - self.last_backup_time
# Calculate next scheduled backup time
next_backup = self._calculate_next_backup_time()
return {
'enabled': BACKUP_ENABLED,
'interval': BACKUP_INTERVAL,
'retention_days': BACKUP_RETENTION,
'max_count': BACKUP_MAX_COUNT,
'backup_count': len(backups),
'total_size_bytes': total_size,
'last_backup_time': self.last_backup_time,
'time_since_last_seconds': time_since_last,
'next_backup_at': next_backup.isoformat() if next_backup else None,
'backups_dir': str(self.backups_dir),
'db_path': str(self.db_path) if self.db_path else None
}
def _calculate_next_backup_time(self) -> Optional[datetime]:
"""Calculate the next scheduled backup time."""
if not BACKUP_ENABLED or not self.last_backup_time:
return None
last_backup_dt = datetime.fromtimestamp(self.last_backup_time, tz=timezone.utc)
if BACKUP_INTERVAL == 'hourly':
return last_backup_dt + timedelta(hours=1)
elif BACKUP_INTERVAL == 'daily':
return last_backup_dt + timedelta(days=1)
elif BACKUP_INTERVAL == 'weekly':
return last_backup_dt + timedelta(weeks=1)
return None
class BackupScheduler:
"""Scheduler for automatic database backups."""
def __init__(self, backup_service: BackupService = None):
"""Initialize backup scheduler.
Args:
backup_service: BackupService instance (creates one if not provided)
"""
self.backup_service = backup_service or BackupService()
self.is_running = False
self._task: Optional[asyncio.Task] = None
logger.info("BackupScheduler initialized")
def _get_interval_seconds(self) -> int:
"""Get backup interval in seconds."""
if BACKUP_INTERVAL == 'hourly':
return 3600
elif BACKUP_INTERVAL == 'daily':
return 86400
elif BACKUP_INTERVAL == 'weekly':
return 604800
return 86400 # Default to daily
async def start(self):
"""Start the backup scheduler."""
if self.is_running:
logger.warning("BackupScheduler already running")
return
if not BACKUP_ENABLED:
logger.info("Backups disabled, scheduler not started")
return
self.is_running = True
self._task = asyncio.create_task(self._schedule_loop())
logger.info(f"BackupScheduler started with {BACKUP_INTERVAL} interval")
async def stop(self):
"""Stop the backup scheduler."""
if not self.is_running:
return
self.is_running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("BackupScheduler stopped")
async def _schedule_loop(self):
"""Main scheduling loop."""
interval_seconds = self._get_interval_seconds()
while self.is_running:
try:
# Check if it's time for a backup
should_backup = False
if not self.backup_service.last_backup_time:
# No previous backup, create one
should_backup = True
else:
time_since_last = time.time() - self.backup_service.last_backup_time
if time_since_last >= interval_seconds:
should_backup = True
if should_backup:
logger.info("Scheduled backup triggered")
result = await self.backup_service.create_backup(
description=f"Scheduled {BACKUP_INTERVAL} backup"
)
if result['success']:
logger.info(f"Scheduled backup completed: {result['filename']}")
else:
logger.error(f"Scheduled backup failed: {result.get('error')}")
# Sleep for a check interval (every 5 minutes)
await asyncio.sleep(300)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in backup scheduler loop: {e}")
await asyncio.sleep(60) # Wait before retrying
def get_status(self) -> Dict[str, Any]:
"""Get scheduler status.
Returns:
Dict with scheduler status and backup service status
"""
status = self.backup_service.get_status()
status['scheduler_running'] = self.is_running
return status
# Global backup service instance
_backup_service: Optional[BackupService] = None
_backup_scheduler: Optional[BackupScheduler] = None
def get_backup_service() -> BackupService:
"""Get or create the global backup service instance."""
global _backup_service
if _backup_service is None:
_backup_service = BackupService()
return _backup_service
def get_backup_scheduler() -> BackupScheduler:
"""Get or create the global backup scheduler instance."""
global _backup_scheduler
if _backup_scheduler is None:
_backup_scheduler = BackupScheduler(get_backup_service())
return _backup_scheduler
```
--------------------------------------------------------------------------------
/scripts/installation/install_linux_service.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Linux systemd service installer for MCP Memory Service.
Creates and manages systemd service files for automatic service startup.
"""
import os
import sys
import json
import argparse
import subprocess
from pathlib import Path
import pwd
import grp
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from scripts.service_utils import (
get_project_root, get_service_paths, get_service_environment,
generate_api_key, save_service_config, load_service_config,
check_dependencies, get_service_command, print_service_info,
require_admin
)
except ImportError as e:
print(f"Error importing service utilities: {e}")
print("Please ensure you're running this from the project directory")
sys.exit(1)
SERVICE_NAME = "mcp-memory"
SERVICE_DISPLAY_NAME = "MCP Memory Service"
SERVICE_DESCRIPTION = "MCP Memory Service with Consolidation and mDNS"
def get_systemd_paths(user_level=True):
"""Get the paths for systemd service files."""
if user_level:
# User-level systemd service
service_dir = Path.home() / ".config" / "systemd" / "user"
service_file = service_dir / f"{SERVICE_NAME}.service"
systemctl_cmd = "systemctl --user"
else:
# System-level systemd service
service_dir = Path("/etc/systemd/system")
service_file = service_dir / f"{SERVICE_NAME}.service"
systemctl_cmd = "sudo systemctl"
return service_dir, service_file, systemctl_cmd
def create_systemd_service(api_key, user_level=True):
"""Create the systemd service unit file."""
paths = get_service_paths()
command = get_service_command()
environment = get_service_environment()
environment['MCP_API_KEY'] = api_key
# Get current user info
current_user = pwd.getpwuid(os.getuid())
username = current_user.pw_name
groupname = grp.getgrgid(current_user.pw_gid).gr_name
# Build environment lines
env_lines = []
for key, value in environment.items():
env_lines.append(f'Environment={key}={value}')
# Create service content
service_content = f'''[Unit]
Description={SERVICE_DESCRIPTION}
Documentation=https://github.com/doobidoo/mcp-memory-service
After=network.target network-online.target
Wants=network-online.target
[Service]
Type=simple
'''
# Add user/group for system-level service
if not user_level:
service_content += f'''User={username}
Group={groupname}
'''
service_content += f'''WorkingDirectory={paths['project_root']}
ExecStart={' '.join(command)}
{chr(10).join(env_lines)}
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier={SERVICE_NAME}
'''
# Add capabilities for binding to privileged ports (if using HTTPS on 443)
if not user_level and environment.get('MCP_HTTP_PORT') == '443':
service_content += '''AmbientCapabilities=CAP_NET_BIND_SERVICE
CapabilityBoundingSet=CAP_NET_BIND_SERVICE
'''
service_content += '''
[Install]
WantedBy='''
if user_level:
service_content += 'default.target'
else:
service_content += 'multi-user.target'
return service_content
def create_shell_scripts():
"""Create convenient shell scripts for service management."""
paths = get_service_paths()
scripts_dir = paths['scripts_dir'] / 'linux'
scripts_dir.mkdir(exist_ok=True)
# Determine if user or system service based on existing installation
user_service_file = Path.home() / ".config" / "systemd" / "user" / f"{SERVICE_NAME}.service"
system_service_file = Path(f"/etc/systemd/system/{SERVICE_NAME}.service")
if user_service_file.exists():
systemctl = "systemctl --user"
sudo = ""
elif system_service_file.exists():
systemctl = "systemctl"
sudo = "sudo "
else:
# Default to user
systemctl = "systemctl --user"
sudo = ""
# Start script
start_script = scripts_dir / 'start_service.sh'
with open(start_script, 'w') as f:
f.write(f'''#!/bin/bash
echo "Starting {SERVICE_DISPLAY_NAME}..."
{sudo}{systemctl} start {SERVICE_NAME}
if [ $? -eq 0 ]; then
echo "✅ Service started successfully!"
else
echo "❌ Failed to start service"
fi
''')
start_script.chmod(0o755)
# Stop script
stop_script = scripts_dir / 'stop_service.sh'
with open(stop_script, 'w') as f:
f.write(f'''#!/bin/bash
echo "Stopping {SERVICE_DISPLAY_NAME}..."
{sudo}{systemctl} stop {SERVICE_NAME}
if [ $? -eq 0 ]; then
echo "✅ Service stopped successfully!"
else
echo "❌ Failed to stop service"
fi
''')
stop_script.chmod(0o755)
# Status script
status_script = scripts_dir / 'service_status.sh'
with open(status_script, 'w') as f:
f.write(f'''#!/bin/bash
echo "{SERVICE_DISPLAY_NAME} Status:"
echo "-" | tr '-' '='
{sudo}{systemctl} status {SERVICE_NAME}
''')
status_script.chmod(0o755)
# Logs script
logs_script = scripts_dir / 'view_logs.sh'
with open(logs_script, 'w') as f:
f.write(f'''#!/bin/bash
echo "Viewing {SERVICE_DISPLAY_NAME} logs (press Ctrl+C to exit)..."
{sudo}journalctl -u {SERVICE_NAME} -f
''')
logs_script.chmod(0o755)
# Uninstall script
uninstall_script = scripts_dir / 'uninstall_service.sh'
with open(uninstall_script, 'w') as f:
f.write(f'''#!/bin/bash
echo "This will uninstall {SERVICE_DISPLAY_NAME}."
read -p "Are you sure? (y/N): " confirm
if [[ ! "$confirm" =~ ^[Yy]$ ]]; then
exit 0
fi
echo "Stopping service..."
{sudo}{systemctl} stop {SERVICE_NAME} 2>/dev/null
{sudo}{systemctl} disable {SERVICE_NAME} 2>/dev/null
echo "Removing service files..."
if [ -f "$HOME/.config/systemd/user/{SERVICE_NAME}.service" ]; then
rm -f "$HOME/.config/systemd/user/{SERVICE_NAME}.service"
systemctl --user daemon-reload
else
sudo rm -f /etc/systemd/system/{SERVICE_NAME}.service
sudo systemctl daemon-reload
fi
echo "✅ Service uninstalled"
''')
uninstall_script.chmod(0o755)
return scripts_dir
def install_service(user_level=True):
"""Install the Linux systemd service."""
service_type = "user service" if user_level else "system service"
# Check for root if system-level
if not user_level:
require_admin(f"System-level service installation requires root privileges")
print(f"\n🔍 Checking dependencies...")
deps_ok, deps_msg = check_dependencies()
if not deps_ok:
print(f"❌ {deps_msg}")
sys.exit(1)
print(f"✅ {deps_msg}")
# Generate API key
api_key = generate_api_key()
print(f"\n🔑 Generated API key: {api_key}")
# Create service configuration
config = {
'service_name': SERVICE_NAME,
'api_key': api_key,
'command': get_service_command(),
'environment': get_service_environment(),
'user_level': user_level
}
# Save configuration
config_file = save_service_config(config)
print(f"💾 Saved configuration to: {config_file}")
# Get systemd paths
service_dir, service_file, systemctl_cmd = get_systemd_paths(user_level)
# Create service directory if it doesn't exist
service_dir.mkdir(parents=True, exist_ok=True)
# Create service file
print(f"\n📝 Creating systemd {service_type} file...")
service_content = create_systemd_service(api_key, user_level)
# Write service file
if user_level:
with open(service_file, 'w') as f:
f.write(service_content)
os.chmod(service_file, 0o644)
else:
# Use sudo to write system service file
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp:
tmp.write(service_content)
tmp_path = tmp.name
subprocess.run(['sudo', 'cp', tmp_path, str(service_file)], check=True)
subprocess.run(['sudo', 'chmod', '644', str(service_file)], check=True)
os.unlink(tmp_path)
print(f"✅ Created service file at: {service_file}")
# Reload systemd
print("\n🔄 Reloading systemd daemon...")
if user_level:
subprocess.run(['systemctl', '--user', 'daemon-reload'], check=True)
else:
subprocess.run(['sudo', 'systemctl', 'daemon-reload'], check=True)
# Enable the service
print(f"\n🚀 Enabling {service_type}...")
cmd = systemctl_cmd.split() + ['enable', SERVICE_NAME]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"❌ Failed to enable service: {result.stderr}")
sys.exit(1)
print(f"✅ Service enabled for automatic startup!")
# Create convenience scripts
scripts_dir = create_shell_scripts()
print(f"\n📁 Created management scripts in: {scripts_dir}")
# Print service information
platform_info = {
'Start Service': f'{systemctl_cmd} start {SERVICE_NAME}',
'Stop Service': f'{systemctl_cmd} stop {SERVICE_NAME}',
'Service Status': f'{systemctl_cmd} status {SERVICE_NAME}',
'View Logs': f'{"sudo " if not user_level else ""}journalctl {"--user " if user_level else ""}-u {SERVICE_NAME} -f',
'Uninstall': f'python "{Path(__file__)}" --uninstall'
}
print_service_info(api_key, platform_info)
# Additional Linux-specific tips
print("\n📌 Linux Tips:")
print(f" • Service will start automatically on {'login' if user_level else 'boot'}")
print(f" • Use journalctl to view detailed logs")
print(f" • {'User services require you to be logged in' if user_level else 'System service runs independently'}")
# Offer to start the service
print(f"\n▶️ To start the service now, run:")
print(f" {systemctl_cmd} start {SERVICE_NAME}")
return True
def uninstall_service(user_level=None):
"""Uninstall the Linux systemd service."""
# Auto-detect installation type if not specified
if user_level is None:
user_service_file = Path.home() / ".config" / "systemd" / "user" / f"{SERVICE_NAME}.service"
system_service_file = Path(f"/etc/systemd/system/{SERVICE_NAME}.service")
if user_service_file.exists():
user_level = True
elif system_service_file.exists():
user_level = False
else:
print("❌ Service is not installed")
return
service_type = "user service" if user_level else "system service"
# Check for root if system-level
if not user_level:
require_admin(f"System-level service removal requires root privileges")
print(f"\n🗑️ Uninstalling {SERVICE_DISPLAY_NAME} {service_type}...")
# Get systemd paths
service_dir, service_file, systemctl_cmd = get_systemd_paths(user_level)
if service_file.exists() or (not user_level and Path(f"/etc/systemd/system/{SERVICE_NAME}.service").exists()):
# Stop the service
print("⏹️ Stopping service...")
cmd = systemctl_cmd.split() + ['stop', SERVICE_NAME]
subprocess.run(cmd, capture_output=True)
# Disable the service
print("🔌 Disabling service...")
cmd = systemctl_cmd.split() + ['disable', SERVICE_NAME]
subprocess.run(cmd, capture_output=True)
# Remove service file
print("🗑️ Removing service file...")
if user_level:
service_file.unlink()
else:
subprocess.run(['sudo', 'rm', '-f', str(service_file)], check=True)
# Reload systemd
print("🔄 Reloading systemd daemon...")
if user_level:
subprocess.run(['systemctl', '--user', 'daemon-reload'], check=True)
else:
subprocess.run(['sudo', 'systemctl', 'daemon-reload'], check=True)
print(f"✅ {service_type} uninstalled successfully!")
else:
print(f"ℹ️ {service_type} is not installed")
# Clean up configuration
config = load_service_config()
if config and config.get('service_name') == SERVICE_NAME:
print("🧹 Cleaning up configuration...")
config_file = get_service_paths()['config_dir'] / 'service_config.json'
config_file.unlink()
def start_service(user_level=None):
"""Start the Linux service."""
# Auto-detect if not specified
if user_level is None:
user_service_file = Path.home() / ".config" / "systemd" / "user" / f"{SERVICE_NAME}.service"
user_level = user_service_file.exists()
service_dir, service_file, systemctl_cmd = get_systemd_paths(user_level)
print(f"\n▶️ Starting {SERVICE_DISPLAY_NAME}...")
cmd = systemctl_cmd.split() + ['start', SERVICE_NAME]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("✅ Service started successfully!")
else:
print(f"❌ Failed to start service: {result.stderr}")
print(f"\n💡 Check logs with: {systemctl_cmd} status {SERVICE_NAME}")
def stop_service(user_level=None):
"""Stop the Linux service."""
# Auto-detect if not specified
if user_level is None:
user_service_file = Path.home() / ".config" / "systemd" / "user" / f"{SERVICE_NAME}.service"
user_level = user_service_file.exists()
service_dir, service_file, systemctl_cmd = get_systemd_paths(user_level)
print(f"\n⏹️ Stopping {SERVICE_DISPLAY_NAME}...")
cmd = systemctl_cmd.split() + ['stop', SERVICE_NAME]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("✅ Service stopped successfully!")
else:
print(f"ℹ️ Service may not be running: {result.stderr}")
def service_status(user_level=None):
"""Check the Linux service status."""
# Auto-detect if not specified
if user_level is None:
user_service_file = Path.home() / ".config" / "systemd" / "user" / f"{SERVICE_NAME}.service"
system_service_file = Path(f"/etc/systemd/system/{SERVICE_NAME}.service")
if user_service_file.exists():
user_level = True
elif system_service_file.exists():
user_level = False
else:
print(f"\n❌ {SERVICE_DISPLAY_NAME} is not installed")
return
service_dir, service_file, systemctl_cmd = get_systemd_paths(user_level)
print(f"\n📊 {SERVICE_DISPLAY_NAME} Status:")
print("-" * 60)
# Get detailed status
cmd = systemctl_cmd.split() + ['status', SERVICE_NAME, '--no-pager']
subprocess.run(cmd)
# Show configuration
config = load_service_config()
if config:
print(f"\n📋 Configuration:")
print(f" Service Name: {SERVICE_NAME}")
print(f" API Key: {config.get('api_key', 'Not set')}")
print(f" Type: {'User Service' if user_level else 'System Service'}")
print(f" Service File: {service_file}")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Linux systemd service installer for MCP Memory Service"
)
# Service level
parser.add_argument('--user', action='store_true',
help='Install as user service (default)')
parser.add_argument('--system', action='store_true',
help='Install as system service (requires sudo)')
# Actions
parser.add_argument('--uninstall', action='store_true', help='Uninstall the service')
parser.add_argument('--start', action='store_true', help='Start the service')
parser.add_argument('--stop', action='store_true', help='Stop the service')
parser.add_argument('--status', action='store_true', help='Check service status')
parser.add_argument('--restart', action='store_true', help='Restart the service')
args = parser.parse_args()
# Determine service level
if args.system and args.user:
print("❌ Cannot specify both --user and --system")
sys.exit(1)
user_level = None # Auto-detect for status/start/stop
if args.system:
user_level = False
elif args.user or not any([args.uninstall, args.start, args.stop, args.status, args.restart]):
user_level = True # Default to user for installation
if args.uninstall:
uninstall_service(user_level)
elif args.start:
start_service(user_level)
elif args.stop:
stop_service(user_level)
elif args.status:
service_status(user_level)
elif args.restart:
stop_service(user_level)
start_service(user_level)
else:
# Default action is to install
install_service(user_level)
if __name__ == '__main__':
main()
```
--------------------------------------------------------------------------------
/docs/sqlite-vec-backend.md:
--------------------------------------------------------------------------------
```markdown
# SQLite-vec Backend Guide
## Overview
The MCP Memory Service now supports SQLite-vec as an alternative storage backend. SQLite-vec provides a lightweight, high-performance vector database solution that offers several advantages over ChromaDB:
- **Lightweight**: Single file database with no external dependencies
- **Fast**: Optimized vector operations with efficient indexing
- **Portable**: Easy to backup, copy, and share memory databases
- **Reliable**: Built on SQLite's proven reliability and ACID compliance
- **Memory Efficient**: Lower memory footprint for smaller memory collections
## Installation
### Prerequisites
The sqlite-vec backend requires the `sqlite-vec` Python package:
```bash
# Install sqlite-vec
pip install sqlite-vec
# Or with UV (recommended)
uv add sqlite-vec
```
### Verification
You can verify sqlite-vec is available by running:
```python
try:
import sqlite_vec
print("✅ sqlite-vec is available")
except ImportError:
print("❌ sqlite-vec is not installed")
```
## Configuration
### Environment Variables
To use the sqlite-vec backend, set the storage backend environment variable:
```bash
# Primary configuration
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
# Optional: Custom database path
export MCP_MEMORY_SQLITE_PATH=/path/to/your/memory.db
```
### Platform-Specific Setup
#### macOS (Bash/Zsh)
```bash
# Add to ~/.bashrc or ~/.zshrc
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
export MCP_MEMORY_SQLITE_PATH="$HOME/Library/Application Support/mcp-memory/sqlite_vec.db"
```
#### Windows (PowerShell)
```powershell
# Add to PowerShell profile
$env:MCP_MEMORY_STORAGE_BACKEND = "sqlite_vec"
$env:MCP_MEMORY_SQLITE_PATH = "$env:LOCALAPPDATA\mcp-memory\sqlite_vec.db"
```
#### Windows (Command Prompt)
```cmd
set MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
set MCP_MEMORY_SQLITE_PATH=%LOCALAPPDATA%\mcp-memory\sqlite_vec.db
```
#### Linux
```bash
# Add to ~/.bashrc
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
export MCP_MEMORY_SQLITE_PATH="$HOME/.local/share/mcp-memory/sqlite_vec.db"
```
### Claude Desktop Configuration
Update your Claude Desktop MCP configuration:
```json
{
"mcpServers": {
"memory": {
"command": "uv",
"args": ["--directory", "/path/to/mcp-memory-service", "run", "memory"],
"env": {
"MCP_MEMORY_STORAGE_BACKEND": "sqlite_vec"
}
}
}
}
```
## Migration from ChromaDB
### Automatic Migration
Use the provided migration script for easy migration:
```bash
# Simple migration with default paths
python migrate_to_sqlite_vec.py
# Custom migration
python scripts/migrate_storage.py \
--from chroma \
--to sqlite_vec \
--source-path /path/to/chroma_db \
--target-path /path/to/sqlite_vec.db \
--backup
```
### Manual Migration Steps
1. **Stop the MCP Memory Service**
```bash
# Stop Claude Desktop or any running instances
```
2. **Create a backup** (recommended)
```bash
python scripts/migrate_storage.py \
--from chroma \
--to sqlite_vec \
--source-path ~/.local/share/mcp-memory/chroma_db \
--target-path ~/.local/share/mcp-memory/sqlite_vec.db \
--backup \
--backup-path memory_backup.json
```
3. **Set environment variables**
```bash
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
```
4. **Restart Claude Desktop**
### Migration Verification
After migration, verify your memories are accessible:
```bash
# Test the new backend
python scripts/verify_environment.py
# Check database statistics
python -c "
import asyncio
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
async def check_stats():
storage = SqliteVecMemoryStorage('path/to/your/db')
await storage.initialize()
stats = storage.get_stats()
print(f'Total memories: {stats[\"total_memories\"]}')
print(f'Database size: {stats[\"database_size_mb\"]} MB')
storage.close()
asyncio.run(check_stats())
"
```
## Performance Characteristics
### Memory Usage
| Collection Size | ChromaDB RAM | SQLite-vec RAM | Difference |
|----------------|--------------|----------------|------------|
| 1,000 memories | ~200 MB | ~50 MB | -75% |
| 10,000 memories | ~800 MB | ~200 MB | -75% |
| 100,000 memories | ~4 GB | ~1 GB | -75% |
### Query Performance
- **Semantic Search**: Similar performance to ChromaDB for most use cases
- **Tag Search**: Faster due to SQL indexing
- **Metadata Queries**: Significantly faster with SQL WHERE clauses
- **Startup Time**: 2-3x faster initialization
### Storage Characteristics
- **Database File**: Single `.db` file (easy backup/restore)
- **Disk Usage**: ~30% smaller than ChromaDB for same data
- **Concurrent Access**: SQLite-level locking (single writer, multiple readers)
## Advanced Configuration
### Custom Embedding Models
```python
# Initialize with custom model
storage = SqliteVecMemoryStorage(
db_path="memory.db",
embedding_model="all-mpnet-base-v2" # Higher quality, slower
)
```
### Multi-Client Access Configuration
SQLite-vec supports advanced multi-client access through **two complementary approaches**:
1. **Phase 1: WAL Mode** - Direct SQLite access with Write-Ahead Logging
2. **Phase 2: HTTP Coordination** - Automatic HTTP server coordination for seamless multi-client access
#### Phase 1: WAL Mode (Default)
The backend automatically enables WAL mode with these default settings:
- **WAL Mode**: Enables multiple readers + single writer
- **Busy Timeout**: 5 seconds (prevents immediate lock errors)
- **Synchronous**: NORMAL (balanced performance/safety)
#### Phase 2: HTTP Server Auto-Detection (Advanced)
The system automatically detects the optimal coordination mode:
**Auto-Detection Modes:**
- **`http_client`**: Existing HTTP server detected → Connect as client
- **`http_server`**: No server found, port available → Start HTTP server
- **`direct`**: Port in use by other service → Fall back to WAL mode
**Coordination Flow:**
1. Check if MCP Memory Service HTTP server is running
2. If found → Use HTTP client to connect to existing server
3. If not found and port available → Auto-start HTTP server (optional)
4. If port busy → Fall back to direct SQLite with WAL mode
#### Custom SQLite Pragmas
You can customize SQLite behavior using environment variables:
```bash
# Recommended configuration (v8.9.0+) - For concurrent HTTP + MCP access
export MCP_MEMORY_SQLITE_PRAGMAS="busy_timeout=15000,cache_size=20000"
# Example configurations for different scenarios:
# High concurrency setup (longer timeout)
export MCP_MEMORY_SQLITE_PRAGMAS="busy_timeout=30000,cache_size=20000,wal_autocheckpoint=1000"
# Performance optimized (use with caution - trades safety for speed)
export MCP_MEMORY_SQLITE_PRAGMAS="synchronous=NORMAL,temp_store=MEMORY,cache_size=50000,busy_timeout=15000"
# Conservative/safe mode (maximum data safety)
export MCP_MEMORY_SQLITE_PRAGMAS="synchronous=FULL,busy_timeout=60000,cache_size=20000"
```
#### HTTP Coordination Configuration
Enable automatic HTTP server coordination for optimal multi-client access:
```bash
# Enable HTTP server auto-start
export MCP_HTTP_ENABLED=true
# Configure HTTP server settings (optional)
export MCP_HTTP_PORT=8000
export MCP_HTTP_HOST=localhost
# Combine with SQLite-vec backend
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
```
**Coordination Modes Explained:**
1. **Automatic Mode (Recommended)**
```bash
# No configuration needed - auto-detects best mode
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
```
2. **Forced HTTP Client Mode**
```bash
# Always connect to existing server (fails if none running)
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
export MCP_HTTP_ENABLED=false
# Requires running: python scripts/run_http_server.py
```
3. **Direct WAL Mode Only**
```bash
# Disable HTTP coordination entirely
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
export MCP_HTTP_ENABLED=false
export MCP_HTTP_ENABLED=false
```
#### Multi-Client Claude Desktop Configuration
**Option 1: Automatic Coordination (Recommended)**
```json
{
"mcpServers": {
"memory": {
"command": "uv",
"args": ["--directory", "/path/to/mcp-memory-service", "run", "memory"],
"env": {
"MCP_MEMORY_STORAGE_BACKEND": "sqlite_vec",
"MCP_HTTP_ENABLED": "true"
}
}
}
}
```
**Option 2: Manual HTTP Server + Client Mode**
```json
{
"mcpServers": {
"memory": {
"command": "uv",
"args": ["--directory", "/path/to/mcp-memory-service", "run", "memory"],
"env": {
"MCP_MEMORY_STORAGE_BACKEND": "sqlite_vec",
"MCP_HTTP_ENABLED": "false"
}
}
}
}
```
*Note: Requires manually running `python scripts/run_http_server.py` first*
**Option 3: WAL Mode Only (Simple)**
```json
{
"mcpServers": {
"memory": {
"command": "uv",
"args": ["--directory", "/path/to/mcp-memory-service", "run", "memory"],
"env": {
"MCP_MEMORY_STORAGE_BACKEND": "sqlite_vec",
"MCP_MEMORY_SQLITE_PRAGMAS": "busy_timeout=10000"
}
}
}
}
```
### Database Optimization
```bash
# Optimize database periodically
python -c "
import asyncio
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
async def optimize():
storage = SqliteVecMemoryStorage('path/to/db')
await storage.initialize()
# Clean up duplicates
count, msg = await storage.cleanup_duplicates()
print(f'Cleaned up {count} duplicates')
# Vacuum database
storage.conn.execute('VACUUM')
print('Database vacuumed')
storage.close()
asyncio.run(optimize())
"
```
### Backup and Restore
```bash
# Create backup
python scripts/migrate_storage.py \
--from sqlite_vec \
--to sqlite_vec \
--source-path memory.db \
--target-path backup.db
# Or simple file copy
cp memory.db memory_backup.db
# Restore from JSON backup
python scripts/migrate_storage.py \
--restore backup.json \
--to sqlite_vec \
--target-path restored_memory.db
```
## Troubleshooting
### Common Issues
#### 1. sqlite-vec Not Found
```
ImportError: No module named 'sqlite_vec'
```
**Solution**: Install sqlite-vec package
```bash
pip install sqlite-vec
# or
uv add sqlite-vec
```
#### 2. Database Lock Errors
```
sqlite3.OperationalError: database is locked
```
**✅ Fixed in v8.9.0** - Proper SQLite pragmas now automatically configured by installer
**For Single Client Issues:**
```bash
# Kill existing processes
pkill -f "mcp-memory-service"
# Restart Claude Desktop
```
**For Multi-Client Setup (Claude Desktop + Claude Code + HTTP Server):**
```bash
# v8.9.0+ Solution: Configure recommended pragma values
export MCP_MEMORY_SQLITE_PRAGMAS="busy_timeout=15000,cache_size=20000"
# Restart all services to apply changes
# Note: Installer automatically sets these for hybrid/sqlite_vec backends
# If issues persist, try longer timeout:
export MCP_MEMORY_SQLITE_PRAGMAS="busy_timeout=30000,cache_size=20000"
# Check for stale lock files (rare)
ls -la /path/to/your/database-wal
ls -la /path/to/your/database-shm
# If stale locks exist (no active processes), remove them
rm /path/to/your/database-wal
rm /path/to/your/database-shm
# 4. Restart all MCP clients
```
**Prevention Tips:**
- Always use WAL mode (enabled by default)
- Configure appropriate busy timeouts for your use case
- Ensure proper shutdown of MCP clients
- Use connection retry logic (built-in)
#### 5. HTTP Coordination Issues
```
Failed to initialize HTTP client storage: Connection refused
```
**Solutions:**
**Auto-Detection Problems:**
```bash
# Check if HTTP server auto-start is working
export LOG_LEVEL=DEBUG
export MCP_HTTP_ENABLED=true
# Check coordination mode detection
python -c "
import asyncio
from src.mcp_memory_service.utils.port_detection import detect_server_coordination_mode
print(asyncio.run(detect_server_coordination_mode()))
"
```
**Manual HTTP Server Setup:**
```bash
# Start HTTP server manually in separate terminal
python scripts/run_http_server.py
# Then start MCP clients (they'll auto-detect the running server)
```
**Port Conflicts:**
```bash
# Check what's using the port
netstat -an | grep :8000 # Linux/macOS
netstat -an | findstr :8000 # Windows
# Use different port
export MCP_HTTP_PORT=8001
```
**Fallback to WAL Mode:**
```bash
# Force WAL mode if HTTP coordination fails
export MCP_HTTP_ENABLED=false
export MCP_HTTP_ENABLED=false
```
#### 3. Permission Errors
```
PermissionError: [Errno 13] Permission denied
```
**Solution**: Check database file permissions
```bash
# Fix permissions
chmod 644 /path/to/sqlite_vec.db
chmod 755 /path/to/directory
```
#### 4. Migration Failures
```
Migration failed: No memories found
```
**Solution**: Verify source path and initialize if needed
```bash
# Check source exists
ls -la /path/to/chroma_db
# Use absolute paths in migration
```
### Debug Mode
Enable debug logging for troubleshooting:
```bash
export LOG_LEVEL=DEBUG
export DEBUG_MODE=1
# Run your MCP client
```
### Health Checks
```python
# Check backend health
import asyncio
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
async def health_check():
storage = SqliteVecMemoryStorage('path/to/db')
await storage.initialize()
stats = storage.get_stats()
print(f"Backend: {stats['backend']}")
print(f"Total memories: {stats['total_memories']}")
print(f"Database size: {stats['database_size_mb']} MB")
print(f"Embedding model: {stats['embedding_model']}")
storage.close()
asyncio.run(health_check())
```
## Comparison: ChromaDB vs SQLite-vec
| Feature | ChromaDB | SQLite-vec | Winner |
|---------|----------|------------|--------|
| Setup Complexity | Medium | Low | SQLite-vec |
| Memory Usage | High | Low | SQLite-vec |
| Query Performance | Excellent | Very Good | ChromaDB |
| Portability | Poor | Excellent | SQLite-vec |
| Backup/Restore | Complex | Simple | SQLite-vec |
| Concurrent Access | Good | Excellent (HTTP + WAL) | SQLite-vec |
| Multi-Client Support | Good | Excellent (HTTP + WAL) | SQLite-vec |
| Ecosystem | Rich | Growing | ChromaDB |
| Reliability | Good | Excellent | SQLite-vec |
## Best Practices
### When to Use SQLite-vec
✅ **Use SQLite-vec when:**
- Memory collections < 100,000 entries
- Multi-client access needed (Claude Desktop + Claude Code + others)
- Seamless setup and coordination required (auto-detection)
- Portability and backup simplicity are important
- Limited system resources
- Simple deployment requirements
- Want both HTTP and direct access capabilities
### When to Use ChromaDB
✅ **Use ChromaDB when:**
- Memory collections > 100,000 entries
- Heavy concurrent usage
- Maximum query performance is critical
- Rich ecosystem features needed
- Distributed setups
### Multi-Client Coordination Tips
1. **Automatic Mode (Recommended)**
```bash
# Let the system choose the best coordination method
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
export MCP_HTTP_ENABLED=true
```
2. **Monitoring Coordination Mode**
```bash
# Check which mode is being used
export LOG_LEVEL=INFO
# Look for "Detected coordination mode: ..." in logs
```
3. **HTTP Server Management**
```bash
# Manual server control
python scripts/run_http_server.py # Start manually
# Check server health
curl http://localhost:8000/health
```
4. **Fallback Strategy**
```bash
# If HTTP coordination fails, system falls back to WAL mode
# No manual intervention needed - fully automatic
```
### Performance Tips
1. **Regular Optimization**
```bash
# Run monthly
python scripts/optimize_sqlite_vec.py
```
2. **Batch Operations**
```python
# Store memories in batches for better performance
for batch in chunk_memories(all_memories, 100):
for memory in batch:
await storage.store(memory)
```
3. **Index Maintenance**
```sql
-- Rebuild indexes periodically
REINDEX;
VACUUM;
```
## API Reference
The sqlite-vec backend implements the same `MemoryStorage` interface as ChromaDB:
```python
# All standard operations work identically
await storage.store(memory)
results = await storage.retrieve(query, n_results=5)
memories = await storage.search_by_tag(["tag1", "tag2"])
success, msg = await storage.delete(content_hash)
success, msg = await storage.update_memory_metadata(hash, updates)
```
See the main API documentation for complete method signatures.
## Contributing
To contribute to sqlite-vec backend development:
1. Run tests: `pytest tests/test_sqlite_vec_storage.py`
2. Check performance: `python tests/performance/test_sqlite_vec_perf.py`
3. Add features following the `MemoryStorage` interface
4. Update this documentation
## Support
For sqlite-vec backend issues:
1. Check [sqlite-vec documentation](https://github.com/asg017/sqlite-vec)
2. Review this guide's troubleshooting section
3. Open an issue on the [MCP Memory Service repository](https://github.com/user/mcp-memory-service/issues)
```
--------------------------------------------------------------------------------
/scripts/migration/legacy/migrate_chroma_to_sqlite.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Migration script to move data from ChromaDB to SQLite-vec.
This script reads all memories from your existing ChromaDB installation
and migrates them to the new SQLite-vec backend, preserving all metadata,
tags, embeddings, and timestamps.
"""
import asyncio
import os
import sys
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional, Union
from datetime import datetime
import re
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from mcp_memory_service.storage.chroma import ChromaMemoryStorage
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from mcp_memory_service.models.memory import Memory
from mcp_memory_service.config import CHROMA_PATH, EMBEDDING_MODEL_NAME
from mcp_memory_service.utils.hashing import generate_content_hash
logger = logging.getLogger(__name__)
def safe_timestamp_convert(timestamp: Union[str, float, int, None]) -> float:
"""Safely convert various timestamp formats to float."""
if timestamp is None:
return datetime.now().timestamp()
if isinstance(timestamp, (int, float)):
return float(timestamp)
if isinstance(timestamp, str):
# Try to parse ISO format strings
if 'T' in timestamp or '-' in timestamp:
try:
# Handle ISO format with or without 'Z'
timestamp_str = timestamp.rstrip('Z')
dt = datetime.fromisoformat(timestamp_str.replace('Z', ''))
return dt.timestamp()
except ValueError:
pass
# Try to parse as float string
try:
return float(timestamp)
except ValueError:
pass
# Fallback to current time
logger.warning(f"Could not parse timestamp '{timestamp}', using current time")
return datetime.now().timestamp()
def extract_memory_data_directly(chroma_storage) -> List[Dict[str, Any]]:
"""Extract memory data directly from ChromaDB without using Memory objects."""
try:
# Access the ChromaDB collection directly
collection = chroma_storage.collection
# Get all data from the collection
results = collection.get(
include=['documents', 'metadatas']
)
memories = []
for i, doc_id in enumerate(results['ids']):
try:
# Extract basic data
content = results['documents'][i] if i < len(results['documents']) else ""
metadata = results['metadatas'][i] if i < len(results['metadatas']) else {}
# Extract and validate tags from metadata
raw_tags = metadata.get('tags', metadata.get('tags_str', []))
tags = []
if isinstance(raw_tags, str):
# Handle comma-separated string or single tag
if ',' in raw_tags:
tags = [tag.strip() for tag in raw_tags.split(',') if tag.strip()]
elif raw_tags.strip():
tags = [raw_tags.strip()]
elif isinstance(raw_tags, list):
# Validate each tag in list
tags = [str(tag).strip() for tag in raw_tags if tag and str(tag).strip()]
else:
logger.warning(f"Unknown tag format for memory {i}: {type(raw_tags)}")
tags = []
# Extract timestamps with flexible conversion
created_at = safe_timestamp_convert(metadata.get('created_at'))
updated_at = safe_timestamp_convert(metadata.get('updated_at', created_at))
# Extract other metadata
memory_type = metadata.get('memory_type', 'imported')
# Create clean metadata dict (remove special fields)
clean_metadata = {k: v for k, v in metadata.items()
if k not in ['tags', 'created_at', 'updated_at', 'memory_type']}
# Generate proper content hash instead of using ChromaDB ID
proper_content_hash = generate_content_hash(content)
memory_data = {
'content': content,
'tags': tags,
'memory_type': memory_type,
'metadata': clean_metadata,
'created_at': created_at,
'updated_at': updated_at,
'content_hash': proper_content_hash # Use proper SHA256 hash
}
memories.append(memory_data)
except Exception as e:
logger.warning(f"Failed to extract memory {i}: {e}")
continue
logger.info(f"Successfully extracted {len(memories)} memories from ChromaDB")
return memories
except Exception as e:
logger.error(f"Failed to extract data from ChromaDB: {e}")
return []
class MigrationStats:
"""Track migration statistics."""
def __init__(self):
self.total_memories = 0
self.migrated_successfully = 0
self.failed_migrations = 0
self.duplicates_skipped = 0
self.start_time = datetime.now()
self.errors: List[str] = []
def add_error(self, error: str):
self.errors.append(error)
self.failed_migrations += 1
def print_summary(self):
duration = datetime.now() - self.start_time
print("\n" + "="*60)
print("MIGRATION SUMMARY")
print("="*60)
print(f"Total memories found: {self.total_memories}")
print(f"Successfully migrated: {self.migrated_successfully}")
print(f"Duplicates skipped: {self.duplicates_skipped}")
print(f"Failed migrations: {self.failed_migrations}")
print(f"Migration duration: {duration.total_seconds():.2f} seconds")
if self.errors:
print(f"\nErrors encountered ({len(self.errors)}):")
for i, error in enumerate(self.errors[:5], 1): # Show first 5 errors
print(f" {i}. {error}")
if len(self.errors) > 5:
print(f" ... and {len(self.errors) - 5} more errors")
else:
print("\nMigration completed without errors!")
async def check_chroma_data(chroma_path: str) -> int:
"""Check if ChromaDB data exists and count memories."""
print(f"Checking ChromaDB data at: {chroma_path}")
try:
chroma_storage = ChromaMemoryStorage(
path=chroma_path
)
# Extract memories directly to avoid data corruption issues
memories = extract_memory_data_directly(chroma_storage)
memory_count = len(memories)
print(f"Found {memory_count} memories in ChromaDB")
return memory_count
except Exception as e:
print(f"Error accessing ChromaDB: {e}")
print("Make sure ChromaDB data exists and is accessible")
return -1
async def migrate_memories(
chroma_path: str,
sqlite_path: str,
stats: MigrationStats,
batch_size: int = 50,
skip_duplicates: bool = True
) -> bool:
"""Migrate all memories from ChromaDB to SQLite-vec."""
chroma_storage = None
sqlite_storage = None
try:
# Initialize ChromaDB storage (source)
print("Connecting to ChromaDB...")
chroma_storage = ChromaMemoryStorage(
path=chroma_path
)
# Initialize SQLite-vec storage (destination)
print("Connecting to SQLite-vec...")
sqlite_storage = SqliteVecMemoryStorage(
db_path=sqlite_path,
embedding_model=EMBEDDING_MODEL_NAME
)
await sqlite_storage.initialize()
# Extract all memories directly from ChromaDB
print("Extracting all memories from ChromaDB...")
all_memories = extract_memory_data_directly(chroma_storage)
stats.total_memories = len(all_memories)
if stats.total_memories == 0:
print("No memories found in ChromaDB")
return True
print(f"Found {stats.total_memories} memories to migrate")
# Migrate in batches
for i in range(0, stats.total_memories, batch_size):
batch = all_memories[i:i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (stats.total_memories + batch_size - 1) // batch_size
print(f"Processing batch {batch_num}/{total_batches} ({len(batch)} memories)...")
for memory_data in batch:
try:
# Check if memory already exists in SQLite-vec (if skipping duplicates)
if skip_duplicates:
try:
# Use a more efficient duplicate check
cursor = sqlite_storage.conn.execute(
"SELECT 1 FROM memories WHERE content_hash = ? LIMIT 1",
(memory_data['content_hash'],)
)
if cursor.fetchone():
stats.duplicates_skipped += 1
continue
except Exception:
# Fallback to retrieve method if direct query fails
existing = await sqlite_storage.retrieve(memory_data['content'], n_results=1)
if existing and any(m.memory.content_hash == memory_data['content_hash'] for m in existing):
stats.duplicates_skipped += 1
continue
# Create Memory object for SQLite-vec storage
memory_obj = Memory(
content=memory_data['content'],
tags=memory_data['tags'],
metadata=memory_data['metadata'],
created_at=memory_data['created_at'],
updated_at=memory_data['updated_at'],
content_hash=memory_data['content_hash']
)
# Store memory in SQLite-vec
success, message = await sqlite_storage.store(memory_obj)
if not success:
raise Exception(f"Storage failed: {message}")
stats.migrated_successfully += 1
except Exception as e:
error_msg = f"Failed to migrate memory {memory_data['content_hash'][:12]}...: {str(e)}"
stats.add_error(error_msg)
logger.error(error_msg)
# Progress update with percentage
migrated_so_far = stats.migrated_successfully + stats.duplicates_skipped + stats.failed_migrations
percentage = (migrated_so_far / stats.total_memories * 100) if stats.total_memories > 0 else 0
print(f"Batch {batch_num}/{total_batches} complete. Progress: {migrated_so_far}/{stats.total_memories} ({percentage:.1f}%)")
return True
except Exception as e:
error_msg = f"Critical migration error: {str(e)}"
stats.add_error(error_msg)
logger.error(error_msg)
return False
finally:
# Clean up connections
if sqlite_storage:
sqlite_storage.close()
async def verify_migration(sqlite_path: str, expected_count: int) -> bool:
"""Verify that the migration was successful."""
print("Verifying migration results...")
try:
sqlite_storage = SqliteVecMemoryStorage(
db_path=sqlite_path,
embedding_model=EMBEDDING_MODEL_NAME
)
await sqlite_storage.initialize()
# Count memories in SQLite-vec
all_memories = await sqlite_storage.retrieve("", n_results=10000)
actual_count = len(all_memories)
sqlite_storage.close()
print(f"Verification: Expected {expected_count}, Found {actual_count}")
if actual_count >= expected_count:
print("Migration verification passed!")
return True
else:
print("Migration verification failed - some memories may be missing")
return False
except Exception as e:
print(f"Verification error: {e}")
return False
def print_banner():
"""Print migration banner."""
print("="*60)
print("MCP Memory Service - ChromaDB to SQLite-vec Migration")
print("="*60)
print("This script migrates all your memories from ChromaDB to SQLite-vec.")
print("Your original ChromaDB data will not be modified.")
print()
async def main():
"""Main migration function."""
print_banner()
# Parse command-line arguments
import argparse
parser = argparse.ArgumentParser(description='Migrate ChromaDB to SQLite-vec')
parser.add_argument('--chroma-path', help='Path to ChromaDB data directory')
parser.add_argument('--sqlite-path', help='Path for SQLite-vec database')
parser.add_argument('--batch-size', type=int, default=50, help='Batch size for migration')
parser.add_argument('--verbose', action='store_true', help='Enable verbose logging')
args = parser.parse_args()
# Setup logging
log_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Configuration with environment variable and argument support
chroma_path = args.chroma_path or os.environ.get('MCP_MEMORY_CHROMA_PATH', CHROMA_PATH)
# Allow custom SQLite path via argument or environment variable
sqlite_path = args.sqlite_path or os.environ.get('MCP_MEMORY_SQLITE_PATH')
if not sqlite_path:
# Default to same directory as ChromaDB
chroma_dir = os.path.dirname(chroma_path) if os.path.dirname(chroma_path) else os.getcwd()
sqlite_path = os.path.join(chroma_dir, 'sqlite_vec_migrated.db')
# Use batch size from arguments
batch_size = args.batch_size
print(f"ChromaDB source: {chroma_path}")
print(f"SQLite-vec destination: {sqlite_path}")
print()
# Check if ChromaDB data exists
memory_count = await check_chroma_data(chroma_path)
if memory_count < 0:
return 1
if memory_count == 0:
print("No memories to migrate. Migration complete!")
return 0
# Confirm migration
print(f"About to migrate {memory_count} memories from ChromaDB to SQLite-vec")
print(f"Destination file: {sqlite_path}")
try:
response = input("\\nProceed with migration? (y/N): ").strip().lower()
if response != 'y':
print("Migration cancelled by user")
return 1
except EOFError:
# Auto-proceed in non-interactive environment
print("\\nAuto-proceeding with migration in non-interactive environment...")
response = 'y'
# Perform migration
stats = MigrationStats()
success = await migrate_memories(chroma_path, sqlite_path, stats, batch_size=batch_size)
if success:
# Verify migration
await verify_migration(sqlite_path, stats.migrated_successfully)
# Print summary
stats.print_summary()
if success and stats.failed_migrations == 0:
print("\\nMigration completed successfully!")
print("\\nNext steps:")
print(f" 1. Update your environment: export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec")
print(f" 2. Update database path: export MCP_MEMORY_SQLITE_PATH={sqlite_path}")
print(f" 3. Restart MCP Memory Service")
print(f" 4. Test that your memories are accessible")
print(f" 5. (Optional) Backup your old ChromaDB data: {chroma_path}")
return 0
else:
print("\\nMigration completed with errors. Please review the summary above.")
return 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))
```
--------------------------------------------------------------------------------
/claude-hooks/utilities/tiered-conversation-monitor.js:
--------------------------------------------------------------------------------
```javascript
/**
* Tiered Conversation Monitor
* Performance-aware semantic analysis with multiple processing levels
*/
const { PerformanceManager } = require('./performance-manager');
class TieredConversationMonitor {
constructor(config = {}, performanceManager = null) {
this.config = config;
this.performanceManager = performanceManager || new PerformanceManager(config.performance);
// Conversation state
this.conversationHistory = [];
this.currentTopics = new Set();
this.contextWindow = config.contextWindow || 10; // Number of recent messages to analyze
// Topic tracking
this.topicWeights = new Map();
this.semanticCache = new Map();
// Performance-based configuration
this.tierConfig = {
instant: {
enabled: true,
methods: ['simplePatternMatch', 'cacheCheck'],
maxLatency: 50
},
fast: {
enabled: true,
methods: ['topicExtraction', 'lightweightSemantic'],
maxLatency: 150
},
intensive: {
enabled: false, // Default off for performance
methods: ['deepSemanticAnalysis', 'fullContextAnalysis'],
maxLatency: 500
}
};
this.updateTierConfiguration();
}
/**
* Update tier configuration based on performance manager
*/
updateTierConfiguration() {
if (!this.performanceManager) return;
const profile = this.performanceManager.performanceBudget;
this.tierConfig.instant.enabled = profile.enabledTiers.includes('instant');
this.tierConfig.fast.enabled = profile.enabledTiers.includes('fast');
this.tierConfig.intensive.enabled = profile.enabledTiers.includes('intensive');
}
/**
* Analyze user message with tiered approach
*/
async analyzeMessage(message, context = {}) {
const analysis = {
topics: [],
semanticShift: 0,
triggerProbability: 0,
processingTier: 'none',
confidence: 0
};
// Tier 1: Instant processing (< 50ms)
if (this.tierConfig.instant.enabled) {
const timing = this.performanceManager.startTiming('instant_analysis', 'instant');
try {
const instantResults = await this.instantAnalysis(message, context);
analysis.topics.push(...instantResults.topics);
analysis.triggerProbability = Math.max(analysis.triggerProbability, instantResults.triggerProbability);
analysis.processingTier = 'instant';
const result = this.performanceManager.endTiming(timing);
// If instant analysis is confident enough, return early
if (instantResults.confidence > 0.8 || !this.tierConfig.fast.enabled) {
analysis.confidence = instantResults.confidence;
return analysis;
}
} catch (error) {
console.warn('[Monitor] Instant analysis failed:', error.message);
}
}
// Tier 2: Fast processing (< 150ms)
if (this.tierConfig.fast.enabled && this.performanceManager.shouldRunHook('fast_analysis', 'fast')) {
const timing = this.performanceManager.startTiming('fast_analysis', 'fast');
try {
const fastResults = await this.fastAnalysis(message, context);
// Merge results with priority to fast analysis
analysis.topics = this.mergeTopics(analysis.topics, fastResults.topics);
analysis.semanticShift = fastResults.semanticShift;
analysis.triggerProbability = Math.max(analysis.triggerProbability, fastResults.triggerProbability);
analysis.processingTier = 'fast';
analysis.confidence = fastResults.confidence;
this.performanceManager.endTiming(timing);
// If fast analysis is confident, return
if (fastResults.confidence > 0.7 || !this.tierConfig.intensive.enabled) {
return analysis;
}
} catch (error) {
console.warn('[Monitor] Fast analysis failed:', error.message);
}
}
// Tier 3: Intensive processing (< 500ms) - only when needed
if (this.tierConfig.intensive.enabled &&
this.performanceManager.shouldRunHook('intensive_analysis', 'intensive') &&
analysis.triggerProbability > 0.3) {
const timing = this.performanceManager.startTiming('intensive_analysis', 'intensive');
try {
const intensiveResults = await this.intensiveAnalysis(message, context);
// Use intensive results as authoritative
analysis.topics = intensiveResults.topics;
analysis.semanticShift = intensiveResults.semanticShift;
analysis.triggerProbability = intensiveResults.triggerProbability;
analysis.confidence = intensiveResults.confidence;
analysis.processingTier = 'intensive';
this.performanceManager.endTiming(timing);
} catch (error) {
console.warn('[Monitor] Intensive analysis failed:', error.message);
}
}
// Update conversation history
this.updateConversationHistory(message, analysis);
return analysis;
}
/**
* Instant analysis: Pattern matching and cache checks
*/
async instantAnalysis(message, context) {
const cacheKey = this.generateCacheKey(message);
// Check cache first
if (this.semanticCache.has(cacheKey)) {
const cached = this.semanticCache.get(cacheKey);
// Update last used timestamp
cached.lastUsed = Date.now();
this.semanticCache.set(cacheKey, cached);
return { ...cached, confidence: 0.9 }; // High confidence for cached results
}
// Simple pattern matching for common triggers
const triggerPatterns = [
/what (did|do) we (decide|choose|do)/i,
/remind me (about|how|what)/i,
/similar to (what|how) we/i,
/like we (discussed|did|decided)/i,
/according to (our|previous)/i,
/remember when we/i,
/last time we/i
];
let triggerProbability = 0;
const topics = [];
// Check for explicit memory trigger patterns
for (const pattern of triggerPatterns) {
if (pattern.test(message)) {
triggerProbability = Math.max(triggerProbability, 0.8);
topics.push('memory-request');
break;
}
}
// Extract obvious topics (technology names, frameworks)
const techPatterns = [
/\b(react|vue|angular|node|python|java|docker|kubernetes)\b/i,
/\b(api|database|frontend|backend|ui|ux)\b/i,
/\b(authentication|oauth|security|performance)\b/i
];
for (const pattern of techPatterns) {
const matches = message.match(pattern);
if (matches) {
topics.push(...matches.map(m => m.toLowerCase()));
triggerProbability = Math.max(triggerProbability, 0.4);
}
}
const result = {
topics: [...new Set(topics)], // Remove duplicates
triggerProbability,
confidence: triggerProbability > 0.5 ? 0.8 : 0.4,
lastUsed: Date.now()
};
// Cache result
this.semanticCache.set(cacheKey, result);
this.cleanCache();
return result;
}
/**
* Fast analysis: Lightweight semantic processing
*/
async fastAnalysis(message, context) {
// Tokenize and extract key phrases
const tokens = this.tokenizeMessage(message);
const keyPhrases = this.extractKeyPhrases(tokens);
// Analyze topic shift from recent history
const semanticShift = this.calculateSemanticShift(keyPhrases);
// Calculate trigger probability based on context and content
let triggerProbability = 0;
// Check for question patterns that suggest memory need
if (this.isQuestionPattern(message)) {
triggerProbability += 0.3;
}
// Check for reference to past work
if (this.referencesPastWork(message)) {
triggerProbability += 0.4;
}
// Check for topic complexity
if (keyPhrases.length > 3) {
triggerProbability += 0.2;
}
// Semantic shift indicates topic change
if (semanticShift > 0.5) {
triggerProbability += 0.3;
}
return {
topics: keyPhrases,
semanticShift,
triggerProbability: Math.min(triggerProbability, 1.0),
confidence: 0.7
};
}
/**
* Intensive analysis: Deep semantic understanding
*/
async intensiveAnalysis(message, context) {
// This would integrate with more sophisticated NLP if available
// For now, enhance the fast analysis with deeper processing
const fastResult = await this.fastAnalysis(message, context);
// Analyze conversation context for better topic understanding
const contextTopics = this.analyzeConversationContext();
const mergedTopics = this.mergeTopics(fastResult.topics, contextTopics);
// More sophisticated semantic shift calculation
const enhancedSemanticShift = this.calculateEnhancedSemanticShift(message, context);
// Advanced trigger probability with context weighting
let enhancedTriggerProbability = fastResult.triggerProbability;
// Weight based on conversation history
if (this.conversationHistory.length > 5) {
const historyWeight = this.calculateHistoryRelevance(message);
enhancedTriggerProbability += historyWeight * 0.2;
}
// Project context relevance
if (context.projectContext) {
const projectRelevance = this.calculateProjectRelevance(message, context.projectContext);
enhancedTriggerProbability += projectRelevance * 0.3;
}
return {
topics: mergedTopics,
semanticShift: enhancedSemanticShift,
triggerProbability: Math.min(enhancedTriggerProbability, 1.0),
confidence: 0.9
};
}
/**
* Helper methods for analysis
*/
tokenizeMessage(message) {
return message.toLowerCase()
.replace(/[^\w\s]/g, ' ')
.split(/\s+/)
.filter(token => token.length > 2);
}
extractKeyPhrases(tokens) {
// Simple key phrase extraction
const technicalTerms = new Set([
'react', 'vue', 'angular', 'node', 'python', 'java', 'javascript',
'api', 'database', 'frontend', 'backend', 'authentication', 'oauth',
'docker', 'kubernetes', 'security', 'performance', 'architecture',
'component', 'service', 'endpoint', 'middleware', 'framework'
]);
return tokens.filter(token => technicalTerms.has(token));
}
calculateSemanticShift(currentTopics) {
if (this.currentTopics.size === 0) {
this.currentTopics = new Set(currentTopics);
return 0;
}
const intersection = new Set([...currentTopics].filter(x => this.currentTopics.has(x)));
const union = new Set([...currentTopics, ...this.currentTopics]);
// Prevent division by zero when both sets are empty
if (union.size === 0) {
this.currentTopics = new Set(currentTopics);
return 0;
}
const similarity = intersection.size / union.size;
const shift = 1 - similarity;
// Update current topics
this.currentTopics = new Set(currentTopics);
return shift;
}
isQuestionPattern(message) {
const questionPatterns = [
/^(what|how|why|when|where|which|who)/i,
/\?$/,
/^(can|could|would|should|do|does|did|is|are|was|were)/i
];
return questionPatterns.some(pattern => pattern.test(message.trim()));
}
referencesPastWork(message) {
const pastWorkPatterns = [
/\b(previous|earlier|before|last time|remember|recall)\b/i,
/\b(we (did|used|chose|decided|implemented))\b/i,
/\b(our (approach|solution|decision|choice))\b/i
];
return pastWorkPatterns.some(pattern => pattern.test(message));
}
mergeTopics(topics1, topics2) {
return [...new Set([...topics1, ...topics2])];
}
analyzeConversationContext() {
// Analyze recent conversation for recurring topics
const recentMessages = this.conversationHistory.slice(-this.contextWindow);
const allTopics = recentMessages.flatMap(msg => msg.analysis?.topics || []);
// Count topic frequency
const topicCounts = {};
allTopics.forEach(topic => {
topicCounts[topic] = (topicCounts[topic] || 0) + 1;
});
// Return topics mentioned more than once
return Object.entries(topicCounts)
.filter(([topic, count]) => count > 1)
.map(([topic]) => topic);
}
calculateEnhancedSemanticShift(message, context) {
// Enhanced semantic shift with context weighting
const basicShift = this.calculateSemanticShift(this.extractKeyPhrases(this.tokenizeMessage(message)));
// Weight by message length and complexity
const lengthWeight = Math.min(message.length / 500, 1.0);
const complexityWeight = (message.match(/\b(implement|architecture|design|strategy|approach)\b/gi) || []).length * 0.1;
return Math.min(basicShift + lengthWeight * 0.2 + complexityWeight, 1.0);
}
calculateHistoryRelevance(message) {
// Calculate how relevant current message is to conversation history
if (this.conversationHistory.length === 0) return 0;
const messageTopics = new Set(this.extractKeyPhrases(this.tokenizeMessage(message)));
const historyTopics = new Set(
this.conversationHistory
.flatMap(msg => msg.analysis?.topics || [])
);
const intersection = new Set([...messageTopics].filter(x => historyTopics.has(x)));
return intersection.size / Math.max(messageTopics.size, 1);
}
calculateProjectRelevance(message, projectContext) {
if (!projectContext) return 0;
const messageTokens = this.tokenizeMessage(message);
const projectTerms = [
projectContext.name?.toLowerCase(),
projectContext.language?.toLowerCase(),
...(projectContext.frameworks || []).map(f => f.toLowerCase())
].filter(Boolean);
const relevantTerms = messageTokens.filter(token =>
projectTerms.some(term => term.includes(token) || token.includes(term))
);
return relevantTerms.length / Math.max(messageTokens.length, 1);
}
updateConversationHistory(message, analysis) {
this.conversationHistory.push({
message,
analysis,
timestamp: Date.now()
});
// Keep only recent history
if (this.conversationHistory.length > this.contextWindow * 2) {
this.conversationHistory.splice(0, this.conversationHistory.length - this.contextWindow);
}
}
generateCacheKey(message) {
// Generate cache key from message content
return message.toLowerCase().replace(/[^\w]/g, '').substring(0, 50);
}
cleanCache() {
// Clean cache if it gets too large
if (this.semanticCache.size > 100) {
const entries = Array.from(this.semanticCache.entries());
entries.sort((a, b) => (b[1].lastUsed || 0) - (a[1].lastUsed || 0));
// Keep only the 50 most recently used entries
this.semanticCache.clear();
entries.slice(0, 50).forEach(([key, value]) => {
this.semanticCache.set(key, value);
});
}
}
/**
* Get current performance status
*/
getPerformanceStatus() {
return {
tierConfig: this.tierConfig,
cacheSize: this.semanticCache.size,
historyLength: this.conversationHistory.length,
currentTopics: Array.from(this.currentTopics),
performanceReport: this.performanceManager.getPerformanceReport()
};
}
/**
* Update performance profile
*/
updatePerformanceProfile(profileName) {
this.performanceManager.switchProfile(profileName);
this.updateTierConfiguration();
}
}
module.exports = { TieredConversationMonitor };
```
--------------------------------------------------------------------------------
/scripts/maintenance/assign_memory_types.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Intelligent Memory Type Assignment Script
Assigns appropriate types to untyped memories using:
1. Tag-based inference (highest confidence)
2. Content pattern matching (medium confidence)
3. Metadata analysis (context hints)
4. Fallback to "note" (lowest confidence)
Usage:
python assign_memory_types.py --dry-run # Preview assignments
python assign_memory_types.py --verbose # Detailed logging
python assign_memory_types.py --show-reasoning # Show inference logic
python assign_memory_types.py # Execute assignments
"""
import sys
import os
import re
import json
import sqlite3
import argparse
import logging
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Set
from collections import defaultdict, Counter
import shutil
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from src.mcp_memory_service.config import SQLITE_VEC_PATH
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
# ============================================================================
# TYPE INFERENCE RULES
# ============================================================================
# Priority 1: Tag-based inference (highest confidence)
TAG_TO_TYPE: Dict[str, str] = {
# Activity indicators
"session-consolidation": "session",
"session-summary": "session",
"session-end": "session",
"session-start": "session",
"development-session": "session",
"work-session": "session",
# Troubleshooting
"troubleshooting": "troubleshooting",
"debug": "troubleshooting",
"debugging": "troubleshooting",
"diagnostic": "troubleshooting",
"investigation": "troubleshooting",
# Fixes
"bug-fix": "fix",
"bugfix": "fix",
"fix": "fix",
"patch": "fix",
"hotfix": "fix",
"correction": "fix",
# Releases and deployments
"release": "release",
"release-notes": "release",
"version": "release",
"deployment": "deployment",
"deploy": "deployment",
"production": "deployment",
# Features
"feature": "feature",
"enhancement": "feature",
"improvement": "feature",
"new-feature": "feature",
# Configuration
"configuration": "configuration",
"config": "configuration",
"setup": "configuration",
"settings": "configuration",
"environment": "configuration",
# Documentation
"documentation": "documentation",
"docs": "documentation",
"readme": "documentation",
"changelog": "documentation",
# Guides
"guide": "guide",
"tutorial": "guide",
"how-to": "guide",
"walkthrough": "guide",
"instructions": "guide",
# Reference
"reference": "reference",
"knowledge-base": "reference",
"cheat-sheet": "reference",
"quick-reference": "reference",
# Milestones
"milestone": "milestone",
"achievement": "achievement",
"completion": "milestone",
"accomplished": "achievement",
# Analysis
"analysis": "analysis",
"research": "analysis",
"findings": "analysis",
"investigation": "analysis",
"report": "analysis",
# Implementation
"implementation": "implementation",
"development": "implementation",
"coding": "implementation",
"integration": "implementation",
# Testing
"test": "test",
"testing": "test",
"validation": "test",
"qa": "test",
# Architecture
"architecture": "architecture",
"design": "architecture",
"design-pattern": "architecture",
"technical-design": "architecture",
# Infrastructure
"infrastructure": "infrastructure",
"devops": "infrastructure",
"ci-cd": "infrastructure",
"automation": "infrastructure",
# Process
"process": "process",
"workflow": "process",
"procedure": "process",
"best-practices": "process",
# Security
"security": "security",
"auth": "security",
"authentication": "security",
"authorization": "security",
# Status
"status": "status",
"update": "status",
"progress": "status",
}
# Priority 2: Content pattern matching (medium confidence)
CONTENT_PATTERNS: Dict[str, List[str]] = {
"fix": [
r"\bfixed\b.*\bbug\b",
r"\bresolved\b.*\b(issue|problem)\b",
r"\brepair(ed|ing)\b",
r"\bhotfix\b",
r"\bpatch(ed|ing)?\b.*\b(bug|issue)\b",
],
"troubleshooting": [
r"\berror\b.*\boccurred\b",
r"\btroubleshooting\b",
r"\bdiagnos(is|tic|ing)\b",
r"\bdebugging\b",
r"\binvestigat(ed|ing)\b.*\b(issue|problem|error)\b",
r"\bfail(ed|ure)\b.*\banalys",
],
"implementation": [
r"\bimplemented\b",
r"\bcreated\b.*\b(function|class|module|component)\b",
r"\badded\b.*\b(feature|functionality)\b",
r"\bdevelop(ed|ing)\b",
r"\bbuilt\b.*\b(system|service|tool)\b",
],
"guide": [
r"^(How to|Step-by-step|Guide:|Tutorial:)",
r"\binstructions?\b.*\b(follow|complete|execute)\b",
r"\bprocedure\b",
r"\bstep \d+",
r"\bwalkthrough\b",
],
"configuration": [
r"\bconfigur(e|ed|ation|ing)\b",
r"\bsetup\b",
r"\.env\b",
r"\bsettings?\b",
r"\benvironment variables?\b",
r"\binstallation\b",
],
"analysis": [
r"\banalysis\b.*\b(shows?|reveals?|indicates?)\b",
r"\bfindings?\b",
r"\bresults?\b.*\b(show|demonstrate|indicate)\b",
r"\bresearch\b",
r"\binvestigation\b.*\bresults?\b",
],
"session": [
r"\bsession\b.*(summary|recap|notes)\b",
r"\bwork session\b",
r"\bdevelopment session\b",
r"\btopics? (discussed|covered)\b",
],
"release": [
r"\b(version|v)\d+\.\d+",
r"\breleas(e|ed|ing)\b",
r"\bchangelog\b",
r"\brelease notes\b",
],
"documentation": [
r"\bdocument(ation|ed|ing)\b",
r"\bREADME\b",
r"\bAPI documentation\b",
r"\breference (manual|guide)\b",
],
"milestone": [
r"\b(completed|finished|accomplished)\b.*\b(project|milestone|phase)\b",
r"\bmilestone\b.*\breached\b",
r"\bdeliverable\b",
],
}
# Priority 3: Metadata type hints
METADATA_TYPE_HINTS: Set[str] = {
"session-summary",
"troubleshooting-session",
"feature-summary",
"code-review",
"release-notes",
}
# ============================================================================
# TYPE INFERENCE ENGINE
# ============================================================================
class TypeInferenceEngine:
"""Infer memory types based on tags, content, and metadata."""
def __init__(self, show_reasoning: bool = False):
self.show_reasoning = show_reasoning
self.inference_stats = Counter()
def infer_type(self, content: str, tags: List[str], metadata: Optional[Dict]) -> Tuple[str, str, int]:
"""
Infer memory type.
Returns:
(inferred_type, reasoning, confidence_score)
confidence_score: 3=high, 2=medium, 1=low
"""
# Priority 1: Tag-based inference (confidence=3)
for tag in tags:
tag_clean = tag.lower().strip()
if tag_clean in TAG_TO_TYPE:
inferred_type = TAG_TO_TYPE[tag_clean]
reasoning = f"Tag match: '{tag}' → '{inferred_type}'"
self.inference_stats["tag_match"] += 1
return (inferred_type, reasoning, 3)
# Priority 2: Content pattern matching (confidence=2)
for memory_type, patterns in CONTENT_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, content, re.IGNORECASE | re.MULTILINE):
reasoning = f"Content pattern: '{pattern[:30]}...' → '{memory_type}'"
self.inference_stats["pattern_match"] += 1
return (memory_type, reasoning, 2)
# Priority 3: Metadata hints (confidence=2)
if metadata:
metadata_type = metadata.get("type", "")
if metadata_type in METADATA_TYPE_HINTS:
# Extract base type from hyphenated metadata type
base_type = metadata_type.split("-")[0]
if base_type in ["session", "troubleshooting", "feature", "release"]:
reasoning = f"Metadata hint: type='{metadata_type}' → '{base_type}'"
self.inference_stats["metadata_hint"] += 1
return (base_type, reasoning, 2)
# Priority 4: Fallback to "note" (confidence=1)
reasoning = "Fallback: No specific indicators → 'note'"
self.inference_stats["fallback"] += 1
return ("note", reasoning, 1)
def get_stats(self) -> Dict[str, int]:
"""Get inference statistics."""
return dict(self.inference_stats)
# ============================================================================
# DATABASE OPERATIONS
# ============================================================================
def create_backup(db_path: str) -> str:
"""Create a timestamped backup of the database."""
backup_path = f"{db_path}.backup-{datetime.now().strftime('%Y%m%d_%H%M%S')}"
shutil.copy2(db_path, backup_path)
logger.info(f"✅ Backup created: {backup_path}")
return backup_path
def analyze_untyped_memories(db_path: str) -> Tuple[int, int]:
"""Count untyped memories and total memories."""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM memories")
total = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM memories WHERE memory_type = '' OR memory_type IS NULL")
untyped = cursor.fetchone()[0]
conn.close()
return untyped, total
def get_untyped_memories(db_path: str) -> List[Tuple[str, str, str, str]]:
"""
Get all untyped memories.
Returns:
List of (content_hash, content, tags_str, metadata_str)
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""
SELECT content_hash, content, tags, metadata
FROM memories
WHERE memory_type = '' OR memory_type IS NULL
""")
results = cursor.fetchall()
conn.close()
return results
def assign_types(db_path: str, assignments: Dict[str, str], dry_run: bool = False) -> int:
"""
Assign types to memories.
Args:
db_path: Database path
assignments: {content_hash: inferred_type}
dry_run: If True, don't actually update
Returns:
Number of memories updated
"""
if dry_run:
logger.info(f"[DRY RUN] Would update {len(assignments)} memories")
return len(assignments)
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
updated = 0
for content_hash, memory_type in assignments.items():
cursor.execute(
"UPDATE memories SET memory_type = ? WHERE content_hash = ?",
(memory_type, content_hash)
)
updated += cursor.rowcount
conn.commit()
conn.close()
logger.info(f"✅ Updated {updated} memories")
return updated
# ============================================================================
# MAIN SCRIPT
# ============================================================================
def main():
parser = argparse.ArgumentParser(
description="Intelligently assign types to untyped memories",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Preview assignments
python assign_memory_types.py --dry-run
# Show detailed reasoning
python assign_memory_types.py --dry-run --show-reasoning
# Execute assignments
python assign_memory_types.py
# Verbose logging
python assign_memory_types.py --verbose
"""
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Preview assignments without modifying database'
)
parser.add_argument(
'--show-reasoning',
action='store_true',
help='Show inference reasoning for each memory'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose logging'
)
parser.add_argument(
'--db-path',
type=str,
default=SQLITE_VEC_PATH,
help=f'Path to SQLite database (default: {SQLITE_VEC_PATH})'
)
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Check database exists
if not os.path.exists(args.db_path):
logger.error(f"❌ Database not found: {args.db_path}")
sys.exit(1)
logger.info("=" * 80)
logger.info("🤖 Intelligent Memory Type Assignment")
logger.info("=" * 80)
logger.info(f"Database: {args.db_path}")
logger.info(f"Mode: {'DRY RUN (preview only)' if args.dry_run else 'EXECUTE (will modify)'}")
logger.info("")
# Analyze current state
logger.info("📊 Analyzing database...")
untyped_count, total_count = analyze_untyped_memories(args.db_path)
logger.info(f"Total memories: {total_count}")
logger.info(f"Untyped memories: {untyped_count} ({untyped_count/total_count*100:.1f}%)")
logger.info("")
if untyped_count == 0:
logger.info("✅ No untyped memories found! Database is clean.")
return
# Initialize inference engine
engine = TypeInferenceEngine(show_reasoning=args.show_reasoning)
# Get untyped memories
logger.info("🔍 Retrieving untyped memories...")
untyped_memories = get_untyped_memories(args.db_path)
logger.info(f"Retrieved {len(untyped_memories)} untyped memories")
logger.info("")
# Infer types
logger.info("🧠 Inferring types...")
assignments = {}
type_distribution = Counter()
confidence_distribution = Counter()
for content_hash, content, tags_str, metadata_str in untyped_memories:
# Parse tags and metadata
tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()] if tags_str else []
metadata = None
if metadata_str:
try:
metadata = json.loads(metadata_str)
except json.JSONDecodeError:
pass
# Infer type
inferred_type, reasoning, confidence = engine.infer_type(content, tags, metadata)
# Store assignment
assignments[content_hash] = inferred_type
type_distribution[inferred_type] += 1
confidence_distribution[confidence] += 1
# Show reasoning if requested
if args.show_reasoning:
logger.info(f"{content_hash[:8]}... → {inferred_type} (conf={confidence})")
logger.info(f" Reason: {reasoning}")
logger.info(f" Tags: {tags[:3]}{'...' if len(tags) > 3 else ''}")
logger.info(f" Preview: {content[:100]}...")
logger.info("")
# Display statistics
logger.info("")
logger.info("=" * 80)
logger.info("📈 Inference Statistics")
logger.info("=" * 80)
logger.info("\nInference Methods:")
for method, count in engine.get_stats().items():
logger.info(f" {method}: {count}")
logger.info("\nConfidence Distribution:")
logger.info(f" High (tag match): {confidence_distribution[3]}")
logger.info(f" Medium (pattern/metadata): {confidence_distribution[2]}")
logger.info(f" Low (fallback): {confidence_distribution[1]}")
logger.info("\nType Distribution:")
for memory_type, count in type_distribution.most_common():
logger.info(f" {memory_type}: {count}")
logger.info("")
logger.info("=" * 80)
# Create backup and execute if not dry-run
if not args.dry_run:
logger.info("")
logger.info("💾 Creating backup...")
backup_path = create_backup(args.db_path)
logger.info("")
logger.info("✍️ Assigning types...")
updated = assign_types(args.db_path, assignments, dry_run=False)
logger.info("")
logger.info("=" * 80)
logger.info("✅ Type assignment completed successfully!")
logger.info(f" Backup saved to: {backup_path}")
logger.info("=" * 80)
else:
logger.info("")
logger.info("⚠️ This was a DRY RUN - no changes were made")
logger.info(" Run without --dry-run to apply assignments")
logger.info("=" * 80)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/tests/consolidation/test_clustering.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for the semantic clustering engine."""
import pytest
import numpy as np
from datetime import datetime, timedelta
from mcp_memory_service.consolidation.clustering import SemanticClusteringEngine
from mcp_memory_service.consolidation.base import MemoryCluster
from mcp_memory_service.models.memory import Memory
@pytest.mark.unit
class TestSemanticClusteringEngine:
"""Test the semantic clustering system."""
@pytest.fixture
def clustering_engine(self, consolidation_config):
return SemanticClusteringEngine(consolidation_config)
@pytest.mark.asyncio
async def test_basic_clustering(self, clustering_engine, sample_memories):
"""Test basic clustering functionality."""
# Use memories with embeddings
memories_with_embeddings = [m for m in sample_memories if m.embedding]
clusters = await clustering_engine.process(memories_with_embeddings)
assert isinstance(clusters, list)
assert all(isinstance(cluster, MemoryCluster) for cluster in clusters)
for cluster in clusters:
assert len(cluster.memory_hashes) >= clustering_engine.min_cluster_size
assert isinstance(cluster.cluster_id, str)
assert isinstance(cluster.centroid_embedding, list)
assert len(cluster.centroid_embedding) > 0
assert 0 <= cluster.coherence_score <= 1
assert isinstance(cluster.created_at, datetime)
assert isinstance(cluster.theme_keywords, list)
@pytest.mark.asyncio
async def test_clustering_with_similar_embeddings(self, clustering_engine):
"""Test clustering with known similar embeddings."""
base_time = datetime.now().timestamp()
# Create memories with similar embeddings (should cluster together)
similar_memories = []
base_embedding = [0.5, 0.4, 0.6, 0.3, 0.7]
for i in range(6): # Create enough for min_cluster_size
# Add small variations to base embedding
embedding = []
for val in base_embedding * 64: # 320-dim
noise = np.random.normal(0, 0.05) # Small noise
embedding.append(max(0, min(1, val + noise)))
memory = Memory(
content=f"Similar content about programming topic {i}",
content_hash=f"similar_{i}",
tags=["programming", "similar"],
embedding=embedding,
created_at=base_time - (i * 3600)
)
similar_memories.append(memory)
# Add some different memories
for i in range(3):
different_embedding = [0.1, 0.9, 0.2, 0.8, 0.1] * 64
memory = Memory(
content=f"Different content about weather {i}",
content_hash=f"different_{i}",
tags=["weather", "different"],
embedding=different_embedding,
created_at=base_time - (i * 3600)
)
similar_memories.append(memory)
clusters = await clustering_engine.process(similar_memories)
# Should find at least one cluster
assert len(clusters) >= 1
# Check that similar memories are clustered together
if clusters:
# Find cluster with similar memories
similar_cluster = None
for cluster in clusters:
if any("similar" in hash_id for hash_id in cluster.memory_hashes):
similar_cluster = cluster
break
if similar_cluster:
# Should contain multiple similar memories
similar_hashes = [h for h in similar_cluster.memory_hashes if "similar" in h]
assert len(similar_hashes) >= clustering_engine.min_cluster_size
@pytest.mark.asyncio
async def test_insufficient_memories(self, clustering_engine):
"""Test handling of insufficient memories for clustering."""
# Create too few memories
few_memories = []
for i in range(2): # Less than min_cluster_size
memory = Memory(
content=f"Content {i}",
content_hash=f"hash_{i}",
tags=["test"],
embedding=[0.5] * 320,
created_at=datetime.now().timestamp()
)
few_memories.append(memory)
clusters = await clustering_engine.process(few_memories)
assert clusters == []
@pytest.mark.asyncio
async def test_memories_without_embeddings(self, clustering_engine):
"""Test handling of memories without embeddings."""
memories_no_embeddings = []
for i in range(5):
memory = Memory(
content=f"Content {i}",
content_hash=f"hash_{i}",
tags=["test"],
embedding=None, # No embedding
created_at=datetime.now().timestamp()
)
memories_no_embeddings.append(memory)
clusters = await clustering_engine.process(memories_no_embeddings)
assert clusters == []
@pytest.mark.asyncio
async def test_theme_keyword_extraction(self, clustering_engine):
"""Test extraction of theme keywords from clusters."""
# Create memories with common themes
themed_memories = []
base_embedding = [0.5, 0.5, 0.5, 0.5, 0.5] * 64
for i in range(5):
memory = Memory(
content=f"Python programming tutorial {i} about functions and classes",
content_hash=f"python_{i}",
tags=["python", "programming", "tutorial"],
embedding=[val + np.random.normal(0, 0.02) for val in base_embedding],
created_at=datetime.now().timestamp()
)
themed_memories.append(memory)
clusters = await clustering_engine.process(themed_memories)
if clusters:
cluster = clusters[0]
# Should extract relevant theme keywords
assert len(cluster.theme_keywords) > 0
# Should include frequent tags
common_tags = {"python", "programming", "tutorial"}
found_tags = set(cluster.theme_keywords).intersection(common_tags)
assert len(found_tags) > 0
@pytest.mark.asyncio
async def test_cluster_metadata(self, clustering_engine, sample_memories):
"""Test that cluster metadata is properly populated."""
memories_with_embeddings = [m for m in sample_memories if m.embedding]
if len(memories_with_embeddings) >= clustering_engine.min_cluster_size:
clusters = await clustering_engine.process(memories_with_embeddings)
for cluster in clusters:
assert 'algorithm' in cluster.metadata
assert 'cluster_size' in cluster.metadata
assert 'average_memory_age' in cluster.metadata
assert 'tag_distribution' in cluster.metadata
assert cluster.metadata['cluster_size'] == len(cluster.memory_hashes)
assert isinstance(cluster.metadata['average_memory_age'], float)
assert isinstance(cluster.metadata['tag_distribution'], dict)
@pytest.mark.asyncio
async def test_simple_clustering_fallback(self, clustering_engine):
"""Test simple clustering algorithm fallback."""
# Force simple clustering algorithm
original_algorithm = clustering_engine.algorithm
clustering_engine.algorithm = 'simple'
try:
# Create memories with known similarity patterns
similar_memories = []
# Group 1: High similarity
base1 = [0.8, 0.7, 0.9, 0.8, 0.7] * 64
for i in range(4):
embedding = [val + np.random.normal(0, 0.01) for val in base1]
memory = Memory(
content=f"Group 1 content {i}",
content_hash=f"group1_{i}",
tags=["group1"],
embedding=embedding,
created_at=datetime.now().timestamp()
)
similar_memories.append(memory)
# Group 2: Different but internally similar
base2 = [0.2, 0.3, 0.1, 0.2, 0.3] * 64
for i in range(4):
embedding = [val + np.random.normal(0, 0.01) for val in base2]
memory = Memory(
content=f"Group 2 content {i}",
content_hash=f"group2_{i}",
tags=["group2"],
embedding=embedding,
created_at=datetime.now().timestamp()
)
similar_memories.append(memory)
clusters = await clustering_engine.process(similar_memories)
# Simple algorithm should still find clusters
assert isinstance(clusters, list)
finally:
clustering_engine.algorithm = original_algorithm
@pytest.mark.asyncio
async def test_merge_similar_clusters(self, clustering_engine):
"""Test merging of similar clusters."""
# Create two similar clusters
cluster1 = MemoryCluster(
cluster_id="cluster1",
memory_hashes=["hash1", "hash2"],
centroid_embedding=[0.5, 0.5, 0.5] * 107, # ~320 dim
coherence_score=0.8,
created_at=datetime.now(),
theme_keywords=["python", "programming"]
)
cluster2 = MemoryCluster(
cluster_id="cluster2",
memory_hashes=["hash3", "hash4"],
centroid_embedding=[0.52, 0.48, 0.51] * 107, # Similar to cluster1
coherence_score=0.7,
created_at=datetime.now(),
theme_keywords=["python", "coding"]
)
# Very different cluster
cluster3 = MemoryCluster(
cluster_id="cluster3",
memory_hashes=["hash5", "hash6"],
centroid_embedding=[0.1, 0.9, 0.1] * 107, # Very different
coherence_score=0.6,
created_at=datetime.now(),
theme_keywords=["weather", "forecast"]
)
clusters = [cluster1, cluster2, cluster3]
merged_clusters = await clustering_engine.merge_similar_clusters(
clusters, similarity_threshold=0.9
)
# Should merge similar clusters
assert len(merged_clusters) <= len(clusters)
# Check that merged cluster contains memories from both original clusters
if len(merged_clusters) < len(clusters):
# Find the merged cluster (should have more memories)
merged = max(merged_clusters, key=lambda c: len(c.memory_hashes))
assert len(merged.memory_hashes) >= 4 # Combined from cluster1 and cluster2
@pytest.mark.asyncio
async def test_coherence_score_calculation(self, clustering_engine):
"""Test coherence score calculation for clusters."""
# Create tightly clustered memories
tight_memories = []
base_embedding = [0.5, 0.5, 0.5, 0.5, 0.5] * 64
for i in range(5):
# Very similar embeddings (high coherence)
embedding = [val + np.random.normal(0, 0.01) for val in base_embedding]
memory = Memory(
content=f"Tight cluster content {i}",
content_hash=f"tight_{i}",
tags=["tight"],
embedding=embedding,
created_at=datetime.now().timestamp()
)
tight_memories.append(memory)
# Create loosely clustered memories
loose_memories = []
for i in range(5):
# More varied embeddings (lower coherence)
embedding = [val + np.random.normal(0, 0.1) for val in base_embedding]
memory = Memory(
content=f"Loose cluster content {i}",
content_hash=f"loose_{i}",
tags=["loose"],
embedding=embedding,
created_at=datetime.now().timestamp()
)
loose_memories.append(memory)
tight_clusters = await clustering_engine.process(tight_memories)
loose_clusters = await clustering_engine.process(loose_memories)
# Tight clusters should have higher coherence scores
if tight_clusters and loose_clusters:
tight_coherence = tight_clusters[0].coherence_score
loose_coherence = loose_clusters[0].coherence_score
# This may not always be true due to randomness, but generally should be
# Just check that coherence scores are in valid range
assert 0 <= tight_coherence <= 1
assert 0 <= loose_coherence <= 1
@pytest.mark.asyncio
async def test_algorithm_fallback_handling(self, clustering_engine):
"""Test handling of different clustering algorithms."""
memories = []
base_embedding = [0.5, 0.4, 0.6, 0.3, 0.7] * 64
for i in range(8): # Enough for clustering
embedding = [val + np.random.normal(0, 0.05) for val in base_embedding]
memory = Memory(
content=f"Test content {i}",
content_hash=f"test_{i}",
tags=["test"],
embedding=embedding,
created_at=datetime.now().timestamp()
)
memories.append(memory)
# Test different algorithms
algorithms = ['simple', 'dbscan', 'hierarchical']
for algorithm in algorithms:
original_algorithm = clustering_engine.algorithm
clustering_engine.algorithm = algorithm
try:
clusters = await clustering_engine.process(memories)
# All algorithms should return valid clusters
assert isinstance(clusters, list)
for cluster in clusters:
assert isinstance(cluster, MemoryCluster)
assert cluster.metadata['algorithm'] in [algorithm, f"{algorithm}_merged"]
finally:
clustering_engine.algorithm = original_algorithm
@pytest.mark.asyncio
async def test_empty_input_handling(self, clustering_engine):
"""Test handling of empty input."""
clusters = await clustering_engine.process([])
assert clusters == []
@pytest.mark.asyncio
async def test_average_age_calculation(self, clustering_engine):
"""Test average age calculation in cluster metadata."""
now = datetime.now()
memories = []
# Create memories with known ages
ages = [1, 3, 5, 7, 9] # days ago
for i, age in enumerate(ages):
memory = Memory(
content=f"Content {i}",
content_hash=f"age_test_{i}",
tags=["age_test"],
embedding=[0.5 + i*0.01] * 320, # Slightly different embeddings
created_at=(now - timedelta(days=age)).timestamp()
)
memories.append(memory)
clusters = await clustering_engine.process(memories)
if clusters:
cluster = clusters[0]
avg_age = cluster.metadata['average_memory_age']
# Average age should be approximately the mean of our test ages
expected_avg = sum(ages) / len(ages)
assert abs(avg_age - expected_avg) < 1 # Within 1 day tolerance
@pytest.mark.asyncio
async def test_tag_distribution_analysis(self, clustering_engine):
"""Test tag distribution analysis in clusters."""
memories = []
base_embedding = [0.5] * 320
# Create memories with specific tag patterns
tag_patterns = [
["python", "programming"],
["python", "tutorial"],
["programming", "guide"],
["python", "programming"], # Duplicate pattern
["tutorial", "guide"]
]
for i, tags in enumerate(tag_patterns):
memory = Memory(
content=f"Content {i}",
content_hash=f"tag_test_{i}",
tags=tags,
embedding=[val + i*0.01 for val in base_embedding],
created_at=datetime.now().timestamp()
)
memories.append(memory)
clusters = await clustering_engine.process(memories)
if clusters:
cluster = clusters[0]
tag_dist = cluster.metadata['tag_distribution']
# Should count tag frequencies correctly
assert isinstance(tag_dist, dict)
assert tag_dist.get("python", 0) >= 2 # Appears multiple times
assert tag_dist.get("programming", 0) >= 2 # Appears multiple times
```
--------------------------------------------------------------------------------
/scripts/migration/migrate_storage.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Storage Migration Tool for MCP Memory Service
This script helps migrate memory data between different storage backends
(ChromaDB and sqlite-vec).
Usage:
python scripts/migrate_storage.py --from chroma --to sqlite-vec
python scripts/migrate_storage.py --from sqlite-vec --to chroma --backup
"""
import argparse
import asyncio
import json
import logging
import os
import sys
import tempfile
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
# Add the src directory to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from mcp_memory_service.models.memory import Memory
from mcp_memory_service.storage.base import MemoryStorage
from mcp_memory_service.storage.chroma import ChromaMemoryStorage
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MigrationTool:
"""Tool for migrating memory data between storage backends."""
def __init__(self):
self.source_storage = None
self.target_storage = None
async def export_memories(self, storage: MemoryStorage) -> List[Dict[str, Any]]:
"""Export all memories from a storage backend."""
logger.info("Exporting memories from source storage...")
exported_memories = []
try:
# For ChromaDB, we need to get all memories via a broad search
if hasattr(storage, 'collection') and storage.collection:
# Get all memories from ChromaDB
results = storage.collection.get()
if results and results.get("ids"):
for i, memory_id in enumerate(results["ids"]):
try:
metadata = results["metadatas"][i] if results.get("metadatas") else {}
document = results["documents"][i] if results.get("documents") else ""
embedding = results["embeddings"][i] if results.get("embeddings") else None
# Convert metadata to memory format
memory_data = {
"content": document,
"content_hash": metadata.get("content_hash", ""),
"tags": metadata.get("tags_str", "").split(",") if metadata.get("tags_str") else [],
"memory_type": metadata.get("type"),
"metadata": {k: v for k, v in metadata.items()
if k not in ["content_hash", "tags_str", "type",
"timestamp", "timestamp_float", "timestamp_str",
"created_at", "created_at_iso", "updated_at", "updated_at_iso"]},
"embedding": embedding,
"created_at": metadata.get("created_at"),
"created_at_iso": metadata.get("created_at_iso"),
"updated_at": metadata.get("updated_at"),
"updated_at_iso": metadata.get("updated_at_iso")
}
exported_memories.append(memory_data)
except Exception as e:
logger.warning(f"Failed to export memory {memory_id}: {e}")
continue
elif hasattr(storage, 'conn') and storage.conn:
# Get all memories from SQLite-vec
cursor = storage.conn.execute('''
SELECT content_hash, content, tags, memory_type, metadata,
created_at, updated_at, created_at_iso, updated_at_iso
FROM memories
ORDER BY created_at
''')
for row in cursor.fetchall():
try:
content_hash, content, tags_str, memory_type, metadata_str = row[:5]
created_at, updated_at, created_at_iso, updated_at_iso = row[5:]
# Parse tags and metadata
tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()] if tags_str else []
metadata = json.loads(metadata_str) if metadata_str else {}
memory_data = {
"content": content,
"content_hash": content_hash,
"tags": tags,
"memory_type": memory_type,
"metadata": metadata,
"embedding": None, # Will be regenerated on import
"created_at": created_at,
"created_at_iso": created_at_iso,
"updated_at": updated_at,
"updated_at_iso": updated_at_iso
}
exported_memories.append(memory_data)
except Exception as e:
logger.warning(f"Failed to export memory: {e}")
continue
logger.info(f"Exported {len(exported_memories)} memories")
return exported_memories
except Exception as e:
logger.error(f"Failed to export memories: {e}")
raise
async def import_memories(self, storage: MemoryStorage, memories: List[Dict[str, Any]]) -> int:
"""Import memories into a storage backend."""
logger.info(f"Importing {len(memories)} memories to target storage...")
imported_count = 0
failed_count = 0
for memory_data in memories:
try:
# Create Memory object
memory = Memory(
content=memory_data["content"],
content_hash=memory_data["content_hash"],
tags=memory_data.get("tags", []),
memory_type=memory_data.get("memory_type"),
metadata=memory_data.get("metadata", {}),
embedding=memory_data.get("embedding"),
created_at=memory_data.get("created_at"),
created_at_iso=memory_data.get("created_at_iso"),
updated_at=memory_data.get("updated_at"),
updated_at_iso=memory_data.get("updated_at_iso")
)
# Store the memory
success, message = await storage.store(memory)
if success:
imported_count += 1
if imported_count % 100 == 0:
logger.info(f"Imported {imported_count} memories...")
else:
failed_count += 1
logger.warning(f"Failed to import memory {memory_data['content_hash']}: {message}")
except Exception as e:
failed_count += 1
logger.warning(f"Failed to import memory: {e}")
continue
logger.info(f"Import complete: {imported_count} successful, {failed_count} failed")
return imported_count
async def create_backup(self, memories: List[Dict[str, Any]], backup_path: str) -> str:
"""Create a JSON backup of exported memories."""
backup_data = {
"version": "1.0",
"timestamp": datetime.now().isoformat(),
"total_memories": len(memories),
"memories": memories
}
os.makedirs(os.path.dirname(backup_path), exist_ok=True)
with open(backup_path, 'w') as f:
json.dump(backup_data, f, indent=2)
logger.info(f"Created backup at: {backup_path}")
return backup_path
async def load_backup(self, backup_path: str) -> List[Dict[str, Any]]:
"""Load memories from a JSON backup file."""
with open(backup_path, 'r') as f:
backup_data = json.load(f)
memories = backup_data.get("memories", [])
logger.info(f"Loaded {len(memories)} memories from backup: {backup_path}")
return memories
async def migrate(self, from_backend: str, to_backend: str,
source_path: str, target_path: str,
create_backup: bool = False, backup_path: str = None) -> bool:
"""Perform migration between storage backends."""
try:
logger.info(f"Starting migration from {from_backend} to {to_backend}")
# Initialize source storage
if from_backend == 'chroma':
self.source_storage = ChromaMemoryStorage(source_path)
elif from_backend == 'sqlite_vec':
self.source_storage = SqliteVecMemoryStorage(source_path)
else:
raise ValueError(f"Unsupported source backend: {from_backend}")
await self.source_storage.initialize()
logger.info(f"Initialized source storage ({from_backend})")
# Export memories
memories = await self.export_memories(self.source_storage)
if not memories:
logger.warning("No memories found in source storage")
return False
# Create backup if requested
if create_backup:
if not backup_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = f"memory_backup_{from_backend}_to_{to_backend}_{timestamp}.json"
await self.create_backup(memories, backup_path)
# Initialize target storage
if to_backend == 'chroma':
self.target_storage = ChromaMemoryStorage(target_path)
elif to_backend == 'sqlite_vec':
self.target_storage = SqliteVecMemoryStorage(target_path)
else:
raise ValueError(f"Unsupported target backend: {to_backend}")
await self.target_storage.initialize()
logger.info(f"Initialized target storage ({to_backend})")
# Import memories
imported_count = await self.import_memories(self.target_storage, memories)
logger.info(f"Migration completed successfully: {imported_count} memories migrated")
return True
except Exception as e:
logger.error(f"Migration failed: {e}")
return False
finally:
# Clean up connections
if self.source_storage and hasattr(self.source_storage, 'close'):
self.source_storage.close()
if self.target_storage and hasattr(self.target_storage, 'close'):
self.target_storage.close()
async def main():
"""Main entry point for the migration tool."""
parser = argparse.ArgumentParser(
description="Migrate memory data between storage backends",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Migrate from ChromaDB to sqlite-vec
python scripts/migrate_storage.py --from chroma --to sqlite_vec \\
--source-path /path/to/chroma_db --target-path /path/to/sqlite_vec.db
# Migrate with backup
python scripts/migrate_storage.py --from chroma --to sqlite_vec \\
--source-path /path/to/chroma_db --target-path /path/to/sqlite_vec.db \\
--backup --backup-path backup.json
# Restore from backup
python scripts/migrate_storage.py --restore backup.json \\
--to sqlite_vec --target-path /path/to/sqlite_vec.db
"""
)
parser.add_argument('--from', dest='from_backend', choices=['chroma', 'sqlite_vec'],
help='Source storage backend')
parser.add_argument('--to', dest='to_backend', choices=['chroma', 'sqlite_vec'],
required=True, help='Target storage backend')
parser.add_argument('--source-path', help='Path to source storage')
parser.add_argument('--target-path', required=True, help='Path to target storage')
parser.add_argument('--backup', action='store_true',
help='Create backup before migration')
parser.add_argument('--backup-path', help='Custom backup file path')
parser.add_argument('--restore', help='Restore from backup file instead of migrating')
parser.add_argument('--dry-run', action='store_true',
help='Show what would be migrated without actually doing it')
parser.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose logging')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Validate arguments
if not args.restore and not args.from_backend:
parser.error("--from is required unless using --restore")
if not args.restore and not args.source_path:
parser.error("--source-path is required unless using --restore")
if args.from_backend == args.to_backend:
parser.error("Source and target backends cannot be the same")
migration_tool = MigrationTool()
try:
if args.restore:
# Restore from backup
logger.info(f"Restoring from backup: {args.restore}")
if not os.path.exists(args.restore):
logger.error(f"Backup file not found: {args.restore}")
return 1
memories = await migration_tool.load_backup(args.restore)
if args.dry_run:
logger.info(f"DRY RUN: Would restore {len(memories)} memories to {args.to_backend}")
return 0
# Initialize target storage
if args.to_backend == 'chroma':
target_storage = ChromaMemoryStorage(args.target_path)
else:
target_storage = SqliteVecMemoryStorage(args.target_path)
await target_storage.initialize()
imported_count = await migration_tool.import_memories(target_storage, memories)
if hasattr(target_storage, 'close'):
target_storage.close()
logger.info(f"Restoration completed: {imported_count} memories restored")
else:
# Regular migration
if args.dry_run:
logger.info(f"DRY RUN: Would migrate from {args.from_backend} to {args.to_backend}")
# Initialize source storage and count memories
if args.from_backend == 'chroma':
source_storage = ChromaMemoryStorage(args.source_path)
else:
source_storage = SqliteVecMemoryStorage(args.source_path)
await source_storage.initialize()
memories = await migration_tool.export_memories(source_storage)
if hasattr(source_storage, 'close'):
source_storage.close()
logger.info(f"DRY RUN: Found {len(memories)} memories to migrate")
return 0
# Perform actual migration
success = await migration_tool.migrate(
from_backend=args.from_backend,
to_backend=args.to_backend,
source_path=args.source_path,
target_path=args.target_path,
create_backup=args.backup,
backup_path=args.backup_path
)
if not success:
logger.error("Migration failed")
return 1
logger.info("Operation completed successfully")
return 0
except KeyboardInterrupt:
logger.info("Operation cancelled by user")
return 1
except Exception as e:
logger.error(f"Operation failed: {e}")
return 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))
```
--------------------------------------------------------------------------------
/docs/cloudflare-setup.md:
--------------------------------------------------------------------------------
```markdown
# Cloudflare Backend Setup Guide
## Overview
The MCP Memory Service supports native Cloudflare integration using Vectorize for vector storage, D1 for metadata, and optional R2 for large content. This provides:
- **Vectorize**: Vector database for semantic search (768-dimensional embeddings)
- **D1**: SQLite database for metadata storage
- **Workers AI**: Embedding generation (@cf/baai/bge-base-en-v1.5)
- **R2** (optional): Object storage for large content
This setup provides global distribution, automatic scaling, and cost-effective pay-per-use pricing.
## 🚀 Quick Start
For users who want to get started immediately:
### Prerequisites
1. **Cloudflare Account**: You need a Cloudflare account with Workers/D1/Vectorize access
2. **API Token**: Create an API token with these permissions:
- **Vectorize Edit** (for creating and managing vector indexes)
- **D1 Edit** (for creating and managing databases)
- **R2 Edit** (optional, for large content storage)
- **Workers AI Read** (for embedding generation)
### Quick Setup Commands
```bash
# 1. Install dependencies
pip install httpx>=0.24.0
# 2. Create Cloudflare resources (requires wrangler CLI)
wrangler vectorize create mcp-memory-index --dimensions=768 --metric=cosine
wrangler d1 create mcp-memory-db
wrangler r2 bucket create mcp-memory-content # Optional
# 3. Configure environment
export MCP_MEMORY_STORAGE_BACKEND=cloudflare
export CLOUDFLARE_API_TOKEN="your-api-token"
export CLOUDFLARE_ACCOUNT_ID="your-account-id"
export CLOUDFLARE_VECTORIZE_INDEX="mcp-memory-index"
export CLOUDFLARE_D1_DATABASE_ID="your-d1-database-id"
export CLOUDFLARE_R2_BUCKET="mcp-memory-content" # Optional
# 4. Test and start
python -m src.mcp_memory_service.server
# Alternative startup methods:
# uv run memory server # Modern CLI (recommended)
# python scripts/run_memory_server.py # Direct script execution
```
> **⚠️ Important**: Cloudflare backend uses Workers AI for embedding generation, so do NOT use `scripts/memory_offline.py` which sets offline mode. Use the standard startup methods above instead.
## Prerequisites
1. **Cloudflare Account**: Sign up at [cloudflare.com](https://www.cloudflare.com/)
2. **Cloudflare Services**: Access to Vectorize, D1, and optionally R2
3. **API Token**: With appropriate permissions
## Step 1: Create Cloudflare Resources
### 1.1 Create Vectorize Index
```bash
# Install Wrangler CLI
npm install -g wrangler
# Login to Cloudflare
wrangler login
# Create Vectorize index (768 dimensions for BGE embeddings)
wrangler vectorize create mcp-memory-index --dimensions=768 --metric=cosine
```
### 1.2 Create D1 Database
```bash
# Create D1 database
wrangler d1 create mcp-memory-db
# Note the database ID from the output
```
### 1.3 Create R2 Bucket (Optional)
```bash
# Create R2 bucket for large content storage
wrangler r2 bucket create mcp-memory-content
```
## Step 2: Configure API Token
### 2.1 Create API Token
1. Go to [Cloudflare Dashboard → My Profile → API Tokens](https://dash.cloudflare.com/profile/api-tokens)
2. Click "Create Token"
3. Use "Custom Token" template
4. Configure permissions:
- **Account**: `Read` (to access account resources)
- **Vectorize**: `Edit` (to manage vector operations)
- **D1**: `Edit` (to manage database operations)
- **R2**: `Edit` (if using R2 for large content)
- **Workers AI**: `Read` (for embedding generation)
### 2.2 Get Account ID
1. Go to [Cloudflare Dashboard](https://dash.cloudflare.com/)
2. Select your domain or go to overview
3. Copy the Account ID from the right sidebar
### 2.3 Manual Resource Creation (Alternative)
If you prefer manual creation via the Cloudflare Dashboard or encounter authentication issues:
**Create Vectorize Index via Dashboard:**
1. Go to [Cloudflare Dashboard → Vectorize](https://dash.cloudflare.com/vectorize)
2. Click "Create Index"
3. Name: `mcp-memory-index`
4. Dimensions: `768`
5. Metric: `cosine`
**Create D1 Database via Dashboard:**
1. Go to [Cloudflare Dashboard → D1](https://dash.cloudflare.com/d1)
2. Click "Create Database"
3. Name: `mcp-memory-db`
4. Copy the Database ID from the overview page
**Create R2 Bucket via Dashboard (Optional):**
1. Go to [Cloudflare Dashboard → R2](https://dash.cloudflare.com/r2)
2. Click "Create Bucket"
3. Name: `mcp-memory-content`
4. Choose region closest to your location
**Alternative API Creation:**
```bash
# Create Vectorize index via API
curl -X POST "https://api.cloudflare.com/client/v4/accounts/YOUR_ACCOUNT_ID/vectorize/indexes" \
-H "Authorization: Bearer YOUR_API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "mcp-memory-index",
"config": {
"dimensions": 768,
"metric": "cosine"
}
}'
# Create D1 database via API
curl -X POST "https://api.cloudflare.com/client/v4/accounts/YOUR_ACCOUNT_ID/d1/database" \
-H "Authorization: Bearer YOUR_API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "mcp-memory-db"
}'
# Create R2 bucket via API (optional)
curl -X POST "https://api.cloudflare.com/client/v4/accounts/YOUR_ACCOUNT_ID/r2/buckets" \
-H "Authorization: Bearer YOUR_API_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "mcp-memory-content"
}'
```
## Step 3: Configure Environment Variables
Set the following environment variables:
```bash
# Required Configuration
export MCP_MEMORY_STORAGE_BACKEND=cloudflare
export CLOUDFLARE_API_TOKEN="your-api-token-here"
export CLOUDFLARE_ACCOUNT_ID="your-account-id-here"
export CLOUDFLARE_VECTORIZE_INDEX="mcp-memory-index"
export CLOUDFLARE_D1_DATABASE_ID="your-d1-database-id"
# Optional Configuration
export CLOUDFLARE_R2_BUCKET="mcp-memory-content" # For large content
export CLOUDFLARE_EMBEDDING_MODEL="@cf/baai/bge-base-en-v1.5" # Default
export CLOUDFLARE_LARGE_CONTENT_THRESHOLD="1048576" # 1MB threshold
export CLOUDFLARE_MAX_RETRIES="3" # API retry attempts
export CLOUDFLARE_BASE_DELAY="1.0" # Retry delay in seconds
```
### Configuration File Example
Create a `.env` file in your project root:
```env
# Cloudflare Backend Configuration
MCP_MEMORY_STORAGE_BACKEND=cloudflare
# Required Cloudflare Settings
CLOUDFLARE_API_TOKEN=your-api-token-here
CLOUDFLARE_ACCOUNT_ID=your-account-id-here
CLOUDFLARE_VECTORIZE_INDEX=mcp-memory-index
CLOUDFLARE_D1_DATABASE_ID=your-d1-database-id
# Optional Settings
CLOUDFLARE_R2_BUCKET=mcp-memory-content
CLOUDFLARE_EMBEDDING_MODEL=@cf/baai/bge-base-en-v1.5
CLOUDFLARE_LARGE_CONTENT_THRESHOLD=1048576
CLOUDFLARE_MAX_RETRIES=3
CLOUDFLARE_BASE_DELAY=1.0
# Logging
LOG_LEVEL=INFO
```
## Step 4: Install Dependencies
The Cloudflare backend requires additional dependencies:
```bash
# Install additional requirements
pip install -r requirements-cloudflare.txt
# Or install manually
pip install httpx>=0.24.0
```
## Step 5: Initialize and Test
### 5.1 Start the Service
```bash
# Start MCP Memory Service with Cloudflare backend
python -m src.mcp_memory_service.server
```
### 5.2 Verify Configuration
The service will automatically:
1. Initialize the D1 database schema
2. Verify access to the Vectorize index
3. Check R2 bucket access (if configured)
Look for these success messages in the logs:
```
INFO:mcp_memory_service.config:Using Cloudflare backend with:
INFO:mcp_memory_service.config: Vectorize Index: mcp-memory-index
INFO:mcp_memory_service.config: D1 Database: your-d1-database-id
INFO:mcp_memory_service.server:Created Cloudflare storage with Vectorize index: mcp-memory-index
INFO:mcp_memory_service.storage.cloudflare:Cloudflare storage backend initialized successfully
```
### 5.3 Test Basic Operations
**Option A: Comprehensive Test Suite**
```bash
# Run comprehensive automated tests
python scripts/test_cloudflare_backend.py
```
**Option B: Manual API Testing**
```bash
# Store a test memory
curl -X POST http://localhost:8000/api/memories \
-H "Content-Type: application/json" \
-d '{
"content": "This is a test memory for Cloudflare backend",
"tags": ["test", "cloudflare"]
}'
# Search memories
curl -X POST http://localhost:8000/api/memories/search \
-H "Content-Type: application/json" \
-d '{
"query": "test memory",
"n_results": 5
}'
# Get statistics
curl http://localhost:8000/api/stats
```
**Option C: Automated Resource Setup**
```bash
# Set up Cloudflare resources automatically
python scripts/setup_cloudflare_resources.py
```
## Architecture Details
### Data Flow
1. **Content Storage**:
- Small content (<1MB): Stored directly in D1
- Large content (>1MB): Stored in R2, referenced in D1
2. **Vector Processing**:
- Content → Workers AI → Embedding Vector
- Vector stored in Vectorize with metadata
- Semantic search via Vectorize similarity
3. **Metadata Management**:
- Memory metadata stored in D1 SQLite
- Tags stored in relational tables
- Full ACID compliance for data integrity
### Performance Optimizations
- **Connection Pooling**: Reused HTTP connections
- **Embedding Caching**: 1000-entry LRU cache
- **Batch Operations**: Bulk vector operations
- **Smart Retries**: Exponential backoff for rate limits
- **Async Operations**: Non-blocking I/O throughout
### Security Features
- **API Key Security**: Never logged or exposed
- **Input Validation**: SQL injection prevention
- **Rate Limiting**: Built-in protection
- **Secure Headers**: Proper HTTP security
## Migration from Other Backends
### From SQLite-vec
```bash
# Export existing data
python scripts/export_sqlite_vec.py --output cloudflare_export.json
# Switch to Cloudflare backend
export MCP_MEMORY_STORAGE_BACKEND=cloudflare
# Import data
python scripts/import_to_cloudflare.py --input cloudflare_export.json
```
### From ChromaDB
```bash
# Export ChromaDB data
python scripts/export_chroma.py --output cloudflare_export.json
# Switch to Cloudflare backend
export MCP_MEMORY_STORAGE_BACKEND=cloudflare
# Import data
python scripts/import_to_cloudflare.py --input cloudflare_export.json
```
## Troubleshooting
### Common Issues
#### 1. Authentication Errors (401)
```
ERROR: Missing required environment variables for Cloudflare backend: CLOUDFLARE_API_TOKEN
ERROR: Unauthorized - Invalid API token
```
**Solution**:
- Verify all required environment variables are set
- Check API token has correct permissions (Vectorize:Edit, D1:Edit, Workers AI:Read)
- Ensure token is not expired
- Verify account ID is correct
#### 2. Resource Not Found (404)
```
ValueError: Vectorize index 'mcp-memory-index' not found
ValueError: D1 database not found
```
**Solution**:
- Create the Vectorize index or verify the index name is correct
- Check that resources were created in the correct account
- Confirm resource IDs/names match exactly
- Verify resource names match exactly
#### 3. Vector Storage Errors (400)
```
ValueError: Failed to store vector data
HTTP 400: Invalid vector data format
```
**Solution**:
- Check vector dimensions (must be 768)
- Verify NDJSON format for vector data
- Ensure metadata values are properly serialized
- Validate input data types
#### 4. D1 Database Access Issues
```
ValueError: Failed to initialize D1 schema
HTTP 403: Insufficient permissions
```
**Solution**:
- Verify D1 database ID and API token permissions
- Ensure database exists and is accessible
- Check API token has D1:Edit permissions
#### 5. API Rate Limits (429)
```
Rate limited after 3 retries
HTTP 429: Too Many Requests
```
**Solution**:
- Increase `CLOUDFLARE_MAX_RETRIES` or `CLOUDFLARE_BASE_DELAY` for more conservative retry behavior
- Implement exponential backoff (already included)
- Monitor API usage through Cloudflare dashboard
- Consider implementing request caching for high-volume usage
### Debug Mode
Enable detailed logging:
```bash
export LOG_LEVEL=DEBUG
python -m src.mcp_memory_service.server --debug
```
### Health Check
```bash
# Check backend health
curl http://localhost:8000/api/health
# Get detailed statistics
curl http://localhost:8000/api/stats
```
## Limitations
### Current Limitations
- **Embedding Model**: Fixed to Workers AI BGE model (768 dimensions)
- **Content Size**: R2 storage recommended for content >1MB
- **Rate Limits**: Subject to Cloudflare service limits
- **Region**: Embedding generation uses Cloudflare's global network
### Planned Improvements
- **Local Embedding Fallback**: For offline or restricted environments
- **Custom Embedding Models**: Support for other embedding models
- **Enhanced Caching**: Multi-level caching strategy
- **Batch Import Tools**: Efficient migration utilities
## 🔄 Multi-Machine Bidirectional Sync
**New in v6.13.7**: Cloudflare backend now supports seamless bidirectional sync between multiple machines, making it ideal for distributed teams or as a replacement for failed centralized servers.
### Use Cases
1. **Failed Server Recovery**: Replace a failed narrowbox/central server with Cloudflare
2. **Multi-Machine Development**: Sync memories across multiple development machines
3. **Team Collaboration**: Share memory context across team members
4. **Backup Strategy**: Cloudflare as primary with local sqlite_vec as backup
### Architecture
```
┌─────────────────┐ ┌─────────────────┐
│ Machine A │ │ Machine B │
│ │ │ │
│ Claude Desktop │ │ Claude Desktop │
│ ↕ │ │ ↕ │
│ sqlite_vec │ │ sqlite_vec │
│ (backup) │ │ (backup) │
└─────────┬───────┘ └─────────┬───────┘
│ │
└─────────┬────────────┘
↕
┌─────────────────┐
│ Cloudflare │
│ │
│ D1 Database │
│ Vectorize Index │
│ Workers AI │
└─────────────────┘
```
### Setup Process
#### 1. Initial Migration (from failed server)
```bash
# Export memories from existing machine
memory export /path/to/export.json
# Set up Cloudflare environment
export CLOUDFLARE_API_TOKEN="your-token"
export CLOUDFLARE_ACCOUNT_ID="your-account"
export CLOUDFLARE_D1_DATABASE_ID="your-d1-id"
export CLOUDFLARE_VECTORIZE_INDEX="mcp-memory-index"
export MCP_MEMORY_STORAGE_BACKEND="cloudflare"
# Import to Cloudflare
python scripts/import_to_cloudflare.py /path/to/export.json
```
#### 2. Configure Each Machine
**Claude Desktop Configuration** (`claude_desktop_config.json`):
```json
{
"mcpServers": {
"memory": {
"command": "/path/to/memory",
"args": ["server"],
"env": {
"MCP_MEMORY_STORAGE_BACKEND": "cloudflare",
"MCP_MEMORY_SQLITE_PATH": "/local/backup/path/sqlite_vec.db",
"CLOUDFLARE_API_TOKEN": "your-token",
"CLOUDFLARE_ACCOUNT_ID": "your-account",
"CLOUDFLARE_D1_DATABASE_ID": "your-d1-id",
"CLOUDFLARE_VECTORIZE_INDEX": "mcp-memory-index"
}
}
}
}
```
#### 3. Verification Testing
Test bidirectional sync by storing and retrieving memories from each machine:
```python
# Test script for verification
import asyncio
from mcp_memory_service.storage.cloudflare import CloudflareStorage
async def test_sync():
storage = CloudflareStorage(...)
await storage.initialize()
# Store test memory
test_memory = Memory(content="Test from Machine A", tags=["sync-test"])
success, message = await storage.store(test_memory)
# Verify from other machine
results = await storage.retrieve("Test from Machine A")
print(f"Sync verified: {len(results)} results found")
```
### Important Notes
- **v6.13.7 Required**: This version fixes the critical Vectorize ID length issue
- **Breaking Change**: Vector IDs changed format from v6.13.6 (removed "mem_" prefix)
- **Backup Strategy**: Local sqlite_vec files are maintained for fallback
- **Migration Time**: Allow extra time for initial memory migration to Cloudflare
### Troubleshooting Sync Issues
#### Vector ID Length Error (Fixed in v6.13.7)
```
Error: "id too long; max is 64 bytes, got 68 bytes"
```
**Solution**: Update to v6.13.7 or later
#### Environment Variable Issues
**Problem**: Memories not syncing between machines
**Solution**:
- Verify identical environment variables on all machines
- Check Claude Desktop configuration matches exactly
- Restart Claude Desktop after config changes
#### Sync Verification
```bash
# Check memory count on each machine
memory status
# Test cross-machine visibility
memory retrieve "test query from other machine"
```
## Support
For issues and questions:
1. **Documentation**: Check this guide and API documentation
2. **GitHub Issues**: Report bugs at the project repository
3. **Cloudflare Support**: For Cloudflare service-specific issues
4. **Community**: Join the project Discord/community channels
## Performance Benchmarks
### Typical Performance
- **Storage**: ~200ms per memory (including embedding generation)
- **Search**: ~100ms for semantic search (5 results)
- **Batch Operations**: ~50ms per memory in batches of 100
- **Global Latency**: <100ms from most global locations
### Optimization Tips
1. **Batch Operations**: Use bulk operations when possible
2. **Content Strategy**: Use R2 for large content
3. **Caching**: Enable embedding caching
4. **Connection Pooling**: Reuse HTTP connections
5. **Regional Deployment**: Deploy close to your users
```
--------------------------------------------------------------------------------
/claude-hooks/core/mid-conversation.js:
--------------------------------------------------------------------------------
```javascript
/**
* Mid-Conversation Memory Hook
* Intelligently triggers memory awareness during conversations based on natural language patterns
*/
const { TieredConversationMonitor } = require('../utilities/tiered-conversation-monitor');
const { AdaptivePatternDetector } = require('../utilities/adaptive-pattern-detector');
const { PerformanceManager } = require('../utilities/performance-manager');
const { MemoryClient } = require('../utilities/memory-client');
const { scoreMemoryRelevance } = require('../utilities/memory-scorer');
const { formatMemoriesForContext } = require('../utilities/context-formatter');
class MidConversationHook {
constructor(config = {}) {
this.config = config;
// Decision weighting constants
this.TRIGGER_WEIGHTS = {
PATTERN_CONFIDENCE: 0.6,
CONVERSATION_CONTEXT: 0.4,
SEMANTIC_SHIFT_BOOST: 0.2,
QUESTION_PATTERN_BOOST: 0.1,
PAST_WORK_BOOST: 0.15
};
this.THRESHOLD_VALUES = {
CONVERSATION_PROBABILITY_MIN: 0.3,
SEMANTIC_SHIFT_MIN: 0.6,
SPEED_MODE_CONFIDENCE_MIN: 0.8,
SPEED_MODE_REDUCTION: 0.8
};
// Initialize performance management
this.performanceManager = new PerformanceManager(config.performance);
// Initialize components with performance awareness
this.conversationMonitor = new TieredConversationMonitor(
config.conversationMonitor,
this.performanceManager
);
this.patternDetector = new AdaptivePatternDetector(
config.patternDetector,
this.performanceManager
);
// Memory client for queries
this.memoryClient = null;
// Hook state - read from correct nested config paths
const midConversationConfig = config.hooks?.midConversation || {};
const naturalTriggersConfig = config.naturalTriggers || {};
this.isEnabled = naturalTriggersConfig.enabled !== false;
this.lastTriggerTime = 0;
this.cooldownPeriod = naturalTriggersConfig.cooldownPeriod || 30000; // 30 seconds between triggers
// Analytics
this.analytics = {
totalAnalyses: 0,
triggersExecuted: 0,
userAcceptanceRate: 0,
averageLatency: 0,
totalFeedback: 0
};
}
/**
* Analyze user message for memory trigger needs
*/
async analyzeMessage(userMessage, context = {}) {
if (!this.isEnabled) return null;
const timing = this.performanceManager.startTiming('mid_conversation_analysis', 'fast');
try {
this.analytics.totalAnalyses++;
// Check cooldown period
if (Date.now() - this.lastTriggerTime < this.cooldownPeriod) {
return this.createResult('cooldown', 'Cooldown period active', 0);
}
// Phase 1: Conversation monitoring
const conversationAnalysis = await this.conversationMonitor.analyzeMessage(userMessage, context);
// Phase 2: Pattern detection
const patternResults = await this.patternDetector.detectPatterns(userMessage, {
...context,
conversationAnalysis
});
// Phase 3: Combined decision making
const triggerDecision = this.makeTriggerDecision(conversationAnalysis, patternResults, context);
// Update last trigger time if we're recommending a trigger
if (triggerDecision.shouldTrigger) {
this.lastTriggerTime = Date.now();
}
// Record performance
const performanceResult = this.performanceManager.endTiming(timing);
this.analytics.averageLatency = this.updateAverageLatency(performanceResult.latency);
return {
shouldTrigger: triggerDecision.shouldTrigger,
confidence: triggerDecision.confidence,
reasoning: triggerDecision.reasoning,
conversationAnalysis,
patternResults,
performance: performanceResult,
timestamp: Date.now()
};
} catch (error) {
console.error('[Mid-Conversation Hook] Analysis failed:', error.message);
this.performanceManager.endTiming(timing);
return this.createResult('error', `Analysis failed: ${error.message}`, 0);
}
}
/**
* Execute memory retrieval and context injection
*/
async executeMemoryTrigger(analysisResult, context = {}) {
if (!analysisResult.shouldTrigger) return null;
const timing = this.performanceManager.startTiming('memory_trigger_execution', 'intensive');
try {
// Initialize memory client if needed
if (!this.memoryClient) {
this.memoryClient = new MemoryClient(this.config.memoryService || {});
await this.memoryClient.connect();
}
// Build enhanced query based on analysis
const memoryQuery = this.buildMemoryQuery(analysisResult, context);
// Retrieve relevant memories
const memories = await this.queryMemories(memoryQuery);
if (memories.length === 0) {
return this.createResult('no_memories', 'No relevant memories found', analysisResult.confidence);
}
// Score and format memories
const scoredMemories = scoreMemoryRelevance(memories, context.projectContext, {
verbose: false,
enhanceRecency: true
});
const contextMessage = formatMemoriesForContext(
scoredMemories.slice(0, this.config.maxMemoriesPerTrigger || 5),
context.projectContext,
{
includeScore: false,
groupByCategory: scoredMemories.length > 3,
maxContentLength: 400,
includeTimestamp: true
}
);
// Record successful trigger
this.analytics.triggersExecuted++;
const performanceResult = this.performanceManager.endTiming(timing);
return {
success: true,
contextMessage,
memoriesFound: memories.length,
memoriesUsed: Math.min(scoredMemories.length, this.config.maxMemoriesPerTrigger || 5),
confidence: analysisResult.confidence,
performance: performanceResult,
triggerType: 'mid_conversation'
};
} catch (error) {
console.error('[Mid-Conversation Hook] Memory trigger failed:', error.message);
this.performanceManager.endTiming(timing);
return this.createResult('execution_error', `Memory trigger failed: ${error.message}`, analysisResult.confidence);
}
}
/**
* Make intelligent trigger decision based on all analyses
*/
makeTriggerDecision(conversationAnalysis, patternResults, context) {
let confidence = 0;
const reasons = [];
// Weight pattern detection heavily for explicit requests
if (patternResults.triggerRecommendation) {
confidence += patternResults.confidence * this.TRIGGER_WEIGHTS.PATTERN_CONFIDENCE;
reasons.push(`Pattern detection: ${patternResults.confidence.toFixed(2)} confidence`);
}
// Add conversation context weighting
if (conversationAnalysis.triggerProbability > this.THRESHOLD_VALUES.CONVERSATION_PROBABILITY_MIN) {
confidence += conversationAnalysis.triggerProbability * this.TRIGGER_WEIGHTS.CONVERSATION_CONTEXT;
reasons.push(`Conversation analysis: ${conversationAnalysis.triggerProbability.toFixed(2)} probability`);
}
// Boost for semantic shift (topic change)
if (conversationAnalysis.semanticShift > this.THRESHOLD_VALUES.SEMANTIC_SHIFT_MIN) {
confidence += this.TRIGGER_WEIGHTS.SEMANTIC_SHIFT_BOOST;
reasons.push(`Semantic shift detected: ${conversationAnalysis.semanticShift.toFixed(2)}`);
}
// Context-specific adjustments
if (context.isQuestionPattern) {
confidence += this.TRIGGER_WEIGHTS.QUESTION_PATTERN_BOOST;
reasons.push('Question pattern detected');
}
if (context.mentionsPastWork) {
confidence += this.TRIGGER_WEIGHTS.PAST_WORK_BOOST;
reasons.push('References past work');
}
// Apply performance profile considerations
const profile = this.performanceManager.performanceBudget;
if (profile.maxLatency < 200 && confidence < this.THRESHOLD_VALUES.SPEED_MODE_CONFIDENCE_MIN) {
// In speed-focused mode, require higher confidence
confidence *= this.THRESHOLD_VALUES.SPEED_MODE_REDUCTION;
reasons.push('Speed mode: increased confidence threshold');
}
// Final decision threshold
const threshold = this.config.naturalTriggers?.triggerThreshold || 0.6;
const shouldTrigger = confidence >= threshold;
return {
shouldTrigger,
confidence: Math.min(confidence, 1.0),
reasoning: reasons.join('; '),
threshold,
details: {
conversationWeight: conversationAnalysis.triggerProbability * 0.4,
patternWeight: patternResults.confidence * 0.6,
contextAdjustments: confidence - (conversationAnalysis.triggerProbability * 0.4 + patternResults.confidence * 0.6)
}
};
}
/**
* Build optimized memory query based on analysis
*/
buildMemoryQuery(analysisResult, context) {
const query = {
semanticQuery: '',
tags: [],
limit: this.config.maxMemoriesPerTrigger || 5,
timeFilter: 'last-month'
};
// Extract key topics from conversation analysis
if (analysisResult.conversationAnalysis.topics.length > 0) {
query.semanticQuery += analysisResult.conversationAnalysis.topics.join(' ');
}
// Add project context
if (context.projectContext) {
query.semanticQuery += ` ${context.projectContext.name}`;
query.tags.push(context.projectContext.name);
if (context.projectContext.language) {
query.tags.push(`language:${context.projectContext.language}`);
}
}
// Add pattern-based context
for (const match of analysisResult.patternResults.matches) {
if (match.category === 'explicitMemoryRequests') {
query.timeFilter = 'last-week'; // Recent memories for explicit requests
} else if (match.category === 'technicalDiscussions') {
query.tags.push('architecture', 'decisions');
}
}
// Ensure we have a meaningful query
if (!query.semanticQuery.trim()) {
query.semanticQuery = 'project context decisions';
}
return query;
}
/**
* Query memories using unified memory client
*/
async queryMemories(query) {
try {
let memories = [];
if (query.timeFilter) {
const timeQuery = `${query.semanticQuery} ${query.timeFilter}`;
memories = await this.memoryClient.queryMemoriesByTime(timeQuery, query.limit);
} else {
memories = await this.memoryClient.queryMemories(query.semanticQuery, query.limit);
}
return memories || [];
} catch (error) {
console.warn('[Mid-Conversation Hook] Memory query failed:', error.message);
return [];
}
}
/**
* Handle user feedback on trigger quality
*/
recordUserFeedback(analysisResult, wasHelpful, context = {}) {
// Update analytics
this.updateAcceptanceRate(wasHelpful);
// Pass feedback to components for learning
this.patternDetector.recordUserFeedback(wasHelpful, analysisResult.patternResults, context);
this.performanceManager.recordUserFeedback(wasHelpful, {
latency: analysisResult.performance?.latency || 0
});
// Log feedback for analysis
console.log(`[Mid-Conversation Hook] User feedback: ${wasHelpful ? 'helpful' : 'not helpful'} (confidence: ${analysisResult.confidence?.toFixed(2)})`);
}
/**
* Update performance profile
*/
updatePerformanceProfile(profileName) {
this.performanceManager.switchProfile(profileName);
this.conversationMonitor.updatePerformanceProfile(profileName);
console.log(`[Mid-Conversation Hook] Switched to performance profile: ${profileName}`);
}
/**
* Get hook status and analytics
*/
getStatus() {
return {
enabled: this.isEnabled,
lastTriggerTime: this.lastTriggerTime,
cooldownRemaining: Math.max(0, this.cooldownPeriod - (Date.now() - this.lastTriggerTime)),
analytics: this.analytics,
performance: this.performanceManager.getPerformanceReport(),
conversationMonitor: this.conversationMonitor.getPerformanceStatus(),
patternDetector: this.patternDetector.getStatistics()
};
}
/**
* Enable or disable the hook
*/
setEnabled(enabled) {
this.isEnabled = enabled;
console.log(`[Mid-Conversation Hook] ${enabled ? 'Enabled' : 'Disabled'}`);
}
/**
* Helper methods
*/
createResult(type, message, confidence) {
return {
shouldTrigger: false,
confidence,
reasoning: message,
type,
timestamp: Date.now()
};
}
updateAverageLatency(newLatency) {
const alpha = 0.1; // Exponential moving average factor
return this.analytics.averageLatency * (1 - alpha) + newLatency * alpha;
}
updateAcceptanceRate(wasPositive) {
// Increment feedback counter
this.analytics.totalFeedback++;
const totalFeedback = this.analytics.totalFeedback;
if (totalFeedback === 1) {
// First feedback sets the initial rate
this.analytics.userAcceptanceRate = wasPositive ? 1.0 : 0.0;
} else {
// Update running average
const currentRate = this.analytics.userAcceptanceRate;
this.analytics.userAcceptanceRate = (currentRate * (totalFeedback - 1) + (wasPositive ? 1 : 0)) / totalFeedback;
}
}
/**
* Cleanup resources
*/
async cleanup() {
if (this.memoryClient) {
try {
await this.memoryClient.disconnect();
} catch (error) {
// Ignore cleanup errors
}
this.memoryClient = null;
}
}
}
/**
* Global hook instance for state management
*/
let globalHookInstance = null;
/**
* Get or create the hook instance (singleton pattern)
*/
function getHookInstance(config) {
if (!globalHookInstance) {
globalHookInstance = new MidConversationHook(config || {});
console.log('[Mid-Conversation Hook] Created new hook instance');
}
return globalHookInstance;
}
/**
* Reset hook instance (for testing or config changes)
*/
function resetHookInstance() {
if (globalHookInstance) {
globalHookInstance.cleanup().catch((error) => {
// Log cleanup errors during reset but don't throw
console.debug('[Mid-Conversation Hook] Cleanup error during reset:', error.message);
});
globalHookInstance = null;
console.log('[Mid-Conversation Hook] Reset hook instance');
}
}
/**
* Hook function for Claude Code integration
*/
async function onMidConversation(context) {
// This would be called by Claude Code during conversation flow
// Implementation depends on how Claude Code exposes mid-conversation hooks
const hook = getHookInstance(context.config);
try {
// Analyze the current message
const analysis = await hook.analyzeMessage(context.userMessage, context);
if (analysis && analysis.shouldTrigger) {
// Execute memory trigger
const result = await hook.executeMemoryTrigger(analysis, context);
if (result && result.success && context.injectSystemMessage) {
await context.injectSystemMessage(result.contextMessage);
console.log(`[Mid-Conversation Hook] Injected ${result.memoriesUsed} memories (confidence: ${result.confidence.toFixed(2)})`);
}
}
} catch (error) {
console.error('[Mid-Conversation Hook] Hook execution failed:', error.message);
// Don't cleanup on error - preserve state for next call
}
}
module.exports = {
MidConversationHook,
onMidConversation,
getHookInstance,
resetHookInstance,
name: 'mid-conversation-memory',
version: '1.0.0',
description: 'Intelligent mid-conversation memory awareness with performance optimization',
trigger: 'mid-conversation',
handler: onMidConversation,
config: {
async: true,
timeout: 10000,
priority: 'high'
}
};
```
--------------------------------------------------------------------------------
/scripts/validation/validate_migration.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Migration Validation Script for MCP Memory Service
This script validates that a migration from ChromaDB to SQLite-vec was successful
by checking data integrity, required fields, and identifying any corruption.
"""
import argparse
import asyncio
import hashlib
import json
import os
import sqlite3
import sys
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
try:
from mcp_memory_service.storage.chroma import ChromaMemoryStorage
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from mcp_memory_service.utils.hashing import generate_content_hash
IMPORTS_AVAILABLE = True
except ImportError:
IMPORTS_AVAILABLE = False
print("Warning: MCP modules not available. Running in standalone mode.")
class ValidationReport:
"""Container for validation results."""
def __init__(self):
self.total_memories = 0
self.valid_memories = 0
self.issues = []
self.warnings = []
self.tag_issues = []
self.hash_mismatches = []
self.missing_fields = []
self.timestamp_issues = []
self.encoding_issues = []
def add_issue(self, issue: str):
"""Add a critical issue."""
self.issues.append(issue)
def add_warning(self, warning: str):
"""Add a warning."""
self.warnings.append(warning)
def is_valid(self) -> bool:
"""Check if validation passed."""
return len(self.issues) == 0
def print_report(self):
"""Print the validation report."""
print("\n" + "="*60)
print("MIGRATION VALIDATION REPORT")
print("="*60)
print(f"\n📊 Statistics:")
print(f" Total memories: {self.total_memories}")
print(f" Valid memories: {self.valid_memories}")
print(f" Validation rate: {self.valid_memories/self.total_memories*100:.1f}%" if self.total_memories > 0 else "N/A")
if self.issues:
print(f"\n❌ Critical Issues ({len(self.issues)}):")
for i, issue in enumerate(self.issues[:10], 1):
print(f" {i}. {issue}")
if len(self.issues) > 10:
print(f" ... and {len(self.issues) - 10} more")
if self.warnings:
print(f"\n⚠️ Warnings ({len(self.warnings)}):")
for i, warning in enumerate(self.warnings[:10], 1):
print(f" {i}. {warning}")
if len(self.warnings) > 10:
print(f" ... and {len(self.warnings) - 10} more")
if self.tag_issues:
print(f"\n🏷️ Tag Issues ({len(self.tag_issues)}):")
for i, issue in enumerate(self.tag_issues[:5], 1):
print(f" {i}. {issue}")
if len(self.tag_issues) > 5:
print(f" ... and {len(self.tag_issues) - 5} more")
if self.hash_mismatches:
print(f"\n🔑 Hash Mismatches ({len(self.hash_mismatches)}):")
for i, mismatch in enumerate(self.hash_mismatches[:5], 1):
print(f" {i}. {mismatch}")
if len(self.hash_mismatches) > 5:
print(f" ... and {len(self.hash_mismatches) - 5} more")
if self.timestamp_issues:
print(f"\n🕐 Timestamp Issues ({len(self.timestamp_issues)}):")
for i, issue in enumerate(self.timestamp_issues[:5], 1):
print(f" {i}. {issue}")
if len(self.timestamp_issues) > 5:
print(f" ... and {len(self.timestamp_issues) - 5} more")
# Final verdict
print("\n" + "="*60)
if self.is_valid():
print("✅ VALIDATION PASSED")
if self.warnings:
print(f" (with {len(self.warnings)} warnings)")
else:
print("❌ VALIDATION FAILED")
print(f" Found {len(self.issues)} critical issues")
print("="*60)
class MigrationValidator:
"""Tool for validating migrated data."""
def __init__(self, sqlite_path: str, chroma_path: Optional[str] = None):
self.sqlite_path = sqlite_path
self.chroma_path = chroma_path
self.report = ValidationReport()
def validate_content_hash(self, content: str, stored_hash: str) -> bool:
"""Validate that content hash is correct."""
if not stored_hash:
return False
# Generate expected hash
expected_hash = hashlib.sha256(content.encode()).hexdigest()
return expected_hash == stored_hash
def validate_tags(self, tags_str: str) -> Tuple[bool, List[str]]:
"""Validate tag format and return cleaned tags."""
if not tags_str:
return True, []
try:
# Tags should be comma-separated
tags = [tag.strip() for tag in tags_str.split(',') if tag.strip()]
# Check for common corruption patterns
issues = []
for tag in tags:
if '\n' in tag or '\r' in tag:
issues.append(f"Newline in tag: {repr(tag)}")
if len(tag) > 100:
issues.append(f"Tag too long: {tag[:50]}...")
if tag.startswith('[') or tag.endswith(']'):
issues.append(f"Array syntax in tag: {tag}")
return len(issues) == 0, tags
except Exception as e:
return False, []
def validate_timestamp(self, timestamp: Any, field_name: str) -> bool:
"""Validate timestamp value."""
if timestamp is None:
return False
try:
if isinstance(timestamp, (int, float)):
# Check if timestamp is reasonable (between 2000 and 2100)
if 946684800 <= float(timestamp) <= 4102444800:
return True
return False
except:
return False
def validate_metadata(self, metadata_str: str) -> Tuple[bool, Dict]:
"""Validate metadata JSON."""
if not metadata_str:
return True, {}
try:
metadata = json.loads(metadata_str)
if not isinstance(metadata, dict):
return False, {}
return True, metadata
except json.JSONDecodeError:
return False, {}
async def validate_sqlite_database(self) -> bool:
"""Validate the SQLite-vec database."""
print(f"Validating SQLite database: {self.sqlite_path}")
if not Path(self.sqlite_path).exists():
self.report.add_issue(f"Database file not found: {self.sqlite_path}")
return False
try:
conn = sqlite3.connect(self.sqlite_path)
# Check if tables exist
tables = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
table_names = [t[0] for t in tables]
if 'memories' not in table_names:
self.report.add_issue("Missing 'memories' table")
return False
# Check schema
schema = conn.execute("PRAGMA table_info(memories)").fetchall()
column_names = [col[1] for col in schema]
required_columns = [
'content_hash', 'content', 'tags', 'memory_type',
'metadata', 'created_at', 'updated_at'
]
for col in required_columns:
if col not in column_names:
self.report.add_issue(f"Missing required column: {col}")
# Validate each memory
cursor = conn.execute("""
SELECT id, content_hash, content, tags, memory_type,
metadata, created_at, updated_at
FROM memories
ORDER BY id
""")
for row in cursor:
memory_id, content_hash, content, tags, memory_type = row[:5]
metadata, created_at, updated_at = row[5:]
self.report.total_memories += 1
memory_valid = True
# Validate content and hash
if not content:
self.report.missing_fields.append(f"Memory {memory_id}: missing content")
memory_valid = False
elif not content_hash:
self.report.missing_fields.append(f"Memory {memory_id}: missing content_hash")
memory_valid = False
elif not self.validate_content_hash(content, content_hash):
self.report.hash_mismatches.append(
f"Memory {memory_id}: hash mismatch (hash: {content_hash[:8]}...)"
)
memory_valid = False
# Validate tags
if tags:
valid_tags, cleaned_tags = self.validate_tags(tags)
if not valid_tags:
self.report.tag_issues.append(
f"Memory {memory_id}: malformed tags: {tags[:50]}..."
)
# Validate timestamps
if not self.validate_timestamp(created_at, 'created_at'):
self.report.timestamp_issues.append(
f"Memory {memory_id}: invalid created_at: {created_at}"
)
memory_valid = False
if not self.validate_timestamp(updated_at, 'updated_at'):
self.report.timestamp_issues.append(
f"Memory {memory_id}: invalid updated_at: {updated_at}"
)
# Validate metadata
if metadata:
valid_meta, meta_dict = self.validate_metadata(metadata)
if not valid_meta:
self.report.warnings.append(
f"Memory {memory_id}: invalid metadata JSON"
)
# Check for encoding issues
try:
content.encode('utf-8')
except UnicodeEncodeError:
self.report.encoding_issues.append(
f"Memory {memory_id}: encoding issues in content"
)
memory_valid = False
if memory_valid:
self.report.valid_memories += 1
conn.close()
# Check for critical issues
if self.report.total_memories == 0:
self.report.add_issue("No memories found in database")
elif self.report.valid_memories < self.report.total_memories * 0.5:
self.report.add_issue(
f"Less than 50% of memories are valid ({self.report.valid_memories}/{self.report.total_memories})"
)
return True
except Exception as e:
self.report.add_issue(f"Database validation error: {e}")
return False
async def compare_with_chroma(self) -> bool:
"""Compare migrated data with original ChromaDB."""
if not self.chroma_path or not IMPORTS_AVAILABLE:
print("Skipping ChromaDB comparison (not available)")
return True
print(f"Comparing with ChromaDB: {self.chroma_path}")
try:
# Load ChromaDB memories
chroma_storage = ChromaMemoryStorage(path=self.chroma_path)
collection = chroma_storage.collection
if not collection:
self.report.add_warning("Could not access ChromaDB collection")
return True
# Get count from ChromaDB
chroma_results = collection.get()
chroma_count = len(chroma_results.get('ids', []))
print(f" ChromaDB memories: {chroma_count}")
print(f" SQLite memories: {self.report.total_memories}")
if chroma_count > 0:
migration_rate = self.report.total_memories / chroma_count * 100
if migration_rate < 95:
self.report.add_warning(
f"Only {migration_rate:.1f}% of ChromaDB memories were migrated"
)
elif migration_rate > 105:
self.report.add_warning(
f"SQLite has {migration_rate:.1f}% of ChromaDB count (possible duplicates)"
)
return True
except Exception as e:
self.report.add_warning(f"Could not compare with ChromaDB: {e}")
return True
async def run(self) -> bool:
"""Run the validation process."""
print("\n" + "="*60)
print("MCP Memory Service - Migration Validator")
print("="*60)
# Validate SQLite database
if not await self.validate_sqlite_database():
self.report.print_report()
return False
# Compare with ChromaDB if available
if self.chroma_path:
await self.compare_with_chroma()
# Print report
self.report.print_report()
return self.report.is_valid()
def find_databases() -> Tuple[Optional[str], Optional[str]]:
"""Try to find SQLite and ChromaDB databases."""
sqlite_path = None
chroma_path = None
# Check environment variables
sqlite_path = os.environ.get('MCP_MEMORY_SQLITE_PATH')
chroma_path = os.environ.get('MCP_MEMORY_CHROMA_PATH')
# Check default locations
home = Path.home()
if sys.platform == 'darwin': # macOS
default_base = home / 'Library' / 'Application Support' / 'mcp-memory'
elif sys.platform == 'win32': # Windows
default_base = Path(os.getenv('LOCALAPPDATA', '')) / 'mcp-memory'
else: # Linux
default_base = home / '.local' / 'share' / 'mcp-memory'
if not sqlite_path:
# Try to find SQLite database
possible_sqlite = [
default_base / 'sqlite_vec.db',
default_base / 'sqlite_vec_migrated.db',
default_base / 'memory_migrated.db',
Path.cwd() / 'sqlite_vec.db',
]
for path in possible_sqlite:
if path.exists():
sqlite_path = str(path)
print(f"Found SQLite database: {path}")
break
if not chroma_path:
# Try to find ChromaDB
possible_chroma = [
home / '.mcp_memory_chroma',
default_base / 'chroma_db',
Path.cwd() / 'chroma_db',
]
for path in possible_chroma:
if path.exists():
chroma_path = str(path)
print(f"Found ChromaDB: {path}")
break
return sqlite_path, chroma_path
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Validate ChromaDB to SQLite-vec migration",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'sqlite_path',
nargs='?',
help='Path to SQLite-vec database (default: auto-detect)'
)
parser.add_argument(
'--chroma-path',
help='Path to original ChromaDB for comparison'
)
parser.add_argument(
'--compare',
action='store_true',
help='Compare with ChromaDB (requires ChromaDB path)'
)
parser.add_argument(
'--fix',
action='store_true',
help='Attempt to fix common issues (experimental)'
)
args = parser.parse_args()
# Find databases
if args.sqlite_path:
sqlite_path = args.sqlite_path
else:
sqlite_path, detected_chroma = find_databases()
if not args.chroma_path and detected_chroma:
args.chroma_path = detected_chroma
if not sqlite_path:
print("Error: Could not find SQLite database")
print("Please specify the path or set MCP_MEMORY_SQLITE_PATH")
return 1
# Run validation
validator = MigrationValidator(sqlite_path, args.chroma_path)
success = asyncio.run(validator.run())
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())
```