This is page 29 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── amp-bridge.md
│ │ ├── amp-pr-automator.md
│ │ ├── code-quality-guard.md
│ │ ├── gemini-pr-automator.md
│ │ └── github-release-manager.md
│ ├── settings.local.json.backup
│ └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── performance_issue.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── bridge-tests.yml
│ ├── CACHE_FIX.md
│ ├── claude-code-review.yml
│ ├── claude.yml
│ ├── cleanup-images.yml.disabled
│ ├── dev-setup-validation.yml
│ ├── docker-publish.yml
│ ├── LATEST_FIXES.md
│ ├── main-optimized.yml.disabled
│ ├── main.yml
│ ├── publish-and-test.yml
│ ├── README_OPTIMIZATION.md
│ ├── release-tag.yml.disabled
│ ├── release.yml
│ ├── roadmap-review-reminder.yml
│ ├── SECRET_CONDITIONAL_FIX.md
│ └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│ ├── .gitignore
│ └── reports
│ └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│ ├── deployment
│ │ ├── deploy_fastmcp_fixed.sh
│ │ ├── deploy_http_with_mcp.sh
│ │ └── deploy_mcp_v4.sh
│ ├── deployment-configs
│ │ ├── empty_config.yml
│ │ └── smithery.yaml
│ ├── development
│ │ └── test_fastmcp.py
│ ├── docs-removed-2025-08-23
│ │ ├── authentication.md
│ │ ├── claude_integration.md
│ │ ├── claude-code-compatibility.md
│ │ ├── claude-code-integration.md
│ │ ├── claude-code-quickstart.md
│ │ ├── claude-desktop-setup.md
│ │ ├── complete-setup-guide.md
│ │ ├── database-synchronization.md
│ │ ├── development
│ │ │ ├── autonomous-memory-consolidation.md
│ │ │ ├── CLEANUP_PLAN.md
│ │ │ ├── CLEANUP_README.md
│ │ │ ├── CLEANUP_SUMMARY.md
│ │ │ ├── dream-inspired-memory-consolidation.md
│ │ │ ├── hybrid-slm-memory-consolidation.md
│ │ │ ├── mcp-milestone.md
│ │ │ ├── multi-client-architecture.md
│ │ │ ├── test-results.md
│ │ │ └── TIMESTAMP_FIX_SUMMARY.md
│ │ ├── distributed-sync.md
│ │ ├── invocation_guide.md
│ │ ├── macos-intel.md
│ │ ├── master-guide.md
│ │ ├── mcp-client-configuration.md
│ │ ├── multi-client-server.md
│ │ ├── service-installation.md
│ │ ├── sessions
│ │ │ └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│ │ ├── UBUNTU_SETUP.md
│ │ ├── ubuntu.md
│ │ ├── windows-setup.md
│ │ └── windows.md
│ ├── docs-root-cleanup-2025-08-23
│ │ ├── AWESOME_LIST_SUBMISSION.md
│ │ ├── CLOUDFLARE_IMPLEMENTATION.md
│ │ ├── DOCUMENTATION_ANALYSIS.md
│ │ ├── DOCUMENTATION_CLEANUP_PLAN.md
│ │ ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│ │ ├── LITESTREAM_SETUP_GUIDE.md
│ │ ├── lm_studio_system_prompt.md
│ │ ├── PYTORCH_DOWNLOAD_FIX.md
│ │ └── README-ORIGINAL-BACKUP.md
│ ├── investigations
│ │ └── MACOS_HOOKS_INVESTIGATION.md
│ ├── litestream-configs-v6.3.0
│ │ ├── install_service.sh
│ │ ├── litestream_master_config_fixed.yml
│ │ ├── litestream_master_config.yml
│ │ ├── litestream_replica_config_fixed.yml
│ │ ├── litestream_replica_config.yml
│ │ ├── litestream_replica_simple.yml
│ │ ├── litestream-http.service
│ │ ├── litestream.service
│ │ └── requirements-cloudflare.txt
│ ├── release-notes
│ │ └── release-notes-v7.1.4.md
│ └── setup-development
│ ├── README.md
│ ├── setup_consolidation_mdns.sh
│ ├── STARTUP_SETUP_GUIDE.md
│ └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│ ├── memory-context.md
│ ├── memory-health.md
│ ├── memory-ingest-dir.md
│ ├── memory-ingest.md
│ ├── memory-recall.md
│ ├── memory-search.md
│ ├── memory-store.md
│ ├── README.md
│ └── session-start.md
├── claude-hooks
│ ├── config.json
│ ├── config.template.json
│ ├── CONFIGURATION.md
│ ├── core
│ │ ├── memory-retrieval.js
│ │ ├── mid-conversation.js
│ │ ├── session-end.js
│ │ ├── session-start.js
│ │ └── topic-change.js
│ ├── debug-pattern-test.js
│ ├── install_claude_hooks_windows.ps1
│ ├── install_hooks.py
│ ├── memory-mode-controller.js
│ ├── MIGRATION.md
│ ├── README-NATURAL-TRIGGERS.md
│ ├── README-phase2.md
│ ├── README.md
│ ├── simple-test.js
│ ├── statusline.sh
│ ├── test-adaptive-weights.js
│ ├── test-dual-protocol-hook.js
│ ├── test-mcp-hook.js
│ ├── test-natural-triggers.js
│ ├── test-recency-scoring.js
│ ├── tests
│ │ ├── integration-test.js
│ │ ├── phase2-integration-test.js
│ │ ├── test-code-execution.js
│ │ ├── test-cross-session.json
│ │ ├── test-session-tracking.json
│ │ └── test-threading.json
│ ├── utilities
│ │ ├── adaptive-pattern-detector.js
│ │ ├── context-formatter.js
│ │ ├── context-shift-detector.js
│ │ ├── conversation-analyzer.js
│ │ ├── dynamic-context-updater.js
│ │ ├── git-analyzer.js
│ │ ├── mcp-client.js
│ │ ├── memory-client.js
│ │ ├── memory-scorer.js
│ │ ├── performance-manager.js
│ │ ├── project-detector.js
│ │ ├── session-tracker.js
│ │ ├── tiered-conversation-monitor.js
│ │ └── version-checker.js
│ └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│ ├── amp-cli-bridge.md
│ ├── api
│ │ ├── code-execution-interface.md
│ │ ├── memory-metadata-api.md
│ │ ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_REPORT.md
│ │ └── tag-standardization.md
│ ├── architecture
│ │ ├── search-enhancement-spec.md
│ │ └── search-examples.md
│ ├── architecture.md
│ ├── archive
│ │ └── obsolete-workflows
│ │ ├── load_memory_context.md
│ │ └── README.md
│ ├── assets
│ │ └── images
│ │ ├── dashboard-v3.3.0-preview.png
│ │ ├── memory-awareness-hooks-example.png
│ │ ├── project-infographic.svg
│ │ └── README.md
│ ├── CLAUDE_CODE_QUICK_REFERENCE.md
│ ├── cloudflare-setup.md
│ ├── deployment
│ │ ├── docker.md
│ │ ├── dual-service.md
│ │ ├── production-guide.md
│ │ └── systemd-service.md
│ ├── development
│ │ ├── ai-agent-instructions.md
│ │ ├── code-quality
│ │ │ ├── phase-2a-completion.md
│ │ │ ├── phase-2a-handle-get-prompt.md
│ │ │ ├── phase-2a-index.md
│ │ │ ├── phase-2a-install-package.md
│ │ │ └── phase-2b-session-summary.md
│ │ ├── code-quality-workflow.md
│ │ ├── dashboard-workflow.md
│ │ ├── issue-management.md
│ │ ├── pr-review-guide.md
│ │ ├── refactoring-notes.md
│ │ ├── release-checklist.md
│ │ └── todo-tracker.md
│ ├── docker-optimized-build.md
│ ├── document-ingestion.md
│ ├── DOCUMENTATION_AUDIT.md
│ ├── enhancement-roadmap-issue-14.md
│ ├── examples
│ │ ├── analysis-scripts.js
│ │ ├── maintenance-session-example.md
│ │ ├── memory-distribution-chart.jsx
│ │ └── tag-schema.json
│ ├── first-time-setup.md
│ ├── glama-deployment.md
│ ├── guides
│ │ ├── advanced-command-examples.md
│ │ ├── chromadb-migration.md
│ │ ├── commands-vs-mcp-server.md
│ │ ├── mcp-enhancements.md
│ │ ├── mdns-service-discovery.md
│ │ ├── memory-consolidation-guide.md
│ │ ├── migration.md
│ │ ├── scripts.md
│ │ └── STORAGE_BACKENDS.md
│ ├── HOOK_IMPROVEMENTS.md
│ ├── hooks
│ │ └── phase2-code-execution-migration.md
│ ├── http-server-management.md
│ ├── ide-compatability.md
│ ├── IMAGE_RETENTION_POLICY.md
│ ├── images
│ │ └── dashboard-placeholder.md
│ ├── implementation
│ │ ├── health_checks.md
│ │ └── performance.md
│ ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│ ├── integration
│ │ ├── homebrew.md
│ │ └── multi-client.md
│ ├── integrations
│ │ ├── gemini.md
│ │ ├── groq-bridge.md
│ │ ├── groq-integration-summary.md
│ │ └── groq-model-comparison.md
│ ├── integrations.md
│ ├── legacy
│ │ └── dual-protocol-hooks.md
│ ├── LM_STUDIO_COMPATIBILITY.md
│ ├── maintenance
│ │ └── memory-maintenance.md
│ ├── mastery
│ │ ├── api-reference.md
│ │ ├── architecture-overview.md
│ │ ├── configuration-guide.md
│ │ ├── local-setup-and-run.md
│ │ ├── testing-guide.md
│ │ └── troubleshooting.md
│ ├── migration
│ │ └── code-execution-api-quick-start.md
│ ├── natural-memory-triggers
│ │ ├── cli-reference.md
│ │ ├── installation-guide.md
│ │ └── performance-optimization.md
│ ├── oauth-setup.md
│ ├── pr-graphql-integration.md
│ ├── quick-setup-cloudflare-dual-environment.md
│ ├── README.md
│ ├── remote-configuration-wiki-section.md
│ ├── research
│ │ ├── code-execution-interface-implementation.md
│ │ └── code-execution-interface-summary.md
│ ├── ROADMAP.md
│ ├── sqlite-vec-backend.md
│ ├── statistics
│ │ ├── charts
│ │ │ ├── activity_patterns.png
│ │ │ ├── contributors.png
│ │ │ ├── growth_trajectory.png
│ │ │ ├── monthly_activity.png
│ │ │ └── october_sprint.png
│ │ ├── data
│ │ │ ├── activity_by_day.csv
│ │ │ ├── activity_by_hour.csv
│ │ │ ├── contributors.csv
│ │ │ └── monthly_activity.csv
│ │ ├── generate_charts.py
│ │ └── REPOSITORY_STATISTICS.md
│ ├── technical
│ │ ├── development.md
│ │ ├── memory-migration.md
│ │ ├── migration-log.md
│ │ ├── sqlite-vec-embedding-fixes.md
│ │ └── tag-storage.md
│ ├── testing
│ │ └── regression-tests.md
│ ├── testing-cloudflare-backend.md
│ ├── troubleshooting
│ │ ├── cloudflare-api-token-setup.md
│ │ ├── cloudflare-authentication.md
│ │ ├── general.md
│ │ ├── hooks-quick-reference.md
│ │ ├── pr162-schema-caching-issue.md
│ │ ├── session-end-hooks.md
│ │ └── sync-issues.md
│ └── tutorials
│ ├── advanced-techniques.md
│ ├── data-analysis.md
│ └── demo-session-walkthrough.md
├── examples
│ ├── claude_desktop_config_template.json
│ ├── claude_desktop_config_windows.json
│ ├── claude-desktop-http-config.json
│ ├── config
│ │ └── claude_desktop_config.json
│ ├── http-mcp-bridge.js
│ ├── memory_export_template.json
│ ├── README.md
│ ├── setup
│ │ └── setup_multi_client_complete.py
│ └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│ ├── .claude
│ │ └── settings.local.json
│ ├── archive
│ │ └── check_missing_timestamps.py
│ ├── backup
│ │ ├── backup_memories.py
│ │ ├── backup_sqlite_vec.sh
│ │ ├── export_distributable_memories.sh
│ │ └── restore_memories.py
│ ├── benchmarks
│ │ ├── benchmark_code_execution_api.py
│ │ ├── benchmark_hybrid_sync.py
│ │ └── benchmark_server_caching.py
│ ├── database
│ │ ├── analyze_sqlite_vec_db.py
│ │ ├── check_sqlite_vec_status.py
│ │ ├── db_health_check.py
│ │ └── simple_timestamp_check.py
│ ├── development
│ │ ├── debug_server_initialization.py
│ │ ├── find_orphaned_files.py
│ │ ├── fix_mdns.sh
│ │ ├── fix_sitecustomize.py
│ │ ├── remote_ingest.sh
│ │ ├── setup-git-merge-drivers.sh
│ │ ├── uv-lock-merge.sh
│ │ └── verify_hybrid_sync.py
│ ├── hooks
│ │ └── pre-commit
│ ├── installation
│ │ ├── install_linux_service.py
│ │ ├── install_macos_service.py
│ │ ├── install_uv.py
│ │ ├── install_windows_service.py
│ │ ├── install.py
│ │ ├── setup_backup_cron.sh
│ │ ├── setup_claude_mcp.sh
│ │ └── setup_cloudflare_resources.py
│ ├── linux
│ │ ├── service_status.sh
│ │ ├── start_service.sh
│ │ ├── stop_service.sh
│ │ ├── uninstall_service.sh
│ │ └── view_logs.sh
│ ├── maintenance
│ │ ├── assign_memory_types.py
│ │ ├── check_memory_types.py
│ │ ├── cleanup_corrupted_encoding.py
│ │ ├── cleanup_memories.py
│ │ ├── cleanup_organize.py
│ │ ├── consolidate_memory_types.py
│ │ ├── consolidation_mappings.json
│ │ ├── delete_orphaned_vectors_fixed.py
│ │ ├── fast_cleanup_duplicates_with_tracking.sh
│ │ ├── find_all_duplicates.py
│ │ ├── find_cloudflare_duplicates.py
│ │ ├── find_duplicates.py
│ │ ├── memory-types.md
│ │ ├── README.md
│ │ ├── recover_timestamps_from_cloudflare.py
│ │ ├── regenerate_embeddings.py
│ │ ├── repair_malformed_tags.py
│ │ ├── repair_memories.py
│ │ ├── repair_sqlite_vec_embeddings.py
│ │ ├── repair_zero_embeddings.py
│ │ ├── restore_from_json_export.py
│ │ └── scan_todos.sh
│ ├── migration
│ │ ├── cleanup_mcp_timestamps.py
│ │ ├── legacy
│ │ │ └── migrate_chroma_to_sqlite.py
│ │ ├── mcp-migration.py
│ │ ├── migrate_sqlite_vec_embeddings.py
│ │ ├── migrate_storage.py
│ │ ├── migrate_tags.py
│ │ ├── migrate_timestamps.py
│ │ ├── migrate_to_cloudflare.py
│ │ ├── migrate_to_sqlite_vec.py
│ │ ├── migrate_v5_enhanced.py
│ │ ├── TIMESTAMP_CLEANUP_README.md
│ │ └── verify_mcp_timestamps.py
│ ├── pr
│ │ ├── amp_collect_results.sh
│ │ ├── amp_detect_breaking_changes.sh
│ │ ├── amp_generate_tests.sh
│ │ ├── amp_pr_review.sh
│ │ ├── amp_quality_gate.sh
│ │ ├── amp_suggest_fixes.sh
│ │ ├── auto_review.sh
│ │ ├── detect_breaking_changes.sh
│ │ ├── generate_tests.sh
│ │ ├── lib
│ │ │ └── graphql_helpers.sh
│ │ ├── quality_gate.sh
│ │ ├── resolve_threads.sh
│ │ ├── run_pyscn_analysis.sh
│ │ ├── run_quality_checks.sh
│ │ ├── thread_status.sh
│ │ └── watch_reviews.sh
│ ├── quality
│ │ ├── fix_dead_code_install.sh
│ │ ├── phase1_dead_code_analysis.md
│ │ ├── phase2_complexity_analysis.md
│ │ ├── README_PHASE1.md
│ │ ├── README_PHASE2.md
│ │ ├── track_pyscn_metrics.sh
│ │ └── weekly_quality_review.sh
│ ├── README.md
│ ├── run
│ │ ├── run_mcp_memory.sh
│ │ ├── run-with-uv.sh
│ │ └── start_sqlite_vec.sh
│ ├── run_memory_server.py
│ ├── server
│ │ ├── check_http_server.py
│ │ ├── check_server_health.py
│ │ ├── memory_offline.py
│ │ ├── preload_models.py
│ │ ├── run_http_server.py
│ │ ├── run_memory_server.py
│ │ ├── start_http_server.bat
│ │ └── start_http_server.sh
│ ├── service
│ │ ├── deploy_dual_services.sh
│ │ ├── install_http_service.sh
│ │ ├── mcp-memory-http.service
│ │ ├── mcp-memory.service
│ │ ├── memory_service_manager.sh
│ │ ├── service_control.sh
│ │ ├── service_utils.py
│ │ └── update_service.sh
│ ├── sync
│ │ ├── check_drift.py
│ │ ├── claude_sync_commands.py
│ │ ├── export_memories.py
│ │ ├── import_memories.py
│ │ ├── litestream
│ │ │ ├── apply_local_changes.sh
│ │ │ ├── enhanced_memory_store.sh
│ │ │ ├── init_staging_db.sh
│ │ │ ├── io.litestream.replication.plist
│ │ │ ├── manual_sync.sh
│ │ │ ├── memory_sync.sh
│ │ │ ├── pull_remote_changes.sh
│ │ │ ├── push_to_remote.sh
│ │ │ ├── README.md
│ │ │ ├── resolve_conflicts.sh
│ │ │ ├── setup_local_litestream.sh
│ │ │ ├── setup_remote_litestream.sh
│ │ │ ├── staging_db_init.sql
│ │ │ ├── stash_local_changes.sh
│ │ │ ├── sync_from_remote_noconfig.sh
│ │ │ └── sync_from_remote.sh
│ │ ├── README.md
│ │ ├── safe_cloudflare_update.sh
│ │ ├── sync_memory_backends.py
│ │ └── sync_now.py
│ ├── testing
│ │ ├── run_complete_test.py
│ │ ├── run_memory_test.sh
│ │ ├── simple_test.py
│ │ ├── test_cleanup_logic.py
│ │ ├── test_cloudflare_backend.py
│ │ ├── test_docker_functionality.py
│ │ ├── test_installation.py
│ │ ├── test_mdns.py
│ │ ├── test_memory_api.py
│ │ ├── test_memory_simple.py
│ │ ├── test_migration.py
│ │ ├── test_search_api.py
│ │ ├── test_sqlite_vec_embeddings.py
│ │ ├── test_sse_events.py
│ │ ├── test-connection.py
│ │ └── test-hook.js
│ ├── utils
│ │ ├── claude_commands_utils.py
│ │ ├── generate_personalized_claude_md.sh
│ │ ├── groq
│ │ ├── groq_agent_bridge.py
│ │ ├── list-collections.py
│ │ ├── memory_wrapper_uv.py
│ │ ├── query_memories.py
│ │ ├── smithery_wrapper.py
│ │ ├── test_groq_bridge.sh
│ │ └── uv_wrapper.py
│ └── validation
│ ├── check_dev_setup.py
│ ├── check_documentation_links.py
│ ├── diagnose_backend_config.py
│ ├── validate_configuration_complete.py
│ ├── validate_memories.py
│ ├── validate_migration.py
│ ├── validate_timestamp_integrity.py
│ ├── verify_environment.py
│ ├── verify_pytorch_windows.py
│ └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│ └── mcp_memory_service
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── operations.py
│ │ ├── sync_wrapper.py
│ │ └── types.py
│ ├── backup
│ │ ├── __init__.py
│ │ └── scheduler.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── ingestion.py
│ │ ├── main.py
│ │ └── utils.py
│ ├── config.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── associations.py
│ │ ├── base.py
│ │ ├── clustering.py
│ │ ├── compression.py
│ │ ├── consolidator.py
│ │ ├── decay.py
│ │ ├── forgetting.py
│ │ ├── health.py
│ │ └── scheduler.py
│ ├── dependency_check.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mdns_service.py
│ ├── embeddings
│ │ ├── __init__.py
│ │ └── onnx_embeddings.py
│ ├── ingestion
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chunker.py
│ │ ├── csv_loader.py
│ │ ├── json_loader.py
│ │ ├── pdf_loader.py
│ │ ├── registry.py
│ │ ├── semtools_loader.py
│ │ └── text_loader.py
│ ├── lm_studio_compat.py
│ ├── mcp_server.py
│ ├── models
│ │ ├── __init__.py
│ │ └── memory.py
│ ├── server.py
│ ├── services
│ │ ├── __init__.py
│ │ └── memory_service.py
│ ├── storage
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloudflare.py
│ │ ├── factory.py
│ │ ├── http_client.py
│ │ ├── hybrid.py
│ │ └── sqlite_vec.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── exporter.py
│ │ ├── importer.py
│ │ └── litestream_config.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── content_splitter.py
│ │ ├── db_utils.py
│ │ ├── debug.py
│ │ ├── document_processing.py
│ │ ├── gpu_detection.py
│ │ ├── hashing.py
│ │ ├── http_server_manager.py
│ │ ├── port_detection.py
│ │ ├── system_detection.py
│ │ └── time_parser.py
│ └── web
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── analytics.py
│ │ ├── backup.py
│ │ ├── consolidation.py
│ │ ├── documents.py
│ │ ├── events.py
│ │ ├── health.py
│ │ ├── manage.py
│ │ ├── mcp.py
│ │ ├── memories.py
│ │ ├── search.py
│ │ └── sync.py
│ ├── app.py
│ ├── dependencies.py
│ ├── oauth
│ │ ├── __init__.py
│ │ ├── authorization.py
│ │ ├── discovery.py
│ │ ├── middleware.py
│ │ ├── models.py
│ │ ├── registration.py
│ │ └── storage.py
│ ├── sse.py
│ └── static
│ ├── app.js
│ ├── index.html
│ ├── README.md
│ ├── sse_test.html
│ └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── test_compact_types.py
│ │ └── test_operations.py
│ ├── bridge
│ │ ├── mock_responses.js
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ └── test_http_mcp_bridge.js
│ ├── conftest.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_associations.py
│ │ ├── test_clustering.py
│ │ ├── test_compression.py
│ │ ├── test_consolidator.py
│ │ ├── test_decay.py
│ │ └── test_forgetting.py
│ ├── contracts
│ │ └── api-specification.yml
│ ├── integration
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── test_api_key_fallback.py
│ │ ├── test_api_memories_chronological.py
│ │ ├── test_api_tag_time_search.py
│ │ ├── test_api_with_memory_service.py
│ │ ├── test_bridge_integration.js
│ │ ├── test_cli_interfaces.py
│ │ ├── test_cloudflare_connection.py
│ │ ├── test_concurrent_clients.py
│ │ ├── test_data_serialization_consistency.py
│ │ ├── test_http_server_startup.py
│ │ ├── test_mcp_memory.py
│ │ ├── test_mdns_integration.py
│ │ ├── test_oauth_basic_auth.py
│ │ ├── test_oauth_flow.py
│ │ ├── test_server_handlers.py
│ │ └── test_store_memory.py
│ ├── performance
│ │ ├── test_background_sync.py
│ │ └── test_hybrid_live.py
│ ├── README.md
│ ├── smithery
│ │ └── test_smithery.py
│ ├── sqlite
│ │ └── simple_sqlite_vec_test.py
│ ├── test_client.py
│ ├── test_content_splitting.py
│ ├── test_database.py
│ ├── test_hybrid_cloudflare_limits.py
│ ├── test_hybrid_storage.py
│ ├── test_memory_ops.py
│ ├── test_semantic_search.py
│ ├── test_sqlite_vec_storage.py
│ ├── test_time_parser.py
│ ├── test_timestamp_preservation.py
│ ├── timestamp
│ │ ├── test_hook_vs_manual_storage.py
│ │ ├── test_issue99_final_validation.py
│ │ ├── test_search_retrieval_inconsistency.py
│ │ ├── test_timestamp_issue.py
│ │ └── test_timestamp_simple.py
│ └── unit
│ ├── conftest.py
│ ├── test_cloudflare_storage.py
│ ├── test_csv_loader.py
│ ├── test_fastapi_dependencies.py
│ ├── test_import.py
│ ├── test_json_loader.py
│ ├── test_mdns_simple.py
│ ├── test_mdns.py
│ ├── test_memory_service.py
│ ├── test_memory.py
│ ├── test_semtools_loader.py
│ ├── test_storage_interface_compatibility.py
│ └── test_tag_time_filtering.py
├── tools
│ ├── docker
│ │ ├── DEPRECATED.md
│ │ ├── docker-compose.http.yml
│ │ ├── docker-compose.pythonpath.yml
│ │ ├── docker-compose.standalone.yml
│ │ ├── docker-compose.uv.yml
│ │ ├── docker-compose.yml
│ │ ├── docker-entrypoint-persistent.sh
│ │ ├── docker-entrypoint-unified.sh
│ │ ├── docker-entrypoint.sh
│ │ ├── Dockerfile
│ │ ├── Dockerfile.glama
│ │ ├── Dockerfile.slim
│ │ ├── README.md
│ │ └── test-docker-modes.sh
│ └── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/mcp_memory_service/storage/hybrid.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Hybrid memory storage backend for MCP Memory Service.
This implementation provides the best of both worlds:
- SQLite-vec as primary storage for ultra-fast reads (~5ms)
- Cloudflare as secondary storage for cloud persistence and multi-device sync
- Background synchronization service for seamless integration
- Graceful degradation when cloud services are unavailable
"""
import asyncio
import logging
import time
from typing import List, Dict, Any, Tuple, Optional
from collections import deque
from dataclasses import dataclass
from .base import MemoryStorage
from .sqlite_vec import SqliteVecMemoryStorage
from .cloudflare import CloudflareStorage
from ..models.memory import Memory, MemoryQueryResult
# Import SSE for real-time progress updates
try:
from ..web.sse import sse_manager, create_sync_progress_event, create_sync_completed_event
SSE_AVAILABLE = True
except ImportError:
SSE_AVAILABLE = False
# Import config to check if limit constants are available
from .. import config as app_config
# Use getattr to provide fallbacks if attributes don't exist (prevents duplicate defaults)
CLOUDFLARE_D1_MAX_SIZE_GB = getattr(app_config, 'CLOUDFLARE_D1_MAX_SIZE_GB', 10)
CLOUDFLARE_VECTORIZE_MAX_VECTORS = getattr(app_config, 'CLOUDFLARE_VECTORIZE_MAX_VECTORS', 5_000_000)
CLOUDFLARE_MAX_METADATA_SIZE_KB = getattr(app_config, 'CLOUDFLARE_MAX_METADATA_SIZE_KB', 10)
CLOUDFLARE_WARNING_THRESHOLD_PERCENT = getattr(app_config, 'CLOUDFLARE_WARNING_THRESHOLD_PERCENT', 80)
CLOUDFLARE_CRITICAL_THRESHOLD_PERCENT = getattr(app_config, 'CLOUDFLARE_CRITICAL_THRESHOLD_PERCENT', 95)
HYBRID_SYNC_ON_STARTUP = getattr(app_config, 'HYBRID_SYNC_ON_STARTUP', True)
HYBRID_MAX_CONTENT_LENGTH = getattr(app_config, 'HYBRID_MAX_CONTENT_LENGTH', 800)
HYBRID_MAX_EMPTY_BATCHES = getattr(app_config, 'HYBRID_MAX_EMPTY_BATCHES', 20)
HYBRID_MIN_CHECK_COUNT = getattr(app_config, 'HYBRID_MIN_CHECK_COUNT', 1000)
logger = logging.getLogger(__name__)
@dataclass
class SyncOperation:
"""Represents a pending sync operation."""
operation: str # 'store', 'delete', 'update'
memory: Optional[Memory] = None
content_hash: Optional[str] = None
updates: Optional[Dict[str, Any]] = None
timestamp: float = None
retries: int = 0
max_retries: int = 3
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
class BackgroundSyncService:
"""
Handles background synchronization between SQLite-vec and Cloudflare.
Features:
- Asynchronous operation queue
- Retry logic with exponential backoff
- Health monitoring and error handling
- Configurable sync intervals and batch sizes
- Graceful degradation when cloud is unavailable
"""
def __init__(self,
primary_storage: SqliteVecMemoryStorage,
secondary_storage: CloudflareStorage,
sync_interval: int = 300, # 5 minutes
batch_size: int = 50,
max_queue_size: int = 1000):
self.primary = primary_storage
self.secondary = secondary_storage
self.sync_interval = sync_interval
self.batch_size = batch_size
self.max_queue_size = max_queue_size
# Sync queues and state
self.operation_queue = asyncio.Queue(maxsize=max_queue_size)
self.failed_operations = deque(maxlen=100) # Keep track of failed operations
self.is_running = False
self.sync_task = None
self.last_sync_time = 0
# Drift detection state (v8.25.0+)
self.last_drift_check_time = 0
self.drift_check_enabled = getattr(app_config, 'HYBRID_SYNC_UPDATES', True)
self.drift_check_interval = getattr(app_config, 'HYBRID_DRIFT_CHECK_INTERVAL', 3600)
self.sync_stats = {
'operations_processed': 0,
'operations_failed': 0,
'last_sync_duration': 0,
'cloudflare_available': True,
'last_drift_check': 0,
'drift_detected_count': 0,
'drift_synced_count': 0
}
# Health monitoring
self.consecutive_failures = 0
self.max_consecutive_failures = 5
self.backoff_time = 60 # Start with 1 minute backoff
# Cloudflare capacity tracking
self.cloudflare_stats = {
'vector_count': 0,
'estimated_d1_size_gb': 0,
'last_capacity_check': 0,
'approaching_limits': False,
'limit_warnings': []
}
async def start(self):
"""Start the background sync service."""
if self.is_running:
logger.warning("Background sync service is already running")
return
self.is_running = True
self.sync_task = asyncio.create_task(self._sync_loop())
logger.info(f"Background sync service started with {self.sync_interval}s interval")
async def stop(self):
"""Stop the background sync service and process remaining operations."""
if not self.is_running:
return
self.is_running = False
# Process remaining operations in queue
remaining_operations = []
while not self.operation_queue.empty():
try:
operation = self.operation_queue.get_nowait()
remaining_operations.append(operation)
except asyncio.QueueEmpty:
break
if remaining_operations:
logger.info(f"Processing {len(remaining_operations)} remaining operations before shutdown")
await self._process_operations_batch(remaining_operations)
# Cancel the sync task
if self.sync_task:
self.sync_task.cancel()
try:
await self.sync_task
except asyncio.CancelledError:
pass
logger.info("Background sync service stopped")
async def enqueue_operation(self, operation: SyncOperation):
"""Enqueue a sync operation for background processing."""
try:
await self.operation_queue.put(operation)
logger.debug(f"Enqueued {operation.operation} operation")
except asyncio.QueueFull:
# If queue is full, process immediately to avoid blocking
logger.warning("Sync queue full, processing operation immediately")
await self._process_single_operation(operation)
async def force_sync(self) -> Dict[str, Any]:
"""Force an immediate full synchronization between backends."""
logger.info("Starting forced sync between primary and secondary storage")
sync_start_time = time.time()
try:
# Get all memories from primary storage
primary_memories = await self.primary.get_all_memories()
# Check Cloudflare availability
try:
await self.secondary.get_stats() # Simple health check
cloudflare_available = True
except Exception as e:
logger.warning(f"Cloudflare not available during force sync: {e}")
cloudflare_available = False
self.sync_stats['cloudflare_available'] = False
return {
'status': 'partial',
'cloudflare_available': False,
'primary_memories': len(primary_memories),
'synced_to_secondary': 0,
'duration': time.time() - sync_start_time
}
# Sync from primary to secondary using concurrent operations
async def sync_memory(memory):
try:
success, message = await self.secondary.store(memory)
if success:
return True, None
else:
logger.debug(f"Failed to sync memory to secondary: {message}")
return False, message
except Exception as e:
logger.debug(f"Exception syncing memory to secondary: {e}")
return False, str(e)
# Process memories concurrently in batches
synced_count = 0
failed_count = 0
# Process in batches to avoid overwhelming the system
batch_size = min(self.batch_size, 10) # Limit concurrent operations
for i in range(0, len(primary_memories), batch_size):
batch = primary_memories[i:i + batch_size]
results = await asyncio.gather(*[sync_memory(m) for m in batch], return_exceptions=True)
for result in results:
if isinstance(result, Exception):
failed_count += 1
logger.debug(f"Exception in batch sync: {result}")
elif isinstance(result, tuple):
success, _ = result
if success:
synced_count += 1
else:
failed_count += 1
sync_duration = time.time() - sync_start_time
self.sync_stats['last_sync_duration'] = sync_duration
self.sync_stats['cloudflare_available'] = cloudflare_available
logger.info(f"Force sync completed: {synced_count} synced, {failed_count} failed in {sync_duration:.2f}s")
return {
'status': 'completed',
'cloudflare_available': cloudflare_available,
'primary_memories': len(primary_memories),
'synced_to_secondary': synced_count,
'failed_operations': failed_count,
'duration': sync_duration
}
except Exception as e:
logger.error(f"Error during force sync: {e}")
return {
'status': 'error',
'error': str(e),
'duration': time.time() - sync_start_time
}
async def get_sync_status(self) -> Dict[str, Any]:
"""Get current sync service status and statistics."""
queue_size = self.operation_queue.qsize()
status = {
'is_running': self.is_running,
'is_paused': not self.is_running,
'pending_operations': queue_size,
'failed_operations': len(self.failed_operations),
'last_sync_time': self.last_sync_time,
'consecutive_failures': self.consecutive_failures,
'stats': self.sync_stats.copy(),
'operations_processed': self.sync_stats.get('operations_processed', 0),
'operations_failed': self.sync_stats.get('operations_failed', 0),
'cloudflare_available': self.sync_stats['cloudflare_available'],
'sync_interval': self.sync_interval,
'next_sync_in': max(0, self.sync_interval - (time.time() - self.last_sync_time)),
'capacity': {
'vector_count': self.cloudflare_stats['vector_count'],
'vector_limit': CLOUDFLARE_VECTORIZE_MAX_VECTORS,
'approaching_limits': self.cloudflare_stats['approaching_limits'],
'warnings': self.cloudflare_stats['limit_warnings']
}
}
return status
async def validate_memory_for_cloudflare(self, memory: Memory) -> Tuple[bool, Optional[str]]:
"""
Validate if a memory can be synced to Cloudflare.
Returns:
Tuple of (is_valid, error_message)
"""
# Check metadata size
if memory.metadata:
import json
metadata_json = json.dumps(memory.metadata)
metadata_size_kb = len(metadata_json.encode('utf-8')) / 1024
if metadata_size_kb > CLOUDFLARE_MAX_METADATA_SIZE_KB:
return False, f"Metadata size {metadata_size_kb:.2f}KB exceeds Cloudflare limit of {CLOUDFLARE_MAX_METADATA_SIZE_KB}KB"
# Check if we're approaching vector count limit
if self.cloudflare_stats['vector_count'] >= CLOUDFLARE_VECTORIZE_MAX_VECTORS:
return False, f"Cloudflare vector limit of {CLOUDFLARE_VECTORIZE_MAX_VECTORS} reached"
return True, None
async def check_cloudflare_capacity(self) -> Dict[str, Any]:
"""
Check remaining Cloudflare capacity and return status.
"""
try:
# Get current stats from Cloudflare
cf_stats = await self.secondary.get_stats()
# Update our tracking
self.cloudflare_stats['vector_count'] = cf_stats.get('total_memories', 0)
self.cloudflare_stats['last_capacity_check'] = time.time()
# Calculate usage percentages
vector_usage_percent = (self.cloudflare_stats['vector_count'] / CLOUDFLARE_VECTORIZE_MAX_VECTORS) * 100
# Clear previous warnings
self.cloudflare_stats['limit_warnings'] = []
# Check vector count limits
if vector_usage_percent >= CLOUDFLARE_CRITICAL_THRESHOLD_PERCENT:
warning = f"CRITICAL: Vector usage at {vector_usage_percent:.1f}% ({self.cloudflare_stats['vector_count']:,}/{CLOUDFLARE_VECTORIZE_MAX_VECTORS:,})"
self.cloudflare_stats['limit_warnings'].append(warning)
logger.error(warning)
self.cloudflare_stats['approaching_limits'] = True
elif vector_usage_percent >= CLOUDFLARE_WARNING_THRESHOLD_PERCENT:
warning = f"WARNING: Vector usage at {vector_usage_percent:.1f}% ({self.cloudflare_stats['vector_count']:,}/{CLOUDFLARE_VECTORIZE_MAX_VECTORS:,})"
self.cloudflare_stats['limit_warnings'].append(warning)
logger.warning(warning)
self.cloudflare_stats['approaching_limits'] = True
else:
self.cloudflare_stats['approaching_limits'] = False
return {
'vector_count': self.cloudflare_stats['vector_count'],
'vector_limit': CLOUDFLARE_VECTORIZE_MAX_VECTORS,
'vector_usage_percent': vector_usage_percent,
'approaching_limits': self.cloudflare_stats['approaching_limits'],
'warnings': self.cloudflare_stats['limit_warnings']
}
except Exception as e:
logger.error(f"Failed to check Cloudflare capacity: {e}")
return {
'error': str(e),
'approaching_limits': False
}
async def _sync_loop(self):
"""Main background sync loop."""
logger.info("Background sync loop started")
while self.is_running:
try:
# Process queued operations
await self._process_operation_queue()
# Periodic full sync if enough time has passed
current_time = time.time()
if current_time - self.last_sync_time >= self.sync_interval:
await self._periodic_sync()
self.last_sync_time = current_time
# Sleep before next iteration
await asyncio.sleep(5) # Check every 5 seconds
except Exception as e:
logger.error(f"Error in sync loop: {e}")
self.consecutive_failures += 1
if self.consecutive_failures >= self.max_consecutive_failures:
logger.warning(f"Too many consecutive sync failures ({self.consecutive_failures}), backing off for {self.backoff_time}s")
await asyncio.sleep(self.backoff_time)
self.backoff_time = min(self.backoff_time * 2, 1800) # Max 30 minutes
else:
await asyncio.sleep(1)
async def _process_operation_queue(self):
"""Process operations from the queue in batches."""
operations = []
# Collect up to batch_size operations
for _ in range(self.batch_size):
try:
operation = self.operation_queue.get_nowait()
operations.append(operation)
except asyncio.QueueEmpty:
break
if operations:
await self._process_operations_batch(operations)
async def _process_operations_batch(self, operations: List[SyncOperation]):
"""Process a batch of sync operations."""
logger.debug(f"Processing batch of {len(operations)} sync operations")
for operation in operations:
try:
await self._process_single_operation(operation)
self.sync_stats['operations_processed'] += 1
except Exception as e:
await self._handle_sync_error(e, operation)
async def _handle_sync_error(self, error: Exception, operation: SyncOperation):
"""
Handle sync operation errors with intelligent retry logic.
Args:
error: The exception that occurred
operation: The failed operation
"""
error_str = str(error).lower()
# Check for specific Cloudflare limit errors
is_limit_error = any(term in error_str for term in [
'limit exceeded', 'quota exceeded', 'maximum', 'too large',
'413', '507', 'insufficient storage', 'capacity'
])
if is_limit_error:
# Don't retry limit errors - they won't succeed
logger.error(f"Cloudflare limit error for {operation.operation}: {error}")
self.sync_stats['operations_failed'] += 1
# Update capacity tracking
self.cloudflare_stats['approaching_limits'] = True
self.cloudflare_stats['limit_warnings'].append(f"Limit error: {error}")
# Check capacity to understand the issue
await self.check_cloudflare_capacity()
return
# Check for temporary/network errors
is_temporary_error = any(term in error_str for term in [
'timeout', 'connection', 'network', '500', '502', '503', '504',
'temporarily unavailable', 'retry'
])
if is_temporary_error or operation.retries < operation.max_retries:
# Retry temporary errors
logger.warning(f"Temporary error for {operation.operation} (retry {operation.retries + 1}/{operation.max_retries}): {error}")
operation.retries += 1
if operation.retries < operation.max_retries:
# Add back to queue for retry with exponential backoff
await asyncio.sleep(min(2 ** operation.retries, 60)) # Max 60 second delay
self.failed_operations.append(operation)
else:
logger.error(f"Max retries reached for {operation.operation}")
self.sync_stats['operations_failed'] += 1
else:
# Permanent error - don't retry
logger.error(f"Permanent error for {operation.operation}: {error}")
self.sync_stats['operations_failed'] += 1
async def _process_single_operation(self, operation: SyncOperation):
"""Process a single sync operation to secondary storage."""
try:
if operation.operation == 'store' and operation.memory:
# Validate memory before syncing
is_valid, validation_error = await self.validate_memory_for_cloudflare(operation.memory)
if not is_valid:
logger.warning(f"Memory validation failed for sync: {validation_error}")
# Don't retry if it's a hard limit
if "exceeds Cloudflare limit" in validation_error or "limit of" in validation_error:
self.sync_stats['operations_failed'] += 1
return # Skip this memory permanently
raise Exception(validation_error)
success, message = await self.secondary.store(operation.memory)
if not success:
raise Exception(f"Store operation failed: {message}")
elif operation.operation == 'delete' and operation.content_hash:
success, message = await self.secondary.delete(operation.content_hash)
if not success:
raise Exception(f"Delete operation failed: {message}")
elif operation.operation == 'update' and operation.content_hash and operation.updates:
success, message = await self.secondary.update_memory_metadata(
operation.content_hash, operation.updates
)
if not success:
raise Exception(f"Update operation failed: {message}")
# Reset failure counters on success
self.consecutive_failures = 0
self.backoff_time = 60
self.sync_stats['cloudflare_available'] = True
except Exception as e:
# Mark Cloudflare as potentially unavailable
self.sync_stats['cloudflare_available'] = False
raise
async def _periodic_sync(self):
"""Perform periodic full synchronization."""
logger.debug("Starting periodic sync")
try:
# Retry any failed operations first
if self.failed_operations:
retry_operations = list(self.failed_operations)
self.failed_operations.clear()
logger.info(f"Retrying {len(retry_operations)} failed operations")
await self._process_operations_batch(retry_operations)
# Perform a lightweight health check
try:
stats = await self.secondary.get_stats()
logger.debug(f"Secondary storage health check passed: {stats}")
self.sync_stats['cloudflare_available'] = True
# Check Cloudflare capacity every periodic sync
capacity_status = await self.check_cloudflare_capacity()
if capacity_status.get('approaching_limits'):
logger.warning("Cloudflare approaching capacity limits")
for warning in capacity_status.get('warnings', []):
logger.warning(warning)
# Periodic drift detection (v8.25.0+)
if self.drift_check_enabled:
time_since_last_check = time.time() - self.last_drift_check_time
if time_since_last_check >= self.drift_check_interval:
logger.info(f"Running periodic drift check (interval: {self.drift_check_interval}s)")
drift_stats = await self._detect_and_sync_drift()
logger.info(f"Drift check complete: {drift_stats}")
except Exception as e:
logger.warning(f"Secondary storage health check failed: {e}")
self.sync_stats['cloudflare_available'] = False
except Exception as e:
logger.error(f"Error during periodic sync: {e}")
async def _detect_and_sync_drift(self, dry_run: bool = False) -> Dict[str, int]:
"""
Detect and sync memories with divergent metadata between backends.
Compares updated_at timestamps to identify metadata drift (tags, types, custom fields)
and synchronizes changes using "newer timestamp wins" strategy.
Args:
dry_run: If True, detect drift but don't apply changes (preview mode)
Returns:
Dictionary with drift detection statistics:
- checked: Number of memories examined
- drift_detected: Number of memories with divergent metadata
- synced: Number of memories synchronized
- failed: Number of sync failures
"""
if not self.drift_check_enabled:
return {'checked': 0, 'drift_detected': 0, 'synced': 0, 'failed': 0}
logger.info(f"Starting drift detection scan (dry_run={dry_run})...")
stats = {'checked': 0, 'drift_detected': 0, 'synced': 0, 'failed': 0}
try:
# Get batch of recently updated memories from Cloudflare
batch_size = getattr(app_config, 'HYBRID_DRIFT_BATCH_SIZE', 100)
# Strategy: Check memories updated since last drift check
time_threshold = self.last_drift_check_time or (time.time() - self.drift_check_interval)
# Get recently updated from Cloudflare
if hasattr(self.secondary, 'get_memories_updated_since'):
cf_updated = await self.secondary.get_memories_updated_since(
time_threshold,
limit=batch_size
)
else:
# Fallback: Get recent memories and filter by timestamp
cf_memories = await self.secondary.get_all_memories(limit=batch_size)
cf_updated = [m for m in cf_memories if m.updated_at and m.updated_at >= time_threshold]
logger.info(f"Found {len(cf_updated)} memories updated in Cloudflare since last check")
# Compare with local versions
for cf_memory in cf_updated:
stats['checked'] += 1
try:
local_memory = await self.primary.get_by_hash(cf_memory.content_hash)
if not local_memory:
# Memory missing locally - sync it
stats['drift_detected'] += 1
logger.debug(f"Memory {cf_memory.content_hash[:8]} missing locally, syncing...")
if not dry_run:
success, _ = await self.primary.store(cf_memory)
if success:
stats['synced'] += 1
else:
logger.info(f"[DRY RUN] Would sync missing memory: {cf_memory.content_hash[:8]}")
stats['synced'] += 1
continue
# Compare updated_at timestamps
cf_updated_at = cf_memory.updated_at or 0
local_updated_at = local_memory.updated_at or 0
# Allow 1 second tolerance for timestamp precision
if abs(cf_updated_at - local_updated_at) > 1.0:
stats['drift_detected'] += 1
logger.debug(
f"Drift detected for {cf_memory.content_hash[:8]}: "
f"Cloudflare={cf_updated_at:.2f}, Local={local_updated_at:.2f}"
)
# Use "newer timestamp wins" strategy
if cf_updated_at > local_updated_at:
# Cloudflare is newer - update local
if not dry_run:
success, _ = await self.primary.update_memory_metadata(
cf_memory.content_hash,
{
'tags': cf_memory.tags,
'memory_type': cf_memory.memory_type,
'metadata': cf_memory.metadata,
'created_at': cf_memory.created_at,
'created_at_iso': cf_memory.created_at_iso,
'updated_at': cf_memory.updated_at,
'updated_at_iso': cf_memory.updated_at_iso,
},
preserve_timestamps=False # Use Cloudflare timestamps
)
if success:
stats['synced'] += 1
logger.info(f"Synced metadata from Cloudflare → local: {cf_memory.content_hash[:8]}")
else:
stats['failed'] += 1
else:
logger.info(f"[DRY RUN] Would sync metadata from Cloudflare → local: {cf_memory.content_hash[:8]}")
stats['synced'] += 1
else:
# Local is newer - update Cloudflare
if not dry_run:
operation = SyncOperation(
operation='update',
content_hash=local_memory.content_hash,
updates={
'tags': local_memory.tags,
'memory_type': local_memory.memory_type,
'metadata': local_memory.metadata,
}
)
await self.enqueue_operation(operation)
stats['synced'] += 1
logger.info(f"Queued metadata sync from local → Cloudflare: {local_memory.content_hash[:8]}")
else:
logger.info(f"[DRY RUN] Would queue metadata sync from local → Cloudflare: {local_memory.content_hash[:8]}")
stats['synced'] += 1
except Exception as e:
logger.warning(f"Error checking drift for memory: {e}")
stats['failed'] += 1
continue
# Update tracking
if not dry_run:
self.last_drift_check_time = time.time()
self.sync_stats['last_drift_check'] = self.last_drift_check_time
self.sync_stats['drift_detected_count'] += stats['drift_detected']
self.sync_stats['drift_synced_count'] += stats['synced']
logger.info(
f"Drift detection complete: checked={stats['checked']}, "
f"drift_detected={stats['drift_detected']}, synced={stats['synced']}, failed={stats['failed']}"
)
except Exception as e:
logger.error(f"Error during drift detection: {e}")
return stats
class HybridMemoryStorage(MemoryStorage):
"""
Hybrid memory storage using SQLite-vec as primary and Cloudflare as secondary.
This implementation provides:
- Ultra-fast reads and writes (~5ms) via SQLite-vec
- Cloud persistence and multi-device sync via Cloudflare
- Background synchronization with retry logic
- Graceful degradation when cloud services are unavailable
- Full compatibility with the MemoryStorage interface
"""
@property
def max_content_length(self) -> Optional[int]:
"""
Maximum content length constrained by Cloudflare secondary storage.
Uses configured hybrid limit (defaults to Cloudflare limit).
"""
return HYBRID_MAX_CONTENT_LENGTH
@property
def supports_chunking(self) -> bool:
"""Hybrid backend supports content chunking with metadata linking."""
return True
def __init__(self,
sqlite_db_path: str,
embedding_model: str = "all-MiniLM-L6-v2",
cloudflare_config: Dict[str, Any] = None,
sync_interval: int = 300,
batch_size: int = 50):
"""
Initialize hybrid storage with primary SQLite-vec and secondary Cloudflare.
Args:
sqlite_db_path: Path to SQLite-vec database file
embedding_model: Embedding model name for SQLite-vec
cloudflare_config: Cloudflare configuration dict
sync_interval: Background sync interval in seconds (default: 5 minutes)
batch_size: Batch size for sync operations (default: 50)
"""
self.primary = SqliteVecMemoryStorage(
db_path=sqlite_db_path,
embedding_model=embedding_model
)
# Initialize Cloudflare storage if config provided
self.secondary = None
self.sync_service = None
if cloudflare_config and all(key in cloudflare_config for key in
['api_token', 'account_id', 'vectorize_index', 'd1_database_id']):
self.secondary = CloudflareStorage(**cloudflare_config)
else:
logger.warning("Cloudflare config incomplete, running in SQLite-only mode")
self.sync_interval = sync_interval
self.batch_size = batch_size
self.initialized = False
# Initial sync status tracking
self.initial_sync_in_progress = False
self.initial_sync_total = 0
self.initial_sync_completed = 0
self.initial_sync_finished = False
async def initialize(self) -> None:
"""Initialize the hybrid storage system."""
logger.info("Initializing hybrid memory storage...")
# Always initialize primary storage
await self.primary.initialize()
logger.info("Primary storage (SQLite-vec) initialized")
# Initialize secondary storage and sync service if available
if self.secondary:
try:
await self.secondary.initialize()
logger.info("Secondary storage (Cloudflare) initialized")
# Start background sync service
self.sync_service = BackgroundSyncService(
self.primary,
self.secondary,
sync_interval=self.sync_interval,
batch_size=self.batch_size
)
await self.sync_service.start()
logger.info("Background sync service started")
# Schedule initial sync to run after server startup (non-blocking)
if HYBRID_SYNC_ON_STARTUP:
asyncio.create_task(self._perform_initial_sync_after_startup())
logger.info("Initial sync scheduled to run after server startup")
except Exception as e:
logger.warning(f"Failed to initialize secondary storage: {e}")
self.secondary = None
self.initialized = True
logger.info("Hybrid memory storage initialization completed")
async def _perform_initial_sync_after_startup(self) -> None:
"""
Wrapper for initial sync that waits for server startup to complete.
This allows the web server to be accessible during the sync process.
"""
# Wait a bit for server to fully start up
await asyncio.sleep(2)
logger.info("Starting initial sync in background (server is now accessible)")
await self._perform_initial_sync()
async def _sync_memories_from_cloudflare(
self,
sync_type: str = "initial",
broadcast_sse: bool = True,
enable_drift_check: bool = True
) -> Dict[str, Any]:
"""
Shared logic for syncing memories FROM Cloudflare TO local storage.
Args:
sync_type: Type of sync ("initial" or "manual") for logging/SSE events
broadcast_sse: Whether to broadcast SSE progress events
enable_drift_check: Whether to check for metadata drift (only for initial sync)
Returns:
Dict with:
- success: bool
- memories_synced: int
- total_checked: int
- message: str
- time_taken_seconds: float
"""
import time
sync_start_time = time.time()
try:
# Get memory count from both storages to compare
primary_stats = await self.primary.get_stats()
secondary_stats = await self.secondary.get_stats()
primary_count = primary_stats.get('total_memories', 0)
secondary_count = secondary_stats.get('total_memories', 0)
logger.info(f"{sync_type.capitalize()} sync: Local={primary_count}, Cloudflare={secondary_count}")
if secondary_count <= primary_count:
logger.info(f"No new memories to sync from Cloudflare ({sync_type} sync)")
return {
'success': True,
'memories_synced': 0,
'total_checked': 0,
'message': 'No new memories to pull from Cloudflare',
'time_taken_seconds': round(time.time() - sync_start_time, 3)
}
# Pull missing memories from Cloudflare using optimized batch processing
missing_count = secondary_count - primary_count
synced_count = 0
batch_size = min(500, self.batch_size * 5) # 5x larger batches for sync
cursor = None
processed_count = 0
consecutive_empty_batches = 0
# Get all local hashes once for O(1) lookup
local_hashes = await self.primary.get_all_content_hashes()
logger.info(f"Pulling {missing_count} potential memories from Cloudflare...")
while True:
try:
# Get batch from Cloudflare using cursor-based pagination
logger.debug(f"Fetching batch: cursor={cursor}, batch_size={batch_size}")
if hasattr(self.secondary, 'get_all_memories_cursor'):
cloudflare_memories = await self.secondary.get_all_memories_cursor(
limit=batch_size,
cursor=cursor
)
else:
cloudflare_memories = await self.secondary.get_all_memories(
limit=batch_size,
offset=processed_count
)
if not cloudflare_memories:
logger.debug(f"No more memories from Cloudflare at cursor {cursor}")
break
logger.debug(f"Processing batch of {len(cloudflare_memories)} memories")
batch_checked = 0
batch_missing = 0
batch_synced = 0
# Parallel processing with concurrency limit
semaphore = asyncio.Semaphore(15)
async def sync_single_memory(cf_memory):
"""Process a single memory with concurrency control."""
nonlocal batch_checked, batch_missing, batch_synced, synced_count, processed_count
async with semaphore:
batch_checked += 1
processed_count += 1
try:
# Fast O(1) existence check
if cf_memory.content_hash not in local_hashes:
batch_missing += 1
# Memory doesn't exist locally, sync it
success, message = await self.primary.store(cf_memory)
if success:
batch_synced += 1
synced_count += 1
local_hashes.add(cf_memory.content_hash) # Update cache
if sync_type == "initial":
self.initial_sync_completed = synced_count
if synced_count % 10 == 0:
logger.info(f"{sync_type.capitalize()} sync progress: {synced_count}/{missing_count} memories synced")
# Broadcast SSE progress event
if broadcast_sse and SSE_AVAILABLE:
try:
progress_event = create_sync_progress_event(
synced_count=synced_count,
total_count=missing_count,
sync_type=sync_type
)
await sse_manager.broadcast_event(progress_event)
except Exception as e:
logger.debug(f"Failed to broadcast SSE progress: {e}")
return ('synced', cf_memory.content_hash)
else:
logger.warning(f"Failed to sync memory {cf_memory.content_hash}: {message}")
return ('failed', cf_memory.content_hash, message)
elif enable_drift_check and self.sync_service and self.sync_service.drift_check_enabled:
# Memory exists - check for metadata drift
existing = await self.primary.get_by_hash(cf_memory.content_hash)
if existing:
cf_updated = cf_memory.updated_at or 0
local_updated = existing.updated_at or 0
# If Cloudflare version is newer, sync metadata
if cf_updated > local_updated + 1.0:
logger.debug(f"Metadata drift detected: {cf_memory.content_hash[:8]}")
success, _ = await self.primary.update_memory_metadata(
cf_memory.content_hash,
{
'tags': cf_memory.tags,
'memory_type': cf_memory.memory_type,
'metadata': cf_memory.metadata,
'created_at': cf_memory.created_at,
'created_at_iso': cf_memory.created_at_iso,
'updated_at': cf_memory.updated_at,
'updated_at_iso': cf_memory.updated_at_iso,
},
preserve_timestamps=False
)
if success:
batch_synced += 1
synced_count += 1
logger.debug(f"Synced metadata for: {cf_memory.content_hash[:8]}")
return ('drift_synced', cf_memory.content_hash)
return ('skipped', cf_memory.content_hash)
except Exception as e:
logger.warning(f"Error syncing memory {cf_memory.content_hash}: {e}")
return ('error', cf_memory.content_hash, str(e))
# Process batch in parallel
tasks = [sync_single_memory(mem) for mem in cloudflare_memories]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
logger.error(f"Error during {sync_type} sync batch processing: {result}")
logger.debug(f"Batch complete: checked={batch_checked}, missing={batch_missing}, synced={batch_synced}")
# Track consecutive empty batches
if batch_synced == 0:
consecutive_empty_batches += 1
logger.debug(f"Empty batch: consecutive={consecutive_empty_batches}/{HYBRID_MAX_EMPTY_BATCHES}")
else:
consecutive_empty_batches = 0
# Log progress summary
if processed_count > 0 and processed_count % 100 == 0:
logger.info(f"Sync progress: processed={processed_count}, synced={synced_count}/{missing_count}")
# Update cursor for next batch
if cloudflare_memories and hasattr(self.secondary, 'get_all_memories_cursor'):
cursor = min(memory.created_at for memory in cloudflare_memories if memory.created_at)
logger.debug(f"Next cursor: {cursor}")
# Early break conditions
if consecutive_empty_batches >= HYBRID_MAX_EMPTY_BATCHES and synced_count > 0:
logger.info(f"Completed after {consecutive_empty_batches} empty batches - {synced_count}/{missing_count} synced")
break
elif processed_count >= HYBRID_MIN_CHECK_COUNT and synced_count == 0:
logger.info(f"No missing memories after checking {processed_count} memories")
break
await asyncio.sleep(0.01)
except Exception as e:
# Handle Cloudflare D1 errors
if "400" in str(e) and not hasattr(self.secondary, 'get_all_memories_cursor'):
logger.error(f"D1 OFFSET limitation at processed_count={processed_count}: {e}")
logger.warning("Cloudflare D1 OFFSET limits reached - sync incomplete")
break
else:
logger.error(f"Error during {sync_type} sync: {e}")
break
time_taken = time.time() - sync_start_time
logger.info(f"{sync_type.capitalize()} sync completed: {synced_count} memories in {time_taken:.2f}s")
# Broadcast SSE completion event
if broadcast_sse and SSE_AVAILABLE and missing_count > 0:
try:
completion_event = create_sync_completed_event(
synced_count=synced_count,
total_count=missing_count,
time_taken_seconds=time_taken,
sync_type=sync_type
)
await sse_manager.broadcast_event(completion_event)
except Exception as e:
logger.debug(f"Failed to broadcast SSE completion: {e}")
return {
'success': True,
'memories_synced': synced_count,
'total_checked': processed_count,
'message': f'Successfully pulled {synced_count} memories from Cloudflare',
'time_taken_seconds': round(time_taken, 3)
}
except Exception as e:
logger.error(f"{sync_type.capitalize()} sync failed: {e}")
return {
'success': False,
'memories_synced': 0,
'total_checked': 0,
'message': f'Failed to pull from Cloudflare: {str(e)}',
'time_taken_seconds': round(time.time() - sync_start_time, 3)
}
async def _perform_initial_sync(self) -> None:
"""
Perform initial sync from Cloudflare to SQLite if enabled.
This downloads all memories from Cloudflare that are missing in local SQLite,
providing immediate access to existing cloud memories.
"""
if not HYBRID_SYNC_ON_STARTUP or not self.secondary:
return
logger.info("Starting initial sync from Cloudflare to SQLite...")
self.initial_sync_in_progress = True
self.initial_sync_completed = 0
self.initial_sync_finished = False
try:
# Set initial_sync_total before calling the shared helper
primary_stats = await self.primary.get_stats()
secondary_stats = await self.secondary.get_stats()
primary_count = primary_stats.get('total_memories', 0)
secondary_count = secondary_stats.get('total_memories', 0)
if secondary_count > primary_count:
self.initial_sync_total = secondary_count - primary_count
else:
self.initial_sync_total = 0
# Call shared helper method with drift checking enabled
result = await self._sync_memories_from_cloudflare(
sync_type="initial",
broadcast_sse=True,
enable_drift_check=True
)
synced_count = result['memories_synced']
# Update sync tracking to reflect actual sync completion
if synced_count == 0 and self.initial_sync_total > 0:
# All memories were already present - this is a successful "no-op" sync
self.initial_sync_completed = self.initial_sync_total
logger.info(f"Sync completed successfully: All {self.initial_sync_total} memories were already present locally")
self.initial_sync_finished = True
except Exception as e:
logger.error(f"Initial sync failed: {e}")
# Don't fail initialization if initial sync fails
logger.warning("Continuing with hybrid storage despite initial sync failure")
finally:
self.initial_sync_in_progress = False
def get_initial_sync_status(self) -> Dict[str, Any]:
"""Get current initial sync status for monitoring."""
return {
"in_progress": self.initial_sync_in_progress,
"total": self.initial_sync_total,
"completed": self.initial_sync_completed,
"finished": self.initial_sync_finished,
"progress_percentage": round((self.initial_sync_completed / max(self.initial_sync_total, 1)) * 100, 1) if self.initial_sync_total > 0 else 0
}
async def store(self, memory: Memory) -> Tuple[bool, str]:
"""Store a memory in primary storage and queue for secondary sync."""
# Always store in primary first for immediate availability
success, message = await self.primary.store(memory)
if success and self.sync_service:
# Queue for background sync to secondary
operation = SyncOperation(operation='store', memory=memory)
await self.sync_service.enqueue_operation(operation)
return success, message
async def retrieve(self, query: str, n_results: int = 5) -> List[MemoryQueryResult]:
"""Retrieve memories from primary storage (fast)."""
return await self.primary.retrieve(query, n_results)
async def search(self, query: str, n_results: int = 5, min_similarity: float = 0.0) -> List[MemoryQueryResult]:
"""Search memories in primary storage."""
return await self.primary.search(query, n_results)
async def search_by_tag(self, tags: List[str], time_start: Optional[float] = None) -> List[Memory]:
"""Search memories by tags in primary storage with optional time filtering.
This method performs an OR search for tags. The `match_all` (AND) logic
is handled at the API layer.
Args:
tags: List of tags to search for
time_start: Optional Unix timestamp (in seconds) to filter memories created after this time
Returns:
List of Memory objects matching the tag criteria and time filter
"""
return await self.primary.search_by_tag(tags, time_start=time_start)
async def search_by_tags(
self,
tags: List[str],
operation: str = "AND",
time_start: Optional[float] = None,
time_end: Optional[float] = None
) -> List[Memory]:
"""Search memories by tags using consistent operation parameter across backends."""
normalized_operation = operation.strip().upper() if isinstance(operation, str) else "AND"
if normalized_operation not in {"AND", "OR"}:
logger.warning("Unsupported tag operation %s; defaulting to AND", operation)
normalized_operation = "AND"
return await self.primary.search_by_tags(
tags,
operation=normalized_operation,
time_start=time_start,
time_end=time_end
)
async def delete(self, content_hash: str) -> Tuple[bool, str]:
"""Delete a memory from primary storage and queue for secondary sync."""
success, message = await self.primary.delete(content_hash)
if success and self.sync_service:
# Queue for background sync to secondary
operation = SyncOperation(operation='delete', content_hash=content_hash)
await self.sync_service.enqueue_operation(operation)
return success, message
async def delete_by_tag(self, tag: str) -> Tuple[int, str]:
"""Delete memories by tag from primary storage and queue for secondary sync."""
# First, get the memories with this tag to get their hashes for sync
memories_to_delete = await self.primary.search_by_tags([tag])
# Delete from primary
count_deleted, message = await self.primary.delete_by_tag(tag)
# Queue individual deletes for secondary sync
if count_deleted > 0 and self.sync_service:
for memory in memories_to_delete:
operation = SyncOperation(operation='delete', content_hash=memory.content_hash)
await self.sync_service.enqueue_operation(operation)
return count_deleted, message
async def delete_by_tags(self, tags: List[str]) -> Tuple[int, str]:
"""
Delete memories matching ANY of the given tags from primary storage and queue for secondary sync.
Optimized to use primary storage's delete_by_tags if available, otherwise falls back to
calling delete_by_tag for each tag.
"""
if not tags:
return 0, "No tags provided"
# First, get all memories with any of these tags for sync queue
memories_to_delete = await self.primary.search_by_tags(tags, operation="OR")
# Remove duplicates based on content_hash
unique_memories = {m.content_hash: m for m in memories_to_delete}.values()
# Delete from primary using optimized method if available
count_deleted, message = await self.primary.delete_by_tags(tags)
# Queue individual deletes for secondary sync
if count_deleted > 0 and self.sync_service:
for memory in unique_memories:
operation = SyncOperation(operation='delete', content_hash=memory.content_hash)
await self.sync_service.enqueue_operation(operation)
return count_deleted, message
async def cleanup_duplicates(self) -> Tuple[int, str]:
"""Clean up duplicates in primary storage."""
# Only cleanup primary, secondary will sync naturally
return await self.primary.cleanup_duplicates()
async def update_memory_metadata(self, content_hash: str, updates: Dict[str, Any], preserve_timestamps: bool = True) -> Tuple[bool, str]:
"""Update memory metadata in primary storage and queue for secondary sync."""
success, message = await self.primary.update_memory_metadata(content_hash, updates, preserve_timestamps)
if success and self.sync_service:
# Queue for background sync to secondary
operation = SyncOperation(
operation='update',
content_hash=content_hash,
updates=updates
)
await self.sync_service.enqueue_operation(operation)
return success, message
async def update_memories_batch(self, memories: List[Memory]) -> List[bool]:
"""
Update multiple memories in a batch operation with optimal performance.
Delegates directly to primary storage's optimized batch update method,
then queues secondary sync operations in background.
Args:
memories: List of Memory objects with updated fields
Returns:
List of success booleans, one for each memory in the batch
"""
# Use primary storage's optimized batch update (single transaction)
results = await self.primary.update_memories_batch(memories)
# Queue successful updates for background sync to secondary
if self.sync_service:
for idx, (memory, success) in enumerate(zip(memories, results)):
if success:
operation = SyncOperation(
operation='update',
content_hash=memory.content_hash,
updates={
'tags': memory.tags,
'metadata': memory.metadata,
'memory_type': memory.memory_type
}
)
# Don't await - queue asynchronously for background processing
try:
await self.sync_service.enqueue_operation(operation)
except Exception as e:
logger.warning(f"Failed to queue sync for {memory.content_hash}: {e}")
return results
async def get_stats(self) -> Dict[str, Any]:
"""Get comprehensive statistics from both storage backends."""
# SQLite-vec get_stats is now async
primary_stats = await self.primary.get_stats()
stats = {
"storage_backend": "Hybrid (SQLite-vec + Cloudflare)",
"primary_backend": "SQLite-vec",
"secondary_backend": "Cloudflare" if self.secondary else "None",
"total_memories": primary_stats.get("total_memories", 0),
"unique_tags": primary_stats.get("unique_tags", 0),
"memories_this_week": primary_stats.get("memories_this_week", 0),
"database_size_bytes": primary_stats.get("database_size_bytes", 0),
"database_size_mb": primary_stats.get("database_size_mb", 0),
"primary_stats": primary_stats,
"sync_enabled": self.sync_service is not None
}
# Add sync service statistics if available
if self.sync_service:
sync_status = await self.sync_service.get_sync_status()
stats["sync_status"] = sync_status
# Add secondary stats if available and healthy
if self.secondary and self.sync_service and self.sync_service.sync_stats['cloudflare_available']:
try:
secondary_stats = await self.secondary.get_stats()
stats["secondary_stats"] = secondary_stats
except Exception as e:
stats["secondary_error"] = str(e)
return stats
async def get_all_tags_with_counts(self) -> List[Dict[str, Any]]:
"""Get all tags with their usage counts from primary storage."""
return await self.primary.get_all_tags_with_counts()
async def get_all_tags(self) -> List[str]:
"""Get all unique tags from primary storage."""
return await self.primary.get_all_tags()
async def get_recent_memories(self, n: int = 10) -> List[Memory]:
"""Get recent memories from primary storage."""
return await self.primary.get_recent_memories(n)
async def get_largest_memories(self, n: int = 10) -> List[Memory]:
"""Get largest memories by content length from primary storage."""
return await self.primary.get_largest_memories(n)
async def get_memory_timestamps(self, days: Optional[int] = None) -> List[float]:
"""
Get memory creation timestamps only, without loading full memory objects.
Delegates to primary storage (SQLite-vec) for optimal performance.
Args:
days: Optional filter to only get memories from last N days
Returns:
List of Unix timestamps (float) in descending order (newest first)
"""
return await self.primary.get_memory_timestamps(days)
async def recall(self, query: Optional[str] = None, n_results: int = 5, start_timestamp: Optional[float] = None, end_timestamp: Optional[float] = None) -> List[MemoryQueryResult]:
"""
Retrieve memories with combined time filtering and optional semantic search.
Args:
query: Optional semantic search query. If None, only time filtering is applied.
n_results: Maximum number of results to return.
start_timestamp: Optional start time for filtering.
end_timestamp: Optional end time for filtering.
Returns:
List of MemoryQueryResult objects.
"""
return await self.primary.recall(query=query, n_results=n_results, start_timestamp=start_timestamp, end_timestamp=end_timestamp)
async def recall_memory(self, query: str, n_results: int = 5) -> List[Memory]:
"""Recall memories using natural language time expressions."""
return await self.primary.recall_memory(query, n_results)
async def get_all_memories(self, limit: int = None, offset: int = 0, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> List[Memory]:
"""Get all memories from primary storage."""
return await self.primary.get_all_memories(limit=limit, offset=offset, memory_type=memory_type, tags=tags)
async def get_by_hash(self, content_hash: str) -> Optional[Memory]:
"""Get a memory by its content hash from primary storage."""
return await self.primary.get_by_hash(content_hash)
async def count_all_memories(self, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> int:
"""Get total count of memories from primary storage."""
return await self.primary.count_all_memories(memory_type=memory_type, tags=tags)
async def get_memories_by_time_range(self, start_time: float, end_time: float) -> List[Memory]:
"""Get memories within time range from primary storage."""
return await self.primary.get_memories_by_time_range(start_time, end_time)
async def close(self):
"""Clean shutdown of hybrid storage system."""
logger.info("Shutting down hybrid memory storage...")
# Stop sync service first
if self.sync_service:
await self.sync_service.stop()
# Close storage backends
if hasattr(self.primary, 'close') and self.primary.close:
if asyncio.iscoroutinefunction(self.primary.close):
await self.primary.close()
else:
self.primary.close()
if self.secondary and hasattr(self.secondary, 'close') and self.secondary.close:
if asyncio.iscoroutinefunction(self.secondary.close):
await self.secondary.close()
else:
self.secondary.close()
logger.info("Hybrid memory storage shutdown completed")
async def force_sync(self) -> Dict[str, Any]:
"""Force immediate synchronization with secondary storage."""
if not self.sync_service:
return {
'status': 'disabled',
'message': 'Background sync service not available'
}
return await self.sync_service.force_sync()
async def force_pull_sync(self) -> Dict[str, Any]:
"""
Force immediate pull synchronization FROM Cloudflare TO local SQLite.
This triggers the same logic as initial_sync but can be called on demand.
Useful for manually refreshing local storage with memories stored via MCP.
Returns:
Dict with sync results including:
- success: bool
- message: str
- memories_pulled: int
- time_taken_seconds: float
"""
# Call shared helper method without drift checking (manual sync doesn't need it)
result = await self._sync_memories_from_cloudflare(
sync_type="manual",
broadcast_sse=True,
enable_drift_check=False
)
# Map result to expected return format
return {
'success': result['success'],
'message': result['message'],
'memories_pulled': result['memories_synced'],
'time_taken_seconds': result['time_taken_seconds']
}
async def get_sync_status(self) -> Dict[str, Any]:
"""Get current background sync status and statistics."""
if not self.sync_service:
return {
'is_running': False,
'pending_operations': 0,
'operations_processed': 0,
'operations_failed': 0,
'last_sync_time': 0,
'sync_interval': 0
}
return await self.sync_service.get_sync_status()
async def pause_sync(self) -> Dict[str, Any]:
"""Pause background sync operations for safe database operations."""
if not self.sync_service:
return {
'success': False,
'message': 'Sync service not available'
}
try:
if not self.sync_service.is_running:
return {
'success': True,
'message': 'Sync already paused'
}
# Stop the sync service
await self.sync_service.stop()
logger.info("Background sync paused")
return {
'success': True,
'message': 'Sync paused successfully'
}
except Exception as e:
logger.error(f"Failed to pause sync: {e}")
return {
'success': False,
'message': f'Failed to pause sync: {str(e)}'
}
async def resume_sync(self) -> Dict[str, Any]:
"""Resume background sync operations after pause."""
if not self.sync_service:
return {
'success': False,
'message': 'Sync service not available'
}
try:
if self.sync_service.is_running:
return {
'success': True,
'message': 'Sync already running'
}
# Start the sync service
await self.sync_service.start()
logger.info("Background sync resumed")
return {
'success': True,
'message': 'Sync resumed successfully'
}
except Exception as e:
logger.error(f"Failed to resume sync: {e}")
return {
'success': False,
'message': f'Failed to resume sync: {str(e)}'
}
def sanitized(self, tags):
"""Sanitize and normalize tags to a JSON string.
This method provides compatibility with the storage interface.
Delegates to primary storage for consistent tag handling.
"""
return self.primary.sanitized(tags)
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/storage/cloudflare.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Cloudflare storage backend for MCP Memory Service.
Provides cloud-native storage using Vectorize, D1, and R2.
"""
import json
import logging
import hashlib
import asyncio
import time
from typing import List, Dict, Any, Tuple, Optional
from datetime import datetime, timezone, timedelta
import httpx
from .base import MemoryStorage
from ..models.memory import Memory, MemoryQueryResult
from ..utils.hashing import generate_content_hash
from ..config import CLOUDFLARE_MAX_CONTENT_LENGTH
logger = logging.getLogger(__name__)
def normalize_tags_for_search(tags: List[str]) -> List[str]:
"""Deduplicate and filter empty tag strings.
Args:
tags: List of tag strings (may contain duplicates or empty strings)
Returns:
Deduplicated list of non-empty tags
"""
return list(dict.fromkeys([tag for tag in tags if tag]))
def normalize_operation(operation: Optional[str]) -> str:
"""Normalize tag search operation to AND or OR.
Args:
operation: Raw operation string (case-insensitive)
Returns:
Normalized operation: "AND" or "OR"
"""
if isinstance(operation, str):
normalized = operation.strip().upper() or "AND"
else:
normalized = "AND"
if normalized not in {"AND", "OR"}:
logger.warning(f"Unsupported operation '{operation}'; defaulting to AND")
normalized = "AND"
return normalized
def build_tag_search_query(tags: List[str], operation: str,
time_start: Optional[float] = None,
time_end: Optional[float] = None) -> Tuple[str, List[Any]]:
"""Build SQL query for tag-based search with time filtering.
Args:
tags: List of deduplicated tags
operation: Search operation ("AND" or "OR")
time_start: Optional start timestamp filter
time_end: Optional end timestamp filter
Returns:
Tuple of (sql_query, parameters_list)
"""
placeholders = ",".join(["?"] * len(tags))
params: List[Any] = list(tags)
sql = (
"SELECT m.* FROM memories m "
"JOIN memory_tags mt ON m.id = mt.memory_id "
"JOIN tags t ON mt.tag_id = t.id "
f"WHERE t.name IN ({placeholders})"
)
if time_start is not None:
sql += " AND m.created_at >= ?"
params.append(time_start)
if time_end is not None:
sql += " AND m.created_at <= ?"
params.append(time_end)
sql += " GROUP BY m.id"
if operation == "AND":
sql += " HAVING COUNT(DISTINCT t.name) = ?"
params.append(len(tags))
sql += " ORDER BY m.created_at DESC"
return sql, params
class CloudflareStorage(MemoryStorage):
"""Cloudflare-based storage backend using Vectorize, D1, and R2."""
# Content length limit from configuration
_MAX_CONTENT_LENGTH = CLOUDFLARE_MAX_CONTENT_LENGTH
@property
def max_content_length(self) -> Optional[int]:
"""Maximum content length: 800 chars (BGE model 512 token limit)."""
return self._MAX_CONTENT_LENGTH
@property
def supports_chunking(self) -> bool:
"""Cloudflare backend supports content chunking with metadata linking."""
return True
def __init__(self,
api_token: str,
account_id: str,
vectorize_index: str,
d1_database_id: str,
r2_bucket: Optional[str] = None,
embedding_model: str = "@cf/baai/bge-base-en-v1.5",
large_content_threshold: int = 1024 * 1024, # 1MB
max_retries: int = 3,
base_delay: float = 1.0):
"""
Initialize Cloudflare storage backend.
Args:
api_token: Cloudflare API token
account_id: Cloudflare account ID
vectorize_index: Vectorize index name
d1_database_id: D1 database ID
r2_bucket: Optional R2 bucket for large content
embedding_model: Workers AI embedding model
large_content_threshold: Size threshold for R2 storage
max_retries: Maximum retry attempts for API calls
base_delay: Base delay for exponential backoff
"""
self.api_token = api_token
self.account_id = account_id
self.vectorize_index = vectorize_index
self.d1_database_id = d1_database_id
self.r2_bucket = r2_bucket
self.embedding_model = embedding_model
self.large_content_threshold = large_content_threshold
self.max_retries = max_retries
self.base_delay = base_delay
# API endpoints
self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}"
self.vectorize_url = f"{self.base_url}/vectorize/v2/indexes/{vectorize_index}"
self.d1_url = f"{self.base_url}/d1/database/{d1_database_id}"
self.ai_url = f"{self.base_url}/ai/run/{embedding_model}"
if r2_bucket:
self.r2_url = f"{self.base_url}/r2/buckets/{r2_bucket}/objects"
# HTTP client with connection pooling
self.client = None
self._initialized = False
# Embedding cache for performance
self._embedding_cache = {}
self._cache_max_size = 1000
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create HTTP client with connection pooling."""
if self.client is None:
headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
self.client = httpx.AsyncClient(
headers=headers,
timeout=httpx.Timeout(30.0),
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5)
)
return self.client
async def _retry_request(self, method: str, url: str, **kwargs) -> httpx.Response:
"""Make HTTP request with exponential backoff retry logic."""
client = await self._get_client()
for attempt in range(self.max_retries + 1):
try:
response = await client.request(method, url, **kwargs)
# Handle rate limiting
if response.status_code == 429:
if attempt < self.max_retries:
delay = self.base_delay * (2 ** attempt)
logger.warning(f"Rate limited, retrying in {delay}s (attempt {attempt + 1}/{self.max_retries + 1})")
await asyncio.sleep(delay)
continue
else:
raise httpx.HTTPError(f"Rate limited after {self.max_retries} retries")
# Handle server errors
if response.status_code >= 500:
if attempt < self.max_retries:
delay = self.base_delay * (2 ** attempt)
logger.warning(f"Server error {response.status_code}, retrying in {delay}s")
await asyncio.sleep(delay)
continue
response.raise_for_status()
return response
except (httpx.NetworkError, httpx.TimeoutException) as e:
if attempt < self.max_retries:
delay = self.base_delay * (2 ** attempt)
logger.warning(f"Network error: {e}, retrying in {delay}s")
await asyncio.sleep(delay)
continue
raise
raise httpx.HTTPError(f"Failed after {self.max_retries} retries")
async def _generate_embedding(self, text: str) -> List[float]:
"""Generate embedding using Workers AI or cache."""
# Check cache first
text_hash = hashlib.sha256(text.encode()).hexdigest()
if text_hash in self._embedding_cache:
return self._embedding_cache[text_hash]
try:
# Use Workers AI to generate embedding
payload = {"text": [text]}
response = await self._retry_request("POST", self.ai_url, json=payload)
result = response.json()
if result.get("success") and "result" in result:
embedding = result["result"]["data"][0]
# Cache the embedding (with size limit)
if len(self._embedding_cache) >= self._cache_max_size:
# Remove oldest entry (simple FIFO)
oldest_key = next(iter(self._embedding_cache))
del self._embedding_cache[oldest_key]
self._embedding_cache[text_hash] = embedding
return embedding
else:
raise ValueError(f"Workers AI embedding failed: {result}")
except Exception as e:
logger.error(f"Failed to generate embedding with Workers AI: {e}")
# TODO: Implement fallback to local sentence-transformers
raise ValueError(f"Embedding generation failed: {e}")
async def initialize(self) -> None:
"""Initialize the Cloudflare storage backend."""
if self._initialized:
return
logger.info("Initializing Cloudflare storage backend...")
try:
# Initialize D1 database schema
await self._initialize_d1_schema()
# Verify Vectorize index exists
await self._verify_vectorize_index()
# Verify R2 bucket if configured
if self.r2_bucket:
await self._verify_r2_bucket()
self._initialized = True
logger.info("Cloudflare storage backend initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Cloudflare storage: {e}")
raise
async def _initialize_d1_schema(self) -> None:
"""Initialize D1 database schema."""
schema_sql = """
-- Memory metadata table
CREATE TABLE IF NOT EXISTS memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content_hash TEXT UNIQUE NOT NULL,
content TEXT NOT NULL,
memory_type TEXT,
created_at REAL NOT NULL,
created_at_iso TEXT NOT NULL,
updated_at REAL,
updated_at_iso TEXT,
metadata_json TEXT,
vector_id TEXT UNIQUE,
content_size INTEGER DEFAULT 0,
r2_key TEXT
);
-- Tags table
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL
);
-- Memory-tag relationships
CREATE TABLE IF NOT EXISTS memory_tags (
memory_id INTEGER,
tag_id INTEGER,
PRIMARY KEY (memory_id, tag_id),
FOREIGN KEY (memory_id) REFERENCES memories(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
);
-- Indexes for performance
CREATE INDEX IF NOT EXISTS idx_memories_content_hash ON memories(content_hash);
CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at);
CREATE INDEX IF NOT EXISTS idx_memories_vector_id ON memories(vector_id);
CREATE INDEX IF NOT EXISTS idx_tags_name ON tags(name);
"""
payload = {"sql": schema_sql}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"Failed to initialize D1 schema: {result}")
async def _verify_vectorize_index(self) -> None:
"""Verify Vectorize index exists and is accessible."""
try:
response = await self._retry_request("GET", f"{self.vectorize_url}")
result = response.json()
if not result.get("success"):
raise ValueError(f"Vectorize index not accessible: {result}")
logger.info(f"Vectorize index verified: {self.vectorize_index}")
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise ValueError(f"Vectorize index '{self.vectorize_index}' not found")
raise
async def _verify_r2_bucket(self) -> None:
"""Verify R2 bucket exists and is accessible."""
try:
# Try to list objects (empty list is fine)
response = await self._retry_request("GET", f"{self.r2_url}?max-keys=1")
logger.info(f"R2 bucket verified: {self.r2_bucket}")
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
raise ValueError(f"R2 bucket '{self.r2_bucket}' not found")
raise
async def store(self, memory: Memory) -> Tuple[bool, str]:
"""Store a memory in Cloudflare storage."""
try:
# Generate embedding for the content
embedding = await self._generate_embedding(memory.content)
# Determine storage strategy based on content size
content_size = len(memory.content.encode('utf-8'))
use_r2 = self.r2_bucket and content_size > self.large_content_threshold
# Store large content in R2 if needed
r2_key = None
stored_content = memory.content
if use_r2:
r2_key = f"content/{memory.content_hash}.txt"
await self._store_r2_content(r2_key, memory.content)
stored_content = f"[R2 Content: {r2_key}]" # Placeholder in D1
# Store vector in Vectorize
vector_id = memory.content_hash
vector_metadata = {
"content_hash": memory.content_hash,
"memory_type": memory.memory_type or "standard",
"tags": ",".join(memory.tags) if memory.tags else "",
"created_at": memory.created_at_iso or datetime.now().isoformat()
}
await self._store_vectorize_vector(vector_id, embedding, vector_metadata)
# Store metadata in D1
await self._store_d1_memory(memory, vector_id, content_size, r2_key, stored_content)
logger.info(f"Successfully stored memory: {memory.content_hash}")
return True, f"Memory stored successfully (vector_id: {vector_id})"
except Exception as e:
logger.error(f"Failed to store memory {memory.content_hash}: {e}")
return False, f"Storage failed: {str(e)}"
async def _store_vectorize_vector(self, vector_id: str, embedding: List[float], metadata: Dict[str, Any]) -> None:
"""Store vector in Vectorize."""
# Try without namespace first to isolate the issue
vector_data = {
"id": vector_id,
"values": embedding,
"metadata": metadata
}
# Convert to NDJSON format as required by the HTTP API
import json
ndjson_content = json.dumps(vector_data) + "\n"
try:
# Send as raw NDJSON data with correct Content-Type header
client = await self._get_client()
# Override headers for this specific request
headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/x-ndjson"
}
response = await client.post(
f"{self.vectorize_url}/upsert",
content=ndjson_content.encode("utf-8"),
headers=headers
)
# Log response status for debugging (avoid logging headers/body for security)
logger.info(f"Vectorize response status: {response.status_code}")
response_text = response.text
if response.status_code != 200:
# Only log response body on errors, and truncate to avoid credential exposure
truncated_response = response_text[:200] + "..." if len(response_text) > 200 else response_text
logger.warning(f"Vectorize error response (truncated): {truncated_response}")
if response.status_code != 200:
raise ValueError(f"HTTP {response.status_code}: {response_text}")
result = response.json()
if not result.get("success"):
raise ValueError(f"Failed to store vector: {result}")
except Exception as e:
# Add more detailed error logging
logger.error(f"Vectorize insert failed: {e}")
logger.error(f"Vector data was: {vector_data}")
logger.error(f"NDJSON content: {ndjson_content.strip()}")
logger.error(f"URL was: {self.vectorize_url}/upsert")
raise ValueError(f"Failed to store vector: {e}")
async def _store_d1_memory(self, memory: Memory, vector_id: str, content_size: int, r2_key: Optional[str], stored_content: str) -> None:
"""Store memory metadata in D1."""
# Insert memory record
insert_sql = """
INSERT INTO memories (
content_hash, content, memory_type, created_at, created_at_iso,
updated_at, updated_at_iso, metadata_json, vector_id, content_size, r2_key
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
now = time.time()
now_iso = datetime.now().isoformat()
params = [
memory.content_hash,
stored_content,
memory.memory_type,
memory.created_at or now,
memory.created_at_iso or now_iso,
memory.updated_at or now,
memory.updated_at_iso or now_iso,
json.dumps(memory.metadata) if memory.metadata else None,
vector_id,
content_size,
r2_key
]
payload = {"sql": insert_sql, "params": params}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"Failed to store memory in D1: {result}")
# Store tags if present
if memory.tags:
memory_id = result["result"][0]["meta"]["last_row_id"]
await self._store_d1_tags(memory_id, memory.tags)
async def _store_d1_tags(self, memory_id: int, tags: List[str]) -> None:
"""Store tags for a memory in D1."""
for tag in tags:
# Insert tag if not exists
tag_sql = "INSERT OR IGNORE INTO tags (name) VALUES (?)"
payload = {"sql": tag_sql, "params": [tag]}
await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
# Link tag to memory
link_sql = """
INSERT INTO memory_tags (memory_id, tag_id)
SELECT ?, id FROM tags WHERE name = ?
"""
payload = {"sql": link_sql, "params": [memory_id, tag]}
await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
async def _store_r2_content(self, key: str, content: str) -> None:
"""Store content in R2."""
response = await self._retry_request(
"PUT",
f"{self.r2_url}/{key}",
content=content.encode('utf-8'),
headers={"Content-Type": "text/plain"}
)
if response.status_code not in [200, 201]:
raise ValueError(f"Failed to store content in R2: {response.status_code}")
async def retrieve(self, query: str, n_results: int = 5) -> List[MemoryQueryResult]:
"""Retrieve memories by semantic search."""
try:
# Generate query embedding
query_embedding = await self._generate_embedding(query)
# Search Vectorize (without namespace for now)
search_payload = {
"vector": query_embedding,
"topK": n_results,
"returnMetadata": "all",
"returnValues": False
}
response = await self._retry_request("POST", f"{self.vectorize_url}/query", json=search_payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"Vectorize query failed: {result}")
matches = result.get("result", {}).get("matches", [])
# Convert to MemoryQueryResult objects
results = []
for match in matches:
memory = await self._load_memory_from_match(match)
if memory:
query_result = MemoryQueryResult(
memory=memory,
relevance_score=match.get("score", 0.0)
)
results.append(query_result)
logger.info(f"Retrieved {len(results)} memories for query")
return results
except Exception as e:
logger.error(f"Failed to retrieve memories: {e}")
return []
async def _load_memory_from_match(self, match: Dict[str, Any]) -> Optional[Memory]:
"""Load full memory from Vectorize match."""
try:
vector_id = match.get("id")
metadata = match.get("metadata", {})
content_hash = metadata.get("content_hash")
if not content_hash:
logger.warning(f"No content_hash in vector metadata: {vector_id}")
return None
# Load from D1
sql = "SELECT * FROM memories WHERE content_hash = ?"
payload = {"sql": sql, "params": [content_hash]}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success") or not result.get("result", [{}])[0].get("results"):
logger.warning(f"Memory not found in D1: {content_hash}")
return None
row = result["result"][0]["results"][0]
# Load content from R2 if needed
content = row["content"]
if row.get("r2_key") and content.startswith("[R2 Content:"):
content = await self._load_r2_content(row["r2_key"])
# Load tags
tags = await self._load_memory_tags(row["id"])
# Reconstruct Memory object
memory = Memory(
content=content,
content_hash=content_hash,
tags=tags,
memory_type=row.get("memory_type"),
metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
created_at=row.get("created_at"),
created_at_iso=row.get("created_at_iso"),
updated_at=row.get("updated_at"),
updated_at_iso=row.get("updated_at_iso")
)
return memory
except Exception as e:
logger.error(f"Failed to load memory from match: {e}")
return None
async def _load_r2_content(self, r2_key: str) -> str:
"""Load content from R2."""
response = await self._retry_request("GET", f"{self.r2_url}/{r2_key}")
return response.text
async def _load_memory_tags(self, memory_id: int) -> List[str]:
"""Load tags for a memory from D1."""
sql = """
SELECT t.name FROM tags t
JOIN memory_tags mt ON t.id = mt.tag_id
WHERE mt.memory_id = ?
"""
payload = {"sql": sql, "params": [memory_id]}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if result.get("success") and result.get("result", [{}])[0].get("results"):
return [row["name"] for row in result["result"][0]["results"]]
return []
async def search_by_tags(
self,
tags: List[str],
operation: str = "AND",
time_start: Optional[float] = None,
time_end: Optional[float] = None
) -> List[Memory]:
"""Search memories by tags with AND/OR semantics and optional time filtering."""
return await self._search_by_tags_internal(
tags=tags,
operation=operation,
time_start=time_start,
time_end=time_end
)
async def search_by_tag(self, tags: List[str], time_start: Optional[float] = None) -> List[Memory]:
"""Search memories by tags with optional time filtering (legacy OR behavior)."""
return await self._search_by_tags_internal(
tags=tags,
operation="OR",
time_start=time_start,
time_end=None
)
async def _search_by_tags_internal(
self,
tags: List[str],
operation: Optional[str] = None,
time_start: Optional[float] = None,
time_end: Optional[float] = None
) -> List[Memory]:
"""Shared implementation for tag-based queries with optional time filtering."""
try:
if not tags:
return []
deduped_tags = normalize_tags_for_search(tags)
if not deduped_tags:
return []
normalized_operation = normalize_operation(operation)
sql, params = build_tag_search_query(deduped_tags, normalized_operation,
time_start, time_end)
payload = {"sql": sql, "params": params}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"D1 tag search failed: {result}")
rows = result.get("result", [{}])[0].get("results") or []
memories: List[Memory] = []
for row in rows:
memory = await self._load_memory_from_row(row)
if memory:
memories.append(memory)
logger.info(
"Found %d memories with tags: %s (operation: %s)",
len(memories),
deduped_tags,
normalized_operation
)
return memories
except Exception as e:
logger.error(
"Failed to search memories by tags %s with operation %s: %s",
tags,
operation,
e
)
return []
async def _load_memory_from_row(self, row: Dict[str, Any]) -> Optional[Memory]:
"""Load memory from D1 row data."""
try:
# Load content from R2 if needed
content = row["content"]
if row.get("r2_key") and content.startswith("[R2 Content:"):
content = await self._load_r2_content(row["r2_key"])
# Load tags
tags = await self._load_memory_tags(row["id"])
memory = Memory(
content=content,
content_hash=row["content_hash"],
tags=tags,
memory_type=row.get("memory_type"),
metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
created_at=row.get("created_at"),
created_at_iso=row.get("created_at_iso"),
updated_at=row.get("updated_at"),
updated_at_iso=row.get("updated_at_iso")
)
return memory
except Exception as e:
logger.error(f"Failed to load memory from row: {e}")
return None
async def delete(self, content_hash: str) -> Tuple[bool, str]:
"""Delete a memory by its hash."""
try:
# Find memory in D1
sql = "SELECT * FROM memories WHERE content_hash = ?"
payload = {"sql": sql, "params": [content_hash]}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success") or not result.get("result", [{}])[0].get("results"):
return False, f"Memory not found: {content_hash}"
row = result["result"][0]["results"][0]
memory_id = row["id"]
vector_id = row.get("vector_id")
r2_key = row.get("r2_key")
# Delete from Vectorize
if vector_id:
await self._delete_vectorize_vector(vector_id)
# Delete from R2 if present
if r2_key:
await self._delete_r2_content(r2_key)
# Delete from D1 (tags will be cascade deleted)
delete_sql = "DELETE FROM memories WHERE id = ?"
payload = {"sql": delete_sql, "params": [memory_id]}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"Failed to delete from D1: {result}")
logger.info(f"Successfully deleted memory: {content_hash}")
return True, "Memory deleted successfully"
except Exception as e:
logger.error(f"Failed to delete memory {content_hash}: {e}")
return False, f"Deletion failed: {str(e)}"
async def get_by_hash(self, content_hash: str) -> Optional[Memory]:
"""Get a memory by its content hash using direct O(1) D1 lookup."""
try:
# Query D1 for the memory
sql = "SELECT * FROM memories WHERE content_hash = ?"
payload = {"sql": sql, "params": [content_hash]}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success") or not result.get("result", [{}])[0].get("results"):
return None
row = result["result"][0]["results"][0]
# Load content from R2 if needed
content = row["content"]
if row.get("r2_key") and content.startswith("[R2 Content:"):
content = await self._load_r2_content(row["r2_key"])
# Load tags
tags = await self._load_memory_tags(row["id"])
# Construct Memory object
memory = Memory(
content=content,
content_hash=content_hash,
tags=tags,
memory_type=row.get("memory_type"),
metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
created_at=row.get("created_at"),
created_at_iso=row.get("created_at_iso"),
updated_at=row.get("updated_at"),
updated_at_iso=row.get("updated_at_iso")
)
return memory
except Exception as e:
logger.error(f"Failed to get memory by hash {content_hash}: {e}")
return None
async def _delete_vectorize_vector(self, vector_id: str) -> None:
"""Delete vector from Vectorize."""
# Correct endpoint uses underscores, not hyphens
payload = {"ids": [vector_id]}
response = await self._retry_request("POST", f"{self.vectorize_url}/delete_by_ids", json=payload)
result = response.json()
if not result.get("success"):
logger.warning(f"Failed to delete vector from Vectorize: {result}")
async def delete_vectors_by_ids(self, vector_ids: List[str]) -> Dict[str, Any]:
"""Delete multiple vectors from Vectorize by their IDs."""
payload = {"ids": vector_ids}
response = await self._retry_request(
"POST",
f"{self.vectorize_url}/delete_by_ids",
json=payload
)
return response.json()
async def _delete_r2_content(self, r2_key: str) -> None:
"""Delete content from R2."""
try:
response = await self._retry_request("DELETE", f"{self.r2_url}/{r2_key}")
if response.status_code not in [200, 204, 404]: # 404 is fine if already deleted
logger.warning(f"Failed to delete R2 content: {response.status_code}")
except Exception as e:
logger.warning(f"Failed to delete R2 content {r2_key}: {e}")
async def delete_by_tag(self, tag: str) -> Tuple[int, str]:
"""Delete memories by tag."""
try:
# Find memories with the tag
memories = await self.search_by_tag([tag])
deleted_count = 0
for memory in memories:
success, _ = await self.delete(memory.content_hash)
if success:
deleted_count += 1
logger.info(f"Deleted {deleted_count} memories with tag: {tag}")
return deleted_count, f"Deleted {deleted_count} memories"
except Exception as e:
logger.error(f"Failed to delete by tag {tag}: {e}")
return 0, f"Deletion failed: {str(e)}"
async def cleanup_duplicates(self) -> Tuple[int, str]:
"""Remove duplicate memories based on content hash."""
try:
# Find duplicates in D1
sql = """
SELECT content_hash, COUNT(*) as count, MIN(id) as keep_id
FROM memories
GROUP BY content_hash
HAVING COUNT(*) > 1
"""
payload = {"sql": sql}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"Failed to find duplicates: {result}")
duplicate_groups = result.get("result", [{}])[0].get("results", [])
total_deleted = 0
for group in duplicate_groups:
content_hash = group["content_hash"]
keep_id = group["keep_id"]
# Delete all except the first one
delete_sql = "DELETE FROM memories WHERE content_hash = ? AND id != ?"
payload = {"sql": delete_sql, "params": [content_hash, keep_id]}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if result.get("success") and result.get("result", [{}])[0].get("meta"):
deleted = result["result"][0]["meta"].get("changes", 0)
total_deleted += deleted
logger.info(f"Cleaned up {total_deleted} duplicate memories")
return total_deleted, f"Removed {total_deleted} duplicates"
except Exception as e:
logger.error(f"Failed to cleanup duplicates: {e}")
return 0, f"Cleanup failed: {str(e)}"
async def update_memory_metadata(self, content_hash: str, updates: Dict[str, Any], preserve_timestamps: bool = True) -> Tuple[bool, str]:
"""Update memory metadata without recreating the entry."""
try:
# Build update SQL
update_fields = []
params = []
if "metadata" in updates:
update_fields.append("metadata_json = ?")
params.append(json.dumps(updates["metadata"]))
if "memory_type" in updates:
update_fields.append("memory_type = ?")
params.append(updates["memory_type"])
if "tags" in updates:
# Handle tags separately - they require relational updates
pass
# Handle timestamp updates based on preserve_timestamps flag
now = time.time()
now_iso = datetime.now().isoformat()
if not preserve_timestamps:
# When preserve_timestamps=False, use timestamps from updates dict if provided
# This allows syncing timestamps from source (e.g., SQLite → Cloudflare)
# Always preserve created_at (never reset to current time!)
if "created_at" in updates:
update_fields.append("created_at = ?")
params.append(updates["created_at"])
if "created_at_iso" in updates:
update_fields.append("created_at_iso = ?")
params.append(updates["created_at_iso"])
# Use updated_at from updates or current time
if "updated_at" in updates:
update_fields.append("updated_at = ?")
params.append(updates["updated_at"])
else:
update_fields.append("updated_at = ?")
params.append(now)
if "updated_at_iso" in updates:
update_fields.append("updated_at_iso = ?")
params.append(updates["updated_at_iso"])
else:
update_fields.append("updated_at_iso = ?")
params.append(now_iso)
else:
# preserve_timestamps=True: only update updated_at to current time
update_fields.append("updated_at = ?")
update_fields.append("updated_at_iso = ?")
params.extend([now, now_iso])
if not update_fields:
return True, "No updates needed"
# Update memory record
sql = f"UPDATE memories SET {', '.join(update_fields)} WHERE content_hash = ?"
params.append(content_hash)
payload = {"sql": sql, "params": params}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"Failed to update memory: {result}")
# Handle tag updates if provided
if "tags" in updates:
await self._update_memory_tags(content_hash, updates["tags"])
logger.info(f"Successfully updated memory metadata: {content_hash}")
return True, "Memory metadata updated successfully"
except Exception as e:
logger.error(f"Failed to update memory metadata {content_hash}: {e}")
return False, f"Update failed: {str(e)}"
async def _update_memory_tags(self, content_hash: str, new_tags: List[str]) -> None:
"""Update tags for a memory."""
# Get memory ID
sql = "SELECT id FROM memories WHERE content_hash = ?"
payload = {"sql": sql, "params": [content_hash]}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success") or not result.get("result", [{}])[0].get("results"):
raise ValueError(f"Memory not found: {content_hash}")
memory_id = result["result"][0]["results"][0]["id"]
# Delete existing tag relationships
delete_sql = "DELETE FROM memory_tags WHERE memory_id = ?"
payload = {"sql": delete_sql, "params": [memory_id]}
await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
# Add new tags
if new_tags:
await self._store_d1_tags(memory_id, new_tags)
async def get_stats(self) -> Dict[str, Any]:
"""Get storage statistics."""
try:
# Calculate timestamp for memories from last 7 days
week_ago = time.time() - (7 * 24 * 60 * 60)
# Get memory count and size from D1
sql = f"""
SELECT
COUNT(*) as total_memories,
SUM(content_size) as total_content_size,
COUNT(DISTINCT vector_id) as total_vectors,
COUNT(r2_key) as r2_stored_count,
(SELECT COUNT(*) FROM tags) as unique_tags,
(SELECT COUNT(*) FROM memories WHERE created_at >= {week_ago}) as memories_this_week
FROM memories
"""
payload = {"sql": sql}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if result.get("success") and result.get("result", [{}])[0].get("results"):
stats = result["result"][0]["results"][0]
return {
"total_memories": stats.get("total_memories", 0),
"unique_tags": stats.get("unique_tags", 0),
"memories_this_week": stats.get("memories_this_week", 0),
"total_content_size_bytes": stats.get("total_content_size", 0),
"total_vectors": stats.get("total_vectors", 0),
"r2_stored_count": stats.get("r2_stored_count", 0),
"storage_backend": "cloudflare",
"vectorize_index": self.vectorize_index,
"d1_database": self.d1_database_id,
"r2_bucket": self.r2_bucket,
"status": "operational"
}
return {
"total_memories": 0,
"unique_tags": 0,
"memories_this_week": 0,
"storage_backend": "cloudflare",
"status": "operational"
}
except Exception as e:
logger.error(f"Failed to get stats: {e}")
return {
"total_memories": 0,
"unique_tags": 0,
"memories_this_week": 0,
"storage_backend": "cloudflare",
"status": "error",
"error": str(e)
}
async def get_all_tags(self) -> List[str]:
"""Get all unique tags in the storage."""
try:
sql = "SELECT name FROM tags ORDER BY name"
payload = {"sql": sql}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if result.get("success") and result.get("result", [{}])[0].get("results"):
return [row["name"] for row in result["result"][0]["results"]]
return []
except Exception as e:
logger.error(f"Failed to get all tags: {e}")
return []
async def get_all_tags_with_counts(self) -> List[Dict[str, Any]]:
"""Get all tags with their usage counts."""
try:
sql = """
SELECT t.name as tag, COUNT(mt.memory_id) as count
FROM tags t
LEFT JOIN memory_tags mt ON t.id = mt.tag_id
GROUP BY t.id
ORDER BY count DESC, t.name ASC
"""
payload = {"sql": sql}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if result.get("success") and result.get("result", [{}])[0].get("results"):
return [row for row in result["result"][0]["results"]]
return []
except Exception as e:
logger.error(f"Failed to get tags with counts: {e}")
return []
async def get_recent_memories(self, n: int = 10) -> List[Memory]:
"""Get n most recent memories."""
try:
sql = "SELECT * FROM memories ORDER BY created_at DESC LIMIT ?"
payload = {"sql": sql, "params": [n]}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
memories = []
if result.get("success") and result.get("result", [{}])[0].get("results"):
for row in result["result"][0]["results"]:
memory = await self._load_memory_from_row(row)
if memory:
memories.append(memory)
logger.info(f"Retrieved {len(memories)} recent memories")
return memories
except Exception as e:
logger.error(f"Failed to get recent memories: {e}")
return []
async def get_largest_memories(self, n: int = 10) -> List[Memory]:
"""Get n largest memories by content length."""
try:
sql = "SELECT * FROM memories ORDER BY LENGTH(content) DESC LIMIT ?"
payload = {"sql": sql, "params": [n]}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
memories = []
if result.get("success") and result.get("result", [{}])[0].get("results"):
for row in result["result"][0]["results"]:
memory = await self._load_memory_from_row(row)
if memory:
memories.append(memory)
logger.info(f"Retrieved {len(memories)} largest memories")
return memories
except Exception as e:
logger.error(f"Failed to get largest memories: {e}")
return []
async def _fetch_d1_timestamps(self, cutoff_timestamp: Optional[float] = None) -> List[float]:
"""Fetch timestamps from D1 database with optional time filter.
Args:
cutoff_timestamp: Optional Unix timestamp to filter memories (>=)
Returns:
List of Unix timestamps (float) in descending order
Raises:
Exception: If D1 query fails
"""
if cutoff_timestamp is not None:
sql = "SELECT created_at FROM memories WHERE created_at >= ? ORDER BY created_at DESC"
payload = {"sql": sql, "params": [cutoff_timestamp]}
else:
sql = "SELECT created_at FROM memories ORDER BY created_at DESC"
payload = {"sql": sql, "params": []}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
timestamps = []
if result.get("success") and result.get("result", [{}])[0].get("results"):
for row in result["result"][0]["results"]:
if row.get("created_at") is not None:
timestamps.append(float(row["created_at"]))
return timestamps
async def get_memory_timestamps(self, days: Optional[int] = None) -> List[float]:
"""
Get memory creation timestamps only, without loading full memory objects.
This is an optimized method for analytics that only needs timestamps,
avoiding the overhead of loading full memory content and embeddings.
Args:
days: Optional filter to only get memories from last N days
Returns:
List of Unix timestamps (float) in descending order (newest first)
"""
try:
cutoff_timestamp = None
if days is not None:
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
cutoff_timestamp = cutoff.timestamp()
timestamps = await self._fetch_d1_timestamps(cutoff_timestamp)
logger.info(f"Retrieved {len(timestamps)} memory timestamps")
return timestamps
except Exception as e:
logger.error(f"Failed to get memory timestamps: {e}")
return []
def sanitized(self, tags):
"""Sanitize and normalize tags to a JSON string.
This method provides compatibility with the storage backend interface.
"""
if tags is None:
return json.dumps([])
# If we get a string, split it into an array
if isinstance(tags, str):
tags = [tag.strip() for tag in tags.split(",") if tag.strip()]
# If we get an array, use it directly
elif isinstance(tags, list):
tags = [str(tag).strip() for tag in tags if str(tag).strip()]
else:
return json.dumps([])
# Return JSON string representation of the array
return json.dumps(tags)
async def recall(self, query: Optional[str] = None, n_results: int = 5, start_timestamp: Optional[float] = None, end_timestamp: Optional[float] = None) -> List[MemoryQueryResult]:
"""
Retrieve memories with combined time filtering and optional semantic search.
Args:
query: Optional semantic search query. If None, only time filtering is applied.
n_results: Maximum number of results to return.
start_timestamp: Optional start time for filtering.
end_timestamp: Optional end time for filtering.
Returns:
List of MemoryQueryResult objects.
"""
try:
# Build time filtering WHERE clause for D1
time_conditions = []
params = []
if start_timestamp is not None:
time_conditions.append("created_at >= ?")
params.append(float(start_timestamp))
if end_timestamp is not None:
time_conditions.append("created_at <= ?")
params.append(float(end_timestamp))
time_where = " AND ".join(time_conditions) if time_conditions else ""
logger.info(f"Recall - Time filtering conditions: {time_where}, params: {params}")
# Determine search strategy
if query and query.strip():
# Combined semantic search with time filtering
logger.info(f"Recall - Using semantic search with query: '{query}'")
try:
# Generate query embedding
query_embedding = await self._generate_embedding(query)
# Search Vectorize with semantic query
search_payload = {
"vector": query_embedding,
"topK": n_results,
"returnMetadata": "all",
"returnValues": False
}
# Add time filtering to vectorize metadata if specified
if time_conditions:
# Note: Vectorize metadata filtering capabilities may be limited
# We'll filter after retrieval for now
logger.info("Recall - Time filtering will be applied post-retrieval from Vectorize")
response = await self._retry_request("POST", f"{self.vectorize_url}/query", json=search_payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"Vectorize query failed: {result}")
matches = result.get("result", {}).get("matches", [])
# Convert matches to MemoryQueryResult objects with time filtering
results = []
for match in matches:
memory = await self._load_memory_from_match(match)
if memory:
# Apply time filtering if needed
if start_timestamp is not None and memory.created_at and memory.created_at < start_timestamp:
continue
if end_timestamp is not None and memory.created_at and memory.created_at > end_timestamp:
continue
query_result = MemoryQueryResult(
memory=memory,
relevance_score=match.get("score", 0.0)
)
results.append(query_result)
logger.info(f"Recall - Retrieved {len(results)} memories with semantic search and time filtering")
return results[:n_results] # Ensure we don't exceed n_results
except Exception as e:
logger.error(f"Recall - Semantic search failed, falling back to time-based search: {e}")
# Fall through to time-based search
# Time-based search only (or fallback)
logger.info(f"Recall - Using time-based search only")
# Build D1 query for time-based retrieval
if time_where:
sql = f"SELECT * FROM memories WHERE {time_where} ORDER BY created_at DESC LIMIT ?"
params.append(n_results)
else:
# No time filters, get most recent
sql = "SELECT * FROM memories ORDER BY created_at DESC LIMIT ?"
params = [n_results]
payload = {"sql": sql, "params": params}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"D1 query failed: {result}")
# Convert D1 results to MemoryQueryResult objects
results = []
if result.get("result", [{}])[0].get("results"):
for row in result["result"][0]["results"]:
memory = await self._load_memory_from_row(row)
if memory:
# For time-based search without semantic query, use timestamp as relevance
relevance_score = memory.created_at or 0.0
query_result = MemoryQueryResult(
memory=memory,
relevance_score=relevance_score
)
results.append(query_result)
logger.info(f"Recall - Retrieved {len(results)} memories with time-based search")
return results
except Exception as e:
logger.error(f"Recall failed: {e}")
return []
async def get_all_memories(self, limit: int = None, offset: int = 0, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> List[Memory]:
"""
Get all memories in storage ordered by creation time (newest first).
Args:
limit: Maximum number of memories to return (None for all)
offset: Number of memories to skip (for pagination)
memory_type: Optional filter by memory type
tags: Optional filter by tags (matches ANY of the provided tags)
Returns:
List of Memory objects ordered by created_at DESC, optionally filtered by type and tags
"""
try:
# Build SQL query with optional memory_type and tags filters
sql = "SELECT m.* FROM memories m"
joins = ""
where_conditions = []
params: List[Any] = []
if memory_type is not None:
where_conditions.append("m.memory_type = ?")
params.append(memory_type)
tag_count = 0
if tags:
tag_count = len(tags)
placeholders = ",".join(["?"] * tag_count)
joins += " " + """
INNER JOIN memory_tags mt ON m.id = mt.memory_id
INNER JOIN tags t ON mt.tag_id = t.id
""".strip().replace("\n", " ")
where_conditions.append(f"t.name IN ({placeholders})")
params.extend(tags)
if joins:
sql += joins
if where_conditions:
sql += " WHERE " + " AND ".join(where_conditions)
if tags:
sql += " GROUP BY m.id HAVING COUNT(DISTINCT t.name) = ?"
sql += " ORDER BY m.created_at DESC"
query_params = params.copy()
if tags:
query_params.append(tag_count)
if limit is not None:
sql += " LIMIT ?"
query_params.append(limit)
if offset > 0:
sql += " OFFSET ?"
query_params.append(offset)
payload = {"sql": sql, "params": query_params}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"D1 query failed: {result}")
memories = []
if result.get("result", [{}])[0].get("results"):
for row in result["result"][0]["results"]:
memory = await self._load_memory_from_row(row)
if memory:
memories.append(memory)
logger.debug(f"Retrieved {len(memories)} memories from D1")
return memories
except Exception as e:
logger.error(f"Error getting all memories: {str(e)}")
return []
def _row_to_memory(self, row: Dict[str, Any]) -> Memory:
"""Convert D1 row to Memory object without loading tags (for bulk operations)."""
# Load content from R2 if needed
content = row["content"]
if row.get("r2_key") and content.startswith("[R2 Content:"):
# For bulk operations, we don't load R2 content to avoid additional requests
# Just keep the placeholder
pass
return Memory(
content=content,
content_hash=row["content_hash"],
tags=[], # Skip tag loading for bulk operations
memory_type=row.get("memory_type"),
metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
created_at=row.get("created_at"),
created_at_iso=row.get("created_at_iso"),
updated_at=row.get("updated_at"),
updated_at_iso=row.get("updated_at_iso")
)
async def get_all_memories_bulk(
self,
include_tags: bool = False
) -> List[Memory]:
"""
Efficiently load all memories from D1.
If include_tags=False, skips N+1 tag queries for better performance.
Useful for maintenance scripts that don't need tag information.
Args:
include_tags: Whether to load tags for each memory (slower but complete)
Returns:
List of Memory objects ordered by created_at DESC
"""
query = "SELECT * FROM memories ORDER BY created_at DESC"
payload = {"sql": query}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"D1 query failed: {result}")
memories = []
if result.get("result", [{}])[0].get("results"):
for row in result["result"][0]["results"]:
if include_tags:
memory = await self._load_memory_from_row(row)
else:
memory = self._row_to_memory(row)
if memory:
memories.append(memory)
logger.debug(f"Bulk loaded {len(memories)} memories from D1")
return memories
async def get_all_memories_cursor(self, limit: int = None, cursor: float = None, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> List[Memory]:
"""
Get all memories using cursor-based pagination to avoid D1 OFFSET limitations.
This method uses timestamp-based cursors instead of OFFSET, which is more efficient
and avoids Cloudflare D1's OFFSET limitations that cause 400 Bad Request errors.
Args:
limit: Maximum number of memories to return (None for all)
cursor: Timestamp cursor for pagination (created_at value from last result)
memory_type: Optional filter by memory type
tags: Optional filter by tags (matches ANY of the provided tags)
Returns:
List of Memory objects ordered by created_at DESC, starting after cursor
"""
try:
# Build SQL query with cursor-based pagination
sql = "SELECT * FROM memories"
params = []
where_conditions = []
# Add cursor condition (timestamp-based pagination)
if cursor is not None:
where_conditions.append("created_at < ?")
params.append(cursor)
# Add memory_type filter if specified
if memory_type is not None:
where_conditions.append("memory_type = ?")
params.append(memory_type)
# Add tags filter if specified (using LIKE for tag matching)
if tags and len(tags) > 0:
tag_conditions = " OR ".join(["tags LIKE ?" for _ in tags])
where_conditions.append(f"({tag_conditions})")
params.extend([f"%{tag}%" for tag in tags])
# Apply WHERE clause if we have any conditions
if where_conditions:
sql += " WHERE " + " AND ".join(where_conditions)
sql += " ORDER BY created_at DESC"
if limit is not None:
sql += " LIMIT ?"
params.append(limit)
payload = {"sql": sql, "params": params}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"D1 query failed: {result}")
memories = []
if result.get("result", [{}])[0].get("results"):
for row in result["result"][0]["results"]:
memory = await self._load_memory_from_row(row)
if memory:
memories.append(memory)
logger.debug(f"Retrieved {len(memories)} memories from D1 with cursor-based pagination")
return memories
except Exception as e:
logger.error(f"Error getting memories with cursor: {str(e)}")
return []
async def get_memories_updated_since(self, timestamp: float, limit: int = 100) -> List[Memory]:
"""
Get memories updated since a specific timestamp (v8.25.0+).
Used by hybrid backend drift detection to find memories with metadata
changes that need to be synchronized.
Args:
timestamp: Unix timestamp (seconds since epoch)
limit: Maximum number of memories to return (default: 100)
Returns:
List of Memory objects updated after the timestamp, ordered by updated_at DESC
"""
try:
# Convert timestamp to ISO format for D1 query
from datetime import datetime
dt = datetime.utcfromtimestamp(timestamp)
iso_timestamp = dt.isoformat()
query = """
SELECT content, content_hash, tags, memory_type, metadata,
created_at, created_at_iso, updated_at, updated_at_iso
FROM memories
WHERE updated_at_iso > ?
ORDER BY updated_at DESC
LIMIT ?
"""
payload = {"sql": query, "params": [iso_timestamp, limit]}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
logger.warning(f"Failed to get updated memories: {result.get('error')}")
return []
memories = []
if result.get("result", [{}])[0].get("results"):
for row in result["result"][0]["results"]:
memory = await self._load_memory_from_row(row)
if memory:
memories.append(memory)
logger.debug(f"Retrieved {len(memories)} memories updated since {iso_timestamp}")
return memories
except Exception as e:
logger.error(f"Error getting updated memories: {e}")
return []
async def get_memories_by_time_range(self, start_time: float, end_time: float) -> List[Memory]:
"""
Get memories within a specific time range (v8.31.0+).
Pushes date-range filtering to D1 database layer for better performance.
This optimization reduces network transfer and enables efficient queries
on databases with >10,000 memories.
Benefits:
- Reduces network transfer (only relevant memories)
- Leverages D1 indexes on created_at
- Scales to databases with >10,000 memories
- 10x performance improvement over application-layer filtering
Args:
start_time: Start timestamp (Unix seconds, inclusive)
end_time: End timestamp (Unix seconds, inclusive)
Returns:
List of Memory objects within the time range, ordered by created_at DESC
"""
try:
# Build SQL query with BETWEEN clause for efficient filtering
# Use SELECT m.* to get all columns (tags are loaded separately via _load_memory_tags)
sql = """
SELECT m.*
FROM memories m
WHERE m.created_at BETWEEN ? AND ?
ORDER BY m.created_at DESC
"""
payload = {"sql": sql, "params": [start_time, end_time]}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
logger.error(f"D1 query failed: {result}")
return []
memories = []
if result.get("result", [{}])[0].get("results"):
for row in result["result"][0]["results"]:
memory = await self._load_memory_from_row(row)
if memory:
memories.append(memory)
logger.info(f"Retrieved {len(memories)} memories in time range {start_time}-{end_time}")
return memories
except Exception as e:
logger.error(f"Error getting memories by time range: {str(e)}")
return []
async def count_all_memories(self, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> int:
"""
Get total count of memories in storage.
Args:
memory_type: Optional filter by memory type
tags: Optional filter by tags (memories matching ANY of the tags)
Returns:
Total number of memories, optionally filtered by type and/or tags
"""
try:
# Build query with filters
base_sql = "SELECT m.id FROM memories m"
joins = ""
conditions = []
params: List[Any] = []
if memory_type is not None:
conditions.append('m.memory_type = ?')
params.append(memory_type)
tag_count = 0
if tags:
tag_count = len(tags)
placeholders = ','.join(['?'] * tag_count)
joins += " " + """
INNER JOIN memory_tags mt ON m.id = mt.memory_id
INNER JOIN tags t ON mt.tag_id = t.id
""".strip().replace("\n", " ")
conditions.append(f't.name IN ({placeholders})')
params.extend(tags)
sql = base_sql + joins
if conditions:
sql += ' WHERE ' + ' AND '.join(conditions)
if tags:
sql += ' GROUP BY m.id HAVING COUNT(DISTINCT t.name) = ?'
count_sql = f'SELECT COUNT(*) as count FROM ({sql}) AS subquery'
count_params = params.copy()
if tags:
count_params.append(tag_count)
payload = {"sql": count_sql, "params": count_params}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
if not result.get("success"):
raise ValueError(f"D1 query failed: {result}")
if result.get("result", [{}])[0].get("results"):
count = result["result"][0]["results"][0].get("count", 0)
return int(count)
return 0
except Exception as e:
logger.error(f"Error counting memories: {str(e)}")
return 0
async def close(self) -> None:
"""Close the storage backend and cleanup resources."""
if self.client:
await self.client.aclose()
self.client = None
# Clear embedding cache
self._embedding_cache.clear()
logger.info("Cloudflare storage backend closed")
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/web/static/index.html:
--------------------------------------------------------------------------------
```html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>MCP Memory Service - Dashboard</title>
<link rel="stylesheet" href="/static/style.css">
<link rel="icon" href="data:image/svg+xml,<svg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 100 100'><text y='.9em' font-size='90'>🧠</text></svg>">
</head>
<body>
<!-- Main Application Container -->
<div id="app" class="app-container">
<!-- Header with Navigation -->
<header class="app-header">
<div class="header-content">
<div class="logo-section">
<h1 class="app-title">🧠 MCP Memory</h1>
<span id="versionBadge" class="version-badge">Loading...</span>
</div>
<!-- SVG Icon Definitions -->
<svg style="display: none;" aria-hidden="true">
<defs>
<symbol id="info-icon" viewBox="0 0 24 24">
<circle cx="12" cy="12" r="10" stroke="currentColor" stroke-width="2" fill="none"/>
<path d="M12 8v.01M12 12v4" stroke="currentColor" stroke-width="2" stroke-linecap="round"/>
</symbol>
</defs>
</svg>
<!-- Quick Search Bar -->
<div class="search-section">
<div class="search-container">
<input type="text"
id="quickSearch"
class="search-input"
placeholder="🔍 Search your memories..."
autocomplete="off">
<button class="search-btn" type="button" aria-label="Search">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M21.71 20.29L18 16.61A9 9 0 1 0 16.61 18l3.68 3.68a1 1 0 0 0 1.42-1.42zM11 18a7 7 0 1 1 7-7 7 7 0 0 1-7 7z"/>
</svg>
</button>
</div>
</div>
<!-- Action Buttons -->
<div class="action-buttons">
<button id="addMemoryBtn" class="btn btn-primary" title="Add New Memory">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M19 13h-6v6h-2v-6H5v-2h6V5h2v6h6v2z"/>
</svg>
<span class="btn-text">Add Memory</span>
</button>
<button id="themeToggleBtn" class="btn btn-secondary" title="Toggle Dark Mode">
<svg id="sunIcon" width="20" height="20" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" viewBox="0 0 24 24">
<circle cx="12" cy="12" r="5"/>
<line x1="12" y1="1" x2="12" y2="3"/>
<line x1="12" y1="21" x2="12" y2="23"/>
<line x1="4.22" y1="4.22" x2="5.64" y2="5.64"/>
<line x1="18.36" y1="18.36" x2="19.78" y2="19.78"/>
<line x1="1" y1="12" x2="3" y2="12"/>
<line x1="21" y1="12" x2="23" y2="12"/>
<line x1="4.22" y1="19.78" x2="5.64" y2="18.36"/>
<line x1="18.36" y1="5.64" x2="19.78" y2="4.22"/>
</svg>
<svg id="moonIcon" class="hidden" width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M21 12.79A9 9 0 1 1 11.21 3 7 7 0 0 0 21 12.79z"/>
</svg>
</button>
<button id="settingsBtn" class="btn btn-secondary" title="Settings">
<svg width="20" height="20" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" viewBox="0 0 24 24">
<circle cx="12" cy="12" r="3"/>
<path d="M12 1v6m0 6v10M1 12h6m6 0h10"/>
<path d="M4.22 4.22l4.24 4.24m7.08 7.08l4.24 4.24M4.22 19.78l4.24-4.24m7.08-7.08l4.24-4.24"/>
</svg>
</button>
</div>
</div>
</header>
<!-- Main Navigation -->
<nav class="main-nav">
<div class="nav-container">
<button class="nav-item active" data-view="dashboard">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M3 13h8V3H3v10zm0 8h8v-6H3v6zm10 0h8V11h-8v10zm0-18v6h8V3h-8z"/>
</svg>
Dashboard
</button>
<button class="nav-item" data-view="search">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M21.71 20.29L18 16.61A9 9 0 1 0 16.61 18l3.68 3.68a1 1 0 0 0 1.42-1.42zM11 18a7 7 0 1 1 7-7 7 7 0 0 1-7 7z"/>
</svg>
Search
</button>
<button class="nav-item" data-view="browse">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M4 6H2v14c0 1.1.9 2 2 2h14v-2H4V6zm16-4H8c-1.1 0-2 .9-2 2v12c0 1.1.9 2 2 2h12c1.1 0 2-.9 2-2V4c0-1.1-.9-2-2-2zm-1 9H9V9h10v2zm-4 4H9v-2h6v2zm4-8H9V5h10v2z"/>
</svg>
Browse
</button>
<button class="nav-item" data-view="documents">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M14,2H6A2,2 0 0,0 4,4V20A2,2 0 0,0 6,22H18A2,2 0 0,0 20,20V8L14,2M18,20H6V4H13V9H18V20Z"/>
</svg>
Documents
</button>
<button class="nav-item" data-view="manage">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M12 2l3.09 6.26L22 9.27l-5 4.87 1.18 6.88L12 17.77l-6.18 3.25L7 14.14 2 9.27l6.91-1.01L12 2z"/>
</svg>
Manage
</button>
<button class="nav-item" data-view="analytics">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M5 9.2h3V19H5zM10.6 5h2.8v14h-2.8zm5.6 8H19v6h-2.8z"/>
</svg>
Analytics
</button>
<button class="nav-item" data-view="apiDocs">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M14,17H7V15H14M17,13H7V11H17M17,9H7V7H17M19,3H5C3.89,3 3,3.89 3,5V19A2,2 0 0,0 5,21H19A2,2 0 0,0 21,19V5C21,3.89 20.1,3 19,3Z"/>
</svg>
API Docs
</button>
</div>
</nav>
<!-- Main Content Area -->
<main class="main-content">
<!-- Dashboard View -->
<div id="dashboardView" class="view-container active">
<div class="dashboard-grid">
<!-- Welcome Section -->
<section class="welcome-card">
<h2>Welcome to your Memory Dashboard</h2>
<p>Manage your AI memories with semantic search, real-time updates, and intelligent organization.</p>
<div class="quick-stats">
<div class="stat-item">
<span class="stat-number" id="totalMemories">—</span>
<span class="stat-label">Total Memories</span>
</div>
<div class="stat-item">
<span class="stat-number" id="recentMemories">—</span>
<span class="stat-label">This Week</span>
</div>
<div class="stat-item">
<span class="stat-number" id="uniqueTags">—</span>
<span class="stat-label">Tags</span>
</div>
</div>
</section>
<!-- Recent Memories -->
<section class="recent-memories">
<h3>Recent Memories</h3>
<div id="recentMemoriesList" class="memory-list">
<!-- Dynamic content will be loaded here -->
</div>
</section>
<!-- Quick Actions -->
<section class="quick-actions">
<h3>Quick Actions</h3>
<div class="action-grid">
<button class="action-card" data-action="quick-search">
<svg width="24" height="24" fill="currentColor" viewBox="0 0 24 24">
<path d="M21.71 20.29L18 16.61A9 9 0 1 0 16.61 18l3.68 3.68a1 1 0 0 0 1.42-1.42zM11 18a7 7 0 1 1 7-7 7 7 0 0 1-7 7z"/>
</svg>
<span>Advanced Search</span>
</button>
<button class="action-card" data-action="add-memory">
<svg width="24" height="24" fill="currentColor" viewBox="0 0 24 24">
<path d="M19 13h-6v6h-2v-6H5v-2h6V5h2v6h6v2z"/>
</svg>
<span>Add Memory</span>
</button>
<button class="action-card" data-action="browse-tags">
<svg width="24" height="24" fill="currentColor" viewBox="0 0 24 24">
<path d="M17.63 5.84C17.27 5.33 16.67 5 16 5L5 5.01C3.9 5.01 3 5.9 3 7v10c0 1.1.9 1.99 2 1.99L16 19c.67 0 1.27-.33 1.63-.84L22 12l-4.37-6.16z"/>
</svg>
<span>Browse Tags</span>
</button>
<button class="action-card" data-action="export-data">
<svg width="24" height="24" fill="currentColor" viewBox="0 0 24 24">
<path d="M14,2H6A2,2 0 0,0 4,4V20A2,2 0 0,0 6,22H18A2,2 0 0,0 20,20V8L14,2M18,20H6V4H13V9H18V20Z"/>
</svg>
<span>Export Data</span>
</button>
</div>
<!-- Compact Sync Control (Hybrid Mode Only) -->
<div id="syncControl" class="sync-control-compact">
<div class="sync-row">
<div class="sync-status">
<span id="syncStatusDot" class="sync-dot"></span>
<span id="syncStatusText" class="sync-text-sm">Checking...</span>
</div>
<span id="syncProgress" class="sync-progress-sm"></span>
<div class="sync-buttons-sm">
<button type="button" id="pauseSyncButton" class="btn-icon-sm" title="Pause sync">
<svg width="14" height="14" fill="currentColor" viewBox="0 0 24 24">
<path d="M6 19h4V5H6v14zm8-14v14h4V5h-4z"/>
</svg>
</button>
<button type="button" id="resumeSyncButton" class="btn-icon-sm" title="Resume sync">
<svg width="14" height="14" fill="currentColor" viewBox="0 0 24 24">
<path d="M8 5v14l11-7z"/>
</svg>
</button>
<button type="button" id="forceSyncButton" class="btn-icon-sm btn-icon-primary" title="Sync now">
<svg width="14" height="14" fill="currentColor" viewBox="0 0 24 24">
<path d="M12 4V1L8 5l4 4V6c3.31 0 6 2.69 6 6 0 1.01-.25 1.97-.7 2.8l1.46 1.46C19.54 15.03 20 13.57 20 12c0-4.42-3.58-8-8-8zm0 14c-3.31 0-6-2.69-6-6 0-1.01.25-1.97.7-2.8L5.24 7.74C4.46 8.97 4 10.43 4 12c0 4.42 3.58 8 8 8v3l4-4-4-4v3z"/>
</svg>
</button>
</div>
</div>
</div>
</section>
</div>
</div>
<!-- Search View -->
<div id="searchView" class="view-container">
<div class="search-layout">
<!-- Advanced Search Panel -->
<aside class="search-filters">
<div class="filters-header">
<h3>Search Filters
<span class="tooltip" title="Filters work together: combine tags, dates, and types to narrow your search">
<svg width="16" height="16" fill="currentColor"><use href="#info-icon"/></svg>
</span>
</h3>
<div class="mode-toggle-compact">
<label class="toggle-switch">
<input type="checkbox" id="liveSearchToggle" checked>
<span class="slider"></span>
</label>
<span class="tooltip" title="Toggle between live search (updates as you type) and manual search mode">
<svg width="16" height="16" fill="currentColor" viewBox="0 0 24 24">
<path d="M13 2L3 14h8l-1 8 10-12h-8l1-8z" fill="currentColor"/>
</svg>
</span>
</div>
</div>
<!-- Search Mode Indicator -->
<div class="search-mode-indicator">
<span class="mode-status">
<span id="searchModeText">Live Search</span> mode
</span>
</div>
<div class="filter-section">
<label for="tagFilter">Tags
<span class="tooltip" title="Enter tags separated by commas (e.g., work, coding, important)">
<svg width="16" height="16" fill="currentColor"><use href="#info-icon"/></svg>
</span>
</label>
<input type="text" id="tagFilter" placeholder="e.g., work, coding, important">
<small class="help-text">Separate multiple tags with commas</small>
</div>
<div class="filter-section">
<label for="dateFilter">Date Range
<span class="tooltip" title="Filter memories by when they were created">
<svg width="16" height="16" fill="currentColor"><use href="#info-icon"/></svg>
</span>
</label>
<select id="dateFilter">
<option value="">All time</option>
<option value="today">Today</option>
<option value="yesterday">Yesterday</option>
<option value="week">This week</option>
<option value="month">This month</option>
<option value="quarter">This quarter</option>
<option value="year">This year</option>
</select>
<small class="help-text">Select a time period to filter memories</small>
</div>
<div class="filter-section">
<label for="typeFilter">Content Type
<span class="tooltip" title="Filter by the type of content stored">
<svg width="16" height="16" fill="currentColor"><use href="#info-icon"/></svg>
</span>
</label>
<select id="typeFilter">
<option value="">All types</option>
<option value="note">Notes</option>
<option value="code">Code</option>
<option value="reference">References</option>
<option value="idea">Ideas</option>
</select>
<small class="help-text">Choose the type of memories to show</small>
</div>
<!-- Filter Actions -->
<div class="filter-actions">
<button type="button" id="applyFiltersBtn" class="btn btn-primary">
<svg width="16" height="16" fill="currentColor" viewBox="0 0 24 24">
<path d="M21.71 20.29L18 16.61A9 9 0 1 0 16.61 18l3.68 3.68a1 1 0 0 0 1.42-1.42zM11 18a7 7 0 1 1 7-7 7 7 0 0 1-7 7z"/>
</svg>
Search
</button>
<button type="button" id="clearFiltersBtn" class="btn btn-secondary">
<svg width="16" height="16" fill="currentColor" viewBox="0 0 24 24">
<path d="M19 6.41L17.59 5 12 10.59 6.41 5 5 6.41 10.59 12 5 17.59 6.41 19 12 13.41 17.59 19 19 17.59 13.41 12z"/>
</svg>
Clear All
</button>
</div>
<!-- Active Filters Display -->
<div id="activeFilters" class="active-filters" style="display: none;">
<h4>Active Filters:</h4>
<div id="activeFiltersList" class="filter-pills">
<!-- Dynamic filter pills will be added here -->
</div>
</div>
</aside>
<!-- Search Results -->
<div class="search-results">
<div class="search-header">
<h2>Search Results</h2>
<div class="results-meta">
<span id="resultsCount">0 results</span>
<div class="view-options">
<button class="view-btn active" data-view="grid">Grid</button>
<button class="view-btn" data-view="list">List</button>
</div>
</div>
</div>
<div id="searchResultsList" class="memory-grid">
<!-- Dynamic search results will be loaded here -->
</div>
</div>
</div>
</div>
<!-- Documents View -->
<div id="documentsView" class="view-container">
<div class="documents-layout">
<!-- Document Upload Section -->
<section class="documents-section">
<h2>📄 Document Ingestion</h2>
<div class="upload-area">
<div id="dropZone" class="drop-zone">
<div class="drop-zone-content">
<svg width="48" height="48" fill="currentColor" viewBox="0 0 24 24">
<path d="M14,2H6A2,2 0 0,0 4,4V20A2,2 0 0,0 6,22H18A2,2 0 0,0 20,20V8L14,2M18,20H6V4H13V9H18V20Z"/>
</svg>
<h3>Drag & drop files here</h3>
<p>or <button id="fileSelectBtn" class="link-button">browse to select files</button></p>
<p class="supported-formats">Supported formats: PDF, TXT, MD, JSON</p>
<input type="file" id="fileInput" multiple accept=".pdf,.txt,.md,.json" style="display: none;">
</div>
</div>
</div>
<!-- Upload Configuration -->
<div class="upload-config">
<div class="config-section">
<label for="docTags">Tags (comma-separated)</label>
<input type="text" id="docTags" placeholder="e.g., documentation, reference, manual" class="form-control">
<small class="form-help">Tags will be applied to all files. Use spaces or commas as separators.</small>
</div>
<!-- Processing Mode Toggle (shown when multiple files selected) -->
<div class="config-section" id="processingModeSection" style="display: none;">
<label>Processing Mode <span class="info-icon info-icon-processing" title="Click for processing mode explanation">ℹ️</span></label>
<div class="processing-mode-toggle">
<button type="button" id="batchModeBtn" class="mode-btn active" data-mode="batch">
<svg width="16" height="16" fill="currentColor" viewBox="0 0 24 24">
<path d="M4 6H2v14c0 1.1.9 2 2 2h14v-2H4V6zm16-4H8c-1.1 0-2 .9-2 2v12c0 1.1.9 2 2 2h12c1.1 0 2-.9 2-2V4c0-1.1-.9-2-2-2zm-1 9H9V9h10v2zm-4 4H9v-2h6v2zm4-8H9V5h10v2z"/>
</svg>
Batch Processing
</button>
<button type="button" id="individualModeBtn" class="mode-btn" data-mode="individual">
<svg width="16" height="16" fill="currentColor" viewBox="0 0 24 24">
<path d="M14,2H6A2,2 0 0,0 4,4V20A2,2 0 0,0 6,22H18A2,2 0 0,0 20,20V8L14,2M18,20H6V4H13V9H18V20Z"/>
</svg>
Individual Processing
</button>
</div>
<div class="mode-description" id="modeDescription">
<small>All selected files will be processed together with the same tags.</small>
</div>
</div>
<div class="config-row">
<div class="config-section">
<label for="chunkSize">
Chunk Size: <span id="chunkSizeValue">1000</span> chars
<span class="info-icon" title="Click for chunking recommendations">ℹ️</span>
</label>
<input type="range" id="chunkSize" min="500" max="2000" value="1000" step="100" class="form-control">
</div>
<div class="config-section">
<label for="chunkOverlap">
Overlap: <span id="chunkOverlapValue">200</span> chars
<span class="info-icon info-icon-overlap" title="Click for overlap explanation">ℹ️</span>
</label>
<input type="range" id="chunkOverlap" min="0" max="500" value="200" step="50" class="form-control">
</div>
</div>
<!-- Chunking Help Section (collapsible) -->
<div id="chunkingHelpSection" class="chunking-help" style="display: none;">
<div class="help-header">
<strong>📚 Chunking Configuration Guide</strong>
<button class="close-help" data-action="hideChunkingHelp">×</button>
</div>
<div class="help-content">
<p class="help-note">
<strong>Note:</strong> Actual chunk sizes may vary as the system respects paragraph boundaries
to maintain semantic coherence.
</p>
<div class="help-recommendations">
<div class="help-option">
<div class="help-option-header">
<strong>✅ Default (1000 chars, 200 overlap)</strong>
<span class="help-tag">Recommended</span>
</div>
<p><strong>Best for:</strong> Technical documentation, reference manuals, knowledge bases</p>
<p><strong>Why:</strong> Paragraph-aware chunking preserves complete thoughts and context</p>
</div>
<div class="help-option">
<div class="help-option-header">
<strong>🔍 Smaller Chunks (500 chars, 100 overlap)</strong>
</div>
<p><strong>Best for:</strong> Dense technical docs, code documentation, API references</p>
<p><strong>Trade-off:</strong> More granular retrieval but may split paragraphs more aggressively</p>
</div>
<div class="help-option">
<div class="help-option-header">
<strong>📖 Larger Chunks (2000 chars, 400 overlap)</strong>
</div>
<p><strong>Best for:</strong> Narrative documents, articles, blogs, long-form content</p>
<p><strong>Trade-off:</strong> Better context but less precise retrieval</p>
</div>
</div>
<div class="help-tips">
<strong>💡 Tips:</strong>
<ul>
<li><strong>Overlap</strong> helps maintain context across chunk boundaries</li>
<li><strong>Higher overlap</strong> = better continuity but more redundancy</li>
<li>Test different settings on a sample document to find optimal configuration</li>
<li>Chunks preserve complete sentences and paragraphs when possible</li>
</ul>
</div>
</div>
</div>
<!-- Overlap Help Section (collapsible) -->
<div id="overlapHelpSection" class="chunking-help overlap-help" style="display: none;">
<div class="help-header">
<strong>🔗 Chunk Overlap Explained</strong>
<button class="close-help" data-action="hideOverlapHelp">×</button>
</div>
<div class="help-content">
<p class="help-note">
<strong>What is overlap?</strong> Overlap is the number of characters that are duplicated
between consecutive chunks. This helps maintain context across chunk boundaries.
</p>
<div class="help-example">
<strong>Visual Example:</strong>
<div class="overlap-diagram">
<div class="chunk-demo">
<span class="chunk-part unique">Chunk 1 unique content...</span>
<span class="chunk-part overlap">shared overlap region</span>
</div>
<div class="chunk-demo">
<span class="chunk-part overlap">shared overlap region</span>
<span class="chunk-part unique">Chunk 2 unique content...</span>
</div>
</div>
</div>
<div class="help-recommendations">
<div class="help-option">
<div class="help-option-header">
<strong>🎯 No Overlap (0 chars)</strong>
</div>
<p><strong>Best for:</strong> Maximum storage efficiency, no redundancy needed</p>
<p><strong>Trade-off:</strong> Context may be lost at chunk boundaries</p>
</div>
<div class="help-option">
<div class="help-option-header">
<strong>✅ Medium Overlap (200 chars)</strong>
<span class="help-tag">Recommended</span>
</div>
<p><strong>Best for:</strong> Most documents - balances context and efficiency</p>
<p><strong>Why:</strong> Preserves 1-2 sentences of context across boundaries</p>
</div>
<div class="help-option">
<div class="help-option-header">
<strong>🔄 High Overlap (400+ chars)</strong>
</div>
<p><strong>Best for:</strong> Complex technical content requiring maximum context</p>
<p><strong>Trade-off:</strong> More storage and processing, higher redundancy</p>
</div>
</div>
<div class="help-tips">
<strong>💡 Guidelines:</strong>
<ul>
<li><strong>Rule of thumb:</strong> Overlap should be 15-25% of chunk size</li>
<li><strong>Small chunks (500)</strong> → Use 100-150 overlap</li>
<li><strong>Medium chunks (1000)</strong> → Use 200-250 overlap</li>
<li><strong>Large chunks (2000)</strong> → Use 400-500 overlap</li>
<li>Higher overlap helps with search accuracy but increases storage</li>
<li>Zero overlap is fine for well-structured documents with clear sections</li>
</ul>
</div>
</div>
</div>
<!-- Processing Mode Help Section (collapsible) -->
<div id="processingModeHelpSection" class="chunking-help processing-mode-help" style="display: none;">
<div class="help-header">
<strong>⚙️ Processing Mode Options</strong>
<button class="close-help" data-action="hideProcessingModeHelp">×</button>
</div>
<div class="help-content">
<p class="help-note">
<strong>When uploading multiple files,</strong> choose how they should be processed.
Both modes apply the same tags to all files.
</p>
<div class="help-recommendations">
<div class="help-option">
<div class="help-option-header">
<strong>📦 Batch Processing</strong>
<span class="help-tag">Default</span>
</div>
<p><strong>What it does:</strong> Uploads all files together as one operation</p>
<p><strong>Best for:</strong> Similar files, fast bulk processing, when you want files grouped together</p>
<p><strong>Pros:</strong> Faster, simpler, single progress indicator</p>
<p><strong>Cons:</strong> One file failure can affect the whole batch</p>
</div>
<div class="help-option">
<div class="help-option-header">
<strong>🔄 Individual Processing</strong>
</div>
<p><strong>What it does:</strong> Uploads each file separately with individual API calls</p>
<p><strong>Best for:</strong> Mixed file types, when you want error isolation, or need individual progress tracking</p>
<p><strong>Pros:</strong> Better error handling, individual progress, more robust</p>
<p><strong>Cons:</strong> Slightly slower due to sequential processing</p>
</div>
</div>
<div class="help-tips">
<strong>💡 When to choose each mode:</strong>
<ul>
<li><strong>Batch mode:</strong> When files are similar and you want them processed quickly together</li>
<li><strong>Individual mode:</strong> When files might have different processing requirements or you want to ensure all files get processed even if some fail</li>
<li><strong>Single files:</strong> Always processed individually (no choice needed)</li>
<li>Both modes apply the same tags to all files</li>
</ul>
</div>
</div>
</div>
<div class="config-section">
<label for="memoryType">Memory Type</label>
<select id="memoryType" class="form-control">
<option value="document">Document</option>
<option value="reference">Reference</option>
<option value="knowledge">Knowledge</option>
<option value="note">Note</option>
</select>
</div>
</div>
<!-- Upload Button -->
<div class="upload-actions">
<button id="uploadBtn" class="btn btn-primary" disabled>
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M14,2H6A2,2 0 0,0 4,4V20A2,2 0 0,0 6,22H18A2,2 0 0,0 20,20V8L14,2M18,20H6V4H13V9H18V20Z"/>
</svg>
Upload & Ingest
</button>
</div>
</section>
<!-- Upload Queue/History Section -->
<section class="documents-section">
<h2>📊 Upload History</h2>
<div id="uploadHistory" class="upload-history">
<div class="loading-spinner"></div>
<p>Loading upload history...</p>
</div>
</section>
<!-- Document Content Search Section -->
<section class="documents-section">
<h2>🔍 Search Ingested Content</h2>
<p class="section-description">Search within your uploaded documents to verify content is indexed</p>
<div class="doc-search-container">
<input type="text"
id="docSearchInput"
class="search-input"
placeholder="Search for content within ingested documents...">
<button id="docSearchBtn" class="btn btn-primary">Search Documents</button>
</div>
<div id="docSearchResults" class="doc-search-results" style="display: none;">
<div class="search-results-header">
<h3>Search Results</h3>
<span id="docSearchCount" class="results-count">0 results</span>
</div>
<div id="docSearchResultsList" class="search-results-list">
<!-- Results will be populated here -->
</div>
</div>
</section>
</div>
</div>
<!-- Other views (Browse, Manage, Analytics) will be added in subsequent phases -->
<div id="browseView" class="view-container">
<div class="view-header">
<h2>Browse by Tags</h2>
<p>Explore your memories organized by tags</p>
</div>
<div class="browse-content">
<div id="tagsCloudContainer" class="tags-cloud">
<!-- Dynamic tags will be loaded here -->
</div>
<div id="taggedMemoriesContainer" class="tagged-memories" style="display: none;">
<div class="section-header">
<h3 id="selectedTagTitle">Memories tagged with: <span id="selectedTagName"></span></h3>
<button id="clearTagFilter" class="btn btn-secondary">Show All Tags</button>
</div>
<div id="taggedMemoriesList" class="memory-grid">
<!-- Filtered memories will be shown here -->
</div>
</div>
</div>
</div>
<!-- Manage View -->
<div id="manageView" class="view-container">
<div class="manage-layout">
<!-- Bulk Operations Section -->
<section class="manage-section">
<h2>🧹 Bulk Operations</h2>
<div class="bulk-ops-grid">
<div class="bulk-op-card">
<h3>Delete by Tag</h3>
<p>Remove all memories with a specific tag</p>
<div class="op-controls">
<select id="deleteTagSelect" class="form-control">
<option value="">Select tag...</option>
</select>
<button id="deleteByTagBtn" class="btn btn-danger">Delete</button>
</div>
</div>
<div class="bulk-op-card">
<h3>Cleanup Duplicates</h3>
<p>Remove duplicate memories based on content</p>
<div class="op-controls">
<button id="cleanupDuplicatesBtn" class="btn btn-warning">Run Cleanup</button>
</div>
</div>
<div class="bulk-op-card">
<h3>Delete by Date</h3>
<p>Remove memories older than a specific date</p>
<div class="op-controls">
<input type="date" id="deleteDateInput" class="form-control">
<button id="deleteByDateBtn" class="btn btn-danger">Delete</button>
</div>
</div>
</div>
</section>
<!-- Tag Management Section -->
<section class="manage-section">
<h2>🏷️ Tag Management</h2>
<div id="tagManagementContainer">
<div class="loading-spinner"></div>
<p>Loading tag statistics...</p>
</div>
</section>
<!-- System Operations Section -->
<section class="manage-section">
<h2>⚙️ System Maintenance</h2>
<div class="system-ops-grid">
<div class="system-op-card">
<h3>Database Optimization</h3>
<p>Optimize database performance</p>
<button id="optimizeDbBtn" class="btn btn-secondary" disabled>Optimize DB</button>
</div>
<div class="system-op-card">
<h3>Rebuild Search Index</h3>
<p>Rebuild search indexes for better performance</p>
<button id="rebuildIndexBtn" class="btn btn-secondary" disabled>Rebuild Index</button>
</div>
</div>
</section>
</div>
</div>
<!-- Analytics View -->
<div id="analyticsView" class="view-container">
<div class="analytics-layout">
<!-- Key Metrics Cards -->
<section class="analytics-section">
<h2>📊 Key Metrics</h2>
<div class="metrics-grid">
<div class="metric-card">
<div class="metric-value" id="analyticsTotalMemories">-</div>
<div class="metric-label">Total Memories</div>
</div>
<div class="metric-card">
<div class="metric-value" id="analyticsThisWeek">-</div>
<div class="metric-label">This Week</div>
</div>
<div class="metric-card">
<div class="metric-value" id="analyticsUniqueTags">-</div>
<div class="metric-label">Unique Tags</div>
</div>
<div class="metric-card">
<div class="metric-value" id="analyticsDbSize">-</div>
<div class="metric-label">Database Size</div>
</div>
</div>
</section>
<!-- Charts Section -->
<section class="analytics-section">
<h2>📈 Trends & Charts</h2>
<div class="charts-grid">
<div class="chart-card">
<h3>Memory Growth Over Time</h3>
<div class="chart-controls">
<select id="growthPeriodSelect" class="form-control">
<option value="week">Last Week</option>
<option value="month" selected>Last Month</option>
<option value="quarter">Last Quarter</option>
<option value="year">Last Year</option>
</select>
</div>
<div id="memoryGrowthChart" class="chart-container">
<div class="loading-spinner"></div>
<p>Loading chart...</p>
</div>
</div>
<div class="chart-card">
<h3>Tag Usage Distribution</h3>
<div id="tagUsageChart" class="chart-container">
<div class="loading-spinner"></div>
<p>Loading chart...</p>
</div>
</div>
<div class="chart-card">
<h3>Memory Types Distribution</h3>
<div id="memoryTypesChart" class="chart-container">
<div class="loading-spinner"></div>
<p>Loading chart...</p>
</div>
</div>
<div class="chart-card">
<h3>Activity Heatmap</h3>
<div class="chart-controls">
<select id="heatmapPeriodSelect" class="form-control">
<option value="90" selected>Last 90 Days</option>
<option value="180">Last 6 Months</option>
<option value="365">Last Year</option>
</select>
</div>
<div id="activityHeatmapChart" class="chart-container">
<div class="loading-spinner"></div>
<p>Loading heatmap...</p>
</div>
</div>
</section>
<!-- Detailed Analytics -->
<section class="analytics-section">
<h2>📋 Detailed Reports</h2>
<div class="reports-grid">
<div class="report-card">
<h3>Top Tags</h3>
<div class="chart-controls">
<select id="topTagsPeriodSelect" class="form-control">
<option value="7d">Last 7 Days</option>
<option value="30d" selected>Last 30 Days</option>
<option value="90d">Last 90 Days</option>
<option value="all">All Time</option>
</select>
</div>
<div id="topTagsList" class="report-content">
<div class="loading-spinner"></div>
<p>Loading...</p>
</div>
</div>
<div class="report-card">
<h3>Recent Activity</h3>
<div class="chart-controls">
<select id="activityGranularitySelect" class="form-control">
<option value="hourly">By Hour</option>
<option value="daily" selected>By Day</option>
<option value="weekly">By Week</option>
</select>
</div>
<div id="recentActivityList" class="report-content">
<div class="loading-spinner"></div>
<p>Loading...</p>
</div>
</div>
<div class="report-card">
<h3>Storage Report</h3>
<div id="storageReport" class="report-content">
<div class="loading-spinner"></div>
<p>Loading...</p>
</div>
</div>
</div>
</section>
</div>
</div>
<!-- API Documentation View -->
<div id="apiDocsView" class="view-container">
<div class="api-docs-header">
<h2>🔗 API Documentation</h2>
<p>Comprehensive REST API endpoints for MCP Memory Service</p>
<div class="api-docs-links">
<a href="/api/docs" target="_blank" class="btn btn-primary">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M14,17H7V15H14M17,13H7V11H17M17,9H7V7H17M19,3H5C3.89,3 3,3.89 3,5V19A2,2 0 0,0 5,21H19A2,2 0 0,0 21,19V5C21,3.89 20.1,3 19,3Z"/>
</svg>
Interactive Swagger UI
</a>
<a href="/api/redoc" target="_blank" class="btn btn-secondary">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M14,17H7V15H14M17,13H7V11H17M17,9H7V7H17M19,3H5C3.89,3 3,3.89 3,5V19A2,2 0 0,0 5,21H19A2,2 0 0,0 21,19V5C21,3.89 20.1,3 19,3Z"/>
</svg>
ReDoc Documentation
</a>
<a href="/api-overview" target="_blank" class="btn btn-secondary">
<svg width="20" height="20" fill="currentColor" viewBox="0 0 24 24">
<path d="M12,2A10,10 0 0,0 2,12A10,10 0 0,0 12,22A10,10 0 0,0 22,12A10,10 0 0,0 12,2M12,4A8,8 0 0,1 20,12A8,8 0 0,1 12,20A8,8 0 0,1 4,12A8,8 0 0,1 12,4M11,16.5L18,9.5L16.59,8.09L11,13.67L7.91,10.59L6.5,12L11,16.5Z"/>
</svg>
API Overview Page
</a>
</div>
</div>
<div class="api-endpoints-grid">
<!-- Memory Management -->
<div class="endpoint-section">
<h3>💾 Memory Management</h3>
<div class="endpoint-list">
<div class="endpoint-item">
<span class="method post">POST</span>
<span class="path">/api/memories</span>
<span class="description">Store a new memory with automatic embedding generation</span>
</div>
<div class="endpoint-item">
<span class="method get">GET</span>
<span class="path">/api/memories</span>
<span class="description">List all memories with pagination support</span>
</div>
<div class="endpoint-item">
<span class="method get">GET</span>
<span class="path">/api/memories/{content_hash}</span>
<span class="description">Retrieve a specific memory by content hash</span>
</div>
<div class="endpoint-item">
<span class="method delete">DELETE</span>
<span class="path">/api/memories/{content_hash}</span>
<span class="description">Delete a memory and its embeddings</span>
</div>
</div>
</div>
<!-- Search Operations -->
<div class="endpoint-section">
<h3>🔍 Search Operations</h3>
<div class="endpoint-list">
<div class="endpoint-item">
<span class="method post">POST</span>
<span class="path">/api/search</span>
<span class="description">Semantic similarity search using embeddings</span>
</div>
<div class="endpoint-item">
<span class="method post">POST</span>
<span class="path">/api/search/by-tag</span>
<span class="description">Search memories by tags (AND/OR logic)</span>
</div>
<div class="endpoint-item">
<span class="method post">POST</span>
<span class="path">/api/search/by-time</span>
<span class="description">Natural language time-based queries</span>
</div>
<div class="endpoint-item">
<span class="method get">GET</span>
<span class="path">/api/search/similar/{content_hash}</span>
<span class="description">Find memories similar to a specific one</span>
</div>
</div>
</div>
<!-- Real-time Events -->
<div class="endpoint-section">
<h3>📡 Real-time Events</h3>
<div class="endpoint-list">
<div class="endpoint-item">
<span class="method get">GET</span>
<span class="path">/api/events</span>
<span class="description">Subscribe to real-time memory events stream</span>
</div>
<div class="endpoint-item">
<span class="method get">GET</span>
<span class="path">/api/events/stats</span>
<span class="description">View SSE connection statistics</span>
</div>
</div>
</div>
<!-- Health & Status -->
<div class="endpoint-section">
<h3>🏥 Health & Status</h3>
<div class="endpoint-list">
<div class="endpoint-item">
<span class="method get">GET</span>
<span class="path">/api/health</span>
<span class="description">Quick health check endpoint</span>
</div>
<div class="endpoint-item">
<span class="method get">GET</span>
<span class="path">/api/health/detailed</span>
<span class="description">Detailed health with database statistics</span>
</div>
</div>
</div>
</div>
</div>
</main>
<!-- Footer -->
<footer class="app-footer">
<div class="footer-content">
<div class="footer-section">
<h4>Documentation</h4>
<ul class="footer-links">
<li><a href="https://github.com/doobidoo/mcp-memory-service/wiki" target="_blank" rel="noopener">
📚 Wiki Home
</a></li>
<li><a href="https://github.com/doobidoo/mcp-memory-service/wiki/07-TROUBLESHOOTING" target="_blank" rel="noopener">
🔧 Troubleshooting Guide
</a></li>
<li><a href="https://github.com/doobidoo/mcp-memory-service/wiki/07-TROUBLESHOOTING#backend-configuration-issues" target="_blank" rel="noopener">
⚙️ Configuration Issues
</a></li>
</ul>
</div>
<div class="footer-section">
<h4>Resources</h4>
<ul class="footer-links">
<li><a href="https://github.com/doobidoo/mcp-memory-service" target="_blank" rel="noopener">
<svg width="16" height="16" fill="currentColor" viewBox="0 0 24 24">
<path d="M12 0c-6.626 0-12 5.373-12 12 0 5.302 3.438 9.8 8.207 11.387.599.111.793-.261.793-.577v-2.234c-3.338.726-4.033-1.416-4.033-1.416-.546-1.387-1.333-1.756-1.333-1.756-1.089-.745.083-.729.083-.729 1.205.084 1.839 1.237 1.839 1.237 1.07 1.834 2.807 1.304 3.492.997.107-.775.418-1.305.762-1.604-2.665-.305-5.467-1.334-5.467-5.931 0-1.311.469-2.381 1.236-3.221-.124-.303-.535-1.524.117-3.176 0 0 1.008-.322 3.301 1.23.957-.266 1.983-.399 3.003-.404 1.02.005 2.047.138 3.006.404 2.291-1.552 3.297-1.23 3.297-1.23.653 1.653.242 2.874.118 3.176.77.84 1.235 1.911 1.235 3.221 0 4.609-2.807 5.624-5.479 5.921.43.372.823 1.102.823 2.222v3.293c0 .319.192.694.801.576 4.765-1.589 8.199-6.086 8.199-11.386 0-6.627-5.373-12-12-12z"/>
</svg>
GitHub Repository
</a></li>
<li><a href="https://doobidoo.github.io" target="_blank" rel="noopener">
🌐 Portfolio
</a></li>
<li><a href="/api/docs" target="_blank">
📖 API Documentation
</a></li>
</ul>
</div>
<div class="footer-section">
<h4>About</h4>
<p class="footer-description">
MCP Memory Service - Semantic memory management for AI assistants
</p>
<div class="footer-license">
<svg width="16" height="16" fill="currentColor" viewBox="0 0 24 24">
<path d="M14,17H7V15H14M17,13H7V11H17M17,9H7V7H17M19,3H5C3.89,3 3,3.89 3,5V19A2,2 0 0,0 5,21H19A2,2 0 0,0 21,19V5C21,3.89 20.1,3 19,3Z"/>
</svg>
<a href="https://github.com/doobidoo/mcp-memory-service/blob/main/LICENSE" target="_blank" rel="noopener">
Licensed under Apache 2.0
</a>
</div>
<div class="footer-copyright">
© 2024 Heinrich Krupp
</div>
</div>
</div>
</footer>
<!-- Memory Detail Modal -->
<div id="memoryModal" class="modal-overlay">
<div class="modal-content">
<div class="modal-header">
<h3 id="modalTitle">Memory Details</h3>
<button class="modal-close" aria-label="Close">
<svg width="24" height="24" fill="currentColor" viewBox="0 0 24 24">
<path d="M19 6.41L17.59 5 12 10.59 6.41 5 5 6.41 10.59 12 5 17.59 6.41 19 12 13.41 17.59 19 19 17.59 13.41 12z"/>
</svg>
</button>
</div>
<div class="modal-body">
<div id="modalContent">
<!-- Dynamic memory content -->
</div>
</div>
<div class="modal-footer">
<button class="btn btn-secondary" id="editMemoryBtn">Edit</button>
<button class="btn btn-danger" id="deleteMemoryBtn">Delete</button>
<button class="btn btn-primary" id="shareMemoryBtn">Share</button>
</div>
</div>
</div>
<!-- Add Memory Modal -->
<div id="addMemoryModal" class="modal-overlay">
<div class="modal-content">
<div class="modal-header">
<h3>Add New Memory</h3>
<button class="modal-close" aria-label="Close">
<svg width="24" height="24" fill="currentColor" viewBox="0 0 24 24">
<path d="M19 6.41L17.59 5 12 10.59 6.41 5 5 6.41 10.59 12 5 17.59 6.41 19 12 13.41 17.59 19 19 17.59 13.41 12z"/>
</svg>
</button>
</div>
<div class="modal-body">
<form id="addMemoryForm">
<div class="form-group">
<label for="memoryContent">Content</label>
<textarea id="memoryContent" rows="6" placeholder="Enter your memory content..."></textarea>
</div>
<div class="form-group">
<label for="memoryTags">Tags (comma-separated)</label>
<input type="text" id="memoryTags" placeholder="e.g., coding, javascript, api">
</div>
<div class="form-group">
<label for="memoryType">Type</label>
<select id="memoryType">
<option value="note">Note</option>
<option value="code">Code</option>
<option value="reference">Reference</option>
<option value="idea">Idea</option>
</select>
</div>
</form>
</div>
<div class="modal-footer">
<button class="btn btn-secondary" id="cancelAddBtn">Cancel</button>
<button class="btn btn-primary" id="saveMemoryBtn">Save Memory</button>
</div>
</div>
</div>
<!-- Settings Modal -->
<div id="settingsModal" class="modal-overlay">
<div class="modal-content">
<div class="modal-header">
<h3>Settings</h3>
<button class="modal-close" aria-label="Close">
<svg width="24" height="24" fill="currentColor" viewBox="0 0 24 24">
<path d="M19 6.41L17.59 5 12 10.59 6.41 5 5 6.41 10.59 12 5 17.59 6.41 19 12 13.41 17.59 19 19 17.59 13.41 12z"/>
</svg>
</button>
</div>
<div class="modal-body">
<form id="settingsForm">
<h4 class="settings-section-heading">Preferences</h4>
<div class="form-group">
<label for="themeSelect">Theme</label>
<select id="themeSelect">
<option value="light">Light</option>
<option value="dark">Dark</option>
</select>
</div>
<div class="form-group">
<label for="viewDensity">View Density</label>
<select id="viewDensity">
<option value="comfortable">Comfortable</option>
<option value="compact">Compact</option>
</select>
</div>
<div class="form-group">
<label for="previewLines">Memory Preview Lines</label>
<input type="number" id="previewLines" min="1" max="10" value="3">
</div>
<hr class="settings-divider">
<h4 class="settings-section-heading" style="margin-bottom: var(--space-4);">System Information</h4>
<div class="system-info">
<div class="info-row">
<span class="info-label">Version:</span>
<span class="info-value" id="settingsVersion">Loading...</span>
</div>
<div class="info-row">
<span class="info-label">Storage Backend:</span>
<span class="info-value" id="settingsBackend">Loading...</span>
</div>
<div class="info-row">
<span class="info-label">Primary Backend:</span>
<span class="info-value" id="settingsPrimaryBackend">Loading...</span>
</div>
<div class="info-row">
<span class="info-label">Embedding Model:</span>
<span class="info-value" id="settingsEmbeddingModel">Loading...</span>
</div>
<div class="info-row">
<span class="info-label">Embedding Dimensions:</span>
<span class="info-value" id="settingsEmbeddingDim">Loading...</span>
</div>
<div class="info-row">
<span class="info-label">Database Size:</span>
<span class="info-value" id="settingsDbSize">Loading...</span>
</div>
<div class="info-row">
<span class="info-label">Total Memories:</span>
<span class="info-value" id="settingsTotalMemories">Loading...</span>
</div>
<div class="info-row">
<span class="info-label">Uptime:</span>
<span class="info-value" id="settingsUptime">Loading...</span>
</div>
</div>
<hr class="settings-divider">
<h4 class="settings-section-heading">Backup & Restore</h4>
<div class="backup-settings">
<div class="info-row">
<span class="info-label">Last Backup:</span>
<span class="info-value" id="settingsLastBackup">Never</span>
</div>
<div class="info-row">
<span class="info-label">Backup Count:</span>
<span class="info-value" id="settingsBackupCount">0</span>
</div>
<div class="info-row">
<span class="info-label">Next Scheduled:</span>
<span class="info-value" id="settingsNextBackup">-</span>
</div>
<div class="backup-actions">
<button id="backupNowButton" class="btn btn-sm btn-primary">Create Backup Now</button>
<button id="viewBackupsButton" class="btn btn-sm btn-secondary">View Backups</button>
</div>
</div>
</form>
</div>
<div class="modal-footer">
<button class="btn btn-secondary" id="cancelSettingsBtn">Cancel</button>
<button class="btn btn-primary" id="saveSettingsBtn">Save Settings</button>
</div>
</div>
</div>
<!-- Loading Overlay -->
<div id="loadingOverlay" class="loading-overlay">
<div class="loading-spinner"></div>
<p>Loading...</p>
</div>
<!-- Toast Notifications -->
<div id="toastContainer" class="toast-container"></div>
</div>
<!-- Connection Status Indicator -->
<div id="connectionStatus" class="connection-status">
<div class="status-indicator"></div>
<span class="status-text">Connected</span>
</div>
<!-- Memory Viewer Modal -->
<div id="memoryViewerModal" class="modal" style="display: none;">
<div class="modal-content memory-viewer-content">
<div class="modal-header">
<h2>📝 Document Memory Chunks</h2>
<button class="modal-close" data-action="closeMemoryViewer">×</button>
</div>
<div class="modal-body">
<div class="document-info">
<h3 id="memoryViewerFilename">Loading...</h3>
<p id="memoryViewerStats" class="text-muted">0 chunks found</p>
</div>
<div class="memory-chunks-container">
<div id="memoryChunksList" class="memory-chunks-list">
<!-- Chunks will be populated here -->
</div>
</div>
</div>
<div class="modal-footer">
<button class="btn btn-secondary" data-action="closeMemoryViewer">Close</button>
</div>
</div>
</div>
<script src="/static/app.js?v=8.27.2-TOAST-AND-ICON"></script>
<link rel="stylesheet" href="/static/style.css?v=8.27.2-TOAST-AND-ICON">
</body>
</html>
```