This is page 17 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── amp-bridge.md
│ │ ├── amp-pr-automator.md
│ │ ├── code-quality-guard.md
│ │ ├── gemini-pr-automator.md
│ │ └── github-release-manager.md
│ ├── settings.local.json.backup
│ └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── performance_issue.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── bridge-tests.yml
│ ├── CACHE_FIX.md
│ ├── claude-code-review.yml
│ ├── claude.yml
│ ├── cleanup-images.yml.disabled
│ ├── dev-setup-validation.yml
│ ├── docker-publish.yml
│ ├── LATEST_FIXES.md
│ ├── main-optimized.yml.disabled
│ ├── main.yml
│ ├── publish-and-test.yml
│ ├── README_OPTIMIZATION.md
│ ├── release-tag.yml.disabled
│ ├── release.yml
│ ├── roadmap-review-reminder.yml
│ ├── SECRET_CONDITIONAL_FIX.md
│ └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│ ├── .gitignore
│ └── reports
│ └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│ ├── deployment
│ │ ├── deploy_fastmcp_fixed.sh
│ │ ├── deploy_http_with_mcp.sh
│ │ └── deploy_mcp_v4.sh
│ ├── deployment-configs
│ │ ├── empty_config.yml
│ │ └── smithery.yaml
│ ├── development
│ │ └── test_fastmcp.py
│ ├── docs-removed-2025-08-23
│ │ ├── authentication.md
│ │ ├── claude_integration.md
│ │ ├── claude-code-compatibility.md
│ │ ├── claude-code-integration.md
│ │ ├── claude-code-quickstart.md
│ │ ├── claude-desktop-setup.md
│ │ ├── complete-setup-guide.md
│ │ ├── database-synchronization.md
│ │ ├── development
│ │ │ ├── autonomous-memory-consolidation.md
│ │ │ ├── CLEANUP_PLAN.md
│ │ │ ├── CLEANUP_README.md
│ │ │ ├── CLEANUP_SUMMARY.md
│ │ │ ├── dream-inspired-memory-consolidation.md
│ │ │ ├── hybrid-slm-memory-consolidation.md
│ │ │ ├── mcp-milestone.md
│ │ │ ├── multi-client-architecture.md
│ │ │ ├── test-results.md
│ │ │ └── TIMESTAMP_FIX_SUMMARY.md
│ │ ├── distributed-sync.md
│ │ ├── invocation_guide.md
│ │ ├── macos-intel.md
│ │ ├── master-guide.md
│ │ ├── mcp-client-configuration.md
│ │ ├── multi-client-server.md
│ │ ├── service-installation.md
│ │ ├── sessions
│ │ │ └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│ │ ├── UBUNTU_SETUP.md
│ │ ├── ubuntu.md
│ │ ├── windows-setup.md
│ │ └── windows.md
│ ├── docs-root-cleanup-2025-08-23
│ │ ├── AWESOME_LIST_SUBMISSION.md
│ │ ├── CLOUDFLARE_IMPLEMENTATION.md
│ │ ├── DOCUMENTATION_ANALYSIS.md
│ │ ├── DOCUMENTATION_CLEANUP_PLAN.md
│ │ ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│ │ ├── LITESTREAM_SETUP_GUIDE.md
│ │ ├── lm_studio_system_prompt.md
│ │ ├── PYTORCH_DOWNLOAD_FIX.md
│ │ └── README-ORIGINAL-BACKUP.md
│ ├── investigations
│ │ └── MACOS_HOOKS_INVESTIGATION.md
│ ├── litestream-configs-v6.3.0
│ │ ├── install_service.sh
│ │ ├── litestream_master_config_fixed.yml
│ │ ├── litestream_master_config.yml
│ │ ├── litestream_replica_config_fixed.yml
│ │ ├── litestream_replica_config.yml
│ │ ├── litestream_replica_simple.yml
│ │ ├── litestream-http.service
│ │ ├── litestream.service
│ │ └── requirements-cloudflare.txt
│ ├── release-notes
│ │ └── release-notes-v7.1.4.md
│ └── setup-development
│ ├── README.md
│ ├── setup_consolidation_mdns.sh
│ ├── STARTUP_SETUP_GUIDE.md
│ └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│ ├── memory-context.md
│ ├── memory-health.md
│ ├── memory-ingest-dir.md
│ ├── memory-ingest.md
│ ├── memory-recall.md
│ ├── memory-search.md
│ ├── memory-store.md
│ ├── README.md
│ └── session-start.md
├── claude-hooks
│ ├── config.json
│ ├── config.template.json
│ ├── CONFIGURATION.md
│ ├── core
│ │ ├── memory-retrieval.js
│ │ ├── mid-conversation.js
│ │ ├── session-end.js
│ │ ├── session-start.js
│ │ └── topic-change.js
│ ├── debug-pattern-test.js
│ ├── install_claude_hooks_windows.ps1
│ ├── install_hooks.py
│ ├── memory-mode-controller.js
│ ├── MIGRATION.md
│ ├── README-NATURAL-TRIGGERS.md
│ ├── README-phase2.md
│ ├── README.md
│ ├── simple-test.js
│ ├── statusline.sh
│ ├── test-adaptive-weights.js
│ ├── test-dual-protocol-hook.js
│ ├── test-mcp-hook.js
│ ├── test-natural-triggers.js
│ ├── test-recency-scoring.js
│ ├── tests
│ │ ├── integration-test.js
│ │ ├── phase2-integration-test.js
│ │ ├── test-code-execution.js
│ │ ├── test-cross-session.json
│ │ ├── test-session-tracking.json
│ │ └── test-threading.json
│ ├── utilities
│ │ ├── adaptive-pattern-detector.js
│ │ ├── context-formatter.js
│ │ ├── context-shift-detector.js
│ │ ├── conversation-analyzer.js
│ │ ├── dynamic-context-updater.js
│ │ ├── git-analyzer.js
│ │ ├── mcp-client.js
│ │ ├── memory-client.js
│ │ ├── memory-scorer.js
│ │ ├── performance-manager.js
│ │ ├── project-detector.js
│ │ ├── session-tracker.js
│ │ ├── tiered-conversation-monitor.js
│ │ └── version-checker.js
│ └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│ ├── amp-cli-bridge.md
│ ├── api
│ │ ├── code-execution-interface.md
│ │ ├── memory-metadata-api.md
│ │ ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_REPORT.md
│ │ └── tag-standardization.md
│ ├── architecture
│ │ ├── search-enhancement-spec.md
│ │ └── search-examples.md
│ ├── architecture.md
│ ├── archive
│ │ └── obsolete-workflows
│ │ ├── load_memory_context.md
│ │ └── README.md
│ ├── assets
│ │ └── images
│ │ ├── dashboard-v3.3.0-preview.png
│ │ ├── memory-awareness-hooks-example.png
│ │ ├── project-infographic.svg
│ │ └── README.md
│ ├── CLAUDE_CODE_QUICK_REFERENCE.md
│ ├── cloudflare-setup.md
│ ├── deployment
│ │ ├── docker.md
│ │ ├── dual-service.md
│ │ ├── production-guide.md
│ │ └── systemd-service.md
│ ├── development
│ │ ├── ai-agent-instructions.md
│ │ ├── code-quality
│ │ │ ├── phase-2a-completion.md
│ │ │ ├── phase-2a-handle-get-prompt.md
│ │ │ ├── phase-2a-index.md
│ │ │ ├── phase-2a-install-package.md
│ │ │ └── phase-2b-session-summary.md
│ │ ├── code-quality-workflow.md
│ │ ├── dashboard-workflow.md
│ │ ├── issue-management.md
│ │ ├── pr-review-guide.md
│ │ ├── refactoring-notes.md
│ │ ├── release-checklist.md
│ │ └── todo-tracker.md
│ ├── docker-optimized-build.md
│ ├── document-ingestion.md
│ ├── DOCUMENTATION_AUDIT.md
│ ├── enhancement-roadmap-issue-14.md
│ ├── examples
│ │ ├── analysis-scripts.js
│ │ ├── maintenance-session-example.md
│ │ ├── memory-distribution-chart.jsx
│ │ └── tag-schema.json
│ ├── first-time-setup.md
│ ├── glama-deployment.md
│ ├── guides
│ │ ├── advanced-command-examples.md
│ │ ├── chromadb-migration.md
│ │ ├── commands-vs-mcp-server.md
│ │ ├── mcp-enhancements.md
│ │ ├── mdns-service-discovery.md
│ │ ├── memory-consolidation-guide.md
│ │ ├── migration.md
│ │ ├── scripts.md
│ │ └── STORAGE_BACKENDS.md
│ ├── HOOK_IMPROVEMENTS.md
│ ├── hooks
│ │ └── phase2-code-execution-migration.md
│ ├── http-server-management.md
│ ├── ide-compatability.md
│ ├── IMAGE_RETENTION_POLICY.md
│ ├── images
│ │ └── dashboard-placeholder.md
│ ├── implementation
│ │ ├── health_checks.md
│ │ └── performance.md
│ ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│ ├── integration
│ │ ├── homebrew.md
│ │ └── multi-client.md
│ ├── integrations
│ │ ├── gemini.md
│ │ ├── groq-bridge.md
│ │ ├── groq-integration-summary.md
│ │ └── groq-model-comparison.md
│ ├── integrations.md
│ ├── legacy
│ │ └── dual-protocol-hooks.md
│ ├── LM_STUDIO_COMPATIBILITY.md
│ ├── maintenance
│ │ └── memory-maintenance.md
│ ├── mastery
│ │ ├── api-reference.md
│ │ ├── architecture-overview.md
│ │ ├── configuration-guide.md
│ │ ├── local-setup-and-run.md
│ │ ├── testing-guide.md
│ │ └── troubleshooting.md
│ ├── migration
│ │ └── code-execution-api-quick-start.md
│ ├── natural-memory-triggers
│ │ ├── cli-reference.md
│ │ ├── installation-guide.md
│ │ └── performance-optimization.md
│ ├── oauth-setup.md
│ ├── pr-graphql-integration.md
│ ├── quick-setup-cloudflare-dual-environment.md
│ ├── README.md
│ ├── remote-configuration-wiki-section.md
│ ├── research
│ │ ├── code-execution-interface-implementation.md
│ │ └── code-execution-interface-summary.md
│ ├── ROADMAP.md
│ ├── sqlite-vec-backend.md
│ ├── statistics
│ │ ├── charts
│ │ │ ├── activity_patterns.png
│ │ │ ├── contributors.png
│ │ │ ├── growth_trajectory.png
│ │ │ ├── monthly_activity.png
│ │ │ └── october_sprint.png
│ │ ├── data
│ │ │ ├── activity_by_day.csv
│ │ │ ├── activity_by_hour.csv
│ │ │ ├── contributors.csv
│ │ │ └── monthly_activity.csv
│ │ ├── generate_charts.py
│ │ └── REPOSITORY_STATISTICS.md
│ ├── technical
│ │ ├── development.md
│ │ ├── memory-migration.md
│ │ ├── migration-log.md
│ │ ├── sqlite-vec-embedding-fixes.md
│ │ └── tag-storage.md
│ ├── testing
│ │ └── regression-tests.md
│ ├── testing-cloudflare-backend.md
│ ├── troubleshooting
│ │ ├── cloudflare-api-token-setup.md
│ │ ├── cloudflare-authentication.md
│ │ ├── general.md
│ │ ├── hooks-quick-reference.md
│ │ ├── pr162-schema-caching-issue.md
│ │ ├── session-end-hooks.md
│ │ └── sync-issues.md
│ └── tutorials
│ ├── advanced-techniques.md
│ ├── data-analysis.md
│ └── demo-session-walkthrough.md
├── examples
│ ├── claude_desktop_config_template.json
│ ├── claude_desktop_config_windows.json
│ ├── claude-desktop-http-config.json
│ ├── config
│ │ └── claude_desktop_config.json
│ ├── http-mcp-bridge.js
│ ├── memory_export_template.json
│ ├── README.md
│ ├── setup
│ │ └── setup_multi_client_complete.py
│ └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│ ├── .claude
│ │ └── settings.local.json
│ ├── archive
│ │ └── check_missing_timestamps.py
│ ├── backup
│ │ ├── backup_memories.py
│ │ ├── backup_sqlite_vec.sh
│ │ ├── export_distributable_memories.sh
│ │ └── restore_memories.py
│ ├── benchmarks
│ │ ├── benchmark_code_execution_api.py
│ │ ├── benchmark_hybrid_sync.py
│ │ └── benchmark_server_caching.py
│ ├── database
│ │ ├── analyze_sqlite_vec_db.py
│ │ ├── check_sqlite_vec_status.py
│ │ ├── db_health_check.py
│ │ └── simple_timestamp_check.py
│ ├── development
│ │ ├── debug_server_initialization.py
│ │ ├── find_orphaned_files.py
│ │ ├── fix_mdns.sh
│ │ ├── fix_sitecustomize.py
│ │ ├── remote_ingest.sh
│ │ ├── setup-git-merge-drivers.sh
│ │ ├── uv-lock-merge.sh
│ │ └── verify_hybrid_sync.py
│ ├── hooks
│ │ └── pre-commit
│ ├── installation
│ │ ├── install_linux_service.py
│ │ ├── install_macos_service.py
│ │ ├── install_uv.py
│ │ ├── install_windows_service.py
│ │ ├── install.py
│ │ ├── setup_backup_cron.sh
│ │ ├── setup_claude_mcp.sh
│ │ └── setup_cloudflare_resources.py
│ ├── linux
│ │ ├── service_status.sh
│ │ ├── start_service.sh
│ │ ├── stop_service.sh
│ │ ├── uninstall_service.sh
│ │ └── view_logs.sh
│ ├── maintenance
│ │ ├── assign_memory_types.py
│ │ ├── check_memory_types.py
│ │ ├── cleanup_corrupted_encoding.py
│ │ ├── cleanup_memories.py
│ │ ├── cleanup_organize.py
│ │ ├── consolidate_memory_types.py
│ │ ├── consolidation_mappings.json
│ │ ├── delete_orphaned_vectors_fixed.py
│ │ ├── fast_cleanup_duplicates_with_tracking.sh
│ │ ├── find_all_duplicates.py
│ │ ├── find_cloudflare_duplicates.py
│ │ ├── find_duplicates.py
│ │ ├── memory-types.md
│ │ ├── README.md
│ │ ├── recover_timestamps_from_cloudflare.py
│ │ ├── regenerate_embeddings.py
│ │ ├── repair_malformed_tags.py
│ │ ├── repair_memories.py
│ │ ├── repair_sqlite_vec_embeddings.py
│ │ ├── repair_zero_embeddings.py
│ │ ├── restore_from_json_export.py
│ │ └── scan_todos.sh
│ ├── migration
│ │ ├── cleanup_mcp_timestamps.py
│ │ ├── legacy
│ │ │ └── migrate_chroma_to_sqlite.py
│ │ ├── mcp-migration.py
│ │ ├── migrate_sqlite_vec_embeddings.py
│ │ ├── migrate_storage.py
│ │ ├── migrate_tags.py
│ │ ├── migrate_timestamps.py
│ │ ├── migrate_to_cloudflare.py
│ │ ├── migrate_to_sqlite_vec.py
│ │ ├── migrate_v5_enhanced.py
│ │ ├── TIMESTAMP_CLEANUP_README.md
│ │ └── verify_mcp_timestamps.py
│ ├── pr
│ │ ├── amp_collect_results.sh
│ │ ├── amp_detect_breaking_changes.sh
│ │ ├── amp_generate_tests.sh
│ │ ├── amp_pr_review.sh
│ │ ├── amp_quality_gate.sh
│ │ ├── amp_suggest_fixes.sh
│ │ ├── auto_review.sh
│ │ ├── detect_breaking_changes.sh
│ │ ├── generate_tests.sh
│ │ ├── lib
│ │ │ └── graphql_helpers.sh
│ │ ├── quality_gate.sh
│ │ ├── resolve_threads.sh
│ │ ├── run_pyscn_analysis.sh
│ │ ├── run_quality_checks.sh
│ │ ├── thread_status.sh
│ │ └── watch_reviews.sh
│ ├── quality
│ │ ├── fix_dead_code_install.sh
│ │ ├── phase1_dead_code_analysis.md
│ │ ├── phase2_complexity_analysis.md
│ │ ├── README_PHASE1.md
│ │ ├── README_PHASE2.md
│ │ ├── track_pyscn_metrics.sh
│ │ └── weekly_quality_review.sh
│ ├── README.md
│ ├── run
│ │ ├── run_mcp_memory.sh
│ │ ├── run-with-uv.sh
│ │ └── start_sqlite_vec.sh
│ ├── run_memory_server.py
│ ├── server
│ │ ├── check_http_server.py
│ │ ├── check_server_health.py
│ │ ├── memory_offline.py
│ │ ├── preload_models.py
│ │ ├── run_http_server.py
│ │ ├── run_memory_server.py
│ │ ├── start_http_server.bat
│ │ └── start_http_server.sh
│ ├── service
│ │ ├── deploy_dual_services.sh
│ │ ├── install_http_service.sh
│ │ ├── mcp-memory-http.service
│ │ ├── mcp-memory.service
│ │ ├── memory_service_manager.sh
│ │ ├── service_control.sh
│ │ ├── service_utils.py
│ │ └── update_service.sh
│ ├── sync
│ │ ├── check_drift.py
│ │ ├── claude_sync_commands.py
│ │ ├── export_memories.py
│ │ ├── import_memories.py
│ │ ├── litestream
│ │ │ ├── apply_local_changes.sh
│ │ │ ├── enhanced_memory_store.sh
│ │ │ ├── init_staging_db.sh
│ │ │ ├── io.litestream.replication.plist
│ │ │ ├── manual_sync.sh
│ │ │ ├── memory_sync.sh
│ │ │ ├── pull_remote_changes.sh
│ │ │ ├── push_to_remote.sh
│ │ │ ├── README.md
│ │ │ ├── resolve_conflicts.sh
│ │ │ ├── setup_local_litestream.sh
│ │ │ ├── setup_remote_litestream.sh
│ │ │ ├── staging_db_init.sql
│ │ │ ├── stash_local_changes.sh
│ │ │ ├── sync_from_remote_noconfig.sh
│ │ │ └── sync_from_remote.sh
│ │ ├── README.md
│ │ ├── safe_cloudflare_update.sh
│ │ ├── sync_memory_backends.py
│ │ └── sync_now.py
│ ├── testing
│ │ ├── run_complete_test.py
│ │ ├── run_memory_test.sh
│ │ ├── simple_test.py
│ │ ├── test_cleanup_logic.py
│ │ ├── test_cloudflare_backend.py
│ │ ├── test_docker_functionality.py
│ │ ├── test_installation.py
│ │ ├── test_mdns.py
│ │ ├── test_memory_api.py
│ │ ├── test_memory_simple.py
│ │ ├── test_migration.py
│ │ ├── test_search_api.py
│ │ ├── test_sqlite_vec_embeddings.py
│ │ ├── test_sse_events.py
│ │ ├── test-connection.py
│ │ └── test-hook.js
│ ├── utils
│ │ ├── claude_commands_utils.py
│ │ ├── generate_personalized_claude_md.sh
│ │ ├── groq
│ │ ├── groq_agent_bridge.py
│ │ ├── list-collections.py
│ │ ├── memory_wrapper_uv.py
│ │ ├── query_memories.py
│ │ ├── smithery_wrapper.py
│ │ ├── test_groq_bridge.sh
│ │ └── uv_wrapper.py
│ └── validation
│ ├── check_dev_setup.py
│ ├── check_documentation_links.py
│ ├── diagnose_backend_config.py
│ ├── validate_configuration_complete.py
│ ├── validate_memories.py
│ ├── validate_migration.py
│ ├── validate_timestamp_integrity.py
│ ├── verify_environment.py
│ ├── verify_pytorch_windows.py
│ └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│ └── mcp_memory_service
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── operations.py
│ │ ├── sync_wrapper.py
│ │ └── types.py
│ ├── backup
│ │ ├── __init__.py
│ │ └── scheduler.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── ingestion.py
│ │ ├── main.py
│ │ └── utils.py
│ ├── config.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── associations.py
│ │ ├── base.py
│ │ ├── clustering.py
│ │ ├── compression.py
│ │ ├── consolidator.py
│ │ ├── decay.py
│ │ ├── forgetting.py
│ │ ├── health.py
│ │ └── scheduler.py
│ ├── dependency_check.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mdns_service.py
│ ├── embeddings
│ │ ├── __init__.py
│ │ └── onnx_embeddings.py
│ ├── ingestion
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chunker.py
│ │ ├── csv_loader.py
│ │ ├── json_loader.py
│ │ ├── pdf_loader.py
│ │ ├── registry.py
│ │ ├── semtools_loader.py
│ │ └── text_loader.py
│ ├── lm_studio_compat.py
│ ├── mcp_server.py
│ ├── models
│ │ ├── __init__.py
│ │ └── memory.py
│ ├── server.py
│ ├── services
│ │ ├── __init__.py
│ │ └── memory_service.py
│ ├── storage
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloudflare.py
│ │ ├── factory.py
│ │ ├── http_client.py
│ │ ├── hybrid.py
│ │ └── sqlite_vec.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── exporter.py
│ │ ├── importer.py
│ │ └── litestream_config.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── content_splitter.py
│ │ ├── db_utils.py
│ │ ├── debug.py
│ │ ├── document_processing.py
│ │ ├── gpu_detection.py
│ │ ├── hashing.py
│ │ ├── http_server_manager.py
│ │ ├── port_detection.py
│ │ ├── system_detection.py
│ │ └── time_parser.py
│ └── web
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── analytics.py
│ │ ├── backup.py
│ │ ├── consolidation.py
│ │ ├── documents.py
│ │ ├── events.py
│ │ ├── health.py
│ │ ├── manage.py
│ │ ├── mcp.py
│ │ ├── memories.py
│ │ ├── search.py
│ │ └── sync.py
│ ├── app.py
│ ├── dependencies.py
│ ├── oauth
│ │ ├── __init__.py
│ │ ├── authorization.py
│ │ ├── discovery.py
│ │ ├── middleware.py
│ │ ├── models.py
│ │ ├── registration.py
│ │ └── storage.py
│ ├── sse.py
│ └── static
│ ├── app.js
│ ├── index.html
│ ├── README.md
│ ├── sse_test.html
│ └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── test_compact_types.py
│ │ └── test_operations.py
│ ├── bridge
│ │ ├── mock_responses.js
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ └── test_http_mcp_bridge.js
│ ├── conftest.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_associations.py
│ │ ├── test_clustering.py
│ │ ├── test_compression.py
│ │ ├── test_consolidator.py
│ │ ├── test_decay.py
│ │ └── test_forgetting.py
│ ├── contracts
│ │ └── api-specification.yml
│ ├── integration
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── test_api_key_fallback.py
│ │ ├── test_api_memories_chronological.py
│ │ ├── test_api_tag_time_search.py
│ │ ├── test_api_with_memory_service.py
│ │ ├── test_bridge_integration.js
│ │ ├── test_cli_interfaces.py
│ │ ├── test_cloudflare_connection.py
│ │ ├── test_concurrent_clients.py
│ │ ├── test_data_serialization_consistency.py
│ │ ├── test_http_server_startup.py
│ │ ├── test_mcp_memory.py
│ │ ├── test_mdns_integration.py
│ │ ├── test_oauth_basic_auth.py
│ │ ├── test_oauth_flow.py
│ │ ├── test_server_handlers.py
│ │ └── test_store_memory.py
│ ├── performance
│ │ ├── test_background_sync.py
│ │ └── test_hybrid_live.py
│ ├── README.md
│ ├── smithery
│ │ └── test_smithery.py
│ ├── sqlite
│ │ └── simple_sqlite_vec_test.py
│ ├── test_client.py
│ ├── test_content_splitting.py
│ ├── test_database.py
│ ├── test_hybrid_cloudflare_limits.py
│ ├── test_hybrid_storage.py
│ ├── test_memory_ops.py
│ ├── test_semantic_search.py
│ ├── test_sqlite_vec_storage.py
│ ├── test_time_parser.py
│ ├── test_timestamp_preservation.py
│ ├── timestamp
│ │ ├── test_hook_vs_manual_storage.py
│ │ ├── test_issue99_final_validation.py
│ │ ├── test_search_retrieval_inconsistency.py
│ │ ├── test_timestamp_issue.py
│ │ └── test_timestamp_simple.py
│ └── unit
│ ├── conftest.py
│ ├── test_cloudflare_storage.py
│ ├── test_csv_loader.py
│ ├── test_fastapi_dependencies.py
│ ├── test_import.py
│ ├── test_json_loader.py
│ ├── test_mdns_simple.py
│ ├── test_mdns.py
│ ├── test_memory_service.py
│ ├── test_memory.py
│ ├── test_semtools_loader.py
│ ├── test_storage_interface_compatibility.py
│ └── test_tag_time_filtering.py
├── tools
│ ├── docker
│ │ ├── DEPRECATED.md
│ │ ├── docker-compose.http.yml
│ │ ├── docker-compose.pythonpath.yml
│ │ ├── docker-compose.standalone.yml
│ │ ├── docker-compose.uv.yml
│ │ ├── docker-compose.yml
│ │ ├── docker-entrypoint-persistent.sh
│ │ ├── docker-entrypoint-unified.sh
│ │ ├── docker-entrypoint.sh
│ │ ├── Dockerfile
│ │ ├── Dockerfile.glama
│ │ ├── Dockerfile.slim
│ │ ├── README.md
│ │ └── test-docker-modes.sh
│ └── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/mcp_memory_service/web/oauth/authorization.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OAuth 2.1 Authorization Server implementation for MCP Memory Service.
Implements OAuth 2.1 authorization code flow and token endpoints.
"""
import time
import logging
import base64
from typing import Optional, Tuple
from urllib.parse import urlencode
from fastapi import APIRouter, HTTPException, status, Form, Query, Request
from fastapi.responses import RedirectResponse
from jose import jwt
from ...config import (
OAUTH_ISSUER,
OAUTH_ACCESS_TOKEN_EXPIRE_MINUTES,
OAUTH_AUTHORIZATION_CODE_EXPIRE_MINUTES,
get_jwt_algorithm,
get_jwt_signing_key
)
from .models import TokenResponse
from .storage import oauth_storage
logger = logging.getLogger(__name__)
router = APIRouter()
def parse_basic_auth(authorization_header: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
"""
Parse HTTP Basic authentication header.
Returns:
Tuple of (client_id, client_secret) or (None, None) if not valid
"""
if not authorization_header:
return None, None
try:
# Check if it's Basic authentication
if not authorization_header.startswith('Basic '):
return None, None
# Extract and decode the credentials
encoded_credentials = authorization_header[6:] # Remove 'Basic ' prefix
decoded_credentials = base64.b64decode(encoded_credentials).decode('utf-8')
# Split username:password
if ':' not in decoded_credentials:
return None, None
client_id, client_secret = decoded_credentials.split(':', 1)
return client_id, client_secret
except Exception as e:
logger.debug(f"Failed to parse Basic auth header: {e}")
return None, None
def create_access_token(client_id: str, scope: Optional[str] = None) -> tuple[str, int]:
"""
Create a JWT access token for the given client.
Uses RS256 with RSA key pair if available, otherwise falls back to HS256.
Returns:
Tuple of (token, expires_in_seconds)
"""
expires_in = OAUTH_ACCESS_TOKEN_EXPIRE_MINUTES * 60
expire_time = time.time() + expires_in
payload = {
"iss": OAUTH_ISSUER,
"sub": client_id,
"aud": "mcp-memory-service",
"exp": expire_time,
"iat": time.time(),
"scope": scope or "read write"
}
algorithm = get_jwt_algorithm()
signing_key = get_jwt_signing_key()
logger.debug(f"Creating JWT token with algorithm: {algorithm}")
token = jwt.encode(payload, signing_key, algorithm=algorithm)
return token, expires_in
async def validate_redirect_uri(client_id: str, redirect_uri: Optional[str]) -> str:
"""Validate redirect URI against registered client."""
client = await oauth_storage.get_client(client_id)
if not client:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "invalid_client",
"error_description": "Invalid client_id"
}
)
# If no redirect_uri provided, use the first registered one
if not redirect_uri:
if not client.redirect_uris:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "invalid_request",
"error_description": "redirect_uri is required when client has no registered redirect URIs"
}
)
return client.redirect_uris[0]
# Validate that the redirect_uri is registered
if redirect_uri not in client.redirect_uris:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "invalid_redirect_uri",
"error_description": "redirect_uri not registered for this client"
}
)
return redirect_uri
@router.get("/authorize")
async def authorize(
response_type: str = Query(..., description="OAuth response type"),
client_id: str = Query(..., description="OAuth client identifier"),
redirect_uri: Optional[str] = Query(None, description="Redirection URI"),
scope: Optional[str] = Query(None, description="Requested scope"),
state: Optional[str] = Query(None, description="Opaque value for CSRF protection")
):
"""
OAuth 2.1 Authorization endpoint.
Implements the authorization code flow. For MVP, this auto-approves
all requests without user interaction.
"""
logger.info(f"Authorization request: client_id={client_id}, response_type={response_type}")
try:
# Validate response_type
if response_type != "code":
error_params = {
"error": "unsupported_response_type",
"error_description": "Only 'code' response type is supported"
}
if state:
error_params["state"] = state
# If we have a redirect_uri, redirect with error
if redirect_uri:
error_url = f"{redirect_uri}?{urlencode(error_params)}"
return RedirectResponse(url=error_url)
else:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_params)
# Validate client and redirect_uri
validated_redirect_uri = await validate_redirect_uri(client_id, redirect_uri)
# Generate authorization code
auth_code = oauth_storage.generate_authorization_code()
# Store authorization code
await oauth_storage.store_authorization_code(
code=auth_code,
client_id=client_id,
redirect_uri=validated_redirect_uri,
scope=scope,
expires_in=OAUTH_AUTHORIZATION_CODE_EXPIRE_MINUTES * 60
)
# Build redirect URL with authorization code
redirect_params = {"code": auth_code}
if state:
redirect_params["state"] = state
redirect_url = f"{validated_redirect_uri}?{urlencode(redirect_params)}"
logger.info(f"Authorization granted for client_id={client_id}")
return RedirectResponse(url=redirect_url)
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
logger.error(f"Authorization error: {e}")
error_params = {
"error": "server_error",
"error_description": "Internal server error"
}
if state:
error_params["state"] = state
if redirect_uri:
error_url = f"{redirect_uri}?{urlencode(error_params)}"
return RedirectResponse(url=error_url)
else:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=error_params)
@router.post("/token", response_model=TokenResponse)
async def _handle_authorization_code_grant(
final_client_id: str,
final_client_secret: str,
code: Optional[str],
redirect_uri: Optional[str]
) -> TokenResponse:
"""Handle OAuth authorization_code grant type."""
if not code:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "invalid_request",
"error_description": "Missing required parameter: code"
}
)
if not final_client_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "invalid_request",
"error_description": "Missing required parameter: client_id"
}
)
# Authenticate client
if not await oauth_storage.authenticate_client(final_client_id, final_client_secret or ""):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={
"error": "invalid_client",
"error_description": "Client authentication failed"
}
)
# Get and consume authorization code
code_data = await oauth_storage.get_authorization_code(code)
if not code_data:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "invalid_grant",
"error_description": "Invalid or expired authorization code"
}
)
# Validate client_id matches
if code_data["client_id"] != final_client_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "invalid_grant",
"error_description": "Authorization code was issued to a different client"
}
)
# Validate redirect_uri if provided
if redirect_uri and code_data["redirect_uri"] != redirect_uri:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "invalid_grant",
"error_description": "redirect_uri does not match the one used in authorization request"
}
)
# Create access token
access_token, expires_in = create_access_token(final_client_id, code_data["scope"])
# Store access token for validation
await oauth_storage.store_access_token(
token=access_token,
client_id=final_client_id,
scope=code_data["scope"],
expires_in=expires_in
)
logger.info(f"Access token issued for client_id={final_client_id}")
return TokenResponse(
access_token=access_token,
token_type="Bearer",
expires_in=expires_in,
scope=code_data["scope"]
)
async def _handle_client_credentials_grant(
final_client_id: str,
final_client_secret: str
) -> TokenResponse:
"""Handle OAuth client_credentials grant type."""
if not final_client_id or not final_client_secret:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "invalid_request",
"error_description": "Missing required parameters: client_id and client_secret"
}
)
# Authenticate client
if not await oauth_storage.authenticate_client(final_client_id, final_client_secret):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={
"error": "invalid_client",
"error_description": "Client authentication failed"
}
)
# Create access token
access_token, expires_in = create_access_token(final_client_id, "read write")
# Store access token
await oauth_storage.store_access_token(
token=access_token,
client_id=final_client_id,
scope="read write",
expires_in=expires_in
)
logger.info(f"Client credentials token issued for client_id={final_client_id}")
return TokenResponse(
access_token=access_token,
token_type="Bearer",
expires_in=expires_in,
scope="read write"
)
async def token(
request: Request,
grant_type: str = Form(..., description="OAuth grant type"),
code: Optional[str] = Form(None, description="Authorization code"),
redirect_uri: Optional[str] = Form(None, description="Redirection URI"),
client_id: Optional[str] = Form(None, description="OAuth client identifier"),
client_secret: Optional[str] = Form(None, description="OAuth client secret")
):
"""
OAuth 2.1 Token endpoint.
Exchanges authorization codes for access tokens.
Supports both authorization_code and client_credentials grant types.
Supports both client_secret_post (form data) and client_secret_basic (HTTP Basic auth).
"""
# Extract client credentials from either HTTP Basic auth or form data
auth_header = request.headers.get('authorization')
basic_client_id, basic_client_secret = parse_basic_auth(auth_header)
# Use Basic auth credentials if available, otherwise fall back to form data
final_client_id = basic_client_id or client_id
final_client_secret = basic_client_secret or client_secret
auth_method = "client_secret_basic" if basic_client_id else "client_secret_post"
logger.info(f"Token request: grant_type={grant_type}, client_id={final_client_id}, auth_method={auth_method}")
try:
if grant_type == "authorization_code":
return await _handle_authorization_code_grant(
final_client_id, final_client_secret, code, redirect_uri
)
elif grant_type == "client_credentials":
return await _handle_client_credentials_grant(
final_client_id, final_client_secret
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"error": "unsupported_grant_type",
"error_description": f"Grant type '{grant_type}' is not supported"
}
)
except HTTPException:
# Re-raise HTTP exceptions
raise
except Exception as e:
logger.error(f"Token endpoint error: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"error": "server_error",
"error_description": "Internal server error"
}
)
```
--------------------------------------------------------------------------------
/scripts/maintenance/repair_zero_embeddings.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Enhanced repair script to fix zero vector embeddings in SQLite-vec databases.
This script detects and repairs embeddings that are all zeros (invalid) and
regenerates them with proper sentence transformer embeddings.
"""
import asyncio
import os
import sys
import sqlite3
import logging
import numpy as np
from typing import List, Tuple
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
try:
import sqlite_vec
from sqlite_vec import serialize_float32
SQLITE_VEC_AVAILABLE = True
except ImportError:
SQLITE_VEC_AVAILABLE = False
try:
from sentence_transformers import SentenceTransformer
SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
SENTENCE_TRANSFORMERS_AVAILABLE = False
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ZeroEmbeddingRepair:
"""Repair SQLite-vec database with zero vector embeddings."""
def __init__(self, db_path: str, model_name: str = "all-MiniLM-L6-v2"):
self.db_path = db_path
self.model_name = model_name
self.conn = None
self.model = None
self.embedding_dimension = 384 # Default for all-MiniLM-L6-v2
def check_dependencies(self):
"""Check required dependencies."""
print("Checking dependencies...")
if not SQLITE_VEC_AVAILABLE:
print("❌ sqlite-vec not installed. Run: pip install sqlite-vec")
return False
if not SENTENCE_TRANSFORMERS_AVAILABLE:
print("❌ sentence-transformers not installed. Run: pip install sentence-transformers torch")
return False
print("✅ All dependencies available")
return True
def connect_database(self):
"""Connect to the database."""
print(f"\nConnecting to database: {self.db_path}")
if not os.path.exists(self.db_path):
raise FileNotFoundError(f"Database not found: {self.db_path}")
self.conn = sqlite3.connect(self.db_path)
self.conn.enable_load_extension(True)
sqlite_vec.load(self.conn)
self.conn.enable_load_extension(False)
print("✅ Connected to database")
def analyze_database(self) -> dict:
"""Analyze the current state of the database including zero embeddings."""
print("\nAnalyzing database...")
analysis = {
"memory_count": 0,
"embedding_count": 0,
"missing_embeddings": 0,
"zero_embeddings": 0,
"valid_embeddings": 0,
"embedding_dimension": None,
"issues": []
}
# Count memories
cursor = self.conn.execute("SELECT COUNT(*) FROM memories")
analysis["memory_count"] = cursor.fetchone()[0]
# Count embeddings
cursor = self.conn.execute("SELECT COUNT(*) FROM memory_embeddings")
analysis["embedding_count"] = cursor.fetchone()[0]
# Check embedding dimension
cursor = self.conn.execute("""
SELECT sql FROM sqlite_master
WHERE type='table' AND name='memory_embeddings'
""")
schema = cursor.fetchone()
if schema:
import re
match = re.search(r'FLOAT\\[(\\d+)\\]', schema[0])
if match:
analysis["embedding_dimension"] = int(match.group(1))
# Find memories without embeddings
cursor = self.conn.execute("""
SELECT COUNT(*) FROM memories m
WHERE NOT EXISTS (
SELECT 1 FROM memory_embeddings e WHERE e.rowid = m.id
)
""")
analysis["missing_embeddings"] = cursor.fetchone()[0]
# Check for zero embeddings
print(" Checking for zero vector embeddings...")
cursor = self.conn.execute("""
SELECT e.rowid, e.content_embedding, m.content
FROM memory_embeddings e
INNER JOIN memories m ON m.id = e.rowid
""")
zero_count = 0
valid_count = 0
for row in cursor.fetchall():
rowid, embedding_blob, content = row
if embedding_blob and len(embedding_blob) > 0:
try:
# Convert to numpy array
embedding_array = np.frombuffer(embedding_blob, dtype=np.float32)
# Check if all zeros
if np.allclose(embedding_array, 0):
zero_count += 1
logger.debug(f"Zero embedding found for memory {rowid}: {content[:50]}...")
else:
valid_count += 1
except Exception as e:
logger.warning(f"Failed to parse embedding for rowid {rowid}: {e}")
zero_count += 1 # Treat unparseable as invalid
else:
zero_count += 1
analysis["zero_embeddings"] = zero_count
analysis["valid_embeddings"] = valid_count
# Identify issues
if analysis["memory_count"] != analysis["embedding_count"]:
analysis["issues"].append(
f"Mismatch: {analysis['memory_count']} memories vs {analysis['embedding_count']} embeddings"
)
if analysis["missing_embeddings"] > 0:
analysis["issues"].append(
f"Missing embeddings: {analysis['missing_embeddings']} memories have no embeddings"
)
if analysis["zero_embeddings"] > 0:
analysis["issues"].append(
f"Zero vector embeddings: {analysis['zero_embeddings']} embeddings are all zeros (invalid)"
)
print(f" Memories: {analysis['memory_count']}")
print(f" Embeddings: {analysis['embedding_count']}")
print(f" Missing embeddings: {analysis['missing_embeddings']}")
print(f" Zero embeddings: {analysis['zero_embeddings']}")
print(f" Valid embeddings: {analysis['valid_embeddings']}")
print(f" Embedding dimension: {analysis['embedding_dimension']}")
if analysis["issues"]:
print("\n⚠️ Issues found:")
for issue in analysis["issues"]:
print(f" - {issue}")
else:
print("\n✅ No issues found")
return analysis
def load_model(self):
"""Load the embedding model."""
print(f"\nLoading embedding model: {self.model_name}")
self.model = SentenceTransformer(self.model_name)
# Get actual dimension
test_embedding = self.model.encode(["test"], convert_to_numpy=True)
self.embedding_dimension = test_embedding.shape[1]
print(f"✅ Model loaded (dimension: {self.embedding_dimension})")
def regenerate_zero_embeddings(self, analysis: dict) -> int:
"""Regenerate embeddings that are zero vectors."""
if analysis["zero_embeddings"] == 0:
return 0
print(f"\nRegenerating {analysis['zero_embeddings']} zero vector embeddings...")
# Get all memories with zero embeddings
cursor = self.conn.execute("""
SELECT e.rowid, e.content_embedding, m.content
FROM memory_embeddings e
INNER JOIN memories m ON m.id = e.rowid
""")
zero_embeddings = []
for row in cursor.fetchall():
rowid, embedding_blob, content = row
if embedding_blob and len(embedding_blob) > 0:
try:
embedding_array = np.frombuffer(embedding_blob, dtype=np.float32)
if np.allclose(embedding_array, 0):
zero_embeddings.append((rowid, content))
except:
zero_embeddings.append((rowid, content))
else:
zero_embeddings.append((rowid, content))
fixed_count = 0
for rowid, content in zero_embeddings:
try:
# Generate new embedding
embedding = self.model.encode([content], convert_to_numpy=True)[0]
# Validate the new embedding
if not np.allclose(embedding, 0) and np.isfinite(embedding).all():
# Update the embedding in database
self.conn.execute(
"UPDATE memory_embeddings SET content_embedding = ? WHERE rowid = ?",
(serialize_float32(embedding), rowid)
)
fixed_count += 1
# Show progress
if fixed_count % 10 == 0:
print(f" ... {fixed_count}/{len(zero_embeddings)} embeddings regenerated")
else:
logger.error(f"Generated invalid embedding for memory {rowid}")
except Exception as e:
logger.error(f"Failed to regenerate embedding for memory {rowid}: {e}")
self.conn.commit()
print(f"✅ Regenerated {fixed_count} embeddings")
return fixed_count
def verify_search(self) -> bool:
"""Test if semantic search works with proper similarity scores."""
print("\nTesting semantic search with similarity scores...")
try:
# Generate a test query embedding
test_query = "test embedding verification"
query_embedding = self.model.encode([test_query], convert_to_numpy=True)[0]
# Try to search and get distances
cursor = self.conn.execute("""
SELECT m.content, e.distance
FROM memories m
INNER JOIN (
SELECT rowid, distance
FROM memory_embeddings
WHERE content_embedding MATCH ?
ORDER BY distance
LIMIT 3
) e ON m.id = e.rowid
ORDER BY e.distance
""", (serialize_float32(query_embedding),))
results = cursor.fetchall()
if results:
print("✅ Semantic search working with results:")
for i, (content, distance) in enumerate(results, 1):
similarity = max(0.0, 1.0 - distance)
print(f" {i}. Distance: {distance:.6f}, Similarity: {similarity:.6f}")
print(f" Content: {content[:60]}...")
# Check if we have reasonable similarity scores
distances = [result[1] for result in results]
if all(d >= 1.0 for d in distances):
print("⚠️ All distances are >= 1.0, similarities will be 0.0")
return False
else:
print("✅ Found reasonable similarity scores")
return True
else:
print("❌ No results returned")
return False
except Exception as e:
print(f"❌ Semantic search failed: {e}")
return False
def run_repair(self):
"""Run the repair process."""
print("\\n" + "="*60)
print("SQLite-vec Zero Embedding Repair Tool")
print("="*60)
try:
# Check dependencies
if not self.check_dependencies():
return
# Connect to database
self.connect_database()
# Analyze current state
analysis = self.analyze_database()
if not analysis["issues"]:
print("\\n✅ Database appears healthy, no repair needed")
return
# Load model
self.load_model()
# Fix zero embeddings
fixed = self.regenerate_zero_embeddings(analysis)
# Verify search works
search_working = self.verify_search()
# Re-analyze
print("\\nRe-analyzing database after repair...")
new_analysis = self.analyze_database()
print("\\n" + "="*60)
print("Repair Summary")
print("="*60)
print(f"Fixed {fixed} zero vector embeddings")
print(f"Search working: {'✅ Yes' if search_working else '❌ No'}")
if new_analysis["issues"]:
print("\\n⚠️ Some issues remain:")
for issue in new_analysis["issues"]:
print(f" - {issue}")
else:
print("\\n✅ All issues resolved!")
except Exception as e:
print(f"\\n❌ Repair failed: {e}")
logger.exception("Repair failed")
finally:
if self.conn:
self.conn.close()
def main():
"""Run the repair tool."""
if len(sys.argv) < 2:
print("Usage: python repair_zero_embeddings.py <database_path>")
print("\\nExample:")
print(" python repair_zero_embeddings.py ~/.local/share/mcp-memory/sqlite_vec.db")
print("\\nThis tool will:")
print(" - Check for zero vector embeddings (invalid)")
print(" - Regenerate proper embeddings using sentence-transformers")
print(" - Verify semantic search functionality with similarity scores")
sys.exit(1)
db_path = sys.argv[1]
repair = ZeroEmbeddingRepair(db_path)
repair.run_repair()
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/scripts/validation/diagnose_backend_config.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Diagnostic script to troubleshoot backend configuration issues.
This helps identify why Cloudflare backend might not be working.
"""
import os
import sys
from pathlib import Path
# Add src to path for imports
src_path = Path(__file__).parent.parent.parent / "src"
sys.path.insert(0, str(src_path))
def print_separator(title):
print("\n" + "=" * 60)
print(f" {title}")
print("=" * 60)
def print_status(status, message):
"""Print status with simple text indicators."""
if status == "success":
print(f"[OK] {message}")
elif status == "warning":
print(f"[WARN] {message}")
elif status == "error":
print(f"[ERROR] {message}")
else:
print(f"[INFO] {message}")
def check_env_file():
"""Check if .env file exists and what it contains."""
print_separator("ENVIRONMENT FILE CHECK")
project_root = Path(__file__).parent.parent.parent
env_file = project_root / ".env"
if env_file.exists():
print_status("success", f".env file found at: {env_file}")
print("\n.env file contents:")
with open(env_file, 'r') as f:
lines = f.readlines()
for i, line in enumerate(lines, 1):
# Mask sensitive values
if 'TOKEN' in line or 'PASSWORD' in line or 'SECRET' in line:
if '=' in line:
key, _ = line.split('=', 1)
print(f" {i:2d}: {key}=***MASKED***")
else:
print(f" {i:2d}: {line.rstrip()}")
else:
print(f" {i:2d}: {line.rstrip()}")
else:
print_status("error", f"No .env file found at: {env_file}")
return False
return True
def check_environment_variables():
"""Check current environment variables."""
print_separator("ENVIRONMENT VARIABLES CHECK")
# Check if dotenv is available and load .env file
try:
from dotenv import load_dotenv
project_root = Path(__file__).parent.parent.parent
env_file = project_root / ".env"
if env_file.exists():
load_dotenv(env_file)
print_status("success", f"Loaded .env file from: {env_file}")
else:
print_status("info", "No .env file to load")
except ImportError:
print_status("warning", "dotenv not available, skipping .env file loading")
# Core configuration
storage_backend = os.getenv('MCP_MEMORY_STORAGE_BACKEND', 'NOT SET')
print(f"\nCore Configuration:")
print(f" MCP_MEMORY_STORAGE_BACKEND: {storage_backend}")
# Cloudflare variables
cloudflare_vars = {
'CLOUDFLARE_API_TOKEN': 'REQUIRED',
'CLOUDFLARE_ACCOUNT_ID': 'REQUIRED',
'CLOUDFLARE_VECTORIZE_INDEX': 'REQUIRED',
'CLOUDFLARE_D1_DATABASE_ID': 'REQUIRED',
'CLOUDFLARE_R2_BUCKET': 'OPTIONAL',
'CLOUDFLARE_EMBEDDING_MODEL': 'OPTIONAL',
'CLOUDFLARE_LARGE_CONTENT_THRESHOLD': 'OPTIONAL',
'CLOUDFLARE_MAX_RETRIES': 'OPTIONAL',
'CLOUDFLARE_BASE_DELAY': 'OPTIONAL'
}
print(f"\nCloudflare Configuration:")
missing_required = []
for var, requirement in cloudflare_vars.items():
value = os.getenv(var)
if value:
if 'TOKEN' in var:
display_value = f"{value[:8]}***MASKED***"
else:
display_value = value
print_status("success", f"{var}: {display_value} ({requirement})")
else:
display_value = "NOT SET"
if requirement == 'REQUIRED':
print_status("error", f"{var}: {display_value} ({requirement})")
missing_required.append(var)
else:
print_status("warning", f"{var}: {display_value} ({requirement})")
if missing_required:
print_status("error", f"Missing required Cloudflare variables: {', '.join(missing_required)}")
return False
else:
print_status("success", "All required Cloudflare variables are set")
return True
def test_config_import():
"""Test importing the configuration module."""
print_separator("CONFIGURATION MODULE TEST")
try:
print("Attempting to import config module...")
from mcp_memory_service.config import (
STORAGE_BACKEND,
CLOUDFLARE_API_TOKEN,
CLOUDFLARE_ACCOUNT_ID,
CLOUDFLARE_VECTORIZE_INDEX,
CLOUDFLARE_D1_DATABASE_ID
)
print_status("success", "Config import successful")
print(f" Configured Backend: {STORAGE_BACKEND}")
print(f" API Token Set: {'YES' if CLOUDFLARE_API_TOKEN else 'NO'}")
print(f" Account ID: {CLOUDFLARE_ACCOUNT_ID}")
print(f" Vectorize Index: {CLOUDFLARE_VECTORIZE_INDEX}")
print(f" D1 Database ID: {CLOUDFLARE_D1_DATABASE_ID}")
return STORAGE_BACKEND
except SystemExit as e:
print_status("error", f"Config import failed with SystemExit: {e}")
print(" This means required Cloudflare variables are missing")
return None
except Exception as e:
print_status("error", f"Config import failed with error: {e}")
return None
def test_storage_creation():
"""Test creating the storage backend."""
print_separator("STORAGE BACKEND CREATION TEST")
try:
from mcp_memory_service.config import STORAGE_BACKEND
print(f"Attempting to create {STORAGE_BACKEND} storage...")
if STORAGE_BACKEND == 'cloudflare':
from mcp_memory_service.storage.cloudflare import CloudflareStorage
from mcp_memory_service.config import (
CLOUDFLARE_API_TOKEN,
CLOUDFLARE_ACCOUNT_ID,
CLOUDFLARE_VECTORIZE_INDEX,
CLOUDFLARE_D1_DATABASE_ID,
CLOUDFLARE_R2_BUCKET,
CLOUDFLARE_EMBEDDING_MODEL,
CLOUDFLARE_LARGE_CONTENT_THRESHOLD,
CLOUDFLARE_MAX_RETRIES,
CLOUDFLARE_BASE_DELAY
)
storage = CloudflareStorage(
api_token=CLOUDFLARE_API_TOKEN,
account_id=CLOUDFLARE_ACCOUNT_ID,
vectorize_index=CLOUDFLARE_VECTORIZE_INDEX,
d1_database_id=CLOUDFLARE_D1_DATABASE_ID,
r2_bucket=CLOUDFLARE_R2_BUCKET,
embedding_model=CLOUDFLARE_EMBEDDING_MODEL,
large_content_threshold=CLOUDFLARE_LARGE_CONTENT_THRESHOLD,
max_retries=CLOUDFLARE_MAX_RETRIES,
base_delay=CLOUDFLARE_BASE_DELAY
)
print_status("success", "CloudflareStorage instance created successfully")
print(f" Storage class: {storage.__class__.__name__}")
return storage
elif STORAGE_BACKEND == 'sqlite_vec':
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from mcp_memory_service.config import SQLITE_VEC_PATH
storage = SqliteVecMemoryStorage(SQLITE_VEC_PATH)
print_status("success", "SqliteVecMemoryStorage instance created successfully")
print(f" Storage class: {storage.__class__.__name__}")
print(f" Database path: {SQLITE_VEC_PATH}")
return storage
else:
print_status("error", f"Unknown storage backend: {STORAGE_BACKEND}")
return None
except Exception as e:
print_status("error", f"Storage creation failed: {e}")
import traceback
print(f"Full traceback:")
traceback.print_exc()
return None
def _verify_token_endpoint(endpoint_url, endpoint_type, api_token, requests):
"""
Helper function to verify a Cloudflare API token against a specific endpoint.
Args:
endpoint_url: The URL to test the token against
endpoint_type: Description of the endpoint type (e.g., "Account-scoped", "Generic user")
api_token: The Cloudflare API token to verify
requests: The requests module
Returns:
tuple: (success: bool, result: dict or None)
"""
print(f"\nTesting {endpoint_type} token verification...")
try:
response = requests.get(
endpoint_url,
headers={"Authorization": f"Bearer {api_token}"},
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get("success"):
result = data.get("result", {})
print_status("success", f"{endpoint_type} verification successful")
print(f" Token ID: {result.get('id', 'N/A')}")
print(f" Status: {result.get('status', 'N/A')}")
print(f" Expires: {result.get('expires_on', 'N/A')}")
return True, result
else:
errors = data.get("errors", [])
print_status("error", f"{endpoint_type} verification failed")
for error in errors:
print(f" Error {error.get('code')}: {error.get('message')}")
return False, None
else:
print_status("error", f"{endpoint_type} verification failed: HTTP {response.status_code}")
print(f" Response: {response.text}")
return False, None
except Exception as e:
print_status("error", f"{endpoint_type} verification error: {e}")
return False, None
def test_cloudflare_token():
"""Test Cloudflare API token with both endpoints to help identify token type."""
print_separator("CLOUDFLARE TOKEN VERIFICATION")
api_token = os.getenv('CLOUDFLARE_API_TOKEN')
account_id = os.getenv('CLOUDFLARE_ACCOUNT_ID')
if not api_token:
print_status("error", "CLOUDFLARE_API_TOKEN not set, skipping token verification")
return False
if not account_id:
print_status("warning", "CLOUDFLARE_ACCOUNT_ID not set, cannot test account-scoped endpoint")
try:
import requests
except ImportError:
print_status("warning", "requests not available, skipping token verification")
return False
token_verified = False
# Test 1: Account-scoped endpoint (recommended for scoped tokens)
if account_id:
endpoint_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/tokens/verify"
success, _ = _verify_token_endpoint(endpoint_url, "account-scoped", api_token, requests)
if success:
token_verified = True
# Test 2: Generic user endpoint (works for global tokens)
endpoint_url = "https://api.cloudflare.com/client/v4/user/tokens/verify"
success, _ = _verify_token_endpoint(endpoint_url, "generic user", api_token, requests)
if success:
token_verified = True
# Provide guidance
print("\nTOKEN VERIFICATION GUIDANCE:")
if account_id:
print("✅ For account-scoped tokens (recommended), use:")
print(f" curl \"https://api.cloudflare.com/client/v4/accounts/{account_id}/tokens/verify\" \\")
print(f" -H \"Authorization: Bearer YOUR_TOKEN\"")
print("✅ For global tokens (legacy), use:")
print(" curl \"https://api.cloudflare.com/client/v4/user/tokens/verify\" \\")
print(" -H \"Authorization: Bearer YOUR_TOKEN\"")
print("❌ Common mistake: Using wrong endpoint for token type")
print("📖 See docs/troubleshooting/cloudflare-authentication.md for details")
return token_verified
def main():
"""Run all diagnostic tests."""
print("MCP Memory Service Backend Configuration Diagnostics")
print("=" * 60)
# Step 1: Check .env file
check_env_file()
# Step 2: Check environment variables
cloudflare_ready = check_environment_variables()
# Step 3: Test Cloudflare token verification
token_valid = test_cloudflare_token()
# Step 4: Test config import
configured_backend = test_config_import()
# Step 5: Test storage creation if config loaded successfully
if configured_backend:
storage = test_storage_creation()
else:
storage = None
# Final summary
print_separator("DIAGNOSTIC SUMMARY")
if configured_backend == 'cloudflare' and cloudflare_ready and token_valid and storage:
print_status("success", "Cloudflare backend should be working correctly")
print(f" Configuration loaded: {configured_backend}")
print(f" Required variables set: {cloudflare_ready}")
print(f" Token verification: {'PASSED' if token_valid else 'NOT TESTED'}")
print(f" Storage instance created: {storage.__class__.__name__}")
elif configured_backend == 'sqlite_vec' and storage:
print_status("success", "SQLite-vec backend is working")
print(f" Configuration loaded: {configured_backend}")
print(f" Storage instance created: {storage.__class__.__name__}")
if cloudflare_ready:
print_status("warning", "Cloudflare variables are set but backend is sqlite_vec")
print(" Check MCP_MEMORY_STORAGE_BACKEND environment variable")
else:
print_status("error", "Backend configuration has issues")
print(f" Configured backend: {configured_backend or 'FAILED TO LOAD'}")
print(f" Cloudflare variables ready: {cloudflare_ready}")
print(f" Storage created: {'YES' if storage else 'NO'}")
print("\nTROUBLESHOOTING STEPS:")
if not cloudflare_ready:
print(" 1. Set missing Cloudflare environment variables")
print(" 2. Create .env file with Cloudflare credentials")
if not token_valid:
print(" 3. Verify Cloudflare API token is valid and has correct permissions")
print(" 4. Use account-scoped verification endpoint (see docs/troubleshooting/cloudflare-authentication.md)")
if not configured_backend:
print(" 5. Fix environment variable loading issues")
if configured_backend and not storage:
print(" 6. Check Cloudflare credentials and connectivity")
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/consolidation/associations.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Creative association discovery engine for memory connections."""
import random
import numpy as np
from typing import List, Dict, Any, Optional, Tuple, Set
from itertools import combinations
from datetime import datetime
from dataclasses import dataclass
import re
from .base import ConsolidationBase, ConsolidationConfig, MemoryAssociation
from ..models.memory import Memory
@dataclass
class AssociationAnalysis:
"""Analysis results for a potential memory association."""
memory1_hash: str
memory2_hash: str
similarity_score: float
connection_reasons: List[str]
shared_concepts: List[str]
temporal_relationship: Optional[str]
tag_overlap: List[str]
confidence_score: float
class CreativeAssociationEngine(ConsolidationBase):
"""
Discovers creative connections between seemingly unrelated memories.
Similar to how dreams create unexpected associations, this engine randomly
pairs memories to discover non-obvious connections in the "sweet spot"
of moderate similarity (0.3-0.7 range).
"""
def __init__(self, config: ConsolidationConfig):
super().__init__(config)
self.min_similarity = config.min_similarity
self.max_similarity = config.max_similarity
self.max_pairs_per_run = config.max_pairs_per_run
# Compile regex patterns for concept extraction
self._concept_patterns = {
'urls': re.compile(r'https?://[^\s]+'),
'emails': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
'dates': re.compile(r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b'),
'numbers': re.compile(r'\b\d+\.?\d*\b'),
'camelCase': re.compile(r'\b[a-z]+[A-Z][a-zA-Z]*\b'),
'PascalCase': re.compile(r'\b[A-Z][a-z]*[A-Z][a-zA-Z]*\b'),
'acronyms': re.compile(r'\b[A-Z]{2,}\b')
}
async def process(self, memories: List[Memory], **kwargs) -> List[MemoryAssociation]:
"""Discover creative associations between memories."""
if not self._validate_memories(memories) or len(memories) < 2:
return []
# Get existing associations to avoid duplicates
existing_associations = kwargs.get('existing_associations', set())
# Sample memory pairs for analysis
pairs = self._sample_memory_pairs(memories)
associations = []
for mem1, mem2 in pairs:
# Skip if association already exists
pair_key = tuple(sorted([mem1.content_hash, mem2.content_hash]))
if pair_key in existing_associations:
continue
# Calculate semantic similarity
similarity = await self._calculate_semantic_similarity(mem1, mem2)
# Check if similarity is in the "sweet spot" for creative connections
if self.min_similarity <= similarity <= self.max_similarity:
analysis = await self._analyze_association(mem1, mem2, similarity)
if analysis.confidence_score > 0.3: # Minimum confidence threshold
association = await self._create_association_memory(analysis)
associations.append(association)
self.logger.info(f"Discovered {len(associations)} creative associations from {len(pairs)} pairs")
return associations
def _sample_memory_pairs(self, memories: List[Memory]) -> List[Tuple[Memory, Memory]]:
"""Sample random pairs of memories for association discovery."""
# Calculate maximum possible pairs
total_possible = len(memories) * (len(memories) - 1) // 2
max_pairs = min(self.max_pairs_per_run, total_possible)
if total_possible <= max_pairs:
# Return all possible pairs if total is manageable
return list(combinations(memories, 2))
else:
# Randomly sample pairs to prevent combinatorial explosion
all_pairs = list(combinations(memories, 2))
return random.sample(all_pairs, max_pairs)
async def _calculate_semantic_similarity(self, mem1: Memory, mem2: Memory) -> float:
"""Calculate semantic similarity between two memories using embeddings."""
if not mem1.embedding or not mem2.embedding:
# Fallback to text-based similarity if embeddings unavailable
return self._calculate_text_similarity(mem1.content, mem2.content)
# Use cosine similarity for embeddings
embedding1 = np.array(mem1.embedding)
embedding2 = np.array(mem2.embedding)
# Normalize embeddings
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
if norm1 == 0 or norm2 == 0:
return 0.0
# Calculate cosine similarity
similarity = np.dot(embedding1, embedding2) / (norm1 * norm2)
# Convert to 0-1 range (cosine similarity can be -1 to 1)
return (similarity + 1) / 2
def _calculate_text_similarity(self, text1: str, text2: str) -> float:
"""Fallback text similarity using word overlap."""
words1 = set(text1.lower().split())
words2 = set(text2.lower().split())
if not words1 or not words2:
return 0.0
intersection = len(words1.intersection(words2))
union = len(words1.union(words2))
return intersection / union if union > 0 else 0.0
async def _analyze_association(
self,
mem1: Memory,
mem2: Memory,
similarity: float
) -> AssociationAnalysis:
"""Analyze why two memories might be associated."""
connection_reasons = []
shared_concepts = []
tag_overlap = []
temporal_relationship = None
# Analyze tag overlap
tags1 = set(mem1.tags)
tags2 = set(mem2.tags)
tag_overlap = list(tags1.intersection(tags2))
if tag_overlap:
connection_reasons.append("shared_tags")
# Analyze temporal relationship
temporal_relationship = self._analyze_temporal_relationship(mem1, mem2)
if temporal_relationship:
connection_reasons.append("temporal_proximity")
# Extract and compare concepts
concepts1 = self._extract_concepts(mem1.content)
concepts2 = self._extract_concepts(mem2.content)
shared_concepts = list(concepts1.intersection(concepts2))
if shared_concepts:
connection_reasons.append("shared_concepts")
# Analyze content patterns
if self._has_similar_structure(mem1.content, mem2.content):
connection_reasons.append("similar_structure")
if self._has_complementary_content(mem1.content, mem2.content):
connection_reasons.append("complementary_content")
# Calculate confidence score based on multiple factors
confidence_score = self._calculate_confidence_score(
similarity, len(connection_reasons), len(shared_concepts), len(tag_overlap)
)
return AssociationAnalysis(
memory1_hash=mem1.content_hash,
memory2_hash=mem2.content_hash,
similarity_score=similarity,
connection_reasons=connection_reasons,
shared_concepts=shared_concepts,
temporal_relationship=temporal_relationship,
tag_overlap=tag_overlap,
confidence_score=confidence_score
)
def _extract_concepts(self, text: str) -> Set[str]:
"""Extract key concepts from text using various patterns."""
concepts = set()
# Extract different types of concepts
for concept_type, pattern in self._concept_patterns.items():
matches = pattern.findall(text)
concepts.update(matches)
# Extract capitalized words (potential proper nouns)
capitalized_words = re.findall(r'\b[A-Z][a-z]+\b', text)
concepts.update(capitalized_words)
# Extract quoted phrases
quoted_phrases = re.findall(r'"([^"]*)"', text)
concepts.update(quoted_phrases)
# Extract common important words (filter out common stop words)
stop_words = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
words = re.findall(r'\b\w{4,}\b', text.lower()) # Words with 4+ characters
important_words = [word for word in words if word not in stop_words]
concepts.update(important_words[:10]) # Limit to top 10 words
return concepts
def _analyze_temporal_relationship(self, mem1: Memory, mem2: Memory) -> Optional[str]:
"""Analyze temporal relationship between memories."""
if not (mem1.created_at and mem2.created_at):
return None
time_diff = abs(mem1.created_at - mem2.created_at)
days_diff = time_diff / (24 * 3600) # Convert to days
if days_diff < 1:
return "same_day"
elif days_diff < 7:
return "same_week"
elif days_diff < 30:
return "same_month"
elif days_diff < 365:
return "same_year"
else:
return "different_years"
def _has_similar_structure(self, text1: str, text2: str) -> bool:
"""Check if texts have similar structural patterns."""
# Check for similar formatting patterns
patterns = [
r'\n\s*[-*+]\s+', # List items
r'\n\s*\d+\.\s+', # Numbered lists
r'\n#{1,6}\s+', # Headers
r'```[\s\S]*?```', # Code blocks
r'\[.*?\]\(.*?\)', # Links
]
for pattern in patterns:
matches1 = len(re.findall(pattern, text1))
matches2 = len(re.findall(pattern, text2))
if matches1 > 0 and matches2 > 0:
return True
return False
def _has_complementary_content(self, text1: str, text2: str) -> bool:
"""Check if texts contain complementary information."""
# Look for question-answer patterns
has_question1 = bool(re.search(r'\?', text1))
has_question2 = bool(re.search(r'\?', text2))
# If one has questions and the other doesn't, they might be complementary
if has_question1 != has_question2:
return True
# Look for problem-solution patterns
problem_words = ['problem', 'issue', 'error', 'bug', 'fail', 'wrong']
solution_words = ['solution', 'fix', 'resolve', 'answer', 'correct', 'solve']
has_problem1 = any(word in text1.lower() for word in problem_words)
has_solution1 = any(word in text1.lower() for word in solution_words)
has_problem2 = any(word in text2.lower() for word in problem_words)
has_solution2 = any(word in text2.lower() for word in solution_words)
# Complementary if one focuses on problems, other on solutions
if (has_problem1 and has_solution2) or (has_solution1 and has_problem2):
return True
return False
def _calculate_confidence_score(
self,
similarity: float,
num_reasons: int,
num_shared_concepts: int,
num_shared_tags: int
) -> float:
"""Calculate confidence score for the association."""
base_score = similarity
# Boost for multiple connection reasons
reason_boost = min(0.3, num_reasons * 0.1)
# Boost for shared concepts
concept_boost = min(0.2, num_shared_concepts * 0.05)
# Boost for shared tags
tag_boost = min(0.2, num_shared_tags * 0.1)
total_score = base_score + reason_boost + concept_boost + tag_boost
return min(1.0, total_score)
async def _create_association_memory(self, analysis: AssociationAnalysis) -> MemoryAssociation:
"""Create a memory association from analysis results."""
return MemoryAssociation(
source_memory_hashes=[analysis.memory1_hash, analysis.memory2_hash],
similarity_score=analysis.similarity_score,
connection_type=', '.join(analysis.connection_reasons),
discovery_method="creative_association",
discovery_date=datetime.now(),
metadata={
"shared_concepts": analysis.shared_concepts,
"temporal_relationship": analysis.temporal_relationship,
"tag_overlap": analysis.tag_overlap,
"confidence_score": analysis.confidence_score,
"analysis_version": "1.0"
}
)
async def filter_high_confidence_associations(
self,
associations: List[MemoryAssociation],
min_confidence: float = 0.5
) -> List[MemoryAssociation]:
"""Filter associations by confidence score."""
return [
assoc for assoc in associations
if assoc.metadata.get('confidence_score', 0) >= min_confidence
]
async def group_associations_by_type(
self,
associations: List[MemoryAssociation]
) -> Dict[str, List[MemoryAssociation]]:
"""Group associations by their connection type."""
groups = {}
for assoc in associations:
conn_type = assoc.connection_type
if conn_type not in groups:
groups[conn_type] = []
groups[conn_type].append(assoc)
return groups
```
--------------------------------------------------------------------------------
/scripts/archive/check_missing_timestamps.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Check for memories without timestamps in the MCP memory service database.
This script analyzes the storage backend for entries missing timestamp data.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
import asyncio
import json
from typing import List, Dict, Any
from datetime import datetime
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from mcp_memory_service.models.memory import Memory
class TimestampAnalyzer:
"""Analyze memory database for missing timestamp entries."""
def __init__(self, storage_backend: str = "sqlite_vec", db_path: str = None):
self.storage_backend = storage_backend
self.db_path = db_path or os.path.expanduser("~/.mcp_memory_service/storage.db")
self.storage = None
async def setup(self):
"""Initialize storage backend."""
print(f"=== Analyzing {self.storage_backend} database for timestamp issues ===")
print(f"Database path: {self.db_path}")
if not os.path.exists(self.db_path):
print(f"❌ Database file not found: {self.db_path}")
print("Possible locations:")
common_paths = [
os.path.expanduser("~/.mcp_memory_service/storage.db"),
os.path.expanduser("~/.mcp_memory_service/memory.db"),
"storage.db",
"memory.db"
]
for path in common_paths:
if os.path.exists(path):
print(f" ✅ Found: {path}")
else:
print(f" ❌ Not found: {path}")
return False
try:
self.storage = SqliteVecMemoryStorage(
db_path=self.db_path,
embedding_model="all-MiniLM-L6-v2"
)
await self.storage.initialize()
print("✅ Storage initialized successfully")
return True
except Exception as e:
print(f"❌ Failed to initialize storage: {e}")
return False
async def get_all_memories(self) -> List[Memory]:
"""Retrieve all memories from the database."""
try:
# SQLite-Vec has a limit of 4096 for k in knn queries, so use direct database access
return await self.get_memories_direct_query()
except Exception as e:
print(f"❌ Error with direct query, trying search approach: {e}")
return await self.get_memories_via_search()
async def get_memories_direct_query(self) -> List[Memory]:
"""Get all memories using direct database queries."""
import sqlite3
memories = []
try:
# Connect directly to SQLite database
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
# Get all memory records
cursor = conn.execute("""
SELECT id, content, content_hash, tags, memory_type,
created_at, created_at_iso, updated_at, updated_at_iso, metadata
FROM memories
ORDER BY created_at DESC
""")
rows = cursor.fetchall()
print(f"📊 Found {len(rows)} memory records in database")
for i, row in enumerate(rows):
try:
# Safely parse JSON fields
tags = []
if row['tags']:
try:
tags = json.loads(row['tags'])
except (json.JSONDecodeError, TypeError):
tags = []
metadata = {}
if row['metadata']:
try:
metadata = json.loads(row['metadata'])
except (json.JSONDecodeError, TypeError):
metadata = {}
# Reconstruct Memory object from database row
memory_dict = {
'content': row['content'] or '',
'content_hash': row['content_hash'] or '',
'tags': tags,
'memory_type': row['memory_type'] or 'unknown',
'created_at': row['created_at'],
'created_at_iso': row['created_at_iso'],
'updated_at': row['updated_at'],
'updated_at_iso': row['updated_at_iso'],
'metadata': metadata
}
memory = Memory.from_dict(memory_dict)
memories.append(memory)
except Exception as e:
print(f"⚠️ Error processing memory {i+1}: {e}")
# Continue processing other memories
continue
conn.close()
return memories
except Exception as e:
print(f"❌ Direct query failed: {e}")
if 'conn' in locals():
conn.close()
return []
async def get_memories_via_search(self) -> List[Memory]:
"""Get memories using search with smaller batches."""
memories = []
try:
# Try different search approaches with smaller limits
search_queries = ["", "memory", "note", "session"]
for query in search_queries:
try:
results = await self.storage.retrieve(query, n_results=1000) # Well under 4096 limit
batch_memories = [result.memory for result in results]
# Deduplicate based on content_hash
existing_hashes = {m.content_hash for m in memories}
new_memories = [m for m in batch_memories if m.content_hash not in existing_hashes]
memories.extend(new_memories)
print(f"📊 Query '{query}': {len(batch_memories)} results, {len(new_memories)} new")
except Exception as e:
print(f"⚠️ Query '{query}' failed: {e}")
continue
print(f"📊 Total unique memories retrieved: {len(memories)}")
return memories
except Exception as e:
print(f"❌ All search approaches failed: {e}")
return []
def analyze_timestamp_fields(self, memories: List[Memory]) -> Dict[str, Any]:
"""Analyze timestamp fields across all memories."""
analysis = {
"total_memories": len(memories),
"missing_created_at": 0,
"missing_created_at_iso": 0,
"missing_both_timestamps": 0,
"invalid_timestamps": 0,
"problematic_memories": [],
"timestamp_formats": set(),
"timestamp_range": {"earliest": None, "latest": None}
}
for memory in memories:
has_created_at = memory.created_at is not None
has_created_at_iso = memory.created_at_iso is not None
# Track missing timestamp fields
if not has_created_at:
analysis["missing_created_at"] += 1
if not has_created_at_iso:
analysis["missing_created_at_iso"] += 1
if not has_created_at and not has_created_at_iso:
analysis["missing_both_timestamps"] += 1
analysis["problematic_memories"].append({
"content_hash": memory.content_hash,
"content_preview": memory.content[:100] + "..." if len(memory.content) > 100 else memory.content,
"tags": memory.tags,
"memory_type": memory.memory_type,
"issue": "missing_both_timestamps"
})
# Track timestamp formats and ranges
if has_created_at_iso:
analysis["timestamp_formats"].add(type(memory.created_at_iso).__name__)
if has_created_at:
try:
if analysis["timestamp_range"]["earliest"] is None or memory.created_at < analysis["timestamp_range"]["earliest"]:
analysis["timestamp_range"]["earliest"] = memory.created_at
if analysis["timestamp_range"]["latest"] is None or memory.created_at > analysis["timestamp_range"]["latest"]:
analysis["timestamp_range"]["latest"] = memory.created_at
except:
analysis["invalid_timestamps"] += 1
analysis["problematic_memories"].append({
"content_hash": memory.content_hash,
"content_preview": memory.content[:100] + "..." if len(memory.content) > 100 else memory.content,
"created_at": str(memory.created_at),
"issue": "invalid_timestamp"
})
# Convert set to list for JSON serialization
analysis["timestamp_formats"] = list(analysis["timestamp_formats"])
return analysis
def print_analysis_report(self, analysis: Dict[str, Any]):
"""Print a detailed analysis report."""
print("\n" + "="*70)
print("TIMESTAMP ANALYSIS REPORT")
print("="*70)
total = analysis["total_memories"]
print(f"\n📊 OVERVIEW:")
print(f" Total memories analyzed: {total}")
print(f" Missing created_at (float): {analysis['missing_created_at']}")
print(f" Missing created_at_iso (ISO string): {analysis['missing_created_at_iso']}")
print(f" Missing both timestamps: {analysis['missing_both_timestamps']}")
print(f" Invalid timestamp values: {analysis['invalid_timestamps']}")
if total > 0:
print(f"\n📈 PERCENTAGES:")
print(f" Missing created_at: {analysis['missing_created_at']/total*100:.1f}%")
print(f" Missing created_at_iso: {analysis['missing_created_at_iso']/total*100:.1f}%")
print(f" Missing both: {analysis['missing_both_timestamps']/total*100:.1f}%")
print(f" Invalid timestamps: {analysis['invalid_timestamps']/total*100:.1f}%")
print(f"\n🕐 TIMESTAMP RANGE:")
if analysis["timestamp_range"]["earliest"] and analysis["timestamp_range"]["latest"]:
earliest = datetime.fromtimestamp(analysis["timestamp_range"]["earliest"])
latest = datetime.fromtimestamp(analysis["timestamp_range"]["latest"])
print(f" Earliest: {earliest} ({analysis['timestamp_range']['earliest']})")
print(f" Latest: {latest} ({analysis['timestamp_range']['latest']})")
else:
print(" No valid timestamps found")
print(f"\n📝 TIMESTAMP FORMATS DETECTED:")
for fmt in analysis["timestamp_formats"]:
print(f" - {fmt}")
if analysis["problematic_memories"]:
print(f"\n⚠️ PROBLEMATIC MEMORIES ({len(analysis['problematic_memories'])}):")
for i, memory in enumerate(analysis["problematic_memories"][:10]): # Show first 10
print(f" {i+1}. Issue: {memory['issue']}")
print(f" Content: {memory['content_preview']}")
print(f" Hash: {memory['content_hash']}")
if 'tags' in memory:
print(f" Tags: {memory.get('tags', [])}")
print()
if len(analysis["problematic_memories"]) > 10:
print(f" ... and {len(analysis['problematic_memories']) - 10} more")
# Health assessment
print(f"\n🏥 DATABASE HEALTH ASSESSMENT:")
if analysis["missing_both_timestamps"] == 0:
print(" ✅ EXCELLENT: All memories have at least one timestamp field")
elif analysis["missing_both_timestamps"] < total * 0.1:
print(f" ⚠️ GOOD: Only {analysis['missing_both_timestamps']} memories missing all timestamps")
elif analysis["missing_both_timestamps"] < total * 0.5:
print(f" ⚠️ CONCERNING: {analysis['missing_both_timestamps']} memories missing all timestamps")
else:
print(f" ❌ CRITICAL: {analysis['missing_both_timestamps']} memories missing all timestamps")
if analysis["missing_created_at"] > 0 or analysis["missing_created_at_iso"] > 0:
print(" 💡 RECOMMENDATION: Run timestamp migration script to fix missing fields")
async def run_analysis(self):
"""Run the complete timestamp analysis."""
if not await self.setup():
return False
memories = await self.get_all_memories()
if not memories:
print("⚠️ No memories found in database")
return False
analysis = self.analyze_timestamp_fields(memories)
self.print_analysis_report(analysis)
# Save detailed report to file
report_file = "timestamp_analysis_report.json"
with open(report_file, 'w') as f:
# Convert any datetime objects to strings for JSON serialization
json_analysis = analysis.copy()
if json_analysis["timestamp_range"]["earliest"]:
json_analysis["timestamp_range"]["earliest_iso"] = datetime.fromtimestamp(json_analysis["timestamp_range"]["earliest"]).isoformat()
if json_analysis["timestamp_range"]["latest"]:
json_analysis["timestamp_range"]["latest_iso"] = datetime.fromtimestamp(json_analysis["timestamp_range"]["latest"]).isoformat()
json.dump(json_analysis, f, indent=2, default=str)
print(f"\n📄 Detailed report saved to: {report_file}")
return analysis["missing_both_timestamps"] == 0
async def main():
"""Main analysis execution."""
import argparse
parser = argparse.ArgumentParser(description="Check for memories without timestamps")
parser.add_argument("--db-path", help="Path to database file")
parser.add_argument("--storage", default="sqlite_vec", choices=["sqlite_vec"],
help="Storage backend to analyze")
args = parser.parse_args()
analyzer = TimestampAnalyzer(
storage_backend=args.storage,
db_path=args.db_path
)
success = await analyzer.run_analysis()
return 0 if success else 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/scripts/installation/install_windows_service.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Windows Service installer for MCP Memory Service.
Installs the service using Python's Windows service capabilities.
"""
import os
import sys
import json
import argparse
import subprocess
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from scripts.service_utils import (
get_project_root, get_service_paths, get_service_environment,
generate_api_key, save_service_config, load_service_config,
check_dependencies, get_service_command, print_service_info,
require_admin
)
except ImportError as e:
print(f"Error importing service utilities: {e}")
print("Please ensure you're running this from the project directory")
sys.exit(1)
SERVICE_NAME = "MCPMemoryService"
SERVICE_DISPLAY_NAME = "MCP Memory Service"
SERVICE_DESCRIPTION = "Semantic memory and persistent storage service for Claude Desktop"
def create_windows_service_script():
"""Create the Windows service wrapper script."""
paths = get_service_paths()
service_script = paths['scripts_dir'] / 'mcp_memory_windows_service.py'
script_content = '''#!/usr/bin/env python3
"""
Windows Service wrapper for MCP Memory Service.
This script runs as a Windows service and manages the MCP Memory server process.
"""
import os
import sys
import time
import subprocess
import win32serviceutil
import win32service
import win32event
import servicemanager
import socket
import json
from pathlib import Path
class MCPMemoryService(win32serviceutil.ServiceFramework):
_svc_name_ = "MCPMemoryService"
_svc_display_name_ = "MCP Memory Service"
_svc_description_ = "Semantic memory and persistent storage service for Claude Desktop"
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__(self, args)
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
socket.setdefaulttimeout(60)
self.is_running = True
self.process = None
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
self.is_running = False
# Stop the subprocess
if self.process:
self.process.terminate()
try:
self.process.wait(timeout=10)
except subprocess.TimeoutExpired:
self.process.kill()
def SvcDoRun(self):
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STARTED,
(self._svc_name_, '')
)
self.main()
def main(self):
# Load service configuration
config_dir = Path.home() / '.mcp_memory_service'
config_file = config_dir / 'service_config.json'
if not config_file.exists():
servicemanager.LogErrorMsg("Service configuration not found")
return
with open(config_file, 'r') as f:
config = json.load(f)
# Set up environment
env = os.environ.copy()
env.update(config['environment'])
# Start the service process
try:
self.process = subprocess.Popen(
config['command'],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
0,
"MCP Memory Service process started"
)
# Monitor the process
while self.is_running:
if self.process.poll() is not None:
# Process died, log error and restart
stdout, stderr = self.process.communicate()
servicemanager.LogErrorMsg(
f"Service process died unexpectedly. stderr: {stderr}"
)
# Wait a bit before restarting
time.sleep(5)
# Restart the process
self.process = subprocess.Popen(
config['command'],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Check if we should stop
if win32event.WaitForSingleObject(self.hWaitStop, 1000) == win32event.WAIT_OBJECT_0:
break
except Exception as e:
servicemanager.LogErrorMsg(f"Error in service: {str(e)}")
if __name__ == '__main__':
if len(sys.argv) == 1:
servicemanager.Initialize()
servicemanager.PrepareToHostSingle(MCPMemoryService)
servicemanager.StartServiceCtrlDispatcher()
else:
win32serviceutil.HandleCommandLine(MCPMemoryService)
'''
with open(service_script, 'w') as f:
f.write(script_content)
return service_script
def create_batch_scripts():
"""Create convenient batch scripts for service management."""
paths = get_service_paths()
scripts_dir = paths['scripts_dir'] / 'windows'
scripts_dir.mkdir(exist_ok=True)
# Start service batch file
start_script = scripts_dir / 'start_service.bat'
with open(start_script, 'w') as f:
f.write(f'''@echo off
echo Starting {SERVICE_DISPLAY_NAME}...
net start {SERVICE_NAME}
if %ERRORLEVEL% == 0 (
echo Service started successfully!
) else (
echo Failed to start service. Run as Administrator if needed.
)
pause
''')
# Stop service batch file
stop_script = scripts_dir / 'stop_service.bat'
with open(stop_script, 'w') as f:
f.write(f'''@echo off
echo Stopping {SERVICE_DISPLAY_NAME}...
net stop {SERVICE_NAME}
if %ERRORLEVEL% == 0 (
echo Service stopped successfully!
) else (
echo Failed to stop service. Run as Administrator if needed.
)
pause
''')
# Status batch file
status_script = scripts_dir / 'service_status.bat'
with open(status_script, 'w') as f:
f.write(f'''@echo off
echo Checking {SERVICE_DISPLAY_NAME} status...
sc query {SERVICE_NAME}
pause
''')
# Uninstall batch file
uninstall_script = scripts_dir / 'uninstall_service.bat'
with open(uninstall_script, 'w') as f:
f.write(f'''@echo off
echo This will uninstall {SERVICE_DISPLAY_NAME}.
echo.
set /p confirm="Are you sure? (Y/N): "
if /i "%confirm%" neq "Y" exit /b
echo Stopping service...
net stop {SERVICE_NAME} 2>nul
echo Uninstalling service...
python "{paths['scripts_dir'] / 'install_windows_service.py'}" --uninstall
pause
''')
return scripts_dir
def install_service():
"""Install the Windows service."""
# Check if pywin32 is installed
try:
import win32serviceutil
import win32service
except ImportError:
print("\n❌ ERROR: pywin32 is required for Windows service installation")
print("Please install it with: pip install pywin32")
sys.exit(1)
# Require administrator privileges
require_admin("Administrator privileges are required to install Windows services")
print("\n🔍 Checking dependencies...")
deps_ok, deps_msg = check_dependencies()
if not deps_ok:
print(f"❌ {deps_msg}")
sys.exit(1)
print(f"✅ {deps_msg}")
# Generate API key
api_key = generate_api_key()
print(f"\n🔑 Generated API key: {api_key}")
# Create service configuration
config = {
'service_name': SERVICE_NAME,
'api_key': api_key,
'command': get_service_command(),
'environment': get_service_environment()
}
config['environment']['MCP_API_KEY'] = api_key
# Save configuration
config_file = save_service_config(config)
print(f"💾 Saved configuration to: {config_file}")
# Create service wrapper script
print("\n📝 Creating service wrapper...")
service_script = create_windows_service_script()
# Install the service using the wrapper
print(f"\n🚀 Installing {SERVICE_DISPLAY_NAME}...")
try:
# First, try to stop and remove existing service
subprocess.run([
sys.executable, str(service_script), 'stop'
], capture_output=True)
subprocess.run([
sys.executable, str(service_script), 'remove'
], capture_output=True)
# Install the service
result = subprocess.run([
sys.executable, str(service_script), 'install'
], capture_output=True, text=True)
if result.returncode != 0:
print(f"❌ Failed to install service: {result.stderr}")
sys.exit(1)
# Configure service for automatic startup
subprocess.run([
'sc', 'config', SERVICE_NAME, 'start=', 'auto'
], capture_output=True)
# Set service description
subprocess.run([
'sc', 'description', SERVICE_NAME, SERVICE_DESCRIPTION
], capture_output=True)
print(f"✅ Service installed successfully!")
except Exception as e:
print(f"❌ Error installing service: {e}")
sys.exit(1)
# Create batch scripts
scripts_dir = create_batch_scripts()
print(f"\n📁 Created management scripts in: {scripts_dir}")
# Print service information
platform_info = {
'Start Service': f'net start {SERVICE_NAME}',
'Stop Service': f'net stop {SERVICE_NAME}',
'Service Status': f'sc query {SERVICE_NAME}',
'Uninstall': f'python "{Path(__file__)}" --uninstall'
}
print_service_info(api_key, platform_info)
return True
def uninstall_service():
"""Uninstall the Windows service."""
require_admin("Administrator privileges are required to uninstall Windows services")
print(f"\n🗑️ Uninstalling {SERVICE_DISPLAY_NAME}...")
paths = get_service_paths()
service_script = paths['scripts_dir'] / 'mcp_memory_windows_service.py'
if not service_script.exists():
# Try using sc command directly
result = subprocess.run([
'sc', 'delete', SERVICE_NAME
], capture_output=True, text=True)
if result.returncode == 0:
print("✅ Service uninstalled successfully!")
else:
print(f"❌ Failed to uninstall service: {result.stderr}")
else:
# Stop the service first
subprocess.run([
sys.executable, str(service_script), 'stop'
], capture_output=True)
# Remove the service
result = subprocess.run([
sys.executable, str(service_script), 'remove'
], capture_output=True, text=True)
if result.returncode == 0:
print("✅ Service uninstalled successfully!")
else:
print(f"❌ Failed to uninstall service: {result.stderr}")
def start_service():
"""Start the Windows service."""
print(f"\n▶️ Starting {SERVICE_DISPLAY_NAME}...")
result = subprocess.run([
'net', 'start', SERVICE_NAME
], capture_output=True, text=True)
if result.returncode == 0:
print("✅ Service started successfully!")
else:
if "already been started" in result.stderr:
print("ℹ️ Service is already running")
else:
print(f"❌ Failed to start service: {result.stderr}")
print("\n💡 Try running as Administrator if you see access denied errors")
def stop_service():
"""Stop the Windows service."""
print(f"\n⏹️ Stopping {SERVICE_DISPLAY_NAME}...")
result = subprocess.run([
'net', 'stop', SERVICE_NAME
], capture_output=True, text=True)
if result.returncode == 0:
print("✅ Service stopped successfully!")
else:
if "is not started" in result.stderr:
print("ℹ️ Service is not running")
else:
print(f"❌ Failed to stop service: {result.stderr}")
def service_status():
"""Check the Windows service status."""
print(f"\n📊 {SERVICE_DISPLAY_NAME} Status:")
print("-" * 40)
result = subprocess.run([
'sc', 'query', SERVICE_NAME
], capture_output=True, text=True)
if result.returncode == 0:
# Parse the output
for line in result.stdout.splitlines():
if "STATE" in line:
if "RUNNING" in line:
print("✅ Service is RUNNING")
elif "STOPPED" in line:
print("⏹️ Service is STOPPED")
else:
print(f"ℹ️ {line.strip()}")
elif "SERVICE_NAME:" in line:
print(f"Service Name: {SERVICE_NAME}")
else:
print("❌ Service is not installed")
# Show configuration if available
config = load_service_config()
if config:
print(f"\n📋 Configuration:")
print(f" API Key: {config.get('api_key', 'Not set')}")
print(f" Config File: {get_service_paths()['config_dir'] / 'service_config.json'}")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Windows Service installer for MCP Memory Service"
)
parser.add_argument('--uninstall', action='store_true', help='Uninstall the service')
parser.add_argument('--start', action='store_true', help='Start the service')
parser.add_argument('--stop', action='store_true', help='Stop the service')
parser.add_argument('--status', action='store_true', help='Check service status')
parser.add_argument('--restart', action='store_true', help='Restart the service')
args = parser.parse_args()
if args.uninstall:
uninstall_service()
elif args.start:
start_service()
elif args.stop:
stop_service()
elif args.status:
service_status()
elif args.restart:
stop_service()
start_service()
else:
# Default action is to install
install_service()
if __name__ == '__main__':
main()
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/web/api/mcp.py:
--------------------------------------------------------------------------------
```python
"""
MCP (Model Context Protocol) endpoints for Claude Code integration.
This module provides MCP protocol endpoints that allow Claude Code clients
to directly access memory operations using the MCP standard.
"""
import asyncio
import json
import logging
from typing import Dict, List, Any, Optional, Union, TYPE_CHECKING
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel, ConfigDict
from ..dependencies import get_storage
from ...utils.hashing import generate_content_hash
from ...config import OAUTH_ENABLED
# Import OAuth dependencies only when needed
if OAUTH_ENABLED or TYPE_CHECKING:
from ..oauth.middleware import require_read_access, require_write_access, AuthenticationResult
else:
# Provide type stubs when OAuth is disabled
AuthenticationResult = None
require_read_access = None
require_write_access = None
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/mcp", tags=["mcp"])
class MCPRequest(BaseModel):
"""MCP protocol request structure."""
jsonrpc: str = "2.0"
id: Optional[Union[str, int]] = None
method: str
params: Optional[Dict[str, Any]] = None
class MCPResponse(BaseModel):
"""MCP protocol response structure.
Note: JSON-RPC 2.0 spec requires that successful responses EXCLUDE the 'error'
field entirely (not include it as null), and error responses EXCLUDE 'result'.
The exclude_none config ensures proper compliance.
"""
model_config = ConfigDict(exclude_none=True)
jsonrpc: str = "2.0"
id: Optional[Union[str, int]] = None
result: Optional[Dict[str, Any]] = None
error: Optional[Dict[str, Any]] = None
class MCPTool(BaseModel):
"""MCP tool definition."""
name: str
description: str
inputSchema: Dict[str, Any]
# Define MCP tools available
MCP_TOOLS = [
MCPTool(
name="store_memory",
description="Store a new memory with optional tags, metadata, and client information",
inputSchema={
"type": "object",
"properties": {
"content": {"type": "string", "description": "The memory content to store"},
"tags": {"type": "array", "items": {"type": "string"}, "description": "Optional tags for the memory"},
"memory_type": {"type": "string", "description": "Optional memory type (e.g., 'note', 'reminder', 'fact')"},
"metadata": {"type": "object", "description": "Additional metadata for the memory"},
"client_hostname": {"type": "string", "description": "Client machine hostname for source tracking"}
},
"required": ["content"]
}
),
MCPTool(
name="retrieve_memory",
description="Search and retrieve memories using semantic similarity",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query for finding relevant memories"},
"limit": {"type": "integer", "description": "Maximum number of memories to return", "default": 10},
"similarity_threshold": {"type": "number", "description": "Minimum similarity score threshold (0.0-1.0)", "default": 0.7, "minimum": 0.0, "maximum": 1.0}
},
"required": ["query"]
}
),
MCPTool(
name="recall_memory",
description="Retrieve memories using natural language time expressions and optional semantic search",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Natural language query specifying the time frame or content to recall"},
"n_results": {"type": "integer", "description": "Maximum number of results to return", "default": 5}
},
"required": ["query"]
}
),
MCPTool(
name="search_by_tag",
description="Search memories by specific tags",
inputSchema={
"type": "object",
"properties": {
"tags": {"type": "array", "items": {"type": "string"}, "description": "Tags to search for"},
"operation": {"type": "string", "enum": ["AND", "OR"], "description": "Tag search operation", "default": "AND"}
},
"required": ["tags"]
}
),
MCPTool(
name="delete_memory",
description="Delete a specific memory by content hash",
inputSchema={
"type": "object",
"properties": {
"content_hash": {"type": "string", "description": "Hash of the memory to delete"}
},
"required": ["content_hash"]
}
),
MCPTool(
name="check_database_health",
description="Check the health and status of the memory database",
inputSchema={
"type": "object",
"properties": {}
}
),
MCPTool(
name="list_memories",
description="List memories with pagination and optional filtering",
inputSchema={
"type": "object",
"properties": {
"page": {"type": "integer", "description": "Page number (1-based)", "default": 1, "minimum": 1},
"page_size": {"type": "integer", "description": "Number of memories per page", "default": 10, "minimum": 1, "maximum": 100},
"tag": {"type": "string", "description": "Filter by specific tag"},
"memory_type": {"type": "string", "description": "Filter by memory type"}
}
}
),
]
@router.post("/")
@router.post("")
async def mcp_endpoint(
request: MCPRequest,
user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
"""Main MCP protocol endpoint for processing MCP requests."""
try:
storage = get_storage()
if request.method == "initialize":
response = MCPResponse(
id=request.id,
result={
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {}
},
"serverInfo": {
"name": "mcp-memory-service",
"version": "4.1.1"
}
}
)
return JSONResponse(content=response.model_dump(exclude_none=True))
elif request.method == "tools/list":
response = MCPResponse(
id=request.id,
result={
"tools": [tool.model_dump() for tool in MCP_TOOLS]
}
)
return JSONResponse(content=response.model_dump(exclude_none=True))
elif request.method == "tools/call":
tool_name = request.params.get("name") if request.params else None
arguments = request.params.get("arguments", {}) if request.params else {}
result = await handle_tool_call(storage, tool_name, arguments)
response = MCPResponse(
id=request.id,
result={
"content": [
{
"type": "text",
"text": json.dumps(result)
}
]
}
)
return JSONResponse(content=response.model_dump(exclude_none=True))
else:
response = MCPResponse(
id=request.id,
error={
"code": -32601,
"message": f"Method not found: {request.method}"
}
)
return JSONResponse(content=response.model_dump(exclude_none=True))
except Exception as e:
logger.error(f"MCP endpoint error: {e}")
response = MCPResponse(
id=request.id,
error={
"code": -32603,
"message": f"Internal error: {str(e)}"
}
)
return JSONResponse(content=response.model_dump(exclude_none=True))
async def handle_tool_call(storage, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""Handle MCP tool calls and route to appropriate memory operations."""
if tool_name == "store_memory":
from mcp_memory_service.models.memory import Memory
content = arguments.get("content")
tags = arguments.get("tags", [])
memory_type = arguments.get("memory_type")
metadata = arguments.get("metadata", {})
client_hostname = arguments.get("client_hostname")
# Ensure metadata is a dict
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except:
metadata = {}
elif not isinstance(metadata, dict):
metadata = {}
# Add client_hostname to metadata if provided
if client_hostname:
metadata["client_hostname"] = client_hostname
content_hash = generate_content_hash(content, metadata)
memory = Memory(
content=content,
content_hash=content_hash,
tags=tags,
memory_type=memory_type,
metadata=metadata
)
success, message = await storage.store(memory)
return {
"success": success,
"message": message,
"content_hash": memory.content_hash if success else None
}
elif tool_name == "retrieve_memory":
query = arguments.get("query")
limit = arguments.get("limit", 10)
similarity_threshold = arguments.get("similarity_threshold", 0.0)
# Get results from storage (no similarity filtering at storage level)
results = await storage.retrieve(query=query, n_results=limit)
# Apply similarity threshold filtering (same as API implementation)
if similarity_threshold is not None:
results = [
result for result in results
if result.relevance_score and result.relevance_score >= similarity_threshold
]
return {
"results": [
{
"content": r.memory.content,
"content_hash": r.memory.content_hash,
"tags": r.memory.tags,
"similarity_score": r.relevance_score,
"created_at": r.memory.created_at_iso
}
for r in results
],
"total_found": len(results)
}
elif tool_name == "recall_memory":
query = arguments.get("query")
n_results = arguments.get("n_results", 5)
# Use storage recall_memory method which handles time expressions
memories = await storage.recall_memory(query=query, n_results=n_results)
return {
"results": [
{
"content": m.content,
"content_hash": m.content_hash,
"tags": m.tags,
"created_at": m.created_at_iso
}
for m in memories
],
"total_found": len(memories)
}
elif tool_name == "search_by_tag":
tags = arguments.get("tags")
operation = arguments.get("operation", "AND")
results = await storage.search_by_tags(tags=tags, operation=operation)
return {
"results": [
{
"content": memory.content,
"content_hash": memory.content_hash,
"tags": memory.tags,
"created_at": memory.created_at_iso
}
for memory in results
],
"total_found": len(results)
}
elif tool_name == "delete_memory":
content_hash = arguments.get("content_hash")
success, message = await storage.delete(content_hash)
return {
"success": success,
"message": message
}
elif tool_name == "check_database_health":
stats = await storage.get_stats()
return {
"status": "healthy",
"statistics": stats
}
elif tool_name == "list_memories":
page = arguments.get("page", 1)
page_size = arguments.get("page_size", 10)
tag = arguments.get("tag")
memory_type = arguments.get("memory_type")
# Calculate offset
offset = (page - 1) * page_size
# Use database-level filtering for better performance
tags_list = [tag] if tag else None
memories = await storage.get_all_memories(
limit=page_size,
offset=offset,
memory_type=memory_type,
tags=tags_list
)
return {
"memories": [
{
"content": memory.content,
"content_hash": memory.content_hash,
"tags": memory.tags,
"memory_type": memory.memory_type,
"metadata": memory.metadata,
"created_at": memory.created_at_iso,
"updated_at": memory.updated_at_iso
}
for memory in memories
],
"page": page,
"page_size": page_size,
"total_found": len(memories)
}
else:
raise ValueError(f"Unknown tool: {tool_name}")
@router.get("/tools")
async def list_mcp_tools(
user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
"""List available MCP tools for discovery."""
return {
"tools": [tool.dict() for tool in MCP_TOOLS],
"protocol": "mcp",
"version": "1.0"
}
@router.get("/health")
async def mcp_health():
"""MCP-specific health check."""
storage = get_storage()
stats = await storage.get_stats()
return {
"status": "healthy",
"protocol": "mcp",
"tools_available": len(MCP_TOOLS),
"storage_backend": "sqlite-vec",
"statistics": stats
}
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/web/api/search.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Search endpoints for the HTTP interface.
Provides semantic search, tag-based search, and time-based recall functionality.
"""
import logging
from typing import List, Optional, Dict, Any, TYPE_CHECKING
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, HTTPException, Depends, Query
from pydantic import BaseModel, Field
from ...storage.base import MemoryStorage
from ...models.memory import Memory, MemoryQueryResult
from ...config import OAUTH_ENABLED
from ...utils.time_parser import parse_time_expression
from ..dependencies import get_storage
from .memories import MemoryResponse, memory_to_response
from ..sse import sse_manager, create_search_completed_event
# Constants
_TIME_SEARCH_CANDIDATE_POOL_SIZE = 100 # Number of candidates to retrieve for time filtering (reduced for performance)
# OAuth authentication imports (conditional)
if OAUTH_ENABLED or TYPE_CHECKING:
from ..oauth.middleware import require_read_access, AuthenticationResult
else:
# Provide type stubs when OAuth is disabled
AuthenticationResult = None
require_read_access = None
router = APIRouter()
logger = logging.getLogger(__name__)
# Request Models
class SemanticSearchRequest(BaseModel):
"""Request model for semantic similarity search."""
query: str = Field(..., description="The search query for semantic similarity")
n_results: int = Field(default=10, ge=1, le=100, description="Maximum number of results to return")
similarity_threshold: Optional[float] = Field(None, ge=0.0, le=1.0, description="Minimum similarity score")
class TagSearchRequest(BaseModel):
"""Request model for tag-based search."""
tags: List[str] = Field(..., description="List of tags to search for (ANY match)")
match_all: bool = Field(default=False, description="If true, memory must have ALL tags; if false, ANY tag")
time_filter: Optional[str] = Field(None, description="Optional natural language time filter (e.g., 'last week', 'yesterday')")
class TimeSearchRequest(BaseModel):
"""Request model for time-based search."""
query: str = Field(..., description="Natural language time query (e.g., 'last week', 'yesterday')")
n_results: int = Field(default=10, ge=1, le=100, description="Maximum number of results to return")
semantic_query: Optional[str] = Field(None, description="Optional semantic query for relevance filtering within time range")
# Response Models
class SearchResult(BaseModel):
"""Individual search result with similarity score."""
memory: MemoryResponse
similarity_score: Optional[float] = Field(None, description="Similarity score (0-1, higher is more similar)")
relevance_reason: Optional[str] = Field(None, description="Why this result was included")
class SearchResponse(BaseModel):
"""Response model for search operations."""
results: List[SearchResult]
total_found: int
query: str
search_type: str
processing_time_ms: Optional[float] = None
def memory_query_result_to_search_result(query_result: MemoryQueryResult) -> SearchResult:
"""Convert MemoryQueryResult to SearchResult format."""
return SearchResult(
memory=memory_to_response(query_result.memory),
similarity_score=query_result.relevance_score,
relevance_reason=f"Semantic similarity: {query_result.relevance_score:.3f}" if query_result.relevance_score else None
)
def memory_to_search_result(memory: Memory, reason: str = None) -> SearchResult:
"""Convert Memory to SearchResult format."""
return SearchResult(
memory=memory_to_response(memory),
similarity_score=None,
relevance_reason=reason
)
@router.post("/search", response_model=SearchResponse, tags=["search"])
async def semantic_search(
request: SemanticSearchRequest,
storage: MemoryStorage = Depends(get_storage),
user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
"""
Perform semantic similarity search on memory content.
Uses vector embeddings to find memories with similar meaning to the query,
even if they don't share exact keywords.
"""
import time
start_time = time.time()
try:
# Perform semantic search using the storage layer
query_results = await storage.retrieve(
query=request.query,
n_results=request.n_results
)
# Filter by similarity threshold if specified
if request.similarity_threshold is not None:
query_results = [
result for result in query_results
if result.relevance_score and result.relevance_score >= request.similarity_threshold
]
# Convert to search results
search_results = [
memory_query_result_to_search_result(result)
for result in query_results
]
processing_time = (time.time() - start_time) * 1000
# Broadcast SSE event for search completion
try:
event = create_search_completed_event(
query=request.query,
search_type="semantic",
results_count=len(search_results),
processing_time_ms=processing_time
)
await sse_manager.broadcast_event(event)
except Exception as e:
logger.warning(f"Failed to broadcast search_completed event: {e}")
return SearchResponse(
results=search_results,
total_found=len(search_results),
query=request.query,
search_type="semantic",
processing_time_ms=processing_time
)
except Exception as e:
logger.error(f"Semantic search failed: {str(e)}")
raise HTTPException(status_code=500, detail="Search operation failed. Please try again.")
@router.post("/search/by-tag", response_model=SearchResponse, tags=["search"])
async def tag_search(
request: TagSearchRequest,
storage: MemoryStorage = Depends(get_storage),
user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
"""
Search memories by tags with optional time filtering.
Finds memories that contain any of the specified tags (OR search) or
all of the specified tags (AND search) based on the match_all parameter.
Optionally filters by time range using natural language expressions like
'last week', 'yesterday', 'this month', etc.
"""
import time
start_time = time.time()
try:
if not request.tags:
raise HTTPException(status_code=400, detail="At least one tag must be specified")
# Parse time filter if provided
time_start = None
if request.time_filter:
start_ts, _ = parse_time_expression(request.time_filter)
time_start = start_ts if start_ts else None
# Use the storage layer's tag search with optional time filtering
memories = await storage.search_by_tag(request.tags, time_start=time_start)
# If match_all is True, filter to only memories that have ALL tags
if request.match_all and len(request.tags) > 1:
tag_set = set(request.tags)
memories = [
memory for memory in memories
if tag_set.issubset(set(memory.tags))
]
# Convert to search results
match_type = "ALL" if request.match_all else "ANY"
search_results = [
memory_to_search_result(
memory,
reason=f"Tags match ({match_type}): {', '.join(set(memory.tags) & set(request.tags))}"
)
for memory in memories
]
processing_time = (time.time() - start_time) * 1000
# Build query string with time filter info if present
query_string = f"Tags: {', '.join(request.tags)} ({match_type})"
if request.time_filter:
query_string += f" | Time: {request.time_filter}"
# Broadcast SSE event for search completion
try:
event = create_search_completed_event(
query=query_string,
search_type="tag",
results_count=len(search_results),
processing_time_ms=processing_time
)
await sse_manager.broadcast_event(event)
except Exception as e:
logger.warning(f"Failed to broadcast search_completed event: {e}")
return SearchResponse(
results=search_results,
total_found=len(search_results),
query=query_string,
search_type="tag",
processing_time_ms=processing_time
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Tag search failed: {str(e)}")
@router.post("/search/by-time", response_model=SearchResponse, tags=["search"])
async def time_search(
request: TimeSearchRequest,
storage: MemoryStorage = Depends(get_storage),
user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
"""
Search memories by time-based queries.
Supports natural language time expressions like 'yesterday', 'last week',
'this month', etc. Currently implements basic time filtering - full natural
language parsing can be enhanced later.
"""
import time
start_time = time.time()
try:
# Parse time query using robust time_parser
start_ts, end_ts = parse_time_expression(request.query)
if start_ts is None and end_ts is None:
raise HTTPException(
status_code=400,
detail=f"Could not parse time query: '{request.query}'. Try 'yesterday', 'last week', 'this month', etc."
)
# Retrieve memories within time range (with larger candidate pool if semantic query provided)
candidate_pool_size = _TIME_SEARCH_CANDIDATE_POOL_SIZE if request.semantic_query else request.n_results
query_results = await storage.recall(
query=request.semantic_query.strip() if request.semantic_query and request.semantic_query.strip() else None,
n_results=candidate_pool_size,
start_timestamp=start_ts,
end_timestamp=end_ts
)
# If semantic query was provided, results are already ranked by relevance
# Otherwise, sort by recency (newest first)
if not (request.semantic_query and request.semantic_query.strip()):
query_results.sort(key=lambda r: r.memory.created_at or 0.0, reverse=True)
# Limit results
filtered_memories = query_results[:request.n_results]
# Convert to search results
search_results = [
memory_query_result_to_search_result(result)
for result in filtered_memories
]
# Update relevance reason for time-based results
for result in search_results:
result.relevance_reason = f"Time match: {request.query}"
processing_time = (time.time() - start_time) * 1000
return SearchResponse(
results=search_results,
total_found=len(search_results),
query=request.query,
search_type="time",
processing_time_ms=processing_time
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Time search failed: {str(e)}")
@router.get("/search/similar/{content_hash}", response_model=SearchResponse, tags=["search"])
async def find_similar(
content_hash: str,
n_results: int = Query(default=10, ge=1, le=100, description="Number of similar memories to find"),
storage: MemoryStorage = Depends(get_storage),
user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
"""
Find memories similar to a specific memory identified by its content hash.
Uses the content of the specified memory as a search query to find
semantically similar memories.
"""
import time
start_time = time.time()
try:
# First, get the target memory by searching with its hash
# This is inefficient but works with current storage interface
target_results = await storage.retrieve(content_hash, n_results=1)
if not target_results or target_results[0].memory.content_hash != content_hash:
raise HTTPException(status_code=404, detail="Memory not found")
target_memory = target_results[0].memory
# Use the target memory's content to find similar memories
similar_results = await storage.retrieve(
query=target_memory.content,
n_results=n_results + 1 # +1 because the original will be included
)
# Filter out the original memory
filtered_results = [
result for result in similar_results
if result.memory.content_hash != content_hash
][:n_results]
# Convert to search results
search_results = [
memory_query_result_to_search_result(result)
for result in filtered_results
]
processing_time = (time.time() - start_time) * 1000
return SearchResponse(
results=search_results,
total_found=len(search_results),
query=f"Similar to: {target_memory.content[:50]}...",
search_type="similar",
processing_time_ms=processing_time
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Similar search failed: {str(e)}")
```
--------------------------------------------------------------------------------
/docs/natural-memory-triggers/cli-reference.md:
--------------------------------------------------------------------------------
```markdown
# Natural Memory Triggers v7.1.3 - CLI Reference
Complete reference for the CLI management system that provides real-time configuration and monitoring of Natural Memory Triggers without requiring file edits or Claude Code restarts.
## Overview
The CLI controller (`memory-mode-controller.js`) is the primary interface for managing Natural Memory Triggers. It provides:
- ✅ **Real-time configuration** changes without restart
- ✅ **Performance monitoring** and metrics
- ✅ **Profile management** for different workflows
- ✅ **Sensitivity tuning** for trigger frequency
- ✅ **System diagnostics** and health checks
## Command Syntax
```bash
node ~/.claude/hooks/memory-mode-controller.js <command> [options] [arguments]
```
## Core Commands
### `status` - System Status and Information
Display current system status, configuration, and performance metrics.
```bash
node memory-mode-controller.js status
```
**Output Example:**
```
📊 Memory Hook Status
Current Profile: balanced
Description: Moderate latency, smart memory triggers
Natural Triggers: enabled
Sensitivity: 0.6
Cooldown Period: 30000ms
Max Memories per Trigger: 5
Performance: 145ms avg latency, 2 degradation events
Cache Size: 12 entries
Conversation History: 8 messages
```
**Options:**
- `--verbose` - Show detailed performance metrics
- `--json` - Output in JSON format for scripting
```bash
node memory-mode-controller.js status --verbose
node memory-mode-controller.js status --json
```
### `profiles` - List Available Performance Profiles
Display all available performance profiles with descriptions and configurations.
```bash
node memory-mode-controller.js profiles
```
**Output:**
```
📋 Available Performance Profiles
🏃 speed_focused
Max Latency: 100ms
Enabled Tiers: instant
Description: Fastest response, minimal memory awareness
⚖️ balanced (current)
Max Latency: 200ms
Enabled Tiers: instant, fast
Description: Moderate latency, smart memory triggers
🧠 memory_aware
Max Latency: 500ms
Enabled Tiers: instant, fast, intensive
Description: Full memory awareness, accept higher latency
🤖 adaptive
Max Latency: auto-adjusting
Enabled Tiers: dynamic
Description: Auto-adjust based on performance and user preferences
```
### `profile` - Switch Performance Profile
Change the active performance profile for different workflow requirements.
```bash
node memory-mode-controller.js profile <profile_name>
```
**Available Profiles:**
#### `speed_focused` - Maximum Speed
```bash
node memory-mode-controller.js profile speed_focused
```
- **Latency**: < 100ms
- **Tiers**: Instant only (pattern matching, cache checks)
- **Use Case**: Quick coding sessions, pair programming
- **Trade-off**: Minimal memory awareness for maximum speed
#### `balanced` - Recommended Default
```bash
node memory-mode-controller.js profile balanced
```
- **Latency**: < 200ms
- **Tiers**: Instant + Fast (semantic analysis)
- **Use Case**: General development work, most productive for daily use
- **Trade-off**: Good balance of speed and context awareness
#### `memory_aware` - Maximum Context
```bash
node memory-mode-controller.js profile memory_aware
```
- **Latency**: < 500ms
- **Tiers**: All tiers (deep semantic understanding)
- **Use Case**: Complex projects, architectural decisions, research
- **Trade-off**: Maximum context awareness, higher latency acceptable
#### `adaptive` - Machine Learning
```bash
node memory-mode-controller.js profile adaptive
```
- **Latency**: Auto-adjusting based on usage patterns
- **Tiers**: Dynamic selection based on user feedback
- **Use Case**: Users who want the system to learn automatically
- **Trade-off**: Requires learning period but becomes highly personalized
### `sensitivity` - Adjust Trigger Sensitivity
Control how often Natural Memory Triggers activate by adjusting the confidence threshold.
```bash
node memory-mode-controller.js sensitivity <value>
```
**Sensitivity Values:**
- `0.0` - Maximum triggers (activates on any potential memory-seeking pattern)
- `0.4` - High sensitivity (more triggers, useful for research/architecture work)
- `0.6` - Balanced (recommended default)
- `0.8` - Low sensitivity (fewer triggers, high-confidence only)
- `1.0` - Minimum triggers (only explicit memory requests)
**Examples:**
```bash
# More triggers for architecture work
node memory-mode-controller.js sensitivity 0.4
# Balanced triggers (recommended)
node memory-mode-controller.js sensitivity 0.6
# Fewer triggers for focused coding
node memory-mode-controller.js sensitivity 0.8
```
## System Management Commands
### `enable` - Enable Natural Memory Triggers
Activate the Natural Memory Triggers system.
```bash
node memory-mode-controller.js enable
```
**Output:**
```
✅ Natural Memory Triggers enabled
Current sensitivity: 0.6
Active profile: balanced
Ready to detect memory-seeking patterns
```
### `disable` - Disable Natural Memory Triggers
Temporarily disable the Natural Memory Triggers system without uninstalling.
```bash
node memory-mode-controller.js disable
```
**Output:**
```
⏸️ Natural Memory Triggers disabled
Manual memory commands still available
Use 'enable' to reactivate triggers
```
### `reset` - Reset to Default Settings
Reset all configuration to default values.
```bash
node memory-mode-controller.js reset
```
**What gets reset:**
- Performance profile → `balanced`
- Sensitivity → `0.6`
- Natural triggers → `enabled`
- Cooldown period → `30000ms`
- Max memories per trigger → `5`
**Confirmation prompt:**
```
⚠️ This will reset all Natural Memory Triggers settings to defaults.
Are you sure? (y/N): y
✅ Settings reset to defaults
```
**Options:**
- `--force` - Skip confirmation prompt
```bash
node memory-mode-controller.js reset --force
```
## Testing and Diagnostics
### `test` - Test Trigger Detection
Test the trigger detection system with a specific query to see how it would be processed.
```bash
node memory-mode-controller.js test "your test query"
```
**Example:**
```bash
node memory-mode-controller.js test "What did we decide about authentication?"
```
**Output:**
```
🧪 Testing Natural Memory Triggers
Query: "What did we decide about authentication?"
Processing tiers: instant → fast → intensive
Tier 1 (Instant): 42ms
- Pattern match: ✅ "what...decide" detected
- Cache check: ❌ No cached result
- Confidence: 0.85
Tier 2 (Fast): 127ms
- Key phrases: ["decide", "authentication"]
- Topic shift: 0.2 (moderate)
- Question pattern: ✅ Detected
- Confidence: 0.78
Memory Query Generated:
- Type: recent-development
- Query: "authentication decision approach implementation"
- Weight: 1.0
Result: Would trigger memory retrieval (confidence 0.85 > threshold 0.6)
```
### `metrics` - Performance Metrics
Display detailed performance metrics and system health information.
```bash
node memory-mode-controller.js metrics
```
**Output:**
```
📊 Natural Memory Triggers Performance Metrics
System Performance:
- Active Profile: balanced
- Average Latency: 145ms
- Degradation Events: 2
- User Tolerance: 0.7
Tier Performance:
- Instant Tier: 47ms avg (120 calls)
- Fast Tier: 142ms avg (89 calls)
- Intensive Tier: 387ms avg (23 calls)
Trigger Statistics:
- Total Triggers: 45
- Success Rate: 89%
- False Positives: 5%
- User Satisfaction: 87%
Cache Performance:
- Cache Size: 15 entries
- Hit Rate: 34%
- Average Hit Time: 3ms
Memory Service:
- Connection Status: ✅ Connected
- Average Response: 89ms
- Error Rate: 0%
```
### `health` - System Health Check
Perform comprehensive health check of all system components.
```bash
node memory-mode-controller.js health
```
**Output:**
```
🏥 Natural Memory Triggers Health Check
Core Components:
✅ TieredConversationMonitor loaded
✅ PerformanceManager initialized
✅ GitAnalyzer functional
✅ MCP Client connected
Configuration:
✅ config.json syntax valid
✅ naturalTriggers section present
✅ performance profiles configured
✅ memory service endpoint accessible
Dependencies:
✅ Node.js version compatible (v18.17.0)
✅ Required packages available
✅ File permissions correct
Memory Service Integration:
✅ Connection established
✅ Authentication valid
✅ API responses normal
⚠️ High response latency (245ms)
Git Integration:
✅ Repository detected
✅ Recent commits available
✅ Changelog found
❌ Branch name unavailable
Recommendations:
- Consider optimizing memory service for faster responses
- Check git configuration for branch detection
```
## Advanced Commands
### `config` - Configuration Management
View and modify configuration settings directly through CLI.
```bash
# View current configuration
node memory-mode-controller.js config show
# Get specific setting
node memory-mode-controller.js config get naturalTriggers.triggerThreshold
# Set specific setting
node memory-mode-controller.js config set naturalTriggers.cooldownPeriod 45000
```
### `cache` - Cache Management
Manage the semantic analysis cache.
```bash
# View cache statistics
node memory-mode-controller.js cache stats
# Clear cache
node memory-mode-controller.js cache clear
# Show cache contents (debug)
node memory-mode-controller.js cache show
```
**Cache Stats Output:**
```
💾 Semantic Cache Statistics
Size: 18/50 entries
Memory Usage: 2.4KB
Hit Rate: 34% (89/260 requests)
Average Hit Time: 2.8ms
Last Cleanup: 15 minutes ago
Most Accessed Patterns:
1. "what did we decide" (12 hits)
2. "how did we implement" (8 hits)
3. "similar to what we" (6 hits)
```
### `export` - Export Configuration and Metrics
Export system configuration and performance data for backup or analysis.
```bash
# Export configuration
node memory-mode-controller.js export config > my-config-backup.json
# Export metrics
node memory-mode-controller.js export metrics > performance-report.json
# Export full system state
node memory-mode-controller.js export all > system-state.json
```
### `import` - Import Configuration
Import previously exported configuration.
```bash
node memory-mode-controller.js import config my-config-backup.json
```
## Scripting and Automation
### JSON Output Mode
Most commands support `--json` flag for machine-readable output:
```bash
# Get status in JSON format
node memory-mode-controller.js status --json
# Example output:
{
"profile": "balanced",
"enabled": true,
"sensitivity": 0.6,
"performance": {
"avgLatency": 145,
"degradationEvents": 2
},
"cache": {
"size": 12,
"hitRate": 0.34
}
}
```
### Batch Operations
Run multiple commands in sequence:
```bash
# Setup for architecture work
node memory-mode-controller.js profile memory_aware
node memory-mode-controller.js sensitivity 0.4
# Daily development setup
node memory-mode-controller.js profile balanced
node memory-mode-controller.js sensitivity 0.6
# Quick coding setup
node memory-mode-controller.js profile speed_focused
node memory-mode-controller.js sensitivity 0.8
```
### Environment Variables
Control CLI behavior with environment variables:
```bash
# Enable debug output
export CLAUDE_HOOKS_DEBUG=true
node memory-mode-controller.js status
# Disable colored output
export NO_COLOR=1
node memory-mode-controller.js status
# Set alternative config path
export CLAUDE_HOOKS_CONFIG=/path/to/config.json
node memory-mode-controller.js status
```
## Error Handling and Debugging
### Common Error Messages
#### `Configuration Error: Cannot read config file`
**Cause**: Missing or corrupted configuration file
**Solution**:
```bash
# Check if config exists
ls ~/.claude/hooks/config.json
# Validate JSON syntax
cat ~/.claude/hooks/config.json | node -e "console.log(JSON.parse(require('fs').readFileSync(0, 'utf8')))"
# Reset to defaults if corrupted
node memory-mode-controller.js reset --force
```
#### `Memory Service Connection Failed`
**Cause**: MCP Memory Service not running or unreachable
**Solution**:
```bash
# Check memory service status
curl -k https://localhost:8443/api/health
# Start memory service
uv run memory server
# Check configuration
node memory-mode-controller.js config get memoryService.endpoint
```
#### `Permission Denied`
**Cause**: Incorrect file permissions
**Solution**:
```bash
# Fix permissions
chmod +x ~/.claude/hooks/memory-mode-controller.js
chmod 644 ~/.claude/hooks/config.json
```
### Debug Mode
Enable verbose debugging:
```bash
export CLAUDE_HOOKS_DEBUG=true
node memory-mode-controller.js status
```
**Debug Output Example:**
```
[DEBUG] Loading configuration from ~/.claude/hooks/config.json
[DEBUG] Configuration loaded successfully
[DEBUG] Initializing TieredConversationMonitor
[DEBUG] PerformanceManager initialized with profile: balanced
[DEBUG] GitAnalyzer detecting repository context
[DEBUG] MCP Client connecting to https://localhost:8443
[DEBUG] Status command executed successfully
```
## Integration Examples
### Shell Aliases
Add to your `.bashrc` or `.zshrc`:
```bash
# Quick aliases for common operations
alias nmt-status='node ~/.claude/hooks/memory-mode-controller.js status'
alias nmt-balanced='node ~/.claude/hooks/memory-mode-controller.js profile balanced'
alias nmt-speed='node ~/.claude/hooks/memory-mode-controller.js profile speed_focused'
alias nmt-memory='node ~/.claude/hooks/memory-mode-controller.js profile memory_aware'
alias nmt-metrics='node ~/.claude/hooks/memory-mode-controller.js metrics'
```
### VS Code Integration
Create VS Code tasks (`.vscode/tasks.json`):
```json
{
"version": "2.0.0",
"tasks": [
{
"label": "NMT: Check Status",
"type": "shell",
"command": "node ~/.claude/hooks/memory-mode-controller.js status",
"group": "build",
"presentation": {
"echo": true,
"reveal": "always",
"focus": false,
"panel": "shared"
}
},
{
"label": "NMT: Switch to Memory Aware",
"type": "shell",
"command": "node ~/.claude/hooks/memory-mode-controller.js profile memory_aware",
"group": "build"
}
]
}
```
### Automated Performance Monitoring
Monitor system performance with cron job:
```bash
# Add to crontab (crontab -e)
# Check metrics every hour and log to file
0 * * * * node ~/.claude/hooks/memory-mode-controller.js metrics --json >> ~/nmt-metrics.log 2>&1
```
---
The CLI controller provides complete control over Natural Memory Triggers v7.1.3, enabling real-time optimization of your intelligent memory awareness system! 🚀
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/storage/base.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MCP Memory Service
Copyright (c) 2024 Heinrich Krupp
Licensed under the MIT License. See LICENSE file in the project root for full license text.
"""
import asyncio
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any, Tuple
from datetime import datetime, timezone, timedelta
from ..models.memory import Memory, MemoryQueryResult
class MemoryStorage(ABC):
"""Abstract base class for memory storage implementations."""
@property
@abstractmethod
def max_content_length(self) -> Optional[int]:
"""
Maximum content length supported by this storage backend.
Returns:
Maximum number of characters allowed in memory content, or None for unlimited.
This limit is based on the underlying embedding model's token limits.
"""
pass
@property
@abstractmethod
def supports_chunking(self) -> bool:
"""
Whether this backend supports automatic content chunking.
Returns:
True if the backend can store chunked memories with linking metadata.
"""
pass
@abstractmethod
async def initialize(self) -> None:
"""Initialize the storage backend."""
pass
@abstractmethod
async def store(self, memory: Memory) -> Tuple[bool, str]:
"""Store a memory. Returns (success, message)."""
pass
async def store_batch(self, memories: List[Memory]) -> List[Tuple[bool, str]]:
"""
Store multiple memories in a single operation.
Default implementation calls store() for each memory concurrently using asyncio.gather.
Override this method in concrete storage backends to provide true batch operations
for improved performance (e.g., single database transaction, bulk network request).
Args:
memories: List of Memory objects to store
Returns:
A list of (success, message) tuples, one for each memory in the batch.
"""
if not memories:
return []
results = await asyncio.gather(
*(self.store(memory) for memory in memories),
return_exceptions=True
)
# Process results to handle potential exceptions from gather
final_results = []
for res in results:
if isinstance(res, Exception):
# If a store operation failed with an exception, record it as a failure
final_results.append((False, f"Failed to store memory: {res}"))
else:
final_results.append(res)
return final_results
@abstractmethod
async def retrieve(self, query: str, n_results: int = 5) -> List[MemoryQueryResult]:
"""Retrieve memories by semantic search."""
pass
@abstractmethod
async def search_by_tag(self, tags: List[str], time_start: Optional[float] = None) -> List[Memory]:
"""Search memories by tags with optional time filtering.
Args:
tags: List of tags to search for
time_start: Optional Unix timestamp (in seconds) to filter memories created after this time
Returns:
List of Memory objects matching the tag criteria and time filter
"""
pass
@abstractmethod
async def search_by_tags(
self,
tags: List[str],
operation: str = "AND",
time_start: Optional[float] = None,
time_end: Optional[float] = None
) -> List[Memory]:
"""Search memories by tags with AND/OR semantics and time range filtering.
Args:
tags: List of tag names to search for
operation: "AND" (all tags must match) or "OR" (any tag matches)
time_start: Optional Unix timestamp for inclusive range start
time_end: Optional Unix timestamp for inclusive range end
Returns:
List of Memory objects matching the criteria
"""
pass
async def search_by_tag_chronological(self, tags: List[str], limit: int = None, offset: int = 0) -> List[Memory]:
"""
Search memories by tags with chronological ordering (newest first).
Args:
tags: List of tags to search for
limit: Maximum number of memories to return (None for all)
offset: Number of memories to skip (for pagination)
Returns:
List of Memory objects ordered by created_at DESC
"""
# Default implementation: use search_by_tag then sort
memories = await self.search_by_tag(tags)
memories.sort(key=lambda m: m.created_at or 0, reverse=True)
# Apply pagination
if offset > 0:
memories = memories[offset:]
if limit is not None:
memories = memories[:limit]
return memories
@abstractmethod
async def delete(self, content_hash: str) -> Tuple[bool, str]:
"""Delete a memory by its hash."""
pass
@abstractmethod
async def get_by_hash(self, content_hash: str) -> Optional[Memory]:
"""
Get a memory by its content hash using direct O(1) lookup.
Args:
content_hash: The content hash of the memory to retrieve
Returns:
Memory object if found, None otherwise
"""
pass
@abstractmethod
async def delete_by_tag(self, tag: str) -> Tuple[int, str]:
"""Delete memories by tag. Returns (count_deleted, message)."""
pass
async def delete_by_tags(self, tags: List[str]) -> Tuple[int, str]:
"""
Delete memories matching ANY of the given tags.
Default implementation calls delete_by_tag for each tag sequentially.
Override in concrete implementations for better performance (e.g., single query with OR).
Args:
tags: List of tags - memories matching ANY tag will be deleted
Returns:
Tuple of (total_count_deleted, message)
"""
if not tags:
return 0, "No tags provided"
total_count = 0
errors = []
for tag in tags:
try:
count, message = await self.delete_by_tag(tag)
total_count += count
if "error" in message.lower() or "failed" in message.lower():
errors.append(f"{tag}: {message}")
except Exception as e:
errors.append(f"{tag}: {str(e)}")
if errors:
error_summary = "; ".join(errors[:3]) # Limit error details
if len(errors) > 3:
error_summary += f" (+{len(errors) - 3} more errors)"
return total_count, f"Deleted {total_count} memories with partial failures: {error_summary}"
return total_count, f"Deleted {total_count} memories across {len(tags)} tag(s)"
@abstractmethod
async def cleanup_duplicates(self) -> Tuple[int, str]:
"""Remove duplicate memories. Returns (count_removed, message)."""
pass
@abstractmethod
async def update_memory_metadata(self, content_hash: str, updates: Dict[str, Any], preserve_timestamps: bool = True) -> Tuple[bool, str]:
"""
Update memory metadata without recreating the entire memory entry.
Args:
content_hash: Hash of the memory to update
updates: Dictionary of metadata fields to update
preserve_timestamps: Whether to preserve original created_at timestamp
Returns:
Tuple of (success, message)
Note:
- Only metadata, tags, and memory_type can be updated
- Content and content_hash cannot be modified
- updated_at timestamp is always refreshed
- created_at is preserved unless preserve_timestamps=False
"""
pass
async def update_memory(self, memory: Memory) -> bool:
"""
Update an existing memory with new metadata, tags, and memory_type.
Args:
memory: Memory object with updated fields
Returns:
True if update was successful, False otherwise
"""
updates = {
'tags': memory.tags,
'metadata': memory.metadata,
'memory_type': memory.memory_type
}
success, _ = await self.update_memory_metadata(
memory.content_hash,
updates,
preserve_timestamps=True
)
return success
async def update_memories_batch(self, memories: List[Memory]) -> List[bool]:
"""
Update multiple memories in a batch operation.
Default implementation calls update_memory() for each memory concurrently using asyncio.gather.
Override this method in concrete storage backends to provide true batch operations
for improved performance (e.g., single database transaction with multiple UPDATEs).
Args:
memories: List of Memory objects with updated fields
Returns:
List of success booleans, one for each memory in the batch
"""
if not memories:
return []
results = await asyncio.gather(
*(self.update_memory(memory) for memory in memories),
return_exceptions=True
)
# Process results to handle potential exceptions from gather
final_results = []
for res in results:
if isinstance(res, Exception):
final_results.append(False)
else:
final_results.append(res)
return final_results
async def get_stats(self) -> Dict[str, Any]:
"""Get storage statistics. Override for specific implementations."""
return {
"total_memories": 0,
"storage_backend": self.__class__.__name__,
"status": "operational"
}
async def get_all_tags(self) -> List[str]:
"""Get all unique tags in the storage. Override for specific implementations."""
return []
async def get_recent_memories(self, n: int = 10) -> List[Memory]:
"""Get n most recent memories. Override for specific implementations."""
return []
async def recall_memory(self, query: str, n_results: int = 5) -> List[Memory]:
"""Recall memories based on natural language time expression. Override for specific implementations."""
# Default implementation just uses regular search
results = await self.retrieve(query, n_results)
return [r.memory for r in results]
async def search(self, query: str, n_results: int = 5) -> List[MemoryQueryResult]:
"""Search memories. Default implementation uses retrieve."""
return await self.retrieve(query, n_results)
async def get_all_memories(self, limit: int = None, offset: int = 0, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> List[Memory]:
"""
Get all memories in storage ordered by creation time (newest first).
Args:
limit: Maximum number of memories to return (None for all)
offset: Number of memories to skip (for pagination)
memory_type: Optional filter by memory type
tags: Optional filter by tags (matches ANY of the provided tags)
Returns:
List of Memory objects ordered by created_at DESC, optionally filtered by type and tags
"""
return []
async def count_all_memories(self, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> int:
"""
Get total count of memories in storage.
Args:
memory_type: Optional filter by memory type
tags: Optional filter by tags (memories matching ANY of the tags)
Returns:
Total number of memories, optionally filtered by type and/or tags
"""
return 0
async def count_memories_by_tag(self, tags: List[str]) -> int:
"""
Count memories that match any of the given tags.
Args:
tags: List of tags to search for
Returns:
Number of memories matching any tag
"""
# Default implementation: search then count
memories = await self.search_by_tag(tags)
return len(memories)
async def get_memories_by_time_range(self, start_time: float, end_time: float) -> List[Memory]:
"""Get memories within a time range. Override for specific implementations."""
return []
async def get_memory_connections(self) -> Dict[str, int]:
"""Get memory connection statistics. Override for specific implementations."""
return {}
async def get_access_patterns(self) -> Dict[str, datetime]:
"""Get memory access pattern statistics. Override for specific implementations."""
return {}
async def get_memory_timestamps(self, days: Optional[int] = None) -> List[float]:
"""
Get memory creation timestamps only, without loading full memory objects.
This is an optimized method for analytics that only needs timestamps,
avoiding the overhead of loading full memory content and embeddings.
Args:
days: Optional filter to only get memories from last N days
Returns:
List of Unix timestamps (float) in descending order (newest first)
"""
# Default implementation falls back to get_recent_memories
# Concrete backends should override with optimized SQL queries
n = 5000 if days is None else days * 100 # Rough estimate
memories = await self.get_recent_memories(n=n)
timestamps = [m.created_at for m in memories if m.created_at]
# Filter by days if specified
if days is not None:
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
cutoff_timestamp = cutoff.timestamp()
timestamps = [ts for ts in timestamps if ts >= cutoff_timestamp]
return sorted(timestamps, reverse=True)
```
--------------------------------------------------------------------------------
/docs/architecture.md:
--------------------------------------------------------------------------------
```markdown
# MCP Memory Service Architecture
## Overview
MCP Memory Service is a Model Context Protocol server that provides semantic memory and persistent storage capabilities for AI assistants. It enables long-term memory storage with semantic search, time-based recall, and tag-based organization across conversations.
## System Architecture
```mermaid
graph TB
subgraph "Client Layer"
CC[Claude Desktop]
LMS[LM Studio]
VSC[VS Code MCP]
GEN[Generic MCP Client]
end
subgraph "Protocol Layer"
MCP[MCP Server Protocol]
HTTP[HTTP API Server]
WEB[Web Dashboard]
end
subgraph "Core Services"
SRV[Memory Service Core]
AUTH[Authentication]
CACHE[Model Cache]
EMB[Embedding Service]
end
subgraph "Storage Abstraction"
ABS[Storage Interface]
HYBRID[Hybrid Backend ⭐]
CLOUDFLARE[Cloudflare Backend]
SQLITE[SQLite-vec Backend]
REMOTE[HTTP Client Backend]
CHROMA[ChromaDB ⚠️ DEPRECATED]
end
subgraph "Infrastructure"
DB[(Vector Database)]
FS[(File System)]
MDNS[mDNS Discovery]
end
CC --> MCP
LMS --> MCP
VSC --> MCP
GEN --> MCP
MCP --> SRV
HTTP --> SRV
WEB --> HTTP
SRV --> AUTH
SRV --> CACHE
SRV --> EMB
SRV --> ABS
ABS --> HYBRID
ABS --> CLOUDFLARE
ABS --> SQLITE
ABS --> REMOTE
ABS --> CHROMA
HYBRID --> SQLITE
HYBRID --> CLOUDFLARE
CLOUDFLARE --> DB
SQLITE --> DB
REMOTE --> HTTP
CHROMA --> DB
DB --> FS
SRV --> MDNS
```
## Core Components
### 1. Server Layer (`src/mcp_memory_service/server.py`)
The main server implementation that handles MCP protocol communication:
- **Protocol Handler**: Implements the MCP protocol specification
- **Request Router**: Routes incoming requests to appropriate handlers
- **Response Builder**: Constructs protocol-compliant responses
- **Client Detection**: Identifies and adapts to different MCP clients (Claude Desktop, LM Studio, etc.)
- **Logging System**: Client-aware logging with JSON compliance for Claude Desktop
Key responsibilities:
- Async request handling with proper error boundaries
- Global model and embedding cache management
- Lazy initialization of storage backends
- Tool registration and invocation
### 2. Storage Abstraction Layer (`src/mcp_memory_service/storage/`)
Abstract interface that allows multiple storage backend implementations:
#### Base Interface (`storage/base.py`)
```python
class MemoryStorage(ABC):
async def initialize(self) -> None:
"""Initialize the storage backend."""
pass
async def store(self, memory: Memory) -> Tuple[bool, str]:
"""Store a memory object."""
pass
async def retrieve(self, query: str, n_results: int) -> List[MemoryQueryResult]:
"""Retrieve memories based on semantic similarity."""
pass
async def search_by_tag(self, tags: List[str]) -> List[Memory]:
"""Search memories by tags."""
pass
async def delete(self, content_hash: str) -> Tuple[bool, str]:
"""Delete a memory by content hash."""
pass
async def recall_memory(self, query: str, n_results: int) -> List[Memory]:
"""Recall memories using natural language time queries."""
pass
```
#### Hybrid Backend (`storage/hybrid.py`) ⭐ **RECOMMENDED**
- **Production default** - Best performance with cloud synchronization
- **Primary storage**: SQLite-vec for ultra-fast local reads (~5ms)
- **Secondary storage**: Cloudflare for multi-device persistence and cloud backup
- **Background sync**: Zero user-facing latency with async operation queue
- **Graceful degradation**: Works offline, automatically syncs when cloud available
- **Capacity monitoring**: Tracks Cloudflare limits and provides warnings
- **Use cases**: Production deployments, multi-device users, cloud-backed local performance
#### Cloudflare Backend (`storage/cloudflare.py`)
- Cloud-native storage using Cloudflare D1 (SQL) + Vectorize (vectors)
- Global edge distribution for low-latency access worldwide
- Serverless architecture with no infrastructure management
- Automatic scaling and high availability
- **Limits**: 10GB D1 database, 5M vectors in Vectorize
- **Use cases**: Cloud-only deployments, serverless environments, no local storage
#### SQLite-vec Backend (`storage/sqlite_vec.py`)
- Lightweight, fast local storage (5ms read latency)
- Native SQLite with vec0 extension for vector similarity
- ONNX Runtime embeddings (no PyTorch dependency)
- Minimal memory footprint and dependencies
- **Use cases**: Development, single-device deployments, or as primary in Hybrid backend
#### HTTP Client Backend (`storage/http_client.py`)
- Remote storage via HTTP API for distributed architectures
- Enables client-server deployments with centralized memory
- Bearer token authentication with API key support
- Automatic retry logic with exponential backoff
- **Use cases**: Multi-client shared memory, remote MCP servers, load balancing
#### ChromaDB Backend (`storage/chroma.py`) ⚠️ **DEPRECATED**
- **Status**: Deprecated since v5.x, removal planned for v6.0.0
- **Migration path**: Switch to Hybrid backend for production
- Original vector database backend with sentence transformer embeddings
- Heavy dependencies (PyTorch, sentence-transformers, ~2GB download)
- Slower performance (15ms vs 5ms for SQLite-vec)
- Higher memory footprint and complexity
- **Why deprecated**: Hybrid backend provides better performance with cloud sync
- **Historical only**: Not recommended for new deployments
### 3. Models Layer (`src/mcp_memory_service/models/`)
Data structures and validation:
```python
@dataclass
class Memory:
id: str
content: str
content_hash: str
memory_type: str
tags: List[str]
metadata: MemoryMetadata
created_at: datetime
updated_at: datetime
@dataclass
class MemoryMetadata:
source: Optional[str]
client_id: Optional[str]
session_id: Optional[str]
parent_memory_id: Optional[str]
child_memory_ids: List[str]
```
### 4. Web Interface (`src/mcp_memory_service/web/`)
Modern web dashboard for memory management:
- **Frontend**: Responsive React-based UI
- **API Routes**: RESTful endpoints for memory operations
- **WebSocket Support**: Real-time updates
- **Authentication**: API key-based authentication
- **Health Monitoring**: System status and metrics
### 5. Configuration Management (`src/mcp_memory_service/config.py`)
Environment-based configuration with sensible defaults:
- Storage backend selection
- Model selection and caching
- Platform-specific optimizations
- Hardware acceleration detection (CUDA, MPS, DirectML, ROCm)
- Network configuration (HTTP, HTTPS, mDNS)
## Key Design Patterns
### Async/Await Pattern
All I/O operations use Python's async/await for non-blocking execution:
```python
async def store_memory(self, content: str) -> Memory:
embedding = await self._generate_embedding(content)
memory = await self.storage.store(content, embedding)
return memory
```
### Lazy Initialization
Resources are initialized only when first needed:
```python
async def _ensure_storage_initialized(self):
if self.storage is None:
self.storage = await create_storage_backend()
return self.storage
```
### Global Caching Strategy
Model and embedding caches are shared globally to reduce memory usage:
```python
_MODEL_CACHE = {}
_EMBEDDING_CACHE = LRUCache(maxsize=1000)
```
### Platform Detection and Optimization
Automatic detection and optimization for different platforms:
- **macOS**: MPS acceleration for Apple Silicon
- **Windows**: CUDA or DirectML
- **Linux**: CUDA, ROCm, or CPU
- **Fallback**: ONNX Runtime for compatibility
## MCP Protocol Operations
### Core Memory Operations
| Operation | Description | Parameters |
|-----------|-------------|------------|
| `store_memory` | Store new memory with tags | content, tags, metadata |
| `retrieve_memory` | Semantic search | query, n_results |
| `recall_memory` | Time-based retrieval | time_expression, n_results |
| `search_by_tag` | Tag-based search | tags[] |
| `delete_memory` | Delete by hash | content_hash |
| `delete_by_tags` | Bulk deletion | tags[] |
### Utility Operations
| Operation | Description | Parameters |
|-----------|-------------|------------|
| `check_database_health` | Health status | - |
| `optimize_db` | Database optimization | - |
| `export_memories` | Export to JSON | output_path |
| `import_memories` | Import from JSON | input_path |
| `get_memory_stats` | Usage statistics | - |
### Debug Operations
| Operation | Description | Parameters |
|-----------|-------------|------------|
| `debug_retrieve` | Detailed similarity scores | query, n_results |
| `exact_match_retrieve` | Exact content matching | query |
## Data Flow
### Memory Storage Flow
```
1. Client sends store_memory request
2. Server validates and enriches metadata
3. Content is hashed for deduplication
4. Text is embedded using sentence transformers
5. Memory is stored in vector database
6. Confirmation returned to client
```
### Memory Retrieval Flow
```
1. Client sends retrieve_memory request
2. Query is embedded to vector representation
3. Vector similarity search performed
4. Results ranked by similarity score
5. Metadata enriched results returned
```
### Time-Based Recall Flow
```
1. Client sends recall_memory with time expression
2. Time parser extracts temporal boundaries
3. Semantic query combined with time filter
4. Filtered results returned chronologically
```
## Performance Optimizations
### Model Caching
- Sentence transformer models cached globally
- Single model instance shared across requests
- Lazy loading on first use
### Embedding Cache
- LRU cache for frequently used embeddings
- Configurable cache size
- Cache hit tracking for optimization
### Query Optimization
- Batch processing for multiple operations
- Connection pooling for database access
- Async I/O for non-blocking operations
### Platform-Specific Optimizations
- Hardware acceleration auto-detection
- Optimized tensor operations per platform
- Fallback strategies for compatibility
## Security Considerations
### Authentication
- API key-based authentication for HTTP endpoints
- Bearer token support
- Per-client authentication in multi-client mode
### Data Privacy
- Content hashing for deduplication
- Optional encryption at rest
- Client isolation in shared deployments
### Network Security
- HTTPS support with SSL/TLS
- CORS configuration for web access
- Rate limiting for API endpoints
## Deployment Architectures
### Production (Hybrid Backend) ⭐ **RECOMMENDED**
- **Local performance**: SQLite-vec for 5ms read latency
- **Cloud persistence**: Cloudflare for multi-device sync and backup
- **Background sync**: Zero user-facing latency, async operation queue
- **Offline capability**: Full functionality without internet, syncs when available
- **Multi-device**: Access same memories across desktop, laptop, mobile
- **Use cases**: Individual users, teams with personal instances, production deployments
- **Setup**: `install.py --storage-backend hybrid` or set `MCP_MEMORY_STORAGE_BACKEND=hybrid`
### Cloud-Only (Cloudflare Backend)
- **Serverless deployment**: No local storage, pure cloud architecture
- **Global edge**: Cloudflare's worldwide network for low latency
- **Automatic scaling**: Handles traffic spikes without configuration
- **Use cases**: Serverless environments, ephemeral containers, CI/CD systems
- **Limits**: 10GB D1 database, 5M vectors in Vectorize
- **Setup**: `install.py --storage-backend cloudflare` or set `MCP_MEMORY_STORAGE_BACKEND=cloudflare`
### Development (SQLite-vec Backend)
- **Lightweight**: Minimal dependencies, fast startup
- **Local-only**: No cloud connectivity required
- **Fast iteration**: 5ms read latency, no sync overhead
- **Use cases**: Development, testing, single-device prototypes
- **Setup**: `install.py --storage-backend sqlite_vec` or set `MCP_MEMORY_STORAGE_BACKEND=sqlite_vec`
### Multi-Client Shared (HTTP Server)
- **Centralized HTTP server** with shared memory pool
- **Multiple clients** connect via API (Claude Desktop, VS Code, custom apps)
- **Authentication**: API key-based access control
- **Use cases**: Team collaboration, shared organizational memory
- **Setup**: Enable HTTP server with `MCP_HTTP_ENABLED=true`, clients use HTTP Client backend
### Legacy (ChromaDB Backend) ⚠️ **NOT RECOMMENDED**
- **Deprecated**: Removal planned for v6.0.0
- **Migration required**: Switch to Hybrid backend
- Heavy dependencies, slower performance (15ms vs 5ms)
- Only for existing deployments with migration path to Hybrid
## Extension Points
### Custom Storage Backends
Implement the `MemoryStorage` abstract base class:
```python
class CustomStorage(MemoryStorage):
async def store(self, memory: Memory) -> Tuple[bool, str]:
# Custom implementation
```
### Custom Embedding Models
Replace the default sentence transformer:
```python
EMBEDDING_MODEL = "your-model/name"
```
### Protocol Extensions
Add new operations via tool registration:
```python
types.Tool(
name="custom_operation",
description="Custom memory operation",
inputSchema={
"type": "object",
"properties": {
"param1": {
"type": "string",
"description": "First parameter"
},
"param2": {
"type": "integer",
"description": "Second parameter",
"default": 0
}
},
"required": ["param1"],
"additionalProperties": false
}
)
```
## Future Enhancements
### Planned Features (See Issue #91)
- **WFGY Semantic Firewall** - Enhanced memory reliability with 16 failure mode detection/recovery
- **Ontology Foundation Layer** (Phase 0) - Controlled vocabulary, taxonomy, knowledge graph
- Automatic memory consolidation
- Semantic clustering
- Memory importance scoring
- Cross-conversation threading
### Under Consideration
- **Agentic RAG** for intelligent retrieval (see Discussion #86)
- **Graph-based memory relationships** (ontology pipeline integration)
- Memory compression strategies
- Federated learning from memories
- Real-time collaboration features
- Advanced visualization tools
## References
- [MCP Protocol Specification](https://modelcontextprotocol.io/docs)
- [ChromaDB Documentation](https://docs.trychroma.com/)
- [SQLite Vec Extension](https://github.com/asg017/sqlite-vec)
- [Sentence Transformers](https://www.sbert.net/)
```
--------------------------------------------------------------------------------
/scripts/maintenance/find_duplicates.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Find and remove duplicate memories from the database.
Duplicates can occur when:
1. Same content was ingested multiple times
2. Re-ingestion after encoding fixes created duplicates
3. Manual storage of similar content
"""
import sqlite3
import json
import sys
import hashlib
import urllib.request
import urllib.parse
import ssl
from pathlib import Path
from collections import defaultdict
from datetime import datetime
def load_config():
"""Load configuration from Claude hooks config file."""
config_path = Path.home() / '.claude' / 'hooks' / 'config.json'
if config_path.exists():
with open(config_path) as f:
return json.load(f)
return None
def get_memories_from_api(endpoint, api_key):
"""Retrieve all memories from the API endpoint using pagination."""
try:
# Create SSL context that allows self-signed certificates
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
all_memories = []
page = 1
page_size = 100 # Use reasonable page size
while True:
# Create request for current page
url = f"{endpoint}/api/memories?page={page}&page_size={page_size}"
req = urllib.request.Request(url)
req.add_header('Authorization', f'Bearer {api_key}')
# Make request
with urllib.request.urlopen(req, context=ssl_context, timeout=30) as response:
if response.status != 200:
print(f"❌ API request failed: {response.status}")
return []
data = response.read().decode('utf-8')
api_response = json.loads(data)
# Extract memories from this page
page_memories = api_response.get('memories', [])
total = api_response.get('total', 0)
has_more = api_response.get('has_more', False)
all_memories.extend(page_memories)
print(f"Retrieved page {page}: {len(page_memories)} memories (total so far: {len(all_memories)}/{total})")
if not has_more:
break
page += 1
print(f"✅ Retrieved all {len(all_memories)} memories from API")
# Convert API format to internal format
converted_memories = []
for mem in all_memories:
converted_memories.append((
mem.get('content_hash', ''),
mem.get('content', ''),
json.dumps(mem.get('tags', [])),
mem.get('created_at', ''),
json.dumps(mem.get('metadata', {}))
))
return converted_memories
except Exception as e:
print(f"❌ Error retrieving memories from API: {e}")
return []
def content_similarity_hash(content):
"""Create a hash for content similarity detection."""
# Normalize content for comparison
normalized = content.strip().lower()
# Remove extra whitespace
normalized = ' '.join(normalized.split())
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()[:16]
def find_duplicates(memories_source, similarity_threshold=0.95):
"""
Find duplicate memories from either database or API.
Args:
memories_source: Either a database path (str) or list of memories from API
similarity_threshold: Threshold for considering memories duplicates (0.0-1.0)
Returns:
Dict of duplicate groups
"""
if isinstance(memories_source, str):
# Database path provided
conn = sqlite3.connect(memories_source)
cursor = conn.cursor()
print("Scanning for duplicate memories...")
# Get all memories
cursor.execute("""
SELECT content_hash, content, tags, created_at, metadata
FROM memories
ORDER BY created_at DESC
""")
all_memories = cursor.fetchall()
conn.close()
else:
# API memories provided
print("Analyzing memories from API...")
all_memories = memories_source
print(f"Found {len(all_memories)} total memories")
# Group by content similarity
content_groups = defaultdict(list)
exact_content_groups = defaultdict(list)
for memory in all_memories:
content_hash, content, tags_json, created_at, metadata_json = memory
# Parse tags and metadata
try:
tags = json.loads(tags_json) if tags_json else []
except:
tags = []
try:
metadata = json.loads(metadata_json) if metadata_json else {}
except:
metadata = {}
# Exact content match
exact_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()
exact_content_groups[exact_hash].append({
'hash': content_hash,
'content': content,
'tags': tags,
'created_at': created_at,
'metadata': metadata,
'content_length': len(content)
})
# Similar content match (normalized)
similarity_hash = content_similarity_hash(content)
content_groups[similarity_hash].append({
'hash': content_hash,
'content': content,
'tags': tags,
'created_at': created_at,
'metadata': metadata,
'content_length': len(content)
})
# Find actual duplicates (groups with > 1 memory)
exact_duplicates = {k: v for k, v in exact_content_groups.items() if len(v) > 1}
similar_duplicates = {k: v for k, v in content_groups.items() if len(v) > 1}
return {
'exact': exact_duplicates,
'similar': similar_duplicates,
'total_memories': len(all_memories)
}
def analyze_duplicate_group(group):
"""Analyze a group of duplicate memories to determine which to keep."""
if len(group) <= 1:
return None
# Sort by creation date (newest first)
sorted_group = sorted(group, key=lambda x: x['created_at'], reverse=True)
analysis = {
'group_size': len(group),
'recommended_keep': None,
'recommended_delete': [],
'reasons': []
}
# Prefer memories with utf8-fixed tag (these are the corrected versions)
utf8_fixed = [m for m in sorted_group if 'utf8-fixed' in m['tags']]
if utf8_fixed:
analysis['recommended_keep'] = utf8_fixed[0]
analysis['recommended_delete'] = [m for m in sorted_group if m != utf8_fixed[0]]
analysis['reasons'].append('Keeping UTF8-fixed version')
return analysis
# Prefer newer memories
analysis['recommended_keep'] = sorted_group[0] # Newest
analysis['recommended_delete'] = sorted_group[1:] # Older ones
analysis['reasons'].append('Keeping newest version')
# Check for different tag sets
keep_tags = set(analysis['recommended_keep']['tags'])
for delete_mem in analysis['recommended_delete']:
delete_tags = set(delete_mem['tags'])
if delete_tags != keep_tags:
analysis['reasons'].append(f'Tag differences: {delete_tags - keep_tags}')
return analysis
def remove_duplicates(db_path, duplicate_groups, dry_run=True):
"""
Remove duplicate memories from the database.
Args:
db_path: Path to the SQLite database
duplicate_groups: Dict of duplicate groups from find_duplicates()
dry_run: If True, only show what would be deleted
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
total_to_delete = 0
deletion_plan = []
print(f"\n{'DRY RUN - ' if dry_run else ''}Analyzing duplicate groups...")
# Process exact duplicates first
print(f"\n=== EXACT DUPLICATES ===")
for content_hash, group in duplicate_groups['exact'].items():
analysis = analyze_duplicate_group(group)
if analysis:
total_to_delete += len(analysis['recommended_delete'])
deletion_plan.extend(analysis['recommended_delete'])
print(f"\nDuplicate group: {len(group)} memories")
print(f" Keep: {analysis['recommended_keep']['hash'][:20]}... ({analysis['recommended_keep']['created_at']})")
print(f" Tags: {', '.join(analysis['recommended_keep']['tags'][:3])}")
print(f" Delete: {len(analysis['recommended_delete'])} older versions")
for reason in analysis['reasons']:
print(f" Reason: {reason}")
# Process similar duplicates (but not exact)
print(f"\n=== SIMILAR DUPLICATES ===")
processed_exact_hashes = set()
for group in duplicate_groups['exact'].values():
for mem in group:
processed_exact_hashes.add(mem['hash'])
for similarity_hash, group in duplicate_groups['similar'].items():
# Skip if these are exact duplicates we already processed
group_hashes = {mem['hash'] for mem in group}
if group_hashes.issubset(processed_exact_hashes):
continue
analysis = analyze_duplicate_group(group)
if analysis:
print(f"\nSimilar group: {len(group)} memories")
print(f" Keep: {analysis['recommended_keep']['hash'][:20]}... ({analysis['recommended_keep']['created_at']})")
print(f" Content preview: {analysis['recommended_keep']['content'][:100]}...")
print(f" Would delete: {len(analysis['recommended_delete'])} similar versions")
# Don't auto-delete similar (only exact) in this version
print(f"\n{'DRY RUN SUMMARY' if dry_run else 'DELETION SUMMARY'}:")
print(f"Total exact duplicates to delete: {total_to_delete}")
print(f"Current total memories: {duplicate_groups['total_memories']}")
print(f"After cleanup: {duplicate_groups['total_memories'] - total_to_delete}")
if not dry_run and total_to_delete > 0:
print(f"\n{'='*50}")
print("DELETING DUPLICATE MEMORIES...")
deleted_count = 0
for mem_to_delete in deletion_plan:
try:
# Delete from memories table
cursor.execute("DELETE FROM memories WHERE content_hash = ?", (mem_to_delete['hash'],))
# Also try to delete from embeddings if it exists
try:
cursor.execute("DELETE FROM memory_embeddings WHERE rowid = ?", (mem_to_delete['hash'],))
except:
pass # Embeddings table might use different structure
deleted_count += 1
if deleted_count % 10 == 0:
print(f" Deleted {deleted_count}/{total_to_delete}...")
except Exception as e:
print(f" Error deleting {mem_to_delete['hash'][:20]}: {e}")
conn.commit()
print(f"✅ Successfully deleted {deleted_count} duplicate memories")
# Verify final count
cursor.execute("SELECT COUNT(*) FROM memories")
final_count = cursor.fetchone()[0]
print(f"📊 Final memory count: {final_count}")
elif dry_run and total_to_delete > 0:
print(f"\nTo actually delete these {total_to_delete} duplicates, run with --execute flag")
conn.close()
return total_to_delete
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(description='Find and remove duplicate memories')
parser.add_argument('--db-path', type=str,
help='Path to SQLite database (if not using API)')
parser.add_argument('--use-api', action='store_true',
help='Use API endpoint from config instead of database')
parser.add_argument('--execute', action='store_true',
help='Actually delete the duplicates (default is dry run)')
parser.add_argument('--similarity-threshold', type=float, default=0.95,
help='Similarity threshold for duplicate detection (0.0-1.0)')
args = parser.parse_args()
# Try to load config first
config = load_config()
if args.use_api or (not args.db_path and config):
if not config:
print("❌ No configuration found. Use --db-path for local database or ensure config exists.")
sys.exit(1)
endpoint = config.get('memoryService', {}).get('endpoint')
api_key = config.get('memoryService', {}).get('apiKey')
if not endpoint or not api_key:
print("❌ API endpoint or key not found in configuration")
sys.exit(1)
print(f"🌐 Using API endpoint: {endpoint}")
# Get memories from API
memories = get_memories_from_api(endpoint, api_key)
if not memories:
print("❌ Failed to retrieve memories from API")
sys.exit(1)
# Find duplicates
duplicates = find_duplicates(memories, args.similarity_threshold)
if not duplicates['exact'] and not duplicates['similar']:
print("✅ No duplicates found!")
return
print(f"\nFound:")
print(f" - {len(duplicates['exact'])} exact duplicate groups")
print(f" - {len(duplicates['similar'])} similar content groups")
if args.execute:
print("⚠️ API-based deletion not yet implemented. Use database path for deletion.")
else:
# Show analysis only
remove_duplicates(None, duplicates, dry_run=True)
else:
# Use database path
db_path = args.db_path or '/home/hkr/.local/share/mcp-memory/sqlite_vec.db'
if not Path(db_path).exists():
print(f"❌ Database not found: {db_path}")
print("💡 Try --use-api to use the API endpoint from config instead")
sys.exit(1)
# Find duplicates
duplicates = find_duplicates(db_path, args.similarity_threshold)
if not duplicates['exact'] and not duplicates['similar']:
print("✅ No duplicates found!")
return
print(f"\nFound:")
print(f" - {len(duplicates['exact'])} exact duplicate groups")
print(f" - {len(duplicates['similar'])} similar content groups")
# Remove duplicates
remove_duplicates(db_path, duplicates, dry_run=not args.execute)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/utils/system_detection.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
System detection utilities for hardware compatibility.
Provides functions to detect hardware architecture, available accelerators,
and determine optimal configurations for different environments.
"""
import os
import sys
import platform
import logging
import subprocess
from typing import Dict, Any, Tuple, Optional, List
import json
logger = logging.getLogger(__name__)
# Hardware acceleration types
class AcceleratorType:
NONE = "none"
CUDA = "cuda"
MPS = "mps" # Apple Metal Performance Shaders
CPU = "cpu"
DIRECTML = "directml" # DirectML for Windows
ROCm = "rocm" # AMD ROCm
class Architecture:
X86_64 = "x86_64"
ARM64 = "arm64"
UNKNOWN = "unknown"
class SystemInfo:
"""Class to store and provide system information."""
def __init__(self):
self.os_name = platform.system().lower()
self.os_version = platform.version()
self.architecture = self._detect_architecture()
self.python_version = platform.python_version()
self.cpu_count = os.cpu_count() or 1
self.memory_gb = self._get_system_memory()
self.accelerator = self._detect_accelerator()
self.is_rosetta = self._detect_rosetta()
self.is_virtual_env = sys.prefix != sys.base_prefix
def _detect_architecture(self) -> str:
"""Detect the system architecture."""
arch = platform.machine().lower()
if arch in ("x86_64", "amd64", "x64"):
return Architecture.X86_64
elif arch in ("arm64", "aarch64"):
return Architecture.ARM64
else:
return Architecture.UNKNOWN
def _get_system_memory(self) -> float:
"""Get the total system memory in GB."""
try:
if self.os_name == "linux":
with open('/proc/meminfo', 'r') as f:
for line in f:
if line.startswith('MemTotal:'):
# Extract the memory value (in kB)
memory_kb = int(line.split()[1])
return round(memory_kb / (1024 * 1024), 2) # Convert to GB
elif self.os_name == "darwin": # macOS
output = subprocess.check_output(['sysctl', '-n', 'hw.memsize']).decode('utf-8').strip()
memory_bytes = int(output)
return round(memory_bytes / (1024**3), 2) # Convert to GB
elif self.os_name == "windows":
import ctypes
kernel32 = ctypes.windll.kernel32
c_ulonglong = ctypes.c_ulonglong
class MEMORYSTATUSEX(ctypes.Structure):
_fields_ = [
('dwLength', ctypes.c_ulong),
('dwMemoryLoad', ctypes.c_ulong),
('ullTotalPhys', c_ulonglong),
('ullAvailPhys', c_ulonglong),
('ullTotalPageFile', c_ulonglong),
('ullAvailPageFile', c_ulonglong),
('ullTotalVirtual', c_ulonglong),
('ullAvailVirtual', c_ulonglong),
('ullAvailExtendedVirtual', c_ulonglong),
]
memoryStatus = MEMORYSTATUSEX()
memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUSEX)
kernel32.GlobalMemoryStatusEx(ctypes.byref(memoryStatus))
return round(memoryStatus.ullTotalPhys / (1024**3), 2) # Convert to GB
except Exception as e:
logger.warning(f"Failed to get system memory: {e}")
# Default fallback
return 4.0 # Assume 4GB as a conservative default
def _detect_accelerator(self) -> str:
"""Detect available hardware acceleration."""
# Try to detect CUDA
if self._check_cuda_available():
return AcceleratorType.CUDA
# Check for Apple MPS (Metal Performance Shaders)
if self.os_name == "darwin" and self.architecture == Architecture.ARM64:
if self._check_mps_available():
return AcceleratorType.MPS
# Check for ROCm on Linux
if self.os_name == "linux" and self._check_rocm_available():
return AcceleratorType.ROCm
# Check for DirectML on Windows
if self.os_name == "windows" and self._check_directml_available():
return AcceleratorType.DIRECTML
# Default to CPU
return AcceleratorType.CPU
def _check_cuda_available(self) -> bool:
"""Check if CUDA is available."""
try:
# Try to import torch and check for CUDA
import torch
# Check if torch is properly installed with CUDA support
if hasattr(torch, 'cuda'):
return torch.cuda.is_available()
else:
logger.warning("PyTorch installed but appears broken (no cuda attribute)")
return False
except (ImportError, AttributeError) as e:
logger.debug(f"CUDA check failed: {e}")
# If torch is not installed or broken, try to check for CUDA using environment
return 'CUDA_HOME' in os.environ or 'CUDA_PATH' in os.environ
def _check_mps_available(self) -> bool:
"""Check if Apple MPS is available."""
try:
import torch
if hasattr(torch, 'backends') and hasattr(torch.backends, 'mps'):
return torch.backends.mps.is_available()
else:
logger.warning("PyTorch installed but appears broken (no backends attribute)")
return False
except (ImportError, AttributeError) as e:
logger.debug(f"MPS check failed: {e}")
# Check for Metal support using system profiler
try:
output = subprocess.check_output(
['system_profiler', 'SPDisplaysDataType'],
stderr=subprocess.DEVNULL
).decode('utf-8')
return 'Metal' in output
except (subprocess.SubprocessError, FileNotFoundError):
return False
def _check_rocm_available(self) -> bool:
"""Check if AMD ROCm is available."""
try:
# Check for ROCm environment
if 'ROCM_HOME' in os.environ or 'ROCM_PATH' in os.environ:
return True
# Check if ROCm libraries are installed
try:
output = subprocess.check_output(
['rocminfo'],
stderr=subprocess.DEVNULL
).decode('utf-8')
return 'GPU Agent' in output
except (subprocess.SubprocessError, FileNotFoundError):
return False
except Exception:
return False
def _check_directml_available(self) -> bool:
"""Check if DirectML is available on Windows."""
try:
# Check if DirectML package is installed
import pkg_resources
pkg_resources.get_distribution('torch-directml')
return True
except ImportError:
# pkg_resources not available
return False
except Exception:
# Any other error (including DistributionNotFound)
return False
def _detect_rosetta(self) -> bool:
"""Detect if running under Rosetta 2 on Apple Silicon."""
if self.os_name != "darwin" or self.architecture != Architecture.ARM64:
return False
try:
# Check for Rosetta by examining the process
output = subprocess.check_output(
['sysctl', '-n', 'sysctl.proc_translated'],
stderr=subprocess.DEVNULL
).decode('utf-8').strip()
return output == '1'
except (subprocess.SubprocessError, FileNotFoundError):
return False
def get_optimal_batch_size(self) -> int:
"""Determine optimal batch size based on hardware."""
# Start with a base batch size
if self.accelerator == AcceleratorType.CUDA:
# Scale based on available GPU memory (rough estimate)
try:
import torch
gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3) # GB
if gpu_memory > 10:
return 32
elif gpu_memory > 6:
return 16
else:
return 8
except:
return 8 # Default for CUDA
elif self.accelerator == AcceleratorType.MPS:
return 8 # Conservative for Apple Silicon
elif self.memory_gb > 16:
return 8 # Larger batch for systems with more RAM
elif self.memory_gb > 8:
return 4
else:
return 2 # Conservative for low-memory systems
def get_optimal_model(self) -> str:
"""Determine the optimal embedding model based on hardware capabilities."""
# Default model
default_model = 'all-MiniLM-L6-v2'
# For very constrained environments, use an even smaller model
if self.memory_gb < 4:
return 'paraphrase-MiniLM-L3-v2'
# For high-performance environments, consider a larger model
if (self.accelerator in [AcceleratorType.CUDA, AcceleratorType.MPS] and
self.memory_gb > 8):
return 'all-mpnet-base-v2' # Better quality but more resource intensive
return default_model
def get_optimal_thread_count(self) -> int:
"""Determine optimal thread count for parallel operations."""
# Use 75% of available cores, but at least 1
return max(1, int(self.cpu_count * 0.75))
def to_dict(self) -> Dict[str, Any]:
"""Convert system info to dictionary."""
return {
"os": self.os_name,
"os_version": self.os_version,
"architecture": self.architecture,
"python_version": self.python_version,
"cpu_count": self.cpu_count,
"memory_gb": self.memory_gb,
"accelerator": self.accelerator,
"is_rosetta": self.is_rosetta,
"is_virtual_env": self.is_virtual_env,
"optimal_batch_size": self.get_optimal_batch_size(),
"optimal_model": self.get_optimal_model(),
"optimal_thread_count": self.get_optimal_thread_count()
}
def __str__(self) -> str:
"""String representation of system info."""
return json.dumps(self.to_dict(), indent=2)
def get_system_info() -> SystemInfo:
"""Get system information singleton."""
if not hasattr(get_system_info, 'instance'):
get_system_info.instance = SystemInfo()
return get_system_info.instance
def get_torch_device() -> str:
"""Get the optimal PyTorch device based on system capabilities."""
system_info = get_system_info()
try:
import torch
if system_info.accelerator == AcceleratorType.CUDA and torch.cuda.is_available():
return "cuda"
elif (system_info.accelerator == AcceleratorType.MPS and
hasattr(torch.backends, 'mps') and
torch.backends.mps.is_available()):
return "mps"
else:
return "cpu"
except ImportError:
return "cpu"
def get_optimal_embedding_settings() -> Dict[str, Any]:
"""Get optimal settings for embedding operations."""
system_info = get_system_info()
return {
"model_name": system_info.get_optimal_model(),
"batch_size": system_info.get_optimal_batch_size(),
"device": get_torch_device(),
"threads": system_info.get_optimal_thread_count()
}
def print_system_diagnostics(client_type: str = 'lm_studio'):
"""Print detailed system diagnostics for troubleshooting, conditionally based on client."""
# Only print for LM Studio to avoid JSON parsing errors in Claude Desktop
if client_type != 'lm_studio':
return
system_info = get_system_info()
print("\n=== System Diagnostics ===")
print(f"OS: {system_info.os_name} {system_info.os_version}")
print(f"Architecture: {system_info.architecture}")
print(f"Python: {system_info.python_version}")
print(f"CPU Cores: {system_info.cpu_count}")
print(f"Memory: {system_info.memory_gb:.2f} GB")
print(f"Accelerator: {system_info.accelerator}")
if system_info.is_rosetta:
print("⚠️ Running under Rosetta 2 translation")
print("\n=== Optimal Settings ===")
print(f"Embedding Model: {system_info.get_optimal_model()}")
print(f"Batch Size: {system_info.get_optimal_batch_size()}")
print(f"Thread Count: {system_info.get_optimal_thread_count()}")
print(f"PyTorch Device: {get_torch_device()}")
# Additional PyTorch diagnostics if available
try:
import torch
print("\n=== PyTorch Diagnostics ===")
print(f"PyTorch Version: {torch.__version__}")
print(f"CUDA Available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA Version: {torch.version.cuda}")
print(f"GPU Device: {torch.cuda.get_device_name(0)}")
print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / (1024**3):.2f} GB")
if hasattr(torch.backends, 'mps'):
print(f"MPS Available: {torch.backends.mps.is_available()}")
except ImportError:
print("\nPyTorch not installed, skipping PyTorch diagnostics")
print("\n=== Environment Variables ===")
for var in ['CUDA_HOME', 'CUDA_PATH', 'ROCM_HOME', 'PYTORCH_ENABLE_MPS_FALLBACK']:
if var in os.environ:
print(f"{var}: {os.environ[var]}")
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/web/oauth/middleware.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OAuth 2.1 authentication middleware for MCP Memory Service.
Provides Bearer token validation with fallback to API key authentication.
"""
import logging
from typing import Optional, Dict, Any
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt, ExpiredSignatureError
from jose.jwt import JWTClaimsError
from ...config import (
OAUTH_ISSUER,
API_KEY,
ALLOW_ANONYMOUS_ACCESS,
OAUTH_ENABLED,
get_jwt_algorithm,
get_jwt_verification_key
)
from .storage import oauth_storage
logger = logging.getLogger(__name__)
# Optional Bearer token security scheme
bearer_scheme = HTTPBearer(auto_error=False)
class AuthenticationResult:
"""Result of authentication attempt."""
def __init__(
self,
authenticated: bool,
client_id: Optional[str] = None,
scope: Optional[str] = None,
auth_method: Optional[str] = None,
error: Optional[str] = None
):
self.authenticated = authenticated
self.client_id = client_id
self.scope = scope
self.auth_method = auth_method # "oauth", "api_key", or "none"
self.error = error
def has_scope(self, required_scope: str) -> bool:
"""Check if the authenticated user has the required scope."""
if not self.authenticated or not self.scope:
return False
# Split scopes and check if required scope is present
scopes = self.scope.split()
return required_scope in scopes
def require_scope(self, required_scope: str) -> None:
"""Raise an exception if the required scope is not present."""
if not self.has_scope(required_scope):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail={
"error": "insufficient_scope",
"error_description": f"Required scope '{required_scope}' not granted"
}
)
def validate_jwt_token(token: str) -> Optional[Dict[str, Any]]:
"""
Validate a JWT access token with comprehensive error handling.
Supports both RS256 and HS256 algorithms based on available keys.
Provides detailed error logging for debugging purposes.
Returns:
JWT payload if valid, None if invalid
"""
# Input validation
if not token or not isinstance(token, str):
logger.debug("Invalid token: empty or non-string token provided")
return None
# Basic token format validation
token = token.strip()
if not token:
logger.debug("Invalid token: empty token after stripping")
return None
# JWT tokens should have 3 parts separated by dots
parts = token.split('.')
if len(parts) != 3:
logger.debug(f"Invalid token format: expected 3 parts, got {len(parts)}")
return None
try:
algorithm = get_jwt_algorithm()
verification_key = get_jwt_verification_key()
logger.debug(f"Validating JWT token with algorithm: {algorithm}")
payload = jwt.decode(
token,
verification_key,
algorithms=[algorithm],
issuer=OAUTH_ISSUER,
audience="mcp-memory-service"
)
# Additional payload validation
required_claims = ['sub', 'iss', 'aud', 'exp', 'iat']
missing_claims = [claim for claim in required_claims if claim not in payload]
if missing_claims:
logger.warning(f"JWT token missing required claims: {missing_claims}")
return None
logger.debug(f"JWT validation successful for subject: {payload.get('sub')}")
return payload
except ExpiredSignatureError:
logger.debug("JWT validation failed: token has expired")
return None
except JWTClaimsError as e:
logger.debug(f"JWT validation failed: invalid claims - {e}")
return None
except ValueError as e:
logger.debug(f"JWT validation failed: configuration error - {e}")
return None
except JWTError as e:
# Catch-all for other JWT-related errors
error_type = type(e).__name__
logger.debug(f"JWT validation failed: {error_type} - {e}")
return None
except Exception as e:
# Unexpected errors should be logged but not crash the system
error_type = type(e).__name__
logger.error(f"Unexpected error during JWT validation: {error_type} - {e}")
return None
async def authenticate_bearer_token(token: str) -> AuthenticationResult:
"""
Authenticate using OAuth Bearer token with comprehensive error handling.
Returns:
AuthenticationResult with authentication status and details
"""
# Input validation
if not token or not isinstance(token, str):
logger.debug("Bearer token authentication failed: invalid token input")
return AuthenticationResult(
authenticated=False,
auth_method="oauth",
error="invalid_token"
)
token = token.strip()
if not token:
logger.debug("Bearer token authentication failed: empty token")
return AuthenticationResult(
authenticated=False,
auth_method="oauth",
error="invalid_token"
)
try:
# First, try JWT validation
jwt_payload = validate_jwt_token(token)
if jwt_payload:
client_id = jwt_payload.get("sub")
scope = jwt_payload.get("scope", "")
# Validate client_id is present
if not client_id:
logger.warning("JWT authentication failed: missing client_id in token payload")
return AuthenticationResult(
authenticated=False,
auth_method="oauth",
error="invalid_token"
)
logger.debug(f"JWT authentication successful: client_id={client_id}, scope={scope}")
return AuthenticationResult(
authenticated=True,
client_id=client_id,
scope=scope,
auth_method="oauth"
)
# Fallback: check if token is stored in OAuth storage
token_data = await oauth_storage.get_access_token(token)
if token_data:
client_id = token_data.get("client_id")
if not client_id:
logger.warning("OAuth storage authentication failed: missing client_id in stored token")
return AuthenticationResult(
authenticated=False,
auth_method="oauth",
error="invalid_token"
)
logger.debug(f"OAuth storage authentication successful: client_id={client_id}")
return AuthenticationResult(
authenticated=True,
client_id=client_id,
scope=token_data.get("scope", ""),
auth_method="oauth"
)
except Exception as e:
# Catch any unexpected errors during authentication
error_type = type(e).__name__
logger.error(f"Unexpected error during bearer token authentication: {error_type} - {e}")
return AuthenticationResult(
authenticated=False,
auth_method="oauth",
error="server_error"
)
logger.debug("Bearer token authentication failed: token not found or invalid")
return AuthenticationResult(
authenticated=False,
auth_method="oauth",
error="invalid_token"
)
def authenticate_api_key(api_key: str) -> AuthenticationResult:
"""
Authenticate using legacy API key with enhanced validation.
Returns:
AuthenticationResult with authentication status
"""
# Input validation
if not api_key or not isinstance(api_key, str):
logger.debug("API key authentication failed: invalid input")
return AuthenticationResult(
authenticated=False,
auth_method="api_key",
error="invalid_api_key"
)
api_key = api_key.strip()
if not api_key:
logger.debug("API key authentication failed: empty key")
return AuthenticationResult(
authenticated=False,
auth_method="api_key",
error="invalid_api_key"
)
# Check if API key is configured
if not API_KEY:
logger.debug("API key authentication failed: no API key configured")
return AuthenticationResult(
authenticated=False,
auth_method="api_key",
error="api_key_not_configured"
)
# Validate API key
if api_key == API_KEY:
logger.debug("API key authentication successful")
return AuthenticationResult(
authenticated=True,
client_id="api_key_client",
scope="read write admin", # API key gets full access
auth_method="api_key"
)
logger.debug("API key authentication failed: key mismatch")
return AuthenticationResult(
authenticated=False,
auth_method="api_key",
error="invalid_api_key"
)
async def get_current_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme)
) -> AuthenticationResult:
"""
Get current authenticated user with fallback authentication methods.
Tries in order:
1. OAuth Bearer token (JWT or stored token) - only if OAuth is enabled
2. Legacy API key authentication
3. Anonymous access (if explicitly enabled)
Returns:
AuthenticationResult with authentication details
"""
# Try OAuth Bearer token authentication first (only if OAuth is enabled)
if credentials and credentials.scheme.lower() == "bearer":
# OAuth Bearer token validation only if OAuth is enabled
if OAUTH_ENABLED:
auth_result = await authenticate_bearer_token(credentials.credentials)
if auth_result.authenticated:
return auth_result
# OAuth token provided but invalid - log the attempt
logger.debug(f"OAuth Bearer token validation failed for enabled OAuth system")
# Try API key authentication as fallback (works regardless of OAuth state)
if API_KEY:
# Some clients might send API key as Bearer token
api_key_result = authenticate_api_key(credentials.credentials)
if api_key_result.authenticated:
return api_key_result
# Determine appropriate error message based on OAuth state
if OAUTH_ENABLED:
error_msg = "The access token provided is expired, revoked, malformed, or invalid"
logger.warning("Invalid Bearer token provided and API key fallback failed")
else:
error_msg = "OAuth is disabled. Use API key authentication or enable anonymous access."
logger.debug("Bearer token provided but OAuth is disabled, API key fallback failed")
# All Bearer token authentication methods failed
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={
"error": "invalid_token",
"error_description": error_msg
},
headers={"WWW-Authenticate": "Bearer"}
)
# Allow anonymous access only if explicitly enabled
if ALLOW_ANONYMOUS_ACCESS:
logger.debug("Anonymous access explicitly enabled, granting read-only access")
return AuthenticationResult(
authenticated=True,
client_id="anonymous",
scope="read", # Anonymous users get read-only access for security
auth_method="none"
)
# No credentials provided and anonymous access not allowed
if API_KEY or OAUTH_ENABLED:
logger.debug("No valid authentication provided")
if OAUTH_ENABLED and API_KEY:
error_msg = "Authorization required. Provide valid OAuth Bearer token or API key."
elif OAUTH_ENABLED:
error_msg = "Authorization required. Provide valid OAuth Bearer token."
else:
error_msg = "Authorization required. Provide valid API key."
else:
logger.debug("No authentication configured and anonymous access disabled")
error_msg = "Authentication is required. Set MCP_ALLOW_ANONYMOUS_ACCESS=true to enable anonymous access."
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail={
"error": "authorization_required",
"error_description": error_msg
},
headers={"WWW-Authenticate": "Bearer"}
)
# Convenience dependency for requiring specific scopes
def require_scope(scope: str):
"""
Create a dependency that requires a specific OAuth scope.
Usage:
@app.get("/admin", dependencies=[Depends(require_scope("admin"))])
"""
async def scope_dependency(user: AuthenticationResult = Depends(get_current_user)):
user.require_scope(scope)
return user
return scope_dependency
# Convenience dependencies for common access patterns
async def require_read_access(user: AuthenticationResult = Depends(get_current_user)) -> AuthenticationResult:
"""Require read access to the resource."""
user.require_scope("read")
return user
async def require_write_access(user: AuthenticationResult = Depends(get_current_user)) -> AuthenticationResult:
"""Require write access to the resource."""
user.require_scope("write")
return user
async def require_admin_access(user: AuthenticationResult = Depends(get_current_user)) -> AuthenticationResult:
"""Require admin access to the resource."""
user.require_scope("admin")
return user
# Optional authentication (for endpoints that work with or without auth)
async def get_optional_user(
credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme)
) -> Optional[AuthenticationResult]:
"""
Get current user but don't require authentication.
Returns:
AuthenticationResult if authenticated, None if not
"""
try:
return await get_current_user(credentials)
except HTTPException:
return None
```