This is page 39 of 47. Use http://codebase.md/doobidoo/mcp-memory-service?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── amp-bridge.md
│ │ ├── amp-pr-automator.md
│ │ ├── code-quality-guard.md
│ │ ├── gemini-pr-automator.md
│ │ └── github-release-manager.md
│ ├── settings.local.json.backup
│ └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── performance_issue.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── bridge-tests.yml
│ ├── CACHE_FIX.md
│ ├── claude-code-review.yml
│ ├── claude.yml
│ ├── cleanup-images.yml.disabled
│ ├── dev-setup-validation.yml
│ ├── docker-publish.yml
│ ├── LATEST_FIXES.md
│ ├── main-optimized.yml.disabled
│ ├── main.yml
│ ├── publish-and-test.yml
│ ├── README_OPTIMIZATION.md
│ ├── release-tag.yml.disabled
│ ├── release.yml
│ ├── roadmap-review-reminder.yml
│ ├── SECRET_CONDITIONAL_FIX.md
│ └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│ ├── .gitignore
│ └── reports
│ └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│ ├── deployment
│ │ ├── deploy_fastmcp_fixed.sh
│ │ ├── deploy_http_with_mcp.sh
│ │ └── deploy_mcp_v4.sh
│ ├── deployment-configs
│ │ ├── empty_config.yml
│ │ └── smithery.yaml
│ ├── development
│ │ └── test_fastmcp.py
│ ├── docs-removed-2025-08-23
│ │ ├── authentication.md
│ │ ├── claude_integration.md
│ │ ├── claude-code-compatibility.md
│ │ ├── claude-code-integration.md
│ │ ├── claude-code-quickstart.md
│ │ ├── claude-desktop-setup.md
│ │ ├── complete-setup-guide.md
│ │ ├── database-synchronization.md
│ │ ├── development
│ │ │ ├── autonomous-memory-consolidation.md
│ │ │ ├── CLEANUP_PLAN.md
│ │ │ ├── CLEANUP_README.md
│ │ │ ├── CLEANUP_SUMMARY.md
│ │ │ ├── dream-inspired-memory-consolidation.md
│ │ │ ├── hybrid-slm-memory-consolidation.md
│ │ │ ├── mcp-milestone.md
│ │ │ ├── multi-client-architecture.md
│ │ │ ├── test-results.md
│ │ │ └── TIMESTAMP_FIX_SUMMARY.md
│ │ ├── distributed-sync.md
│ │ ├── invocation_guide.md
│ │ ├── macos-intel.md
│ │ ├── master-guide.md
│ │ ├── mcp-client-configuration.md
│ │ ├── multi-client-server.md
│ │ ├── service-installation.md
│ │ ├── sessions
│ │ │ └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│ │ ├── UBUNTU_SETUP.md
│ │ ├── ubuntu.md
│ │ ├── windows-setup.md
│ │ └── windows.md
│ ├── docs-root-cleanup-2025-08-23
│ │ ├── AWESOME_LIST_SUBMISSION.md
│ │ ├── CLOUDFLARE_IMPLEMENTATION.md
│ │ ├── DOCUMENTATION_ANALYSIS.md
│ │ ├── DOCUMENTATION_CLEANUP_PLAN.md
│ │ ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│ │ ├── LITESTREAM_SETUP_GUIDE.md
│ │ ├── lm_studio_system_prompt.md
│ │ ├── PYTORCH_DOWNLOAD_FIX.md
│ │ └── README-ORIGINAL-BACKUP.md
│ ├── investigations
│ │ └── MACOS_HOOKS_INVESTIGATION.md
│ ├── litestream-configs-v6.3.0
│ │ ├── install_service.sh
│ │ ├── litestream_master_config_fixed.yml
│ │ ├── litestream_master_config.yml
│ │ ├── litestream_replica_config_fixed.yml
│ │ ├── litestream_replica_config.yml
│ │ ├── litestream_replica_simple.yml
│ │ ├── litestream-http.service
│ │ ├── litestream.service
│ │ └── requirements-cloudflare.txt
│ ├── release-notes
│ │ └── release-notes-v7.1.4.md
│ └── setup-development
│ ├── README.md
│ ├── setup_consolidation_mdns.sh
│ ├── STARTUP_SETUP_GUIDE.md
│ └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│ ├── memory-context.md
│ ├── memory-health.md
│ ├── memory-ingest-dir.md
│ ├── memory-ingest.md
│ ├── memory-recall.md
│ ├── memory-search.md
│ ├── memory-store.md
│ ├── README.md
│ └── session-start.md
├── claude-hooks
│ ├── config.json
│ ├── config.template.json
│ ├── CONFIGURATION.md
│ ├── core
│ │ ├── memory-retrieval.js
│ │ ├── mid-conversation.js
│ │ ├── session-end.js
│ │ ├── session-start.js
│ │ └── topic-change.js
│ ├── debug-pattern-test.js
│ ├── install_claude_hooks_windows.ps1
│ ├── install_hooks.py
│ ├── memory-mode-controller.js
│ ├── MIGRATION.md
│ ├── README-NATURAL-TRIGGERS.md
│ ├── README-phase2.md
│ ├── README.md
│ ├── simple-test.js
│ ├── statusline.sh
│ ├── test-adaptive-weights.js
│ ├── test-dual-protocol-hook.js
│ ├── test-mcp-hook.js
│ ├── test-natural-triggers.js
│ ├── test-recency-scoring.js
│ ├── tests
│ │ ├── integration-test.js
│ │ ├── phase2-integration-test.js
│ │ ├── test-code-execution.js
│ │ ├── test-cross-session.json
│ │ ├── test-session-tracking.json
│ │ └── test-threading.json
│ ├── utilities
│ │ ├── adaptive-pattern-detector.js
│ │ ├── context-formatter.js
│ │ ├── context-shift-detector.js
│ │ ├── conversation-analyzer.js
│ │ ├── dynamic-context-updater.js
│ │ ├── git-analyzer.js
│ │ ├── mcp-client.js
│ │ ├── memory-client.js
│ │ ├── memory-scorer.js
│ │ ├── performance-manager.js
│ │ ├── project-detector.js
│ │ ├── session-tracker.js
│ │ ├── tiered-conversation-monitor.js
│ │ └── version-checker.js
│ └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│ ├── amp-cli-bridge.md
│ ├── api
│ │ ├── code-execution-interface.md
│ │ ├── memory-metadata-api.md
│ │ ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_REPORT.md
│ │ └── tag-standardization.md
│ ├── architecture
│ │ ├── search-enhancement-spec.md
│ │ └── search-examples.md
│ ├── architecture.md
│ ├── archive
│ │ └── obsolete-workflows
│ │ ├── load_memory_context.md
│ │ └── README.md
│ ├── assets
│ │ └── images
│ │ ├── dashboard-v3.3.0-preview.png
│ │ ├── memory-awareness-hooks-example.png
│ │ ├── project-infographic.svg
│ │ └── README.md
│ ├── CLAUDE_CODE_QUICK_REFERENCE.md
│ ├── cloudflare-setup.md
│ ├── deployment
│ │ ├── docker.md
│ │ ├── dual-service.md
│ │ ├── production-guide.md
│ │ └── systemd-service.md
│ ├── development
│ │ ├── ai-agent-instructions.md
│ │ ├── code-quality
│ │ │ ├── phase-2a-completion.md
│ │ │ ├── phase-2a-handle-get-prompt.md
│ │ │ ├── phase-2a-index.md
│ │ │ ├── phase-2a-install-package.md
│ │ │ └── phase-2b-session-summary.md
│ │ ├── code-quality-workflow.md
│ │ ├── dashboard-workflow.md
│ │ ├── issue-management.md
│ │ ├── pr-review-guide.md
│ │ ├── refactoring-notes.md
│ │ ├── release-checklist.md
│ │ └── todo-tracker.md
│ ├── docker-optimized-build.md
│ ├── document-ingestion.md
│ ├── DOCUMENTATION_AUDIT.md
│ ├── enhancement-roadmap-issue-14.md
│ ├── examples
│ │ ├── analysis-scripts.js
│ │ ├── maintenance-session-example.md
│ │ ├── memory-distribution-chart.jsx
│ │ └── tag-schema.json
│ ├── first-time-setup.md
│ ├── glama-deployment.md
│ ├── guides
│ │ ├── advanced-command-examples.md
│ │ ├── chromadb-migration.md
│ │ ├── commands-vs-mcp-server.md
│ │ ├── mcp-enhancements.md
│ │ ├── mdns-service-discovery.md
│ │ ├── memory-consolidation-guide.md
│ │ ├── migration.md
│ │ ├── scripts.md
│ │ └── STORAGE_BACKENDS.md
│ ├── HOOK_IMPROVEMENTS.md
│ ├── hooks
│ │ └── phase2-code-execution-migration.md
│ ├── http-server-management.md
│ ├── ide-compatability.md
│ ├── IMAGE_RETENTION_POLICY.md
│ ├── images
│ │ └── dashboard-placeholder.md
│ ├── implementation
│ │ ├── health_checks.md
│ │ └── performance.md
│ ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│ ├── integration
│ │ ├── homebrew.md
│ │ └── multi-client.md
│ ├── integrations
│ │ ├── gemini.md
│ │ ├── groq-bridge.md
│ │ ├── groq-integration-summary.md
│ │ └── groq-model-comparison.md
│ ├── integrations.md
│ ├── legacy
│ │ └── dual-protocol-hooks.md
│ ├── LM_STUDIO_COMPATIBILITY.md
│ ├── maintenance
│ │ └── memory-maintenance.md
│ ├── mastery
│ │ ├── api-reference.md
│ │ ├── architecture-overview.md
│ │ ├── configuration-guide.md
│ │ ├── local-setup-and-run.md
│ │ ├── testing-guide.md
│ │ └── troubleshooting.md
│ ├── migration
│ │ └── code-execution-api-quick-start.md
│ ├── natural-memory-triggers
│ │ ├── cli-reference.md
│ │ ├── installation-guide.md
│ │ └── performance-optimization.md
│ ├── oauth-setup.md
│ ├── pr-graphql-integration.md
│ ├── quick-setup-cloudflare-dual-environment.md
│ ├── README.md
│ ├── remote-configuration-wiki-section.md
│ ├── research
│ │ ├── code-execution-interface-implementation.md
│ │ └── code-execution-interface-summary.md
│ ├── ROADMAP.md
│ ├── sqlite-vec-backend.md
│ ├── statistics
│ │ ├── charts
│ │ │ ├── activity_patterns.png
│ │ │ ├── contributors.png
│ │ │ ├── growth_trajectory.png
│ │ │ ├── monthly_activity.png
│ │ │ └── october_sprint.png
│ │ ├── data
│ │ │ ├── activity_by_day.csv
│ │ │ ├── activity_by_hour.csv
│ │ │ ├── contributors.csv
│ │ │ └── monthly_activity.csv
│ │ ├── generate_charts.py
│ │ └── REPOSITORY_STATISTICS.md
│ ├── technical
│ │ ├── development.md
│ │ ├── memory-migration.md
│ │ ├── migration-log.md
│ │ ├── sqlite-vec-embedding-fixes.md
│ │ └── tag-storage.md
│ ├── testing
│ │ └── regression-tests.md
│ ├── testing-cloudflare-backend.md
│ ├── troubleshooting
│ │ ├── cloudflare-api-token-setup.md
│ │ ├── cloudflare-authentication.md
│ │ ├── general.md
│ │ ├── hooks-quick-reference.md
│ │ ├── pr162-schema-caching-issue.md
│ │ ├── session-end-hooks.md
│ │ └── sync-issues.md
│ └── tutorials
│ ├── advanced-techniques.md
│ ├── data-analysis.md
│ └── demo-session-walkthrough.md
├── examples
│ ├── claude_desktop_config_template.json
│ ├── claude_desktop_config_windows.json
│ ├── claude-desktop-http-config.json
│ ├── config
│ │ └── claude_desktop_config.json
│ ├── http-mcp-bridge.js
│ ├── memory_export_template.json
│ ├── README.md
│ ├── setup
│ │ └── setup_multi_client_complete.py
│ └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│ ├── .claude
│ │ └── settings.local.json
│ ├── archive
│ │ └── check_missing_timestamps.py
│ ├── backup
│ │ ├── backup_memories.py
│ │ ├── backup_sqlite_vec.sh
│ │ ├── export_distributable_memories.sh
│ │ └── restore_memories.py
│ ├── benchmarks
│ │ ├── benchmark_code_execution_api.py
│ │ ├── benchmark_hybrid_sync.py
│ │ └── benchmark_server_caching.py
│ ├── database
│ │ ├── analyze_sqlite_vec_db.py
│ │ ├── check_sqlite_vec_status.py
│ │ ├── db_health_check.py
│ │ └── simple_timestamp_check.py
│ ├── development
│ │ ├── debug_server_initialization.py
│ │ ├── find_orphaned_files.py
│ │ ├── fix_mdns.sh
│ │ ├── fix_sitecustomize.py
│ │ ├── remote_ingest.sh
│ │ ├── setup-git-merge-drivers.sh
│ │ ├── uv-lock-merge.sh
│ │ └── verify_hybrid_sync.py
│ ├── hooks
│ │ └── pre-commit
│ ├── installation
│ │ ├── install_linux_service.py
│ │ ├── install_macos_service.py
│ │ ├── install_uv.py
│ │ ├── install_windows_service.py
│ │ ├── install.py
│ │ ├── setup_backup_cron.sh
│ │ ├── setup_claude_mcp.sh
│ │ └── setup_cloudflare_resources.py
│ ├── linux
│ │ ├── service_status.sh
│ │ ├── start_service.sh
│ │ ├── stop_service.sh
│ │ ├── uninstall_service.sh
│ │ └── view_logs.sh
│ ├── maintenance
│ │ ├── assign_memory_types.py
│ │ ├── check_memory_types.py
│ │ ├── cleanup_corrupted_encoding.py
│ │ ├── cleanup_memories.py
│ │ ├── cleanup_organize.py
│ │ ├── consolidate_memory_types.py
│ │ ├── consolidation_mappings.json
│ │ ├── delete_orphaned_vectors_fixed.py
│ │ ├── fast_cleanup_duplicates_with_tracking.sh
│ │ ├── find_all_duplicates.py
│ │ ├── find_cloudflare_duplicates.py
│ │ ├── find_duplicates.py
│ │ ├── memory-types.md
│ │ ├── README.md
│ │ ├── recover_timestamps_from_cloudflare.py
│ │ ├── regenerate_embeddings.py
│ │ ├── repair_malformed_tags.py
│ │ ├── repair_memories.py
│ │ ├── repair_sqlite_vec_embeddings.py
│ │ ├── repair_zero_embeddings.py
│ │ ├── restore_from_json_export.py
│ │ └── scan_todos.sh
│ ├── migration
│ │ ├── cleanup_mcp_timestamps.py
│ │ ├── legacy
│ │ │ └── migrate_chroma_to_sqlite.py
│ │ ├── mcp-migration.py
│ │ ├── migrate_sqlite_vec_embeddings.py
│ │ ├── migrate_storage.py
│ │ ├── migrate_tags.py
│ │ ├── migrate_timestamps.py
│ │ ├── migrate_to_cloudflare.py
│ │ ├── migrate_to_sqlite_vec.py
│ │ ├── migrate_v5_enhanced.py
│ │ ├── TIMESTAMP_CLEANUP_README.md
│ │ └── verify_mcp_timestamps.py
│ ├── pr
│ │ ├── amp_collect_results.sh
│ │ ├── amp_detect_breaking_changes.sh
│ │ ├── amp_generate_tests.sh
│ │ ├── amp_pr_review.sh
│ │ ├── amp_quality_gate.sh
│ │ ├── amp_suggest_fixes.sh
│ │ ├── auto_review.sh
│ │ ├── detect_breaking_changes.sh
│ │ ├── generate_tests.sh
│ │ ├── lib
│ │ │ └── graphql_helpers.sh
│ │ ├── quality_gate.sh
│ │ ├── resolve_threads.sh
│ │ ├── run_pyscn_analysis.sh
│ │ ├── run_quality_checks.sh
│ │ ├── thread_status.sh
│ │ └── watch_reviews.sh
│ ├── quality
│ │ ├── fix_dead_code_install.sh
│ │ ├── phase1_dead_code_analysis.md
│ │ ├── phase2_complexity_analysis.md
│ │ ├── README_PHASE1.md
│ │ ├── README_PHASE2.md
│ │ ├── track_pyscn_metrics.sh
│ │ └── weekly_quality_review.sh
│ ├── README.md
│ ├── run
│ │ ├── run_mcp_memory.sh
│ │ ├── run-with-uv.sh
│ │ └── start_sqlite_vec.sh
│ ├── run_memory_server.py
│ ├── server
│ │ ├── check_http_server.py
│ │ ├── check_server_health.py
│ │ ├── memory_offline.py
│ │ ├── preload_models.py
│ │ ├── run_http_server.py
│ │ ├── run_memory_server.py
│ │ ├── start_http_server.bat
│ │ └── start_http_server.sh
│ ├── service
│ │ ├── deploy_dual_services.sh
│ │ ├── install_http_service.sh
│ │ ├── mcp-memory-http.service
│ │ ├── mcp-memory.service
│ │ ├── memory_service_manager.sh
│ │ ├── service_control.sh
│ │ ├── service_utils.py
│ │ └── update_service.sh
│ ├── sync
│ │ ├── check_drift.py
│ │ ├── claude_sync_commands.py
│ │ ├── export_memories.py
│ │ ├── import_memories.py
│ │ ├── litestream
│ │ │ ├── apply_local_changes.sh
│ │ │ ├── enhanced_memory_store.sh
│ │ │ ├── init_staging_db.sh
│ │ │ ├── io.litestream.replication.plist
│ │ │ ├── manual_sync.sh
│ │ │ ├── memory_sync.sh
│ │ │ ├── pull_remote_changes.sh
│ │ │ ├── push_to_remote.sh
│ │ │ ├── README.md
│ │ │ ├── resolve_conflicts.sh
│ │ │ ├── setup_local_litestream.sh
│ │ │ ├── setup_remote_litestream.sh
│ │ │ ├── staging_db_init.sql
│ │ │ ├── stash_local_changes.sh
│ │ │ ├── sync_from_remote_noconfig.sh
│ │ │ └── sync_from_remote.sh
│ │ ├── README.md
│ │ ├── safe_cloudflare_update.sh
│ │ ├── sync_memory_backends.py
│ │ └── sync_now.py
│ ├── testing
│ │ ├── run_complete_test.py
│ │ ├── run_memory_test.sh
│ │ ├── simple_test.py
│ │ ├── test_cleanup_logic.py
│ │ ├── test_cloudflare_backend.py
│ │ ├── test_docker_functionality.py
│ │ ├── test_installation.py
│ │ ├── test_mdns.py
│ │ ├── test_memory_api.py
│ │ ├── test_memory_simple.py
│ │ ├── test_migration.py
│ │ ├── test_search_api.py
│ │ ├── test_sqlite_vec_embeddings.py
│ │ ├── test_sse_events.py
│ │ ├── test-connection.py
│ │ └── test-hook.js
│ ├── utils
│ │ ├── claude_commands_utils.py
│ │ ├── generate_personalized_claude_md.sh
│ │ ├── groq
│ │ ├── groq_agent_bridge.py
│ │ ├── list-collections.py
│ │ ├── memory_wrapper_uv.py
│ │ ├── query_memories.py
│ │ ├── smithery_wrapper.py
│ │ ├── test_groq_bridge.sh
│ │ └── uv_wrapper.py
│ └── validation
│ ├── check_dev_setup.py
│ ├── check_documentation_links.py
│ ├── diagnose_backend_config.py
│ ├── validate_configuration_complete.py
│ ├── validate_memories.py
│ ├── validate_migration.py
│ ├── validate_timestamp_integrity.py
│ ├── verify_environment.py
│ ├── verify_pytorch_windows.py
│ └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│ └── mcp_memory_service
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── operations.py
│ │ ├── sync_wrapper.py
│ │ └── types.py
│ ├── backup
│ │ ├── __init__.py
│ │ └── scheduler.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── ingestion.py
│ │ ├── main.py
│ │ └── utils.py
│ ├── config.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── associations.py
│ │ ├── base.py
│ │ ├── clustering.py
│ │ ├── compression.py
│ │ ├── consolidator.py
│ │ ├── decay.py
│ │ ├── forgetting.py
│ │ ├── health.py
│ │ └── scheduler.py
│ ├── dependency_check.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mdns_service.py
│ ├── embeddings
│ │ ├── __init__.py
│ │ └── onnx_embeddings.py
│ ├── ingestion
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chunker.py
│ │ ├── csv_loader.py
│ │ ├── json_loader.py
│ │ ├── pdf_loader.py
│ │ ├── registry.py
│ │ ├── semtools_loader.py
│ │ └── text_loader.py
│ ├── lm_studio_compat.py
│ ├── mcp_server.py
│ ├── models
│ │ ├── __init__.py
│ │ └── memory.py
│ ├── server.py
│ ├── services
│ │ ├── __init__.py
│ │ └── memory_service.py
│ ├── storage
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloudflare.py
│ │ ├── factory.py
│ │ ├── http_client.py
│ │ ├── hybrid.py
│ │ └── sqlite_vec.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── exporter.py
│ │ ├── importer.py
│ │ └── litestream_config.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── content_splitter.py
│ │ ├── db_utils.py
│ │ ├── debug.py
│ │ ├── document_processing.py
│ │ ├── gpu_detection.py
│ │ ├── hashing.py
│ │ ├── http_server_manager.py
│ │ ├── port_detection.py
│ │ ├── system_detection.py
│ │ └── time_parser.py
│ └── web
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── analytics.py
│ │ ├── backup.py
│ │ ├── consolidation.py
│ │ ├── documents.py
│ │ ├── events.py
│ │ ├── health.py
│ │ ├── manage.py
│ │ ├── mcp.py
│ │ ├── memories.py
│ │ ├── search.py
│ │ └── sync.py
│ ├── app.py
│ ├── dependencies.py
│ ├── oauth
│ │ ├── __init__.py
│ │ ├── authorization.py
│ │ ├── discovery.py
│ │ ├── middleware.py
│ │ ├── models.py
│ │ ├── registration.py
│ │ └── storage.py
│ ├── sse.py
│ └── static
│ ├── app.js
│ ├── index.html
│ ├── README.md
│ ├── sse_test.html
│ └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── test_compact_types.py
│ │ └── test_operations.py
│ ├── bridge
│ │ ├── mock_responses.js
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ └── test_http_mcp_bridge.js
│ ├── conftest.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_associations.py
│ │ ├── test_clustering.py
│ │ ├── test_compression.py
│ │ ├── test_consolidator.py
│ │ ├── test_decay.py
│ │ └── test_forgetting.py
│ ├── contracts
│ │ └── api-specification.yml
│ ├── integration
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── test_api_key_fallback.py
│ │ ├── test_api_memories_chronological.py
│ │ ├── test_api_tag_time_search.py
│ │ ├── test_api_with_memory_service.py
│ │ ├── test_bridge_integration.js
│ │ ├── test_cli_interfaces.py
│ │ ├── test_cloudflare_connection.py
│ │ ├── test_concurrent_clients.py
│ │ ├── test_data_serialization_consistency.py
│ │ ├── test_http_server_startup.py
│ │ ├── test_mcp_memory.py
│ │ ├── test_mdns_integration.py
│ │ ├── test_oauth_basic_auth.py
│ │ ├── test_oauth_flow.py
│ │ ├── test_server_handlers.py
│ │ └── test_store_memory.py
│ ├── performance
│ │ ├── test_background_sync.py
│ │ └── test_hybrid_live.py
│ ├── README.md
│ ├── smithery
│ │ └── test_smithery.py
│ ├── sqlite
│ │ └── simple_sqlite_vec_test.py
│ ├── test_client.py
│ ├── test_content_splitting.py
│ ├── test_database.py
│ ├── test_hybrid_cloudflare_limits.py
│ ├── test_hybrid_storage.py
│ ├── test_memory_ops.py
│ ├── test_semantic_search.py
│ ├── test_sqlite_vec_storage.py
│ ├── test_time_parser.py
│ ├── test_timestamp_preservation.py
│ ├── timestamp
│ │ ├── test_hook_vs_manual_storage.py
│ │ ├── test_issue99_final_validation.py
│ │ ├── test_search_retrieval_inconsistency.py
│ │ ├── test_timestamp_issue.py
│ │ └── test_timestamp_simple.py
│ └── unit
│ ├── conftest.py
│ ├── test_cloudflare_storage.py
│ ├── test_csv_loader.py
│ ├── test_fastapi_dependencies.py
│ ├── test_import.py
│ ├── test_json_loader.py
│ ├── test_mdns_simple.py
│ ├── test_mdns.py
│ ├── test_memory_service.py
│ ├── test_memory.py
│ ├── test_semtools_loader.py
│ ├── test_storage_interface_compatibility.py
│ └── test_tag_time_filtering.py
├── tools
│ ├── docker
│ │ ├── DEPRECATED.md
│ │ ├── docker-compose.http.yml
│ │ ├── docker-compose.pythonpath.yml
│ │ ├── docker-compose.standalone.yml
│ │ ├── docker-compose.uv.yml
│ │ ├── docker-compose.yml
│ │ ├── docker-entrypoint-persistent.sh
│ │ ├── docker-entrypoint-unified.sh
│ │ ├── docker-entrypoint.sh
│ │ ├── Dockerfile
│ │ ├── Dockerfile.glama
│ │ ├── Dockerfile.slim
│ │ ├── README.md
│ │ └── test-docker-modes.sh
│ └── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/mcp_memory_service/storage/hybrid.py:
--------------------------------------------------------------------------------
```python
1 | # Copyright 2024 Heinrich Krupp
2 | #
3 | # Licensed under the Apache License, Version 2.0 (the "License");
4 | # you may not use this file except in compliance with the License.
5 | # You may obtain a copy of the License at
6 | #
7 | # http://www.apache.org/licenses/LICENSE-2.0
8 | #
9 | # Unless required by applicable law or agreed to in writing, software
10 | # distributed under the License is distributed on an "AS IS" BASIS,
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 | # See the License for the specific language governing permissions and
13 | # limitations under the License.
14 |
15 | """
16 | Hybrid memory storage backend for MCP Memory Service.
17 |
18 | This implementation provides the best of both worlds:
19 | - SQLite-vec as primary storage for ultra-fast reads (~5ms)
20 | - Cloudflare as secondary storage for cloud persistence and multi-device sync
21 | - Background synchronization service for seamless integration
22 | - Graceful degradation when cloud services are unavailable
23 | """
24 |
25 | import asyncio
26 | import logging
27 | import time
28 | from typing import List, Dict, Any, Tuple, Optional
29 | from collections import deque
30 | from dataclasses import dataclass
31 |
32 | from .base import MemoryStorage
33 | from .sqlite_vec import SqliteVecMemoryStorage
34 | from .cloudflare import CloudflareStorage
35 | from ..models.memory import Memory, MemoryQueryResult
36 |
37 | # Import SSE for real-time progress updates
38 | try:
39 | from ..web.sse import sse_manager, create_sync_progress_event, create_sync_completed_event
40 | SSE_AVAILABLE = True
41 | except ImportError:
42 | SSE_AVAILABLE = False
43 |
44 | # Import config to check if limit constants are available
45 | from .. import config as app_config
46 |
47 | # Use getattr to provide fallbacks if attributes don't exist (prevents duplicate defaults)
48 | CLOUDFLARE_D1_MAX_SIZE_GB = getattr(app_config, 'CLOUDFLARE_D1_MAX_SIZE_GB', 10)
49 | CLOUDFLARE_VECTORIZE_MAX_VECTORS = getattr(app_config, 'CLOUDFLARE_VECTORIZE_MAX_VECTORS', 5_000_000)
50 | CLOUDFLARE_MAX_METADATA_SIZE_KB = getattr(app_config, 'CLOUDFLARE_MAX_METADATA_SIZE_KB', 10)
51 | CLOUDFLARE_WARNING_THRESHOLD_PERCENT = getattr(app_config, 'CLOUDFLARE_WARNING_THRESHOLD_PERCENT', 80)
52 | CLOUDFLARE_CRITICAL_THRESHOLD_PERCENT = getattr(app_config, 'CLOUDFLARE_CRITICAL_THRESHOLD_PERCENT', 95)
53 | HYBRID_SYNC_ON_STARTUP = getattr(app_config, 'HYBRID_SYNC_ON_STARTUP', True)
54 | HYBRID_MAX_CONTENT_LENGTH = getattr(app_config, 'HYBRID_MAX_CONTENT_LENGTH', 800)
55 | HYBRID_MAX_EMPTY_BATCHES = getattr(app_config, 'HYBRID_MAX_EMPTY_BATCHES', 20)
56 | HYBRID_MIN_CHECK_COUNT = getattr(app_config, 'HYBRID_MIN_CHECK_COUNT', 1000)
57 |
58 | logger = logging.getLogger(__name__)
59 |
60 | @dataclass
61 | class SyncOperation:
62 | """Represents a pending sync operation."""
63 | operation: str # 'store', 'delete', 'update'
64 | memory: Optional[Memory] = None
65 | content_hash: Optional[str] = None
66 | updates: Optional[Dict[str, Any]] = None
67 | timestamp: float = None
68 | retries: int = 0
69 | max_retries: int = 3
70 |
71 | def __post_init__(self):
72 | if self.timestamp is None:
73 | self.timestamp = time.time()
74 |
75 | class BackgroundSyncService:
76 | """
77 | Handles background synchronization between SQLite-vec and Cloudflare.
78 |
79 | Features:
80 | - Asynchronous operation queue
81 | - Retry logic with exponential backoff
82 | - Health monitoring and error handling
83 | - Configurable sync intervals and batch sizes
84 | - Graceful degradation when cloud is unavailable
85 | """
86 |
87 | def __init__(self,
88 | primary_storage: SqliteVecMemoryStorage,
89 | secondary_storage: CloudflareStorage,
90 | sync_interval: int = 300, # 5 minutes
91 | batch_size: int = 50,
92 | max_queue_size: int = 1000):
93 | self.primary = primary_storage
94 | self.secondary = secondary_storage
95 | self.sync_interval = sync_interval
96 | self.batch_size = batch_size
97 | self.max_queue_size = max_queue_size
98 |
99 | # Sync queues and state
100 | self.operation_queue = asyncio.Queue(maxsize=max_queue_size)
101 | self.failed_operations = deque(maxlen=100) # Keep track of failed operations
102 | self.is_running = False
103 | self.sync_task = None
104 | self.last_sync_time = 0
105 |
106 | # Drift detection state (v8.25.0+)
107 | self.last_drift_check_time = 0
108 | self.drift_check_enabled = getattr(app_config, 'HYBRID_SYNC_UPDATES', True)
109 | self.drift_check_interval = getattr(app_config, 'HYBRID_DRIFT_CHECK_INTERVAL', 3600)
110 |
111 | self.sync_stats = {
112 | 'operations_processed': 0,
113 | 'operations_failed': 0,
114 | 'last_sync_duration': 0,
115 | 'cloudflare_available': True,
116 | 'last_drift_check': 0,
117 | 'drift_detected_count': 0,
118 | 'drift_synced_count': 0
119 | }
120 |
121 | # Health monitoring
122 | self.consecutive_failures = 0
123 | self.max_consecutive_failures = 5
124 | self.backoff_time = 60 # Start with 1 minute backoff
125 |
126 | # Cloudflare capacity tracking
127 | self.cloudflare_stats = {
128 | 'vector_count': 0,
129 | 'estimated_d1_size_gb': 0,
130 | 'last_capacity_check': 0,
131 | 'approaching_limits': False,
132 | 'limit_warnings': []
133 | }
134 |
135 | async def start(self):
136 | """Start the background sync service."""
137 | if self.is_running:
138 | logger.warning("Background sync service is already running")
139 | return
140 |
141 | self.is_running = True
142 | self.sync_task = asyncio.create_task(self._sync_loop())
143 | logger.info(f"Background sync service started with {self.sync_interval}s interval")
144 |
145 | async def stop(self):
146 | """Stop the background sync service and process remaining operations."""
147 | if not self.is_running:
148 | return
149 |
150 | self.is_running = False
151 |
152 | # Process remaining operations in queue
153 | remaining_operations = []
154 | while not self.operation_queue.empty():
155 | try:
156 | operation = self.operation_queue.get_nowait()
157 | remaining_operations.append(operation)
158 | except asyncio.QueueEmpty:
159 | break
160 |
161 | if remaining_operations:
162 | logger.info(f"Processing {len(remaining_operations)} remaining operations before shutdown")
163 | await self._process_operations_batch(remaining_operations)
164 |
165 | # Cancel the sync task
166 | if self.sync_task:
167 | self.sync_task.cancel()
168 | try:
169 | await self.sync_task
170 | except asyncio.CancelledError:
171 | pass
172 |
173 | logger.info("Background sync service stopped")
174 |
175 | async def enqueue_operation(self, operation: SyncOperation):
176 | """Enqueue a sync operation for background processing."""
177 | try:
178 | await self.operation_queue.put(operation)
179 | logger.debug(f"Enqueued {operation.operation} operation")
180 | except asyncio.QueueFull:
181 | # If queue is full, process immediately to avoid blocking
182 | logger.warning("Sync queue full, processing operation immediately")
183 | await self._process_single_operation(operation)
184 |
185 | async def force_sync(self) -> Dict[str, Any]:
186 | """Force an immediate full synchronization between backends."""
187 | logger.info("Starting forced sync between primary and secondary storage")
188 | sync_start_time = time.time()
189 |
190 | try:
191 | # Get all memories from primary storage
192 | primary_memories = await self.primary.get_all_memories()
193 |
194 | # Check Cloudflare availability
195 | try:
196 | await self.secondary.get_stats() # Simple health check
197 | cloudflare_available = True
198 | except Exception as e:
199 | logger.warning(f"Cloudflare not available during force sync: {e}")
200 | cloudflare_available = False
201 | self.sync_stats['cloudflare_available'] = False
202 | return {
203 | 'status': 'partial',
204 | 'cloudflare_available': False,
205 | 'primary_memories': len(primary_memories),
206 | 'synced_to_secondary': 0,
207 | 'duration': time.time() - sync_start_time
208 | }
209 |
210 | # Sync from primary to secondary using concurrent operations
211 | async def sync_memory(memory):
212 | try:
213 | success, message = await self.secondary.store(memory)
214 | if success:
215 | return True, None
216 | else:
217 | logger.debug(f"Failed to sync memory to secondary: {message}")
218 | return False, message
219 | except Exception as e:
220 | logger.debug(f"Exception syncing memory to secondary: {e}")
221 | return False, str(e)
222 |
223 | # Process memories concurrently in batches
224 | synced_count = 0
225 | failed_count = 0
226 |
227 | # Process in batches to avoid overwhelming the system
228 | batch_size = min(self.batch_size, 10) # Limit concurrent operations
229 | for i in range(0, len(primary_memories), batch_size):
230 | batch = primary_memories[i:i + batch_size]
231 | results = await asyncio.gather(*[sync_memory(m) for m in batch], return_exceptions=True)
232 |
233 | for result in results:
234 | if isinstance(result, Exception):
235 | failed_count += 1
236 | logger.debug(f"Exception in batch sync: {result}")
237 | elif isinstance(result, tuple):
238 | success, _ = result
239 | if success:
240 | synced_count += 1
241 | else:
242 | failed_count += 1
243 |
244 | sync_duration = time.time() - sync_start_time
245 | self.sync_stats['last_sync_duration'] = sync_duration
246 | self.sync_stats['cloudflare_available'] = cloudflare_available
247 |
248 | logger.info(f"Force sync completed: {synced_count} synced, {failed_count} failed in {sync_duration:.2f}s")
249 |
250 | return {
251 | 'status': 'completed',
252 | 'cloudflare_available': cloudflare_available,
253 | 'primary_memories': len(primary_memories),
254 | 'synced_to_secondary': synced_count,
255 | 'failed_operations': failed_count,
256 | 'duration': sync_duration
257 | }
258 |
259 | except Exception as e:
260 | logger.error(f"Error during force sync: {e}")
261 | return {
262 | 'status': 'error',
263 | 'error': str(e),
264 | 'duration': time.time() - sync_start_time
265 | }
266 |
267 | async def get_sync_status(self) -> Dict[str, Any]:
268 | """Get current sync service status and statistics."""
269 | queue_size = self.operation_queue.qsize()
270 |
271 | status = {
272 | 'is_running': self.is_running,
273 | 'is_paused': not self.is_running,
274 | 'pending_operations': queue_size,
275 | 'failed_operations': len(self.failed_operations),
276 | 'last_sync_time': self.last_sync_time,
277 | 'consecutive_failures': self.consecutive_failures,
278 | 'stats': self.sync_stats.copy(),
279 | 'operations_processed': self.sync_stats.get('operations_processed', 0),
280 | 'operations_failed': self.sync_stats.get('operations_failed', 0),
281 | 'cloudflare_available': self.sync_stats['cloudflare_available'],
282 | 'sync_interval': self.sync_interval,
283 | 'next_sync_in': max(0, self.sync_interval - (time.time() - self.last_sync_time)),
284 | 'capacity': {
285 | 'vector_count': self.cloudflare_stats['vector_count'],
286 | 'vector_limit': CLOUDFLARE_VECTORIZE_MAX_VECTORS,
287 | 'approaching_limits': self.cloudflare_stats['approaching_limits'],
288 | 'warnings': self.cloudflare_stats['limit_warnings']
289 | }
290 | }
291 |
292 | return status
293 |
294 | async def validate_memory_for_cloudflare(self, memory: Memory) -> Tuple[bool, Optional[str]]:
295 | """
296 | Validate if a memory can be synced to Cloudflare.
297 |
298 | Returns:
299 | Tuple of (is_valid, error_message)
300 | """
301 | # Check metadata size
302 | if memory.metadata:
303 | import json
304 | metadata_json = json.dumps(memory.metadata)
305 | metadata_size_kb = len(metadata_json.encode('utf-8')) / 1024
306 |
307 | if metadata_size_kb > CLOUDFLARE_MAX_METADATA_SIZE_KB:
308 | return False, f"Metadata size {metadata_size_kb:.2f}KB exceeds Cloudflare limit of {CLOUDFLARE_MAX_METADATA_SIZE_KB}KB"
309 |
310 | # Check if we're approaching vector count limit
311 | if self.cloudflare_stats['vector_count'] >= CLOUDFLARE_VECTORIZE_MAX_VECTORS:
312 | return False, f"Cloudflare vector limit of {CLOUDFLARE_VECTORIZE_MAX_VECTORS} reached"
313 |
314 | return True, None
315 |
316 | async def check_cloudflare_capacity(self) -> Dict[str, Any]:
317 | """
318 | Check remaining Cloudflare capacity and return status.
319 | """
320 | try:
321 | # Get current stats from Cloudflare
322 | cf_stats = await self.secondary.get_stats()
323 |
324 | # Update our tracking
325 | self.cloudflare_stats['vector_count'] = cf_stats.get('total_memories', 0)
326 | self.cloudflare_stats['last_capacity_check'] = time.time()
327 |
328 | # Calculate usage percentages
329 | vector_usage_percent = (self.cloudflare_stats['vector_count'] / CLOUDFLARE_VECTORIZE_MAX_VECTORS) * 100
330 |
331 | # Clear previous warnings
332 | self.cloudflare_stats['limit_warnings'] = []
333 |
334 | # Check vector count limits
335 | if vector_usage_percent >= CLOUDFLARE_CRITICAL_THRESHOLD_PERCENT:
336 | warning = f"CRITICAL: Vector usage at {vector_usage_percent:.1f}% ({self.cloudflare_stats['vector_count']:,}/{CLOUDFLARE_VECTORIZE_MAX_VECTORS:,})"
337 | self.cloudflare_stats['limit_warnings'].append(warning)
338 | logger.error(warning)
339 | self.cloudflare_stats['approaching_limits'] = True
340 | elif vector_usage_percent >= CLOUDFLARE_WARNING_THRESHOLD_PERCENT:
341 | warning = f"WARNING: Vector usage at {vector_usage_percent:.1f}% ({self.cloudflare_stats['vector_count']:,}/{CLOUDFLARE_VECTORIZE_MAX_VECTORS:,})"
342 | self.cloudflare_stats['limit_warnings'].append(warning)
343 | logger.warning(warning)
344 | self.cloudflare_stats['approaching_limits'] = True
345 | else:
346 | self.cloudflare_stats['approaching_limits'] = False
347 |
348 | return {
349 | 'vector_count': self.cloudflare_stats['vector_count'],
350 | 'vector_limit': CLOUDFLARE_VECTORIZE_MAX_VECTORS,
351 | 'vector_usage_percent': vector_usage_percent,
352 | 'approaching_limits': self.cloudflare_stats['approaching_limits'],
353 | 'warnings': self.cloudflare_stats['limit_warnings']
354 | }
355 |
356 | except Exception as e:
357 | logger.error(f"Failed to check Cloudflare capacity: {e}")
358 | return {
359 | 'error': str(e),
360 | 'approaching_limits': False
361 | }
362 |
363 | async def _sync_loop(self):
364 | """Main background sync loop."""
365 | logger.info("Background sync loop started")
366 |
367 | while self.is_running:
368 | try:
369 | # Process queued operations
370 | await self._process_operation_queue()
371 |
372 | # Periodic full sync if enough time has passed
373 | current_time = time.time()
374 | if current_time - self.last_sync_time >= self.sync_interval:
375 | await self._periodic_sync()
376 | self.last_sync_time = current_time
377 |
378 | # Sleep before next iteration
379 | await asyncio.sleep(5) # Check every 5 seconds
380 |
381 | except Exception as e:
382 | logger.error(f"Error in sync loop: {e}")
383 | self.consecutive_failures += 1
384 |
385 | if self.consecutive_failures >= self.max_consecutive_failures:
386 | logger.warning(f"Too many consecutive sync failures ({self.consecutive_failures}), backing off for {self.backoff_time}s")
387 | await asyncio.sleep(self.backoff_time)
388 | self.backoff_time = min(self.backoff_time * 2, 1800) # Max 30 minutes
389 | else:
390 | await asyncio.sleep(1)
391 |
392 | async def _process_operation_queue(self):
393 | """Process operations from the queue in batches."""
394 | operations = []
395 |
396 | # Collect up to batch_size operations
397 | for _ in range(self.batch_size):
398 | try:
399 | operation = self.operation_queue.get_nowait()
400 | operations.append(operation)
401 | except asyncio.QueueEmpty:
402 | break
403 |
404 | if operations:
405 | await self._process_operations_batch(operations)
406 |
407 | async def _process_operations_batch(self, operations: List[SyncOperation]):
408 | """Process a batch of sync operations."""
409 | logger.debug(f"Processing batch of {len(operations)} sync operations")
410 |
411 | for operation in operations:
412 | try:
413 | await self._process_single_operation(operation)
414 | self.sync_stats['operations_processed'] += 1
415 |
416 | except Exception as e:
417 | await self._handle_sync_error(e, operation)
418 |
419 | async def _handle_sync_error(self, error: Exception, operation: SyncOperation):
420 | """
421 | Handle sync operation errors with intelligent retry logic.
422 |
423 | Args:
424 | error: The exception that occurred
425 | operation: The failed operation
426 | """
427 | error_str = str(error).lower()
428 |
429 | # Check for specific Cloudflare limit errors
430 | is_limit_error = any(term in error_str for term in [
431 | 'limit exceeded', 'quota exceeded', 'maximum', 'too large',
432 | '413', '507', 'insufficient storage', 'capacity'
433 | ])
434 |
435 | if is_limit_error:
436 | # Don't retry limit errors - they won't succeed
437 | logger.error(f"Cloudflare limit error for {operation.operation}: {error}")
438 | self.sync_stats['operations_failed'] += 1
439 |
440 | # Update capacity tracking
441 | self.cloudflare_stats['approaching_limits'] = True
442 | self.cloudflare_stats['limit_warnings'].append(f"Limit error: {error}")
443 |
444 | # Check capacity to understand the issue
445 | await self.check_cloudflare_capacity()
446 | return
447 |
448 | # Check for temporary/network errors
449 | is_temporary_error = any(term in error_str for term in [
450 | 'timeout', 'connection', 'network', '500', '502', '503', '504',
451 | 'temporarily unavailable', 'retry'
452 | ])
453 |
454 | if is_temporary_error or operation.retries < operation.max_retries:
455 | # Retry temporary errors
456 | logger.warning(f"Temporary error for {operation.operation} (retry {operation.retries + 1}/{operation.max_retries}): {error}")
457 | operation.retries += 1
458 |
459 | if operation.retries < operation.max_retries:
460 | # Add back to queue for retry with exponential backoff
461 | await asyncio.sleep(min(2 ** operation.retries, 60)) # Max 60 second delay
462 | self.failed_operations.append(operation)
463 | else:
464 | logger.error(f"Max retries reached for {operation.operation}")
465 | self.sync_stats['operations_failed'] += 1
466 | else:
467 | # Permanent error - don't retry
468 | logger.error(f"Permanent error for {operation.operation}: {error}")
469 | self.sync_stats['operations_failed'] += 1
470 |
471 | async def _process_single_operation(self, operation: SyncOperation):
472 | """Process a single sync operation to secondary storage."""
473 | try:
474 | if operation.operation == 'store' and operation.memory:
475 | # Validate memory before syncing
476 | is_valid, validation_error = await self.validate_memory_for_cloudflare(operation.memory)
477 | if not is_valid:
478 | logger.warning(f"Memory validation failed for sync: {validation_error}")
479 | # Don't retry if it's a hard limit
480 | if "exceeds Cloudflare limit" in validation_error or "limit of" in validation_error:
481 | self.sync_stats['operations_failed'] += 1
482 | return # Skip this memory permanently
483 | raise Exception(validation_error)
484 |
485 | success, message = await self.secondary.store(operation.memory)
486 | if not success:
487 | raise Exception(f"Store operation failed: {message}")
488 |
489 | elif operation.operation == 'delete' and operation.content_hash:
490 | success, message = await self.secondary.delete(operation.content_hash)
491 | if not success:
492 | raise Exception(f"Delete operation failed: {message}")
493 |
494 | elif operation.operation == 'update' and operation.content_hash and operation.updates:
495 | success, message = await self.secondary.update_memory_metadata(
496 | operation.content_hash, operation.updates
497 | )
498 | if not success:
499 | raise Exception(f"Update operation failed: {message}")
500 |
501 | # Reset failure counters on success
502 | self.consecutive_failures = 0
503 | self.backoff_time = 60
504 | self.sync_stats['cloudflare_available'] = True
505 |
506 | except Exception as e:
507 | # Mark Cloudflare as potentially unavailable
508 | self.sync_stats['cloudflare_available'] = False
509 | raise
510 |
511 | async def _periodic_sync(self):
512 | """Perform periodic full synchronization."""
513 | logger.debug("Starting periodic sync")
514 |
515 | try:
516 | # Retry any failed operations first
517 | if self.failed_operations:
518 | retry_operations = list(self.failed_operations)
519 | self.failed_operations.clear()
520 | logger.info(f"Retrying {len(retry_operations)} failed operations")
521 | await self._process_operations_batch(retry_operations)
522 |
523 | # Perform a lightweight health check
524 | try:
525 | stats = await self.secondary.get_stats()
526 | logger.debug(f"Secondary storage health check passed: {stats}")
527 | self.sync_stats['cloudflare_available'] = True
528 |
529 | # Check Cloudflare capacity every periodic sync
530 | capacity_status = await self.check_cloudflare_capacity()
531 | if capacity_status.get('approaching_limits'):
532 | logger.warning("Cloudflare approaching capacity limits")
533 | for warning in capacity_status.get('warnings', []):
534 | logger.warning(warning)
535 |
536 | # Periodic drift detection (v8.25.0+)
537 | if self.drift_check_enabled:
538 | time_since_last_check = time.time() - self.last_drift_check_time
539 | if time_since_last_check >= self.drift_check_interval:
540 | logger.info(f"Running periodic drift check (interval: {self.drift_check_interval}s)")
541 | drift_stats = await self._detect_and_sync_drift()
542 | logger.info(f"Drift check complete: {drift_stats}")
543 |
544 | except Exception as e:
545 | logger.warning(f"Secondary storage health check failed: {e}")
546 | self.sync_stats['cloudflare_available'] = False
547 |
548 | except Exception as e:
549 | logger.error(f"Error during periodic sync: {e}")
550 |
551 | async def _detect_and_sync_drift(self, dry_run: bool = False) -> Dict[str, int]:
552 | """
553 | Detect and sync memories with divergent metadata between backends.
554 |
555 | Compares updated_at timestamps to identify metadata drift (tags, types, custom fields)
556 | and synchronizes changes using "newer timestamp wins" strategy.
557 |
558 | Args:
559 | dry_run: If True, detect drift but don't apply changes (preview mode)
560 |
561 | Returns:
562 | Dictionary with drift detection statistics:
563 | - checked: Number of memories examined
564 | - drift_detected: Number of memories with divergent metadata
565 | - synced: Number of memories synchronized
566 | - failed: Number of sync failures
567 | """
568 | if not self.drift_check_enabled:
569 | return {'checked': 0, 'drift_detected': 0, 'synced': 0, 'failed': 0}
570 |
571 | logger.info(f"Starting drift detection scan (dry_run={dry_run})...")
572 | stats = {'checked': 0, 'drift_detected': 0, 'synced': 0, 'failed': 0}
573 |
574 | try:
575 | # Get batch of recently updated memories from Cloudflare
576 | batch_size = getattr(app_config, 'HYBRID_DRIFT_BATCH_SIZE', 100)
577 |
578 | # Strategy: Check memories updated since last drift check
579 | time_threshold = self.last_drift_check_time or (time.time() - self.drift_check_interval)
580 |
581 | # Get recently updated from Cloudflare
582 | if hasattr(self.secondary, 'get_memories_updated_since'):
583 | cf_updated = await self.secondary.get_memories_updated_since(
584 | time_threshold,
585 | limit=batch_size
586 | )
587 | else:
588 | # Fallback: Get recent memories and filter by timestamp
589 | cf_memories = await self.secondary.get_all_memories(limit=batch_size)
590 | cf_updated = [m for m in cf_memories if m.updated_at and m.updated_at >= time_threshold]
591 |
592 | logger.info(f"Found {len(cf_updated)} memories updated in Cloudflare since last check")
593 |
594 | # Compare with local versions
595 | for cf_memory in cf_updated:
596 | stats['checked'] += 1
597 | try:
598 | local_memory = await self.primary.get_by_hash(cf_memory.content_hash)
599 |
600 | if not local_memory:
601 | # Memory missing locally - sync it
602 | stats['drift_detected'] += 1
603 | logger.debug(f"Memory {cf_memory.content_hash[:8]} missing locally, syncing...")
604 | if not dry_run:
605 | success, _ = await self.primary.store(cf_memory)
606 | if success:
607 | stats['synced'] += 1
608 | else:
609 | logger.info(f"[DRY RUN] Would sync missing memory: {cf_memory.content_hash[:8]}")
610 | stats['synced'] += 1
611 | continue
612 |
613 | # Compare updated_at timestamps
614 | cf_updated_at = cf_memory.updated_at or 0
615 | local_updated_at = local_memory.updated_at or 0
616 |
617 | # Allow 1 second tolerance for timestamp precision
618 | if abs(cf_updated_at - local_updated_at) > 1.0:
619 | stats['drift_detected'] += 1
620 | logger.debug(
621 | f"Drift detected for {cf_memory.content_hash[:8]}: "
622 | f"Cloudflare={cf_updated_at:.2f}, Local={local_updated_at:.2f}"
623 | )
624 |
625 | # Use "newer timestamp wins" strategy
626 | if cf_updated_at > local_updated_at:
627 | # Cloudflare is newer - update local
628 | if not dry_run:
629 | success, _ = await self.primary.update_memory_metadata(
630 | cf_memory.content_hash,
631 | {
632 | 'tags': cf_memory.tags,
633 | 'memory_type': cf_memory.memory_type,
634 | 'metadata': cf_memory.metadata,
635 | 'created_at': cf_memory.created_at,
636 | 'created_at_iso': cf_memory.created_at_iso,
637 | 'updated_at': cf_memory.updated_at,
638 | 'updated_at_iso': cf_memory.updated_at_iso,
639 | },
640 | preserve_timestamps=False # Use Cloudflare timestamps
641 | )
642 | if success:
643 | stats['synced'] += 1
644 | logger.info(f"Synced metadata from Cloudflare → local: {cf_memory.content_hash[:8]}")
645 | else:
646 | stats['failed'] += 1
647 | else:
648 | logger.info(f"[DRY RUN] Would sync metadata from Cloudflare → local: {cf_memory.content_hash[:8]}")
649 | stats['synced'] += 1
650 | else:
651 | # Local is newer - update Cloudflare
652 | if not dry_run:
653 | operation = SyncOperation(
654 | operation='update',
655 | content_hash=local_memory.content_hash,
656 | updates={
657 | 'tags': local_memory.tags,
658 | 'memory_type': local_memory.memory_type,
659 | 'metadata': local_memory.metadata,
660 | }
661 | )
662 | await self.enqueue_operation(operation)
663 | stats['synced'] += 1
664 | logger.info(f"Queued metadata sync from local → Cloudflare: {local_memory.content_hash[:8]}")
665 | else:
666 | logger.info(f"[DRY RUN] Would queue metadata sync from local → Cloudflare: {local_memory.content_hash[:8]}")
667 | stats['synced'] += 1
668 |
669 | except Exception as e:
670 | logger.warning(f"Error checking drift for memory: {e}")
671 | stats['failed'] += 1
672 | continue
673 |
674 | # Update tracking
675 | if not dry_run:
676 | self.last_drift_check_time = time.time()
677 | self.sync_stats['last_drift_check'] = self.last_drift_check_time
678 | self.sync_stats['drift_detected_count'] += stats['drift_detected']
679 | self.sync_stats['drift_synced_count'] += stats['synced']
680 |
681 | logger.info(
682 | f"Drift detection complete: checked={stats['checked']}, "
683 | f"drift_detected={stats['drift_detected']}, synced={stats['synced']}, failed={stats['failed']}"
684 | )
685 |
686 | except Exception as e:
687 | logger.error(f"Error during drift detection: {e}")
688 |
689 | return stats
690 |
691 |
692 | class HybridMemoryStorage(MemoryStorage):
693 | """
694 | Hybrid memory storage using SQLite-vec as primary and Cloudflare as secondary.
695 |
696 | This implementation provides:
697 | - Ultra-fast reads and writes (~5ms) via SQLite-vec
698 | - Cloud persistence and multi-device sync via Cloudflare
699 | - Background synchronization with retry logic
700 | - Graceful degradation when cloud services are unavailable
701 | - Full compatibility with the MemoryStorage interface
702 | """
703 |
704 | @property
705 | def max_content_length(self) -> Optional[int]:
706 | """
707 | Maximum content length constrained by Cloudflare secondary storage.
708 | Uses configured hybrid limit (defaults to Cloudflare limit).
709 | """
710 | return HYBRID_MAX_CONTENT_LENGTH
711 |
712 | @property
713 | def supports_chunking(self) -> bool:
714 | """Hybrid backend supports content chunking with metadata linking."""
715 | return True
716 |
717 | def __init__(self,
718 | sqlite_db_path: str,
719 | embedding_model: str = "all-MiniLM-L6-v2",
720 | cloudflare_config: Dict[str, Any] = None,
721 | sync_interval: int = 300,
722 | batch_size: int = 50):
723 | """
724 | Initialize hybrid storage with primary SQLite-vec and secondary Cloudflare.
725 |
726 | Args:
727 | sqlite_db_path: Path to SQLite-vec database file
728 | embedding_model: Embedding model name for SQLite-vec
729 | cloudflare_config: Cloudflare configuration dict
730 | sync_interval: Background sync interval in seconds (default: 5 minutes)
731 | batch_size: Batch size for sync operations (default: 50)
732 | """
733 | self.primary = SqliteVecMemoryStorage(
734 | db_path=sqlite_db_path,
735 | embedding_model=embedding_model
736 | )
737 |
738 | # Initialize Cloudflare storage if config provided
739 | self.secondary = None
740 | self.sync_service = None
741 |
742 | if cloudflare_config and all(key in cloudflare_config for key in
743 | ['api_token', 'account_id', 'vectorize_index', 'd1_database_id']):
744 | self.secondary = CloudflareStorage(**cloudflare_config)
745 | else:
746 | logger.warning("Cloudflare config incomplete, running in SQLite-only mode")
747 |
748 | self.sync_interval = sync_interval
749 | self.batch_size = batch_size
750 | self.initialized = False
751 |
752 | # Initial sync status tracking
753 | self.initial_sync_in_progress = False
754 | self.initial_sync_total = 0
755 | self.initial_sync_completed = 0
756 | self.initial_sync_finished = False
757 |
758 | async def initialize(self) -> None:
759 | """Initialize the hybrid storage system."""
760 | logger.info("Initializing hybrid memory storage...")
761 |
762 | # Always initialize primary storage
763 | await self.primary.initialize()
764 | logger.info("Primary storage (SQLite-vec) initialized")
765 |
766 | # Initialize secondary storage and sync service if available
767 | if self.secondary:
768 | try:
769 | await self.secondary.initialize()
770 | logger.info("Secondary storage (Cloudflare) initialized")
771 |
772 | # Start background sync service
773 | self.sync_service = BackgroundSyncService(
774 | self.primary,
775 | self.secondary,
776 | sync_interval=self.sync_interval,
777 | batch_size=self.batch_size
778 | )
779 | await self.sync_service.start()
780 | logger.info("Background sync service started")
781 |
782 | # Schedule initial sync to run after server startup (non-blocking)
783 | if HYBRID_SYNC_ON_STARTUP:
784 | asyncio.create_task(self._perform_initial_sync_after_startup())
785 | logger.info("Initial sync scheduled to run after server startup")
786 |
787 | except Exception as e:
788 | logger.warning(f"Failed to initialize secondary storage: {e}")
789 | self.secondary = None
790 |
791 | self.initialized = True
792 | logger.info("Hybrid memory storage initialization completed")
793 |
794 | async def _perform_initial_sync_after_startup(self) -> None:
795 | """
796 | Wrapper for initial sync that waits for server startup to complete.
797 | This allows the web server to be accessible during the sync process.
798 | """
799 | # Wait a bit for server to fully start up
800 | await asyncio.sleep(2)
801 | logger.info("Starting initial sync in background (server is now accessible)")
802 | await self._perform_initial_sync()
803 |
804 | async def _sync_memories_from_cloudflare(
805 | self,
806 | sync_type: str = "initial",
807 | broadcast_sse: bool = True,
808 | enable_drift_check: bool = True
809 | ) -> Dict[str, Any]:
810 | """
811 | Shared logic for syncing memories FROM Cloudflare TO local storage.
812 |
813 | Args:
814 | sync_type: Type of sync ("initial" or "manual") for logging/SSE events
815 | broadcast_sse: Whether to broadcast SSE progress events
816 | enable_drift_check: Whether to check for metadata drift (only for initial sync)
817 |
818 | Returns:
819 | Dict with:
820 | - success: bool
821 | - memories_synced: int
822 | - total_checked: int
823 | - message: str
824 | - time_taken_seconds: float
825 | """
826 | import time
827 | sync_start_time = time.time()
828 |
829 | try:
830 | # Get memory count from both storages to compare
831 | primary_stats = await self.primary.get_stats()
832 | secondary_stats = await self.secondary.get_stats()
833 |
834 | primary_count = primary_stats.get('total_memories', 0)
835 | secondary_count = secondary_stats.get('total_memories', 0)
836 |
837 | logger.info(f"{sync_type.capitalize()} sync: Local={primary_count}, Cloudflare={secondary_count}")
838 |
839 | if secondary_count <= primary_count:
840 | logger.info(f"No new memories to sync from Cloudflare ({sync_type} sync)")
841 | return {
842 | 'success': True,
843 | 'memories_synced': 0,
844 | 'total_checked': 0,
845 | 'message': 'No new memories to pull from Cloudflare',
846 | 'time_taken_seconds': round(time.time() - sync_start_time, 3)
847 | }
848 |
849 | # Pull missing memories from Cloudflare using optimized batch processing
850 | missing_count = secondary_count - primary_count
851 | synced_count = 0
852 | batch_size = min(500, self.batch_size * 5) # 5x larger batches for sync
853 | cursor = None
854 | processed_count = 0
855 | consecutive_empty_batches = 0
856 |
857 | # Get all local hashes once for O(1) lookup
858 | local_hashes = await self.primary.get_all_content_hashes()
859 | logger.info(f"Pulling {missing_count} potential memories from Cloudflare...")
860 |
861 | while True:
862 | try:
863 | # Get batch from Cloudflare using cursor-based pagination
864 | logger.debug(f"Fetching batch: cursor={cursor}, batch_size={batch_size}")
865 |
866 | if hasattr(self.secondary, 'get_all_memories_cursor'):
867 | cloudflare_memories = await self.secondary.get_all_memories_cursor(
868 | limit=batch_size,
869 | cursor=cursor
870 | )
871 | else:
872 | cloudflare_memories = await self.secondary.get_all_memories(
873 | limit=batch_size,
874 | offset=processed_count
875 | )
876 |
877 | if not cloudflare_memories:
878 | logger.debug(f"No more memories from Cloudflare at cursor {cursor}")
879 | break
880 |
881 | logger.debug(f"Processing batch of {len(cloudflare_memories)} memories")
882 | batch_checked = 0
883 | batch_missing = 0
884 | batch_synced = 0
885 |
886 | # Parallel processing with concurrency limit
887 | semaphore = asyncio.Semaphore(15)
888 |
889 | async def sync_single_memory(cf_memory):
890 | """Process a single memory with concurrency control."""
891 | nonlocal batch_checked, batch_missing, batch_synced, synced_count, processed_count
892 |
893 | async with semaphore:
894 | batch_checked += 1
895 | processed_count += 1
896 | try:
897 | # Fast O(1) existence check
898 | if cf_memory.content_hash not in local_hashes:
899 | batch_missing += 1
900 | # Memory doesn't exist locally, sync it
901 | success, message = await self.primary.store(cf_memory)
902 | if success:
903 | batch_synced += 1
904 | synced_count += 1
905 | local_hashes.add(cf_memory.content_hash) # Update cache
906 |
907 | if sync_type == "initial":
908 | self.initial_sync_completed = synced_count
909 |
910 | if synced_count % 10 == 0:
911 | logger.info(f"{sync_type.capitalize()} sync progress: {synced_count}/{missing_count} memories synced")
912 |
913 | # Broadcast SSE progress event
914 | if broadcast_sse and SSE_AVAILABLE:
915 | try:
916 | progress_event = create_sync_progress_event(
917 | synced_count=synced_count,
918 | total_count=missing_count,
919 | sync_type=sync_type
920 | )
921 | await sse_manager.broadcast_event(progress_event)
922 | except Exception as e:
923 | logger.debug(f"Failed to broadcast SSE progress: {e}")
924 |
925 | return ('synced', cf_memory.content_hash)
926 | else:
927 | logger.warning(f"Failed to sync memory {cf_memory.content_hash}: {message}")
928 | return ('failed', cf_memory.content_hash, message)
929 | elif enable_drift_check and self.sync_service and self.sync_service.drift_check_enabled:
930 | # Memory exists - check for metadata drift
931 | existing = await self.primary.get_by_hash(cf_memory.content_hash)
932 | if existing:
933 | cf_updated = cf_memory.updated_at or 0
934 | local_updated = existing.updated_at or 0
935 |
936 | # If Cloudflare version is newer, sync metadata
937 | if cf_updated > local_updated + 1.0:
938 | logger.debug(f"Metadata drift detected: {cf_memory.content_hash[:8]}")
939 | success, _ = await self.primary.update_memory_metadata(
940 | cf_memory.content_hash,
941 | {
942 | 'tags': cf_memory.tags,
943 | 'memory_type': cf_memory.memory_type,
944 | 'metadata': cf_memory.metadata,
945 | 'created_at': cf_memory.created_at,
946 | 'created_at_iso': cf_memory.created_at_iso,
947 | 'updated_at': cf_memory.updated_at,
948 | 'updated_at_iso': cf_memory.updated_at_iso,
949 | },
950 | preserve_timestamps=False
951 | )
952 | if success:
953 | batch_synced += 1
954 | synced_count += 1
955 | logger.debug(f"Synced metadata for: {cf_memory.content_hash[:8]}")
956 | return ('drift_synced', cf_memory.content_hash)
957 | return ('skipped', cf_memory.content_hash)
958 | except Exception as e:
959 | logger.warning(f"Error syncing memory {cf_memory.content_hash}: {e}")
960 | return ('error', cf_memory.content_hash, str(e))
961 |
962 | # Process batch in parallel
963 | tasks = [sync_single_memory(mem) for mem in cloudflare_memories]
964 | results = await asyncio.gather(*tasks, return_exceptions=True)
965 | for result in results:
966 | if isinstance(result, Exception):
967 | logger.error(f"Error during {sync_type} sync batch processing: {result}")
968 |
969 | logger.debug(f"Batch complete: checked={batch_checked}, missing={batch_missing}, synced={batch_synced}")
970 |
971 | # Track consecutive empty batches
972 | if batch_synced == 0:
973 | consecutive_empty_batches += 1
974 | logger.debug(f"Empty batch: consecutive={consecutive_empty_batches}/{HYBRID_MAX_EMPTY_BATCHES}")
975 | else:
976 | consecutive_empty_batches = 0
977 |
978 | # Log progress summary
979 | if processed_count > 0 and processed_count % 100 == 0:
980 | logger.info(f"Sync progress: processed={processed_count}, synced={synced_count}/{missing_count}")
981 |
982 | # Update cursor for next batch
983 | if cloudflare_memories and hasattr(self.secondary, 'get_all_memories_cursor'):
984 | cursor = min(memory.created_at for memory in cloudflare_memories if memory.created_at)
985 | logger.debug(f"Next cursor: {cursor}")
986 |
987 | # Early break conditions
988 | if consecutive_empty_batches >= HYBRID_MAX_EMPTY_BATCHES and synced_count > 0:
989 | logger.info(f"Completed after {consecutive_empty_batches} empty batches - {synced_count}/{missing_count} synced")
990 | break
991 | elif processed_count >= HYBRID_MIN_CHECK_COUNT and synced_count == 0:
992 | logger.info(f"No missing memories after checking {processed_count} memories")
993 | break
994 |
995 | await asyncio.sleep(0.01)
996 |
997 | except Exception as e:
998 | # Handle Cloudflare D1 errors
999 | if "400" in str(e) and not hasattr(self.secondary, 'get_all_memories_cursor'):
1000 | logger.error(f"D1 OFFSET limitation at processed_count={processed_count}: {e}")
1001 | logger.warning("Cloudflare D1 OFFSET limits reached - sync incomplete")
1002 | break
1003 | else:
1004 | logger.error(f"Error during {sync_type} sync: {e}")
1005 | break
1006 |
1007 | time_taken = time.time() - sync_start_time
1008 | logger.info(f"{sync_type.capitalize()} sync completed: {synced_count} memories in {time_taken:.2f}s")
1009 |
1010 | # Broadcast SSE completion event
1011 | if broadcast_sse and SSE_AVAILABLE and missing_count > 0:
1012 | try:
1013 | completion_event = create_sync_completed_event(
1014 | synced_count=synced_count,
1015 | total_count=missing_count,
1016 | time_taken_seconds=time_taken,
1017 | sync_type=sync_type
1018 | )
1019 | await sse_manager.broadcast_event(completion_event)
1020 | except Exception as e:
1021 | logger.debug(f"Failed to broadcast SSE completion: {e}")
1022 |
1023 | return {
1024 | 'success': True,
1025 | 'memories_synced': synced_count,
1026 | 'total_checked': processed_count,
1027 | 'message': f'Successfully pulled {synced_count} memories from Cloudflare',
1028 | 'time_taken_seconds': round(time_taken, 3)
1029 | }
1030 |
1031 | except Exception as e:
1032 | logger.error(f"{sync_type.capitalize()} sync failed: {e}")
1033 | return {
1034 | 'success': False,
1035 | 'memories_synced': 0,
1036 | 'total_checked': 0,
1037 | 'message': f'Failed to pull from Cloudflare: {str(e)}',
1038 | 'time_taken_seconds': round(time.time() - sync_start_time, 3)
1039 | }
1040 |
1041 | async def _perform_initial_sync(self) -> None:
1042 | """
1043 | Perform initial sync from Cloudflare to SQLite if enabled.
1044 |
1045 | This downloads all memories from Cloudflare that are missing in local SQLite,
1046 | providing immediate access to existing cloud memories.
1047 | """
1048 | if not HYBRID_SYNC_ON_STARTUP or not self.secondary:
1049 | return
1050 |
1051 | logger.info("Starting initial sync from Cloudflare to SQLite...")
1052 |
1053 | self.initial_sync_in_progress = True
1054 | self.initial_sync_completed = 0
1055 | self.initial_sync_finished = False
1056 |
1057 | try:
1058 | # Set initial_sync_total before calling the shared helper
1059 | primary_stats = await self.primary.get_stats()
1060 | secondary_stats = await self.secondary.get_stats()
1061 | primary_count = primary_stats.get('total_memories', 0)
1062 | secondary_count = secondary_stats.get('total_memories', 0)
1063 |
1064 | if secondary_count > primary_count:
1065 | self.initial_sync_total = secondary_count - primary_count
1066 | else:
1067 | self.initial_sync_total = 0
1068 |
1069 | # Call shared helper method with drift checking enabled
1070 | result = await self._sync_memories_from_cloudflare(
1071 | sync_type="initial",
1072 | broadcast_sse=True,
1073 | enable_drift_check=True
1074 | )
1075 |
1076 | synced_count = result['memories_synced']
1077 |
1078 | # Update sync tracking to reflect actual sync completion
1079 | if synced_count == 0 and self.initial_sync_total > 0:
1080 | # All memories were already present - this is a successful "no-op" sync
1081 | self.initial_sync_completed = self.initial_sync_total
1082 | logger.info(f"Sync completed successfully: All {self.initial_sync_total} memories were already present locally")
1083 |
1084 | self.initial_sync_finished = True
1085 |
1086 | except Exception as e:
1087 | logger.error(f"Initial sync failed: {e}")
1088 | # Don't fail initialization if initial sync fails
1089 | logger.warning("Continuing with hybrid storage despite initial sync failure")
1090 | finally:
1091 | self.initial_sync_in_progress = False
1092 |
1093 | def get_initial_sync_status(self) -> Dict[str, Any]:
1094 | """Get current initial sync status for monitoring."""
1095 | return {
1096 | "in_progress": self.initial_sync_in_progress,
1097 | "total": self.initial_sync_total,
1098 | "completed": self.initial_sync_completed,
1099 | "finished": self.initial_sync_finished,
1100 | "progress_percentage": round((self.initial_sync_completed / max(self.initial_sync_total, 1)) * 100, 1) if self.initial_sync_total > 0 else 0
1101 | }
1102 |
1103 | async def store(self, memory: Memory) -> Tuple[bool, str]:
1104 | """Store a memory in primary storage and queue for secondary sync."""
1105 | # Always store in primary first for immediate availability
1106 | success, message = await self.primary.store(memory)
1107 |
1108 | if success and self.sync_service:
1109 | # Queue for background sync to secondary
1110 | operation = SyncOperation(operation='store', memory=memory)
1111 | await self.sync_service.enqueue_operation(operation)
1112 |
1113 | return success, message
1114 |
1115 | async def retrieve(self, query: str, n_results: int = 5) -> List[MemoryQueryResult]:
1116 | """Retrieve memories from primary storage (fast)."""
1117 | return await self.primary.retrieve(query, n_results)
1118 |
1119 | async def search(self, query: str, n_results: int = 5, min_similarity: float = 0.0) -> List[MemoryQueryResult]:
1120 | """Search memories in primary storage."""
1121 | return await self.primary.search(query, n_results)
1122 |
1123 | async def search_by_tag(self, tags: List[str], time_start: Optional[float] = None) -> List[Memory]:
1124 | """Search memories by tags in primary storage with optional time filtering.
1125 |
1126 | This method performs an OR search for tags. The `match_all` (AND) logic
1127 | is handled at the API layer.
1128 |
1129 | Args:
1130 | tags: List of tags to search for
1131 | time_start: Optional Unix timestamp (in seconds) to filter memories created after this time
1132 |
1133 | Returns:
1134 | List of Memory objects matching the tag criteria and time filter
1135 | """
1136 | return await self.primary.search_by_tag(tags, time_start=time_start)
1137 |
1138 | async def search_by_tags(
1139 | self,
1140 | tags: List[str],
1141 | operation: str = "AND",
1142 | time_start: Optional[float] = None,
1143 | time_end: Optional[float] = None
1144 | ) -> List[Memory]:
1145 | """Search memories by tags using consistent operation parameter across backends."""
1146 | normalized_operation = operation.strip().upper() if isinstance(operation, str) else "AND"
1147 | if normalized_operation not in {"AND", "OR"}:
1148 | logger.warning("Unsupported tag operation %s; defaulting to AND", operation)
1149 | normalized_operation = "AND"
1150 |
1151 | return await self.primary.search_by_tags(
1152 | tags,
1153 | operation=normalized_operation,
1154 | time_start=time_start,
1155 | time_end=time_end
1156 | )
1157 |
1158 | async def delete(self, content_hash: str) -> Tuple[bool, str]:
1159 | """Delete a memory from primary storage and queue for secondary sync."""
1160 | success, message = await self.primary.delete(content_hash)
1161 |
1162 | if success and self.sync_service:
1163 | # Queue for background sync to secondary
1164 | operation = SyncOperation(operation='delete', content_hash=content_hash)
1165 | await self.sync_service.enqueue_operation(operation)
1166 |
1167 | return success, message
1168 |
1169 | async def delete_by_tag(self, tag: str) -> Tuple[int, str]:
1170 | """Delete memories by tag from primary storage and queue for secondary sync."""
1171 | # First, get the memories with this tag to get their hashes for sync
1172 | memories_to_delete = await self.primary.search_by_tags([tag])
1173 |
1174 | # Delete from primary
1175 | count_deleted, message = await self.primary.delete_by_tag(tag)
1176 |
1177 | # Queue individual deletes for secondary sync
1178 | if count_deleted > 0 and self.sync_service:
1179 | for memory in memories_to_delete:
1180 | operation = SyncOperation(operation='delete', content_hash=memory.content_hash)
1181 | await self.sync_service.enqueue_operation(operation)
1182 |
1183 | return count_deleted, message
1184 |
1185 | async def delete_by_tags(self, tags: List[str]) -> Tuple[int, str]:
1186 | """
1187 | Delete memories matching ANY of the given tags from primary storage and queue for secondary sync.
1188 |
1189 | Optimized to use primary storage's delete_by_tags if available, otherwise falls back to
1190 | calling delete_by_tag for each tag.
1191 | """
1192 | if not tags:
1193 | return 0, "No tags provided"
1194 |
1195 | # First, get all memories with any of these tags for sync queue
1196 | memories_to_delete = await self.primary.search_by_tags(tags, operation="OR")
1197 |
1198 | # Remove duplicates based on content_hash
1199 | unique_memories = {m.content_hash: m for m in memories_to_delete}.values()
1200 |
1201 | # Delete from primary using optimized method if available
1202 | count_deleted, message = await self.primary.delete_by_tags(tags)
1203 |
1204 | # Queue individual deletes for secondary sync
1205 | if count_deleted > 0 and self.sync_service:
1206 | for memory in unique_memories:
1207 | operation = SyncOperation(operation='delete', content_hash=memory.content_hash)
1208 | await self.sync_service.enqueue_operation(operation)
1209 |
1210 | return count_deleted, message
1211 |
1212 | async def cleanup_duplicates(self) -> Tuple[int, str]:
1213 | """Clean up duplicates in primary storage."""
1214 | # Only cleanup primary, secondary will sync naturally
1215 | return await self.primary.cleanup_duplicates()
1216 |
1217 | async def update_memory_metadata(self, content_hash: str, updates: Dict[str, Any], preserve_timestamps: bool = True) -> Tuple[bool, str]:
1218 | """Update memory metadata in primary storage and queue for secondary sync."""
1219 | success, message = await self.primary.update_memory_metadata(content_hash, updates, preserve_timestamps)
1220 |
1221 | if success and self.sync_service:
1222 | # Queue for background sync to secondary
1223 | operation = SyncOperation(
1224 | operation='update',
1225 | content_hash=content_hash,
1226 | updates=updates
1227 | )
1228 | await self.sync_service.enqueue_operation(operation)
1229 |
1230 | return success, message
1231 |
1232 | async def update_memories_batch(self, memories: List[Memory]) -> List[bool]:
1233 | """
1234 | Update multiple memories in a batch operation with optimal performance.
1235 |
1236 | Delegates directly to primary storage's optimized batch update method,
1237 | then queues secondary sync operations in background.
1238 |
1239 | Args:
1240 | memories: List of Memory objects with updated fields
1241 |
1242 | Returns:
1243 | List of success booleans, one for each memory in the batch
1244 | """
1245 | # Use primary storage's optimized batch update (single transaction)
1246 | results = await self.primary.update_memories_batch(memories)
1247 |
1248 | # Queue successful updates for background sync to secondary
1249 | if self.sync_service:
1250 | for idx, (memory, success) in enumerate(zip(memories, results)):
1251 | if success:
1252 | operation = SyncOperation(
1253 | operation='update',
1254 | content_hash=memory.content_hash,
1255 | updates={
1256 | 'tags': memory.tags,
1257 | 'metadata': memory.metadata,
1258 | 'memory_type': memory.memory_type
1259 | }
1260 | )
1261 | # Don't await - queue asynchronously for background processing
1262 | try:
1263 | await self.sync_service.enqueue_operation(operation)
1264 | except Exception as e:
1265 | logger.warning(f"Failed to queue sync for {memory.content_hash}: {e}")
1266 |
1267 | return results
1268 |
1269 | async def get_stats(self) -> Dict[str, Any]:
1270 | """Get comprehensive statistics from both storage backends."""
1271 | # SQLite-vec get_stats is now async
1272 | primary_stats = await self.primary.get_stats()
1273 |
1274 | stats = {
1275 | "storage_backend": "Hybrid (SQLite-vec + Cloudflare)",
1276 | "primary_backend": "SQLite-vec",
1277 | "secondary_backend": "Cloudflare" if self.secondary else "None",
1278 | "total_memories": primary_stats.get("total_memories", 0),
1279 | "unique_tags": primary_stats.get("unique_tags", 0),
1280 | "memories_this_week": primary_stats.get("memories_this_week", 0),
1281 | "database_size_bytes": primary_stats.get("database_size_bytes", 0),
1282 | "database_size_mb": primary_stats.get("database_size_mb", 0),
1283 | "primary_stats": primary_stats,
1284 | "sync_enabled": self.sync_service is not None
1285 | }
1286 |
1287 | # Add sync service statistics if available
1288 | if self.sync_service:
1289 | sync_status = await self.sync_service.get_sync_status()
1290 | stats["sync_status"] = sync_status
1291 |
1292 | # Add secondary stats if available and healthy
1293 | if self.secondary and self.sync_service and self.sync_service.sync_stats['cloudflare_available']:
1294 | try:
1295 | secondary_stats = await self.secondary.get_stats()
1296 | stats["secondary_stats"] = secondary_stats
1297 | except Exception as e:
1298 | stats["secondary_error"] = str(e)
1299 |
1300 | return stats
1301 |
1302 | async def get_all_tags_with_counts(self) -> List[Dict[str, Any]]:
1303 | """Get all tags with their usage counts from primary storage."""
1304 | return await self.primary.get_all_tags_with_counts()
1305 |
1306 | async def get_all_tags(self) -> List[str]:
1307 | """Get all unique tags from primary storage."""
1308 | return await self.primary.get_all_tags()
1309 |
1310 | async def get_recent_memories(self, n: int = 10) -> List[Memory]:
1311 | """Get recent memories from primary storage."""
1312 | return await self.primary.get_recent_memories(n)
1313 |
1314 | async def get_largest_memories(self, n: int = 10) -> List[Memory]:
1315 | """Get largest memories by content length from primary storage."""
1316 | return await self.primary.get_largest_memories(n)
1317 |
1318 | async def get_memory_timestamps(self, days: Optional[int] = None) -> List[float]:
1319 | """
1320 | Get memory creation timestamps only, without loading full memory objects.
1321 |
1322 | Delegates to primary storage (SQLite-vec) for optimal performance.
1323 |
1324 | Args:
1325 | days: Optional filter to only get memories from last N days
1326 |
1327 | Returns:
1328 | List of Unix timestamps (float) in descending order (newest first)
1329 | """
1330 | return await self.primary.get_memory_timestamps(days)
1331 |
1332 | async def recall(self, query: Optional[str] = None, n_results: int = 5, start_timestamp: Optional[float] = None, end_timestamp: Optional[float] = None) -> List[MemoryQueryResult]:
1333 | """
1334 | Retrieve memories with combined time filtering and optional semantic search.
1335 |
1336 | Args:
1337 | query: Optional semantic search query. If None, only time filtering is applied.
1338 | n_results: Maximum number of results to return.
1339 | start_timestamp: Optional start time for filtering.
1340 | end_timestamp: Optional end time for filtering.
1341 |
1342 | Returns:
1343 | List of MemoryQueryResult objects.
1344 | """
1345 | return await self.primary.recall(query=query, n_results=n_results, start_timestamp=start_timestamp, end_timestamp=end_timestamp)
1346 |
1347 | async def recall_memory(self, query: str, n_results: int = 5) -> List[Memory]:
1348 | """Recall memories using natural language time expressions."""
1349 | return await self.primary.recall_memory(query, n_results)
1350 |
1351 | async def get_all_memories(self, limit: int = None, offset: int = 0, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> List[Memory]:
1352 | """Get all memories from primary storage."""
1353 | return await self.primary.get_all_memories(limit=limit, offset=offset, memory_type=memory_type, tags=tags)
1354 |
1355 | async def get_by_hash(self, content_hash: str) -> Optional[Memory]:
1356 | """Get a memory by its content hash from primary storage."""
1357 | return await self.primary.get_by_hash(content_hash)
1358 |
1359 | async def count_all_memories(self, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> int:
1360 | """Get total count of memories from primary storage."""
1361 | return await self.primary.count_all_memories(memory_type=memory_type, tags=tags)
1362 |
1363 | async def get_memories_by_time_range(self, start_time: float, end_time: float) -> List[Memory]:
1364 | """Get memories within time range from primary storage."""
1365 | return await self.primary.get_memories_by_time_range(start_time, end_time)
1366 |
1367 | async def close(self):
1368 | """Clean shutdown of hybrid storage system."""
1369 | logger.info("Shutting down hybrid memory storage...")
1370 |
1371 | # Stop sync service first
1372 | if self.sync_service:
1373 | await self.sync_service.stop()
1374 |
1375 | # Close storage backends
1376 | if hasattr(self.primary, 'close') and self.primary.close:
1377 | if asyncio.iscoroutinefunction(self.primary.close):
1378 | await self.primary.close()
1379 | else:
1380 | self.primary.close()
1381 |
1382 | if self.secondary and hasattr(self.secondary, 'close') and self.secondary.close:
1383 | if asyncio.iscoroutinefunction(self.secondary.close):
1384 | await self.secondary.close()
1385 | else:
1386 | self.secondary.close()
1387 |
1388 | logger.info("Hybrid memory storage shutdown completed")
1389 |
1390 | async def force_sync(self) -> Dict[str, Any]:
1391 | """Force immediate synchronization with secondary storage."""
1392 | if not self.sync_service:
1393 | return {
1394 | 'status': 'disabled',
1395 | 'message': 'Background sync service not available'
1396 | }
1397 |
1398 | return await self.sync_service.force_sync()
1399 |
1400 | async def force_pull_sync(self) -> Dict[str, Any]:
1401 | """
1402 | Force immediate pull synchronization FROM Cloudflare TO local SQLite.
1403 |
1404 | This triggers the same logic as initial_sync but can be called on demand.
1405 | Useful for manually refreshing local storage with memories stored via MCP.
1406 |
1407 | Returns:
1408 | Dict with sync results including:
1409 | - success: bool
1410 | - message: str
1411 | - memories_pulled: int
1412 | - time_taken_seconds: float
1413 | """
1414 | # Call shared helper method without drift checking (manual sync doesn't need it)
1415 | result = await self._sync_memories_from_cloudflare(
1416 | sync_type="manual",
1417 | broadcast_sse=True,
1418 | enable_drift_check=False
1419 | )
1420 |
1421 | # Map result to expected return format
1422 | return {
1423 | 'success': result['success'],
1424 | 'message': result['message'],
1425 | 'memories_pulled': result['memories_synced'],
1426 | 'time_taken_seconds': result['time_taken_seconds']
1427 | }
1428 |
1429 | async def get_sync_status(self) -> Dict[str, Any]:
1430 | """Get current background sync status and statistics."""
1431 | if not self.sync_service:
1432 | return {
1433 | 'is_running': False,
1434 | 'pending_operations': 0,
1435 | 'operations_processed': 0,
1436 | 'operations_failed': 0,
1437 | 'last_sync_time': 0,
1438 | 'sync_interval': 0
1439 | }
1440 |
1441 | return await self.sync_service.get_sync_status()
1442 |
1443 | async def pause_sync(self) -> Dict[str, Any]:
1444 | """Pause background sync operations for safe database operations."""
1445 | if not self.sync_service:
1446 | return {
1447 | 'success': False,
1448 | 'message': 'Sync service not available'
1449 | }
1450 |
1451 | try:
1452 | if not self.sync_service.is_running:
1453 | return {
1454 | 'success': True,
1455 | 'message': 'Sync already paused'
1456 | }
1457 |
1458 | # Stop the sync service
1459 | await self.sync_service.stop()
1460 | logger.info("Background sync paused")
1461 |
1462 | return {
1463 | 'success': True,
1464 | 'message': 'Sync paused successfully'
1465 | }
1466 |
1467 | except Exception as e:
1468 | logger.error(f"Failed to pause sync: {e}")
1469 | return {
1470 | 'success': False,
1471 | 'message': f'Failed to pause sync: {str(e)}'
1472 | }
1473 |
1474 | async def resume_sync(self) -> Dict[str, Any]:
1475 | """Resume background sync operations after pause."""
1476 | if not self.sync_service:
1477 | return {
1478 | 'success': False,
1479 | 'message': 'Sync service not available'
1480 | }
1481 |
1482 | try:
1483 | if self.sync_service.is_running:
1484 | return {
1485 | 'success': True,
1486 | 'message': 'Sync already running'
1487 | }
1488 |
1489 | # Start the sync service
1490 | await self.sync_service.start()
1491 | logger.info("Background sync resumed")
1492 |
1493 | return {
1494 | 'success': True,
1495 | 'message': 'Sync resumed successfully'
1496 | }
1497 |
1498 | except Exception as e:
1499 | logger.error(f"Failed to resume sync: {e}")
1500 | return {
1501 | 'success': False,
1502 | 'message': f'Failed to resume sync: {str(e)}'
1503 | }
1504 |
1505 | def sanitized(self, tags):
1506 | """Sanitize and normalize tags to a JSON string.
1507 |
1508 | This method provides compatibility with the storage interface.
1509 | Delegates to primary storage for consistent tag handling.
1510 | """
1511 | return self.primary.sanitized(tags)
1512 |
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/storage/cloudflare.py:
--------------------------------------------------------------------------------
```python
1 | # Copyright 2024 Heinrich Krupp
2 | #
3 | # Licensed under the Apache License, Version 2.0 (the "License");
4 | # you may not use this file except in compliance with the License.
5 | # You may obtain a copy of the License at
6 | #
7 | # http://www.apache.org/licenses/LICENSE-2.0
8 | #
9 | # Unless required by applicable law or agreed to in writing, software
10 | # distributed under the License is distributed on an "AS IS" BASIS,
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 | # See the License for the specific language governing permissions and
13 | # limitations under the License.
14 |
15 | """
16 | Cloudflare storage backend for MCP Memory Service.
17 | Provides cloud-native storage using Vectorize, D1, and R2.
18 | """
19 |
20 | import json
21 | import logging
22 | import hashlib
23 | import asyncio
24 | import time
25 | from typing import List, Dict, Any, Tuple, Optional
26 | from datetime import datetime, timezone, timedelta
27 | import httpx
28 |
29 | from .base import MemoryStorage
30 | from ..models.memory import Memory, MemoryQueryResult
31 | from ..utils.hashing import generate_content_hash
32 | from ..config import CLOUDFLARE_MAX_CONTENT_LENGTH
33 |
34 | logger = logging.getLogger(__name__)
35 |
36 |
37 | def normalize_tags_for_search(tags: List[str]) -> List[str]:
38 | """Deduplicate and filter empty tag strings.
39 |
40 | Args:
41 | tags: List of tag strings (may contain duplicates or empty strings)
42 |
43 | Returns:
44 | Deduplicated list of non-empty tags
45 | """
46 | return list(dict.fromkeys([tag for tag in tags if tag]))
47 |
48 |
49 | def normalize_operation(operation: Optional[str]) -> str:
50 | """Normalize tag search operation to AND or OR.
51 |
52 | Args:
53 | operation: Raw operation string (case-insensitive)
54 |
55 | Returns:
56 | Normalized operation: "AND" or "OR"
57 | """
58 | if isinstance(operation, str):
59 | normalized = operation.strip().upper() or "AND"
60 | else:
61 | normalized = "AND"
62 |
63 | if normalized not in {"AND", "OR"}:
64 | logger.warning(f"Unsupported operation '{operation}'; defaulting to AND")
65 | normalized = "AND"
66 |
67 | return normalized
68 |
69 |
70 | def build_tag_search_query(tags: List[str], operation: str,
71 | time_start: Optional[float] = None,
72 | time_end: Optional[float] = None) -> Tuple[str, List[Any]]:
73 | """Build SQL query for tag-based search with time filtering.
74 |
75 | Args:
76 | tags: List of deduplicated tags
77 | operation: Search operation ("AND" or "OR")
78 | time_start: Optional start timestamp filter
79 | time_end: Optional end timestamp filter
80 |
81 | Returns:
82 | Tuple of (sql_query, parameters_list)
83 | """
84 | placeholders = ",".join(["?"] * len(tags))
85 | params: List[Any] = list(tags)
86 |
87 | sql = (
88 | "SELECT m.* FROM memories m "
89 | "JOIN memory_tags mt ON m.id = mt.memory_id "
90 | "JOIN tags t ON mt.tag_id = t.id "
91 | f"WHERE t.name IN ({placeholders})"
92 | )
93 |
94 | if time_start is not None:
95 | sql += " AND m.created_at >= ?"
96 | params.append(time_start)
97 |
98 | if time_end is not None:
99 | sql += " AND m.created_at <= ?"
100 | params.append(time_end)
101 |
102 | sql += " GROUP BY m.id"
103 |
104 | if operation == "AND":
105 | sql += " HAVING COUNT(DISTINCT t.name) = ?"
106 | params.append(len(tags))
107 |
108 | sql += " ORDER BY m.created_at DESC"
109 |
110 | return sql, params
111 |
112 |
113 | class CloudflareStorage(MemoryStorage):
114 | """Cloudflare-based storage backend using Vectorize, D1, and R2."""
115 |
116 | # Content length limit from configuration
117 | _MAX_CONTENT_LENGTH = CLOUDFLARE_MAX_CONTENT_LENGTH
118 |
119 | @property
120 | def max_content_length(self) -> Optional[int]:
121 | """Maximum content length: 800 chars (BGE model 512 token limit)."""
122 | return self._MAX_CONTENT_LENGTH
123 |
124 | @property
125 | def supports_chunking(self) -> bool:
126 | """Cloudflare backend supports content chunking with metadata linking."""
127 | return True
128 |
129 | def __init__(self,
130 | api_token: str,
131 | account_id: str,
132 | vectorize_index: str,
133 | d1_database_id: str,
134 | r2_bucket: Optional[str] = None,
135 | embedding_model: str = "@cf/baai/bge-base-en-v1.5",
136 | large_content_threshold: int = 1024 * 1024, # 1MB
137 | max_retries: int = 3,
138 | base_delay: float = 1.0):
139 | """
140 | Initialize Cloudflare storage backend.
141 |
142 | Args:
143 | api_token: Cloudflare API token
144 | account_id: Cloudflare account ID
145 | vectorize_index: Vectorize index name
146 | d1_database_id: D1 database ID
147 | r2_bucket: Optional R2 bucket for large content
148 | embedding_model: Workers AI embedding model
149 | large_content_threshold: Size threshold for R2 storage
150 | max_retries: Maximum retry attempts for API calls
151 | base_delay: Base delay for exponential backoff
152 | """
153 | self.api_token = api_token
154 | self.account_id = account_id
155 | self.vectorize_index = vectorize_index
156 | self.d1_database_id = d1_database_id
157 | self.r2_bucket = r2_bucket
158 | self.embedding_model = embedding_model
159 | self.large_content_threshold = large_content_threshold
160 | self.max_retries = max_retries
161 | self.base_delay = base_delay
162 |
163 | # API endpoints
164 | self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}"
165 | self.vectorize_url = f"{self.base_url}/vectorize/v2/indexes/{vectorize_index}"
166 | self.d1_url = f"{self.base_url}/d1/database/{d1_database_id}"
167 | self.ai_url = f"{self.base_url}/ai/run/{embedding_model}"
168 |
169 | if r2_bucket:
170 | self.r2_url = f"{self.base_url}/r2/buckets/{r2_bucket}/objects"
171 |
172 | # HTTP client with connection pooling
173 | self.client = None
174 | self._initialized = False
175 |
176 | # Embedding cache for performance
177 | self._embedding_cache = {}
178 | self._cache_max_size = 1000
179 |
180 | async def _get_client(self) -> httpx.AsyncClient:
181 | """Get or create HTTP client with connection pooling."""
182 | if self.client is None:
183 | headers = {
184 | "Authorization": f"Bearer {self.api_token}",
185 | "Content-Type": "application/json"
186 | }
187 | self.client = httpx.AsyncClient(
188 | headers=headers,
189 | timeout=httpx.Timeout(30.0),
190 | limits=httpx.Limits(max_connections=10, max_keepalive_connections=5)
191 | )
192 | return self.client
193 |
194 | async def _retry_request(self, method: str, url: str, **kwargs) -> httpx.Response:
195 | """Make HTTP request with exponential backoff retry logic."""
196 | client = await self._get_client()
197 |
198 | for attempt in range(self.max_retries + 1):
199 | try:
200 | response = await client.request(method, url, **kwargs)
201 |
202 | # Handle rate limiting
203 | if response.status_code == 429:
204 | if attempt < self.max_retries:
205 | delay = self.base_delay * (2 ** attempt)
206 | logger.warning(f"Rate limited, retrying in {delay}s (attempt {attempt + 1}/{self.max_retries + 1})")
207 | await asyncio.sleep(delay)
208 | continue
209 | else:
210 | raise httpx.HTTPError(f"Rate limited after {self.max_retries} retries")
211 |
212 | # Handle server errors
213 | if response.status_code >= 500:
214 | if attempt < self.max_retries:
215 | delay = self.base_delay * (2 ** attempt)
216 | logger.warning(f"Server error {response.status_code}, retrying in {delay}s")
217 | await asyncio.sleep(delay)
218 | continue
219 |
220 | response.raise_for_status()
221 | return response
222 |
223 | except (httpx.NetworkError, httpx.TimeoutException) as e:
224 | if attempt < self.max_retries:
225 | delay = self.base_delay * (2 ** attempt)
226 | logger.warning(f"Network error: {e}, retrying in {delay}s")
227 | await asyncio.sleep(delay)
228 | continue
229 | raise
230 |
231 | raise httpx.HTTPError(f"Failed after {self.max_retries} retries")
232 |
233 | async def _generate_embedding(self, text: str) -> List[float]:
234 | """Generate embedding using Workers AI or cache."""
235 | # Check cache first
236 | text_hash = hashlib.sha256(text.encode()).hexdigest()
237 | if text_hash in self._embedding_cache:
238 | return self._embedding_cache[text_hash]
239 |
240 | try:
241 | # Use Workers AI to generate embedding
242 | payload = {"text": [text]}
243 | response = await self._retry_request("POST", self.ai_url, json=payload)
244 | result = response.json()
245 |
246 | if result.get("success") and "result" in result:
247 | embedding = result["result"]["data"][0]
248 |
249 | # Cache the embedding (with size limit)
250 | if len(self._embedding_cache) >= self._cache_max_size:
251 | # Remove oldest entry (simple FIFO)
252 | oldest_key = next(iter(self._embedding_cache))
253 | del self._embedding_cache[oldest_key]
254 |
255 | self._embedding_cache[text_hash] = embedding
256 | return embedding
257 | else:
258 | raise ValueError(f"Workers AI embedding failed: {result}")
259 |
260 | except Exception as e:
261 | logger.error(f"Failed to generate embedding with Workers AI: {e}")
262 | # TODO: Implement fallback to local sentence-transformers
263 | raise ValueError(f"Embedding generation failed: {e}")
264 |
265 | async def initialize(self) -> None:
266 | """Initialize the Cloudflare storage backend."""
267 | if self._initialized:
268 | return
269 |
270 | logger.info("Initializing Cloudflare storage backend...")
271 |
272 | try:
273 | # Initialize D1 database schema
274 | await self._initialize_d1_schema()
275 |
276 | # Verify Vectorize index exists
277 | await self._verify_vectorize_index()
278 |
279 | # Verify R2 bucket if configured
280 | if self.r2_bucket:
281 | await self._verify_r2_bucket()
282 |
283 | self._initialized = True
284 | logger.info("Cloudflare storage backend initialized successfully")
285 |
286 | except Exception as e:
287 | logger.error(f"Failed to initialize Cloudflare storage: {e}")
288 | raise
289 |
290 | async def _initialize_d1_schema(self) -> None:
291 | """Initialize D1 database schema."""
292 | schema_sql = """
293 | -- Memory metadata table
294 | CREATE TABLE IF NOT EXISTS memories (
295 | id INTEGER PRIMARY KEY AUTOINCREMENT,
296 | content_hash TEXT UNIQUE NOT NULL,
297 | content TEXT NOT NULL,
298 | memory_type TEXT,
299 | created_at REAL NOT NULL,
300 | created_at_iso TEXT NOT NULL,
301 | updated_at REAL,
302 | updated_at_iso TEXT,
303 | metadata_json TEXT,
304 | vector_id TEXT UNIQUE,
305 | content_size INTEGER DEFAULT 0,
306 | r2_key TEXT
307 | );
308 |
309 | -- Tags table
310 | CREATE TABLE IF NOT EXISTS tags (
311 | id INTEGER PRIMARY KEY AUTOINCREMENT,
312 | name TEXT UNIQUE NOT NULL
313 | );
314 |
315 | -- Memory-tag relationships
316 | CREATE TABLE IF NOT EXISTS memory_tags (
317 | memory_id INTEGER,
318 | tag_id INTEGER,
319 | PRIMARY KEY (memory_id, tag_id),
320 | FOREIGN KEY (memory_id) REFERENCES memories(id) ON DELETE CASCADE,
321 | FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
322 | );
323 |
324 | -- Indexes for performance
325 | CREATE INDEX IF NOT EXISTS idx_memories_content_hash ON memories(content_hash);
326 | CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at);
327 | CREATE INDEX IF NOT EXISTS idx_memories_vector_id ON memories(vector_id);
328 | CREATE INDEX IF NOT EXISTS idx_tags_name ON tags(name);
329 | """
330 |
331 | payload = {"sql": schema_sql}
332 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
333 | result = response.json()
334 |
335 | if not result.get("success"):
336 | raise ValueError(f"Failed to initialize D1 schema: {result}")
337 |
338 | async def _verify_vectorize_index(self) -> None:
339 | """Verify Vectorize index exists and is accessible."""
340 | try:
341 | response = await self._retry_request("GET", f"{self.vectorize_url}")
342 | result = response.json()
343 |
344 | if not result.get("success"):
345 | raise ValueError(f"Vectorize index not accessible: {result}")
346 |
347 | logger.info(f"Vectorize index verified: {self.vectorize_index}")
348 |
349 | except httpx.HTTPStatusError as e:
350 | if e.response.status_code == 404:
351 | raise ValueError(f"Vectorize index '{self.vectorize_index}' not found")
352 | raise
353 |
354 | async def _verify_r2_bucket(self) -> None:
355 | """Verify R2 bucket exists and is accessible."""
356 | try:
357 | # Try to list objects (empty list is fine)
358 | response = await self._retry_request("GET", f"{self.r2_url}?max-keys=1")
359 | logger.info(f"R2 bucket verified: {self.r2_bucket}")
360 |
361 | except httpx.HTTPStatusError as e:
362 | if e.response.status_code == 404:
363 | raise ValueError(f"R2 bucket '{self.r2_bucket}' not found")
364 | raise
365 |
366 | async def store(self, memory: Memory) -> Tuple[bool, str]:
367 | """Store a memory in Cloudflare storage."""
368 | try:
369 | # Generate embedding for the content
370 | embedding = await self._generate_embedding(memory.content)
371 |
372 | # Determine storage strategy based on content size
373 | content_size = len(memory.content.encode('utf-8'))
374 | use_r2 = self.r2_bucket and content_size > self.large_content_threshold
375 |
376 | # Store large content in R2 if needed
377 | r2_key = None
378 | stored_content = memory.content
379 |
380 | if use_r2:
381 | r2_key = f"content/{memory.content_hash}.txt"
382 | await self._store_r2_content(r2_key, memory.content)
383 | stored_content = f"[R2 Content: {r2_key}]" # Placeholder in D1
384 |
385 | # Store vector in Vectorize
386 | vector_id = memory.content_hash
387 | vector_metadata = {
388 | "content_hash": memory.content_hash,
389 | "memory_type": memory.memory_type or "standard",
390 | "tags": ",".join(memory.tags) if memory.tags else "",
391 | "created_at": memory.created_at_iso or datetime.now().isoformat()
392 | }
393 |
394 | await self._store_vectorize_vector(vector_id, embedding, vector_metadata)
395 |
396 | # Store metadata in D1
397 | await self._store_d1_memory(memory, vector_id, content_size, r2_key, stored_content)
398 |
399 | logger.info(f"Successfully stored memory: {memory.content_hash}")
400 | return True, f"Memory stored successfully (vector_id: {vector_id})"
401 |
402 | except Exception as e:
403 | logger.error(f"Failed to store memory {memory.content_hash}: {e}")
404 | return False, f"Storage failed: {str(e)}"
405 |
406 | async def _store_vectorize_vector(self, vector_id: str, embedding: List[float], metadata: Dict[str, Any]) -> None:
407 | """Store vector in Vectorize."""
408 | # Try without namespace first to isolate the issue
409 | vector_data = {
410 | "id": vector_id,
411 | "values": embedding,
412 | "metadata": metadata
413 | }
414 |
415 | # Convert to NDJSON format as required by the HTTP API
416 | import json
417 | ndjson_content = json.dumps(vector_data) + "\n"
418 |
419 | try:
420 | # Send as raw NDJSON data with correct Content-Type header
421 | client = await self._get_client()
422 |
423 | # Override headers for this specific request
424 | headers = {
425 | "Authorization": f"Bearer {self.api_token}",
426 | "Content-Type": "application/x-ndjson"
427 | }
428 |
429 | response = await client.post(
430 | f"{self.vectorize_url}/upsert",
431 | content=ndjson_content.encode("utf-8"),
432 | headers=headers
433 | )
434 |
435 | # Log response status for debugging (avoid logging headers/body for security)
436 | logger.info(f"Vectorize response status: {response.status_code}")
437 | response_text = response.text
438 | if response.status_code != 200:
439 | # Only log response body on errors, and truncate to avoid credential exposure
440 | truncated_response = response_text[:200] + "..." if len(response_text) > 200 else response_text
441 | logger.warning(f"Vectorize error response (truncated): {truncated_response}")
442 |
443 | if response.status_code != 200:
444 | raise ValueError(f"HTTP {response.status_code}: {response_text}")
445 |
446 | result = response.json()
447 | if not result.get("success"):
448 | raise ValueError(f"Failed to store vector: {result}")
449 |
450 | except Exception as e:
451 | # Add more detailed error logging
452 | logger.error(f"Vectorize insert failed: {e}")
453 | logger.error(f"Vector data was: {vector_data}")
454 | logger.error(f"NDJSON content: {ndjson_content.strip()}")
455 | logger.error(f"URL was: {self.vectorize_url}/upsert")
456 | raise ValueError(f"Failed to store vector: {e}")
457 |
458 | async def _store_d1_memory(self, memory: Memory, vector_id: str, content_size: int, r2_key: Optional[str], stored_content: str) -> None:
459 | """Store memory metadata in D1."""
460 | # Insert memory record
461 | insert_sql = """
462 | INSERT INTO memories (
463 | content_hash, content, memory_type, created_at, created_at_iso,
464 | updated_at, updated_at_iso, metadata_json, vector_id, content_size, r2_key
465 | ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
466 | """
467 |
468 | now = time.time()
469 | now_iso = datetime.now().isoformat()
470 |
471 | params = [
472 | memory.content_hash,
473 | stored_content,
474 | memory.memory_type,
475 | memory.created_at or now,
476 | memory.created_at_iso or now_iso,
477 | memory.updated_at or now,
478 | memory.updated_at_iso or now_iso,
479 | json.dumps(memory.metadata) if memory.metadata else None,
480 | vector_id,
481 | content_size,
482 | r2_key
483 | ]
484 |
485 | payload = {"sql": insert_sql, "params": params}
486 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
487 | result = response.json()
488 |
489 | if not result.get("success"):
490 | raise ValueError(f"Failed to store memory in D1: {result}")
491 |
492 | # Store tags if present
493 | if memory.tags:
494 | memory_id = result["result"][0]["meta"]["last_row_id"]
495 | await self._store_d1_tags(memory_id, memory.tags)
496 |
497 | async def _store_d1_tags(self, memory_id: int, tags: List[str]) -> None:
498 | """Store tags for a memory in D1."""
499 | for tag in tags:
500 | # Insert tag if not exists
501 | tag_sql = "INSERT OR IGNORE INTO tags (name) VALUES (?)"
502 | payload = {"sql": tag_sql, "params": [tag]}
503 | await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
504 |
505 | # Link tag to memory
506 | link_sql = """
507 | INSERT INTO memory_tags (memory_id, tag_id)
508 | SELECT ?, id FROM tags WHERE name = ?
509 | """
510 | payload = {"sql": link_sql, "params": [memory_id, tag]}
511 | await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
512 |
513 | async def _store_r2_content(self, key: str, content: str) -> None:
514 | """Store content in R2."""
515 | response = await self._retry_request(
516 | "PUT",
517 | f"{self.r2_url}/{key}",
518 | content=content.encode('utf-8'),
519 | headers={"Content-Type": "text/plain"}
520 | )
521 |
522 | if response.status_code not in [200, 201]:
523 | raise ValueError(f"Failed to store content in R2: {response.status_code}")
524 |
525 | async def retrieve(self, query: str, n_results: int = 5) -> List[MemoryQueryResult]:
526 | """Retrieve memories by semantic search."""
527 | try:
528 | # Generate query embedding
529 | query_embedding = await self._generate_embedding(query)
530 |
531 | # Search Vectorize (without namespace for now)
532 | search_payload = {
533 | "vector": query_embedding,
534 | "topK": n_results,
535 | "returnMetadata": "all",
536 | "returnValues": False
537 | }
538 |
539 | response = await self._retry_request("POST", f"{self.vectorize_url}/query", json=search_payload)
540 | result = response.json()
541 |
542 | if not result.get("success"):
543 | raise ValueError(f"Vectorize query failed: {result}")
544 |
545 | matches = result.get("result", {}).get("matches", [])
546 |
547 | # Convert to MemoryQueryResult objects
548 | results = []
549 | for match in matches:
550 | memory = await self._load_memory_from_match(match)
551 | if memory:
552 | query_result = MemoryQueryResult(
553 | memory=memory,
554 | relevance_score=match.get("score", 0.0)
555 | )
556 | results.append(query_result)
557 |
558 | logger.info(f"Retrieved {len(results)} memories for query")
559 | return results
560 |
561 | except Exception as e:
562 | logger.error(f"Failed to retrieve memories: {e}")
563 | return []
564 |
565 | async def _load_memory_from_match(self, match: Dict[str, Any]) -> Optional[Memory]:
566 | """Load full memory from Vectorize match."""
567 | try:
568 | vector_id = match.get("id")
569 | metadata = match.get("metadata", {})
570 | content_hash = metadata.get("content_hash")
571 |
572 | if not content_hash:
573 | logger.warning(f"No content_hash in vector metadata: {vector_id}")
574 | return None
575 |
576 | # Load from D1
577 | sql = "SELECT * FROM memories WHERE content_hash = ?"
578 | payload = {"sql": sql, "params": [content_hash]}
579 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
580 | result = response.json()
581 |
582 | if not result.get("success") or not result.get("result", [{}])[0].get("results"):
583 | logger.warning(f"Memory not found in D1: {content_hash}")
584 | return None
585 |
586 | row = result["result"][0]["results"][0]
587 |
588 | # Load content from R2 if needed
589 | content = row["content"]
590 | if row.get("r2_key") and content.startswith("[R2 Content:"):
591 | content = await self._load_r2_content(row["r2_key"])
592 |
593 | # Load tags
594 | tags = await self._load_memory_tags(row["id"])
595 |
596 | # Reconstruct Memory object
597 | memory = Memory(
598 | content=content,
599 | content_hash=content_hash,
600 | tags=tags,
601 | memory_type=row.get("memory_type"),
602 | metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
603 | created_at=row.get("created_at"),
604 | created_at_iso=row.get("created_at_iso"),
605 | updated_at=row.get("updated_at"),
606 | updated_at_iso=row.get("updated_at_iso")
607 | )
608 |
609 | return memory
610 |
611 | except Exception as e:
612 | logger.error(f"Failed to load memory from match: {e}")
613 | return None
614 |
615 | async def _load_r2_content(self, r2_key: str) -> str:
616 | """Load content from R2."""
617 | response = await self._retry_request("GET", f"{self.r2_url}/{r2_key}")
618 | return response.text
619 |
620 | async def _load_memory_tags(self, memory_id: int) -> List[str]:
621 | """Load tags for a memory from D1."""
622 | sql = """
623 | SELECT t.name FROM tags t
624 | JOIN memory_tags mt ON t.id = mt.tag_id
625 | WHERE mt.memory_id = ?
626 | """
627 | payload = {"sql": sql, "params": [memory_id]}
628 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
629 | result = response.json()
630 |
631 | if result.get("success") and result.get("result", [{}])[0].get("results"):
632 | return [row["name"] for row in result["result"][0]["results"]]
633 |
634 | return []
635 |
636 | async def search_by_tags(
637 | self,
638 | tags: List[str],
639 | operation: str = "AND",
640 | time_start: Optional[float] = None,
641 | time_end: Optional[float] = None
642 | ) -> List[Memory]:
643 | """Search memories by tags with AND/OR semantics and optional time filtering."""
644 | return await self._search_by_tags_internal(
645 | tags=tags,
646 | operation=operation,
647 | time_start=time_start,
648 | time_end=time_end
649 | )
650 |
651 | async def search_by_tag(self, tags: List[str], time_start: Optional[float] = None) -> List[Memory]:
652 | """Search memories by tags with optional time filtering (legacy OR behavior)."""
653 | return await self._search_by_tags_internal(
654 | tags=tags,
655 | operation="OR",
656 | time_start=time_start,
657 | time_end=None
658 | )
659 |
660 | async def _search_by_tags_internal(
661 | self,
662 | tags: List[str],
663 | operation: Optional[str] = None,
664 | time_start: Optional[float] = None,
665 | time_end: Optional[float] = None
666 | ) -> List[Memory]:
667 | """Shared implementation for tag-based queries with optional time filtering."""
668 | try:
669 | if not tags:
670 | return []
671 |
672 | deduped_tags = normalize_tags_for_search(tags)
673 | if not deduped_tags:
674 | return []
675 |
676 | normalized_operation = normalize_operation(operation)
677 | sql, params = build_tag_search_query(deduped_tags, normalized_operation,
678 | time_start, time_end)
679 |
680 | payload = {"sql": sql, "params": params}
681 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
682 | result = response.json()
683 |
684 | if not result.get("success"):
685 | raise ValueError(f"D1 tag search failed: {result}")
686 |
687 | rows = result.get("result", [{}])[0].get("results") or []
688 | memories: List[Memory] = []
689 |
690 | for row in rows:
691 | memory = await self._load_memory_from_row(row)
692 | if memory:
693 | memories.append(memory)
694 |
695 | logger.info(
696 | "Found %d memories with tags: %s (operation: %s)",
697 | len(memories),
698 | deduped_tags,
699 | normalized_operation
700 | )
701 | return memories
702 |
703 | except Exception as e:
704 | logger.error(
705 | "Failed to search memories by tags %s with operation %s: %s",
706 | tags,
707 | operation,
708 | e
709 | )
710 | return []
711 |
712 | async def _load_memory_from_row(self, row: Dict[str, Any]) -> Optional[Memory]:
713 | """Load memory from D1 row data."""
714 | try:
715 | # Load content from R2 if needed
716 | content = row["content"]
717 | if row.get("r2_key") and content.startswith("[R2 Content:"):
718 | content = await self._load_r2_content(row["r2_key"])
719 |
720 | # Load tags
721 | tags = await self._load_memory_tags(row["id"])
722 |
723 | memory = Memory(
724 | content=content,
725 | content_hash=row["content_hash"],
726 | tags=tags,
727 | memory_type=row.get("memory_type"),
728 | metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
729 | created_at=row.get("created_at"),
730 | created_at_iso=row.get("created_at_iso"),
731 | updated_at=row.get("updated_at"),
732 | updated_at_iso=row.get("updated_at_iso")
733 | )
734 |
735 | return memory
736 |
737 | except Exception as e:
738 | logger.error(f"Failed to load memory from row: {e}")
739 | return None
740 |
741 | async def delete(self, content_hash: str) -> Tuple[bool, str]:
742 | """Delete a memory by its hash."""
743 | try:
744 | # Find memory in D1
745 | sql = "SELECT * FROM memories WHERE content_hash = ?"
746 | payload = {"sql": sql, "params": [content_hash]}
747 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
748 | result = response.json()
749 |
750 | if not result.get("success") or not result.get("result", [{}])[0].get("results"):
751 | return False, f"Memory not found: {content_hash}"
752 |
753 | row = result["result"][0]["results"][0]
754 | memory_id = row["id"]
755 | vector_id = row.get("vector_id")
756 | r2_key = row.get("r2_key")
757 |
758 | # Delete from Vectorize
759 | if vector_id:
760 | await self._delete_vectorize_vector(vector_id)
761 |
762 | # Delete from R2 if present
763 | if r2_key:
764 | await self._delete_r2_content(r2_key)
765 |
766 | # Delete from D1 (tags will be cascade deleted)
767 | delete_sql = "DELETE FROM memories WHERE id = ?"
768 | payload = {"sql": delete_sql, "params": [memory_id]}
769 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
770 | result = response.json()
771 |
772 | if not result.get("success"):
773 | raise ValueError(f"Failed to delete from D1: {result}")
774 |
775 | logger.info(f"Successfully deleted memory: {content_hash}")
776 | return True, "Memory deleted successfully"
777 |
778 | except Exception as e:
779 | logger.error(f"Failed to delete memory {content_hash}: {e}")
780 | return False, f"Deletion failed: {str(e)}"
781 |
782 | async def get_by_hash(self, content_hash: str) -> Optional[Memory]:
783 | """Get a memory by its content hash using direct O(1) D1 lookup."""
784 | try:
785 | # Query D1 for the memory
786 | sql = "SELECT * FROM memories WHERE content_hash = ?"
787 | payload = {"sql": sql, "params": [content_hash]}
788 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
789 | result = response.json()
790 |
791 | if not result.get("success") or not result.get("result", [{}])[0].get("results"):
792 | return None
793 |
794 | row = result["result"][0]["results"][0]
795 |
796 | # Load content from R2 if needed
797 | content = row["content"]
798 | if row.get("r2_key") and content.startswith("[R2 Content:"):
799 | content = await self._load_r2_content(row["r2_key"])
800 |
801 | # Load tags
802 | tags = await self._load_memory_tags(row["id"])
803 |
804 | # Construct Memory object
805 | memory = Memory(
806 | content=content,
807 | content_hash=content_hash,
808 | tags=tags,
809 | memory_type=row.get("memory_type"),
810 | metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
811 | created_at=row.get("created_at"),
812 | created_at_iso=row.get("created_at_iso"),
813 | updated_at=row.get("updated_at"),
814 | updated_at_iso=row.get("updated_at_iso")
815 | )
816 |
817 | return memory
818 |
819 | except Exception as e:
820 | logger.error(f"Failed to get memory by hash {content_hash}: {e}")
821 | return None
822 |
823 | async def _delete_vectorize_vector(self, vector_id: str) -> None:
824 | """Delete vector from Vectorize."""
825 | # Correct endpoint uses underscores, not hyphens
826 | payload = {"ids": [vector_id]}
827 |
828 | response = await self._retry_request("POST", f"{self.vectorize_url}/delete_by_ids", json=payload)
829 | result = response.json()
830 |
831 | if not result.get("success"):
832 | logger.warning(f"Failed to delete vector from Vectorize: {result}")
833 |
834 | async def delete_vectors_by_ids(self, vector_ids: List[str]) -> Dict[str, Any]:
835 | """Delete multiple vectors from Vectorize by their IDs."""
836 | payload = {"ids": vector_ids}
837 | response = await self._retry_request(
838 | "POST",
839 | f"{self.vectorize_url}/delete_by_ids",
840 | json=payload
841 | )
842 | return response.json()
843 |
844 | async def _delete_r2_content(self, r2_key: str) -> None:
845 | """Delete content from R2."""
846 | try:
847 | response = await self._retry_request("DELETE", f"{self.r2_url}/{r2_key}")
848 | if response.status_code not in [200, 204, 404]: # 404 is fine if already deleted
849 | logger.warning(f"Failed to delete R2 content: {response.status_code}")
850 | except Exception as e:
851 | logger.warning(f"Failed to delete R2 content {r2_key}: {e}")
852 |
853 | async def delete_by_tag(self, tag: str) -> Tuple[int, str]:
854 | """Delete memories by tag."""
855 | try:
856 | # Find memories with the tag
857 | memories = await self.search_by_tag([tag])
858 |
859 | deleted_count = 0
860 | for memory in memories:
861 | success, _ = await self.delete(memory.content_hash)
862 | if success:
863 | deleted_count += 1
864 |
865 | logger.info(f"Deleted {deleted_count} memories with tag: {tag}")
866 | return deleted_count, f"Deleted {deleted_count} memories"
867 |
868 | except Exception as e:
869 | logger.error(f"Failed to delete by tag {tag}: {e}")
870 | return 0, f"Deletion failed: {str(e)}"
871 |
872 | async def cleanup_duplicates(self) -> Tuple[int, str]:
873 | """Remove duplicate memories based on content hash."""
874 | try:
875 | # Find duplicates in D1
876 | sql = """
877 | SELECT content_hash, COUNT(*) as count, MIN(id) as keep_id
878 | FROM memories
879 | GROUP BY content_hash
880 | HAVING COUNT(*) > 1
881 | """
882 |
883 | payload = {"sql": sql}
884 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
885 | result = response.json()
886 |
887 | if not result.get("success"):
888 | raise ValueError(f"Failed to find duplicates: {result}")
889 |
890 | duplicate_groups = result.get("result", [{}])[0].get("results", [])
891 |
892 | total_deleted = 0
893 | for group in duplicate_groups:
894 | content_hash = group["content_hash"]
895 | keep_id = group["keep_id"]
896 |
897 | # Delete all except the first one
898 | delete_sql = "DELETE FROM memories WHERE content_hash = ? AND id != ?"
899 | payload = {"sql": delete_sql, "params": [content_hash, keep_id]}
900 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
901 | result = response.json()
902 |
903 | if result.get("success") and result.get("result", [{}])[0].get("meta"):
904 | deleted = result["result"][0]["meta"].get("changes", 0)
905 | total_deleted += deleted
906 |
907 | logger.info(f"Cleaned up {total_deleted} duplicate memories")
908 | return total_deleted, f"Removed {total_deleted} duplicates"
909 |
910 | except Exception as e:
911 | logger.error(f"Failed to cleanup duplicates: {e}")
912 | return 0, f"Cleanup failed: {str(e)}"
913 |
914 | async def update_memory_metadata(self, content_hash: str, updates: Dict[str, Any], preserve_timestamps: bool = True) -> Tuple[bool, str]:
915 | """Update memory metadata without recreating the entry."""
916 | try:
917 | # Build update SQL
918 | update_fields = []
919 | params = []
920 |
921 | if "metadata" in updates:
922 | update_fields.append("metadata_json = ?")
923 | params.append(json.dumps(updates["metadata"]))
924 |
925 | if "memory_type" in updates:
926 | update_fields.append("memory_type = ?")
927 | params.append(updates["memory_type"])
928 |
929 | if "tags" in updates:
930 | # Handle tags separately - they require relational updates
931 | pass
932 |
933 | # Handle timestamp updates based on preserve_timestamps flag
934 | now = time.time()
935 | now_iso = datetime.now().isoformat()
936 |
937 | if not preserve_timestamps:
938 | # When preserve_timestamps=False, use timestamps from updates dict if provided
939 | # This allows syncing timestamps from source (e.g., SQLite → Cloudflare)
940 | # Always preserve created_at (never reset to current time!)
941 | if "created_at" in updates:
942 | update_fields.append("created_at = ?")
943 | params.append(updates["created_at"])
944 | if "created_at_iso" in updates:
945 | update_fields.append("created_at_iso = ?")
946 | params.append(updates["created_at_iso"])
947 | # Use updated_at from updates or current time
948 | if "updated_at" in updates:
949 | update_fields.append("updated_at = ?")
950 | params.append(updates["updated_at"])
951 | else:
952 | update_fields.append("updated_at = ?")
953 | params.append(now)
954 | if "updated_at_iso" in updates:
955 | update_fields.append("updated_at_iso = ?")
956 | params.append(updates["updated_at_iso"])
957 | else:
958 | update_fields.append("updated_at_iso = ?")
959 | params.append(now_iso)
960 | else:
961 | # preserve_timestamps=True: only update updated_at to current time
962 | update_fields.append("updated_at = ?")
963 | update_fields.append("updated_at_iso = ?")
964 | params.extend([now, now_iso])
965 |
966 | if not update_fields:
967 | return True, "No updates needed"
968 |
969 | # Update memory record
970 | sql = f"UPDATE memories SET {', '.join(update_fields)} WHERE content_hash = ?"
971 | params.append(content_hash)
972 |
973 | payload = {"sql": sql, "params": params}
974 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
975 | result = response.json()
976 |
977 | if not result.get("success"):
978 | raise ValueError(f"Failed to update memory: {result}")
979 |
980 | # Handle tag updates if provided
981 | if "tags" in updates:
982 | await self._update_memory_tags(content_hash, updates["tags"])
983 |
984 | logger.info(f"Successfully updated memory metadata: {content_hash}")
985 | return True, "Memory metadata updated successfully"
986 |
987 | except Exception as e:
988 | logger.error(f"Failed to update memory metadata {content_hash}: {e}")
989 | return False, f"Update failed: {str(e)}"
990 |
991 | async def _update_memory_tags(self, content_hash: str, new_tags: List[str]) -> None:
992 | """Update tags for a memory."""
993 | # Get memory ID
994 | sql = "SELECT id FROM memories WHERE content_hash = ?"
995 | payload = {"sql": sql, "params": [content_hash]}
996 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
997 | result = response.json()
998 |
999 | if not result.get("success") or not result.get("result", [{}])[0].get("results"):
1000 | raise ValueError(f"Memory not found: {content_hash}")
1001 |
1002 | memory_id = result["result"][0]["results"][0]["id"]
1003 |
1004 | # Delete existing tag relationships
1005 | delete_sql = "DELETE FROM memory_tags WHERE memory_id = ?"
1006 | payload = {"sql": delete_sql, "params": [memory_id]}
1007 | await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1008 |
1009 | # Add new tags
1010 | if new_tags:
1011 | await self._store_d1_tags(memory_id, new_tags)
1012 |
1013 | async def get_stats(self) -> Dict[str, Any]:
1014 | """Get storage statistics."""
1015 | try:
1016 | # Calculate timestamp for memories from last 7 days
1017 | week_ago = time.time() - (7 * 24 * 60 * 60)
1018 |
1019 | # Get memory count and size from D1
1020 | sql = f"""
1021 | SELECT
1022 | COUNT(*) as total_memories,
1023 | SUM(content_size) as total_content_size,
1024 | COUNT(DISTINCT vector_id) as total_vectors,
1025 | COUNT(r2_key) as r2_stored_count,
1026 | (SELECT COUNT(*) FROM tags) as unique_tags,
1027 | (SELECT COUNT(*) FROM memories WHERE created_at >= {week_ago}) as memories_this_week
1028 | FROM memories
1029 | """
1030 |
1031 | payload = {"sql": sql}
1032 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1033 | result = response.json()
1034 |
1035 | if result.get("success") and result.get("result", [{}])[0].get("results"):
1036 | stats = result["result"][0]["results"][0]
1037 |
1038 | return {
1039 | "total_memories": stats.get("total_memories", 0),
1040 | "unique_tags": stats.get("unique_tags", 0),
1041 | "memories_this_week": stats.get("memories_this_week", 0),
1042 | "total_content_size_bytes": stats.get("total_content_size", 0),
1043 | "total_vectors": stats.get("total_vectors", 0),
1044 | "r2_stored_count": stats.get("r2_stored_count", 0),
1045 | "storage_backend": "cloudflare",
1046 | "vectorize_index": self.vectorize_index,
1047 | "d1_database": self.d1_database_id,
1048 | "r2_bucket": self.r2_bucket,
1049 | "status": "operational"
1050 | }
1051 |
1052 | return {
1053 | "total_memories": 0,
1054 | "unique_tags": 0,
1055 | "memories_this_week": 0,
1056 | "storage_backend": "cloudflare",
1057 | "status": "operational"
1058 | }
1059 |
1060 | except Exception as e:
1061 | logger.error(f"Failed to get stats: {e}")
1062 | return {
1063 | "total_memories": 0,
1064 | "unique_tags": 0,
1065 | "memories_this_week": 0,
1066 | "storage_backend": "cloudflare",
1067 | "status": "error",
1068 | "error": str(e)
1069 | }
1070 |
1071 | async def get_all_tags(self) -> List[str]:
1072 | """Get all unique tags in the storage."""
1073 | try:
1074 | sql = "SELECT name FROM tags ORDER BY name"
1075 | payload = {"sql": sql}
1076 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1077 | result = response.json()
1078 |
1079 | if result.get("success") and result.get("result", [{}])[0].get("results"):
1080 | return [row["name"] for row in result["result"][0]["results"]]
1081 |
1082 | return []
1083 |
1084 | except Exception as e:
1085 | logger.error(f"Failed to get all tags: {e}")
1086 | return []
1087 |
1088 | async def get_all_tags_with_counts(self) -> List[Dict[str, Any]]:
1089 | """Get all tags with their usage counts."""
1090 | try:
1091 | sql = """
1092 | SELECT t.name as tag, COUNT(mt.memory_id) as count
1093 | FROM tags t
1094 | LEFT JOIN memory_tags mt ON t.id = mt.tag_id
1095 | GROUP BY t.id
1096 | ORDER BY count DESC, t.name ASC
1097 | """
1098 | payload = {"sql": sql}
1099 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1100 | result = response.json()
1101 |
1102 | if result.get("success") and result.get("result", [{}])[0].get("results"):
1103 | return [row for row in result["result"][0]["results"]]
1104 |
1105 | return []
1106 |
1107 | except Exception as e:
1108 | logger.error(f"Failed to get tags with counts: {e}")
1109 | return []
1110 |
1111 | async def get_recent_memories(self, n: int = 10) -> List[Memory]:
1112 | """Get n most recent memories."""
1113 | try:
1114 | sql = "SELECT * FROM memories ORDER BY created_at DESC LIMIT ?"
1115 | payload = {"sql": sql, "params": [n]}
1116 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1117 | result = response.json()
1118 |
1119 | memories = []
1120 | if result.get("success") and result.get("result", [{}])[0].get("results"):
1121 | for row in result["result"][0]["results"]:
1122 | memory = await self._load_memory_from_row(row)
1123 | if memory:
1124 | memories.append(memory)
1125 |
1126 | logger.info(f"Retrieved {len(memories)} recent memories")
1127 | return memories
1128 |
1129 | except Exception as e:
1130 | logger.error(f"Failed to get recent memories: {e}")
1131 | return []
1132 |
1133 | async def get_largest_memories(self, n: int = 10) -> List[Memory]:
1134 | """Get n largest memories by content length."""
1135 | try:
1136 | sql = "SELECT * FROM memories ORDER BY LENGTH(content) DESC LIMIT ?"
1137 | payload = {"sql": sql, "params": [n]}
1138 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1139 | result = response.json()
1140 |
1141 | memories = []
1142 | if result.get("success") and result.get("result", [{}])[0].get("results"):
1143 | for row in result["result"][0]["results"]:
1144 | memory = await self._load_memory_from_row(row)
1145 | if memory:
1146 | memories.append(memory)
1147 |
1148 | logger.info(f"Retrieved {len(memories)} largest memories")
1149 | return memories
1150 |
1151 | except Exception as e:
1152 | logger.error(f"Failed to get largest memories: {e}")
1153 | return []
1154 |
1155 | async def _fetch_d1_timestamps(self, cutoff_timestamp: Optional[float] = None) -> List[float]:
1156 | """Fetch timestamps from D1 database with optional time filter.
1157 |
1158 | Args:
1159 | cutoff_timestamp: Optional Unix timestamp to filter memories (>=)
1160 |
1161 | Returns:
1162 | List of Unix timestamps (float) in descending order
1163 |
1164 | Raises:
1165 | Exception: If D1 query fails
1166 | """
1167 | if cutoff_timestamp is not None:
1168 | sql = "SELECT created_at FROM memories WHERE created_at >= ? ORDER BY created_at DESC"
1169 | payload = {"sql": sql, "params": [cutoff_timestamp]}
1170 | else:
1171 | sql = "SELECT created_at FROM memories ORDER BY created_at DESC"
1172 | payload = {"sql": sql, "params": []}
1173 |
1174 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1175 | result = response.json()
1176 |
1177 | timestamps = []
1178 | if result.get("success") and result.get("result", [{}])[0].get("results"):
1179 | for row in result["result"][0]["results"]:
1180 | if row.get("created_at") is not None:
1181 | timestamps.append(float(row["created_at"]))
1182 |
1183 | return timestamps
1184 |
1185 | async def get_memory_timestamps(self, days: Optional[int] = None) -> List[float]:
1186 | """
1187 | Get memory creation timestamps only, without loading full memory objects.
1188 |
1189 | This is an optimized method for analytics that only needs timestamps,
1190 | avoiding the overhead of loading full memory content and embeddings.
1191 |
1192 | Args:
1193 | days: Optional filter to only get memories from last N days
1194 |
1195 | Returns:
1196 | List of Unix timestamps (float) in descending order (newest first)
1197 | """
1198 | try:
1199 | cutoff_timestamp = None
1200 | if days is not None:
1201 | cutoff = datetime.now(timezone.utc) - timedelta(days=days)
1202 | cutoff_timestamp = cutoff.timestamp()
1203 |
1204 | timestamps = await self._fetch_d1_timestamps(cutoff_timestamp)
1205 | logger.info(f"Retrieved {len(timestamps)} memory timestamps")
1206 | return timestamps
1207 |
1208 | except Exception as e:
1209 | logger.error(f"Failed to get memory timestamps: {e}")
1210 | return []
1211 |
1212 | def sanitized(self, tags):
1213 | """Sanitize and normalize tags to a JSON string.
1214 |
1215 | This method provides compatibility with the storage backend interface.
1216 | """
1217 | if tags is None:
1218 | return json.dumps([])
1219 |
1220 | # If we get a string, split it into an array
1221 | if isinstance(tags, str):
1222 | tags = [tag.strip() for tag in tags.split(",") if tag.strip()]
1223 | # If we get an array, use it directly
1224 | elif isinstance(tags, list):
1225 | tags = [str(tag).strip() for tag in tags if str(tag).strip()]
1226 | else:
1227 | return json.dumps([])
1228 |
1229 | # Return JSON string representation of the array
1230 | return json.dumps(tags)
1231 |
1232 | async def recall(self, query: Optional[str] = None, n_results: int = 5, start_timestamp: Optional[float] = None, end_timestamp: Optional[float] = None) -> List[MemoryQueryResult]:
1233 | """
1234 | Retrieve memories with combined time filtering and optional semantic search.
1235 |
1236 | Args:
1237 | query: Optional semantic search query. If None, only time filtering is applied.
1238 | n_results: Maximum number of results to return.
1239 | start_timestamp: Optional start time for filtering.
1240 | end_timestamp: Optional end time for filtering.
1241 |
1242 | Returns:
1243 | List of MemoryQueryResult objects.
1244 | """
1245 | try:
1246 | # Build time filtering WHERE clause for D1
1247 | time_conditions = []
1248 | params = []
1249 |
1250 | if start_timestamp is not None:
1251 | time_conditions.append("created_at >= ?")
1252 | params.append(float(start_timestamp))
1253 |
1254 | if end_timestamp is not None:
1255 | time_conditions.append("created_at <= ?")
1256 | params.append(float(end_timestamp))
1257 |
1258 | time_where = " AND ".join(time_conditions) if time_conditions else ""
1259 |
1260 | logger.info(f"Recall - Time filtering conditions: {time_where}, params: {params}")
1261 |
1262 | # Determine search strategy
1263 | if query and query.strip():
1264 | # Combined semantic search with time filtering
1265 | logger.info(f"Recall - Using semantic search with query: '{query}'")
1266 |
1267 | try:
1268 | # Generate query embedding
1269 | query_embedding = await self._generate_embedding(query)
1270 |
1271 | # Search Vectorize with semantic query
1272 | search_payload = {
1273 | "vector": query_embedding,
1274 | "topK": n_results,
1275 | "returnMetadata": "all",
1276 | "returnValues": False
1277 | }
1278 |
1279 | # Add time filtering to vectorize metadata if specified
1280 | if time_conditions:
1281 | # Note: Vectorize metadata filtering capabilities may be limited
1282 | # We'll filter after retrieval for now
1283 | logger.info("Recall - Time filtering will be applied post-retrieval from Vectorize")
1284 |
1285 | response = await self._retry_request("POST", f"{self.vectorize_url}/query", json=search_payload)
1286 | result = response.json()
1287 |
1288 | if not result.get("success"):
1289 | raise ValueError(f"Vectorize query failed: {result}")
1290 |
1291 | matches = result.get("result", {}).get("matches", [])
1292 |
1293 | # Convert matches to MemoryQueryResult objects with time filtering
1294 | results = []
1295 | for match in matches:
1296 | memory = await self._load_memory_from_match(match)
1297 | if memory:
1298 | # Apply time filtering if needed
1299 | if start_timestamp is not None and memory.created_at and memory.created_at < start_timestamp:
1300 | continue
1301 | if end_timestamp is not None and memory.created_at and memory.created_at > end_timestamp:
1302 | continue
1303 |
1304 | query_result = MemoryQueryResult(
1305 | memory=memory,
1306 | relevance_score=match.get("score", 0.0)
1307 | )
1308 | results.append(query_result)
1309 |
1310 | logger.info(f"Recall - Retrieved {len(results)} memories with semantic search and time filtering")
1311 | return results[:n_results] # Ensure we don't exceed n_results
1312 |
1313 | except Exception as e:
1314 | logger.error(f"Recall - Semantic search failed, falling back to time-based search: {e}")
1315 | # Fall through to time-based search
1316 |
1317 | # Time-based search only (or fallback)
1318 | logger.info(f"Recall - Using time-based search only")
1319 |
1320 | # Build D1 query for time-based retrieval
1321 | if time_where:
1322 | sql = f"SELECT * FROM memories WHERE {time_where} ORDER BY created_at DESC LIMIT ?"
1323 | params.append(n_results)
1324 | else:
1325 | # No time filters, get most recent
1326 | sql = "SELECT * FROM memories ORDER BY created_at DESC LIMIT ?"
1327 | params = [n_results]
1328 |
1329 | payload = {"sql": sql, "params": params}
1330 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1331 | result = response.json()
1332 |
1333 | if not result.get("success"):
1334 | raise ValueError(f"D1 query failed: {result}")
1335 |
1336 | # Convert D1 results to MemoryQueryResult objects
1337 | results = []
1338 | if result.get("result", [{}])[0].get("results"):
1339 | for row in result["result"][0]["results"]:
1340 | memory = await self._load_memory_from_row(row)
1341 | if memory:
1342 | # For time-based search without semantic query, use timestamp as relevance
1343 | relevance_score = memory.created_at or 0.0
1344 | query_result = MemoryQueryResult(
1345 | memory=memory,
1346 | relevance_score=relevance_score
1347 | )
1348 | results.append(query_result)
1349 |
1350 | logger.info(f"Recall - Retrieved {len(results)} memories with time-based search")
1351 | return results
1352 |
1353 | except Exception as e:
1354 | logger.error(f"Recall failed: {e}")
1355 | return []
1356 |
1357 | async def get_all_memories(self, limit: int = None, offset: int = 0, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> List[Memory]:
1358 | """
1359 | Get all memories in storage ordered by creation time (newest first).
1360 |
1361 | Args:
1362 | limit: Maximum number of memories to return (None for all)
1363 | offset: Number of memories to skip (for pagination)
1364 | memory_type: Optional filter by memory type
1365 | tags: Optional filter by tags (matches ANY of the provided tags)
1366 |
1367 | Returns:
1368 | List of Memory objects ordered by created_at DESC, optionally filtered by type and tags
1369 | """
1370 | try:
1371 | # Build SQL query with optional memory_type and tags filters
1372 | sql = "SELECT m.* FROM memories m"
1373 | joins = ""
1374 | where_conditions = []
1375 | params: List[Any] = []
1376 |
1377 | if memory_type is not None:
1378 | where_conditions.append("m.memory_type = ?")
1379 | params.append(memory_type)
1380 |
1381 | tag_count = 0
1382 | if tags:
1383 | tag_count = len(tags)
1384 | placeholders = ",".join(["?"] * tag_count)
1385 | joins += " " + """
1386 | INNER JOIN memory_tags mt ON m.id = mt.memory_id
1387 | INNER JOIN tags t ON mt.tag_id = t.id
1388 | """.strip().replace("\n", " ")
1389 | where_conditions.append(f"t.name IN ({placeholders})")
1390 | params.extend(tags)
1391 |
1392 | if joins:
1393 | sql += joins
1394 |
1395 | if where_conditions:
1396 | sql += " WHERE " + " AND ".join(where_conditions)
1397 |
1398 | if tags:
1399 | sql += " GROUP BY m.id HAVING COUNT(DISTINCT t.name) = ?"
1400 |
1401 | sql += " ORDER BY m.created_at DESC"
1402 |
1403 | query_params = params.copy()
1404 | if tags:
1405 | query_params.append(tag_count)
1406 |
1407 | if limit is not None:
1408 | sql += " LIMIT ?"
1409 | query_params.append(limit)
1410 |
1411 | if offset > 0:
1412 | sql += " OFFSET ?"
1413 | query_params.append(offset)
1414 |
1415 | payload = {"sql": sql, "params": query_params}
1416 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1417 | result = response.json()
1418 |
1419 | if not result.get("success"):
1420 | raise ValueError(f"D1 query failed: {result}")
1421 |
1422 | memories = []
1423 | if result.get("result", [{}])[0].get("results"):
1424 | for row in result["result"][0]["results"]:
1425 | memory = await self._load_memory_from_row(row)
1426 | if memory:
1427 | memories.append(memory)
1428 |
1429 | logger.debug(f"Retrieved {len(memories)} memories from D1")
1430 | return memories
1431 |
1432 | except Exception as e:
1433 | logger.error(f"Error getting all memories: {str(e)}")
1434 | return []
1435 |
1436 | def _row_to_memory(self, row: Dict[str, Any]) -> Memory:
1437 | """Convert D1 row to Memory object without loading tags (for bulk operations)."""
1438 | # Load content from R2 if needed
1439 | content = row["content"]
1440 | if row.get("r2_key") and content.startswith("[R2 Content:"):
1441 | # For bulk operations, we don't load R2 content to avoid additional requests
1442 | # Just keep the placeholder
1443 | pass
1444 |
1445 | return Memory(
1446 | content=content,
1447 | content_hash=row["content_hash"],
1448 | tags=[], # Skip tag loading for bulk operations
1449 | memory_type=row.get("memory_type"),
1450 | metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
1451 | created_at=row.get("created_at"),
1452 | created_at_iso=row.get("created_at_iso"),
1453 | updated_at=row.get("updated_at"),
1454 | updated_at_iso=row.get("updated_at_iso")
1455 | )
1456 |
1457 | async def get_all_memories_bulk(
1458 | self,
1459 | include_tags: bool = False
1460 | ) -> List[Memory]:
1461 | """
1462 | Efficiently load all memories from D1.
1463 |
1464 | If include_tags=False, skips N+1 tag queries for better performance.
1465 | Useful for maintenance scripts that don't need tag information.
1466 |
1467 | Args:
1468 | include_tags: Whether to load tags for each memory (slower but complete)
1469 |
1470 | Returns:
1471 | List of Memory objects ordered by created_at DESC
1472 | """
1473 | query = "SELECT * FROM memories ORDER BY created_at DESC"
1474 | payload = {"sql": query}
1475 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1476 | result = response.json()
1477 |
1478 | if not result.get("success"):
1479 | raise ValueError(f"D1 query failed: {result}")
1480 |
1481 | memories = []
1482 | if result.get("result", [{}])[0].get("results"):
1483 | for row in result["result"][0]["results"]:
1484 | if include_tags:
1485 | memory = await self._load_memory_from_row(row)
1486 | else:
1487 | memory = self._row_to_memory(row)
1488 | if memory:
1489 | memories.append(memory)
1490 |
1491 | logger.debug(f"Bulk loaded {len(memories)} memories from D1")
1492 | return memories
1493 |
1494 | async def get_all_memories_cursor(self, limit: int = None, cursor: float = None, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> List[Memory]:
1495 | """
1496 | Get all memories using cursor-based pagination to avoid D1 OFFSET limitations.
1497 |
1498 | This method uses timestamp-based cursors instead of OFFSET, which is more efficient
1499 | and avoids Cloudflare D1's OFFSET limitations that cause 400 Bad Request errors.
1500 |
1501 | Args:
1502 | limit: Maximum number of memories to return (None for all)
1503 | cursor: Timestamp cursor for pagination (created_at value from last result)
1504 | memory_type: Optional filter by memory type
1505 | tags: Optional filter by tags (matches ANY of the provided tags)
1506 |
1507 | Returns:
1508 | List of Memory objects ordered by created_at DESC, starting after cursor
1509 | """
1510 | try:
1511 | # Build SQL query with cursor-based pagination
1512 | sql = "SELECT * FROM memories"
1513 | params = []
1514 | where_conditions = []
1515 |
1516 | # Add cursor condition (timestamp-based pagination)
1517 | if cursor is not None:
1518 | where_conditions.append("created_at < ?")
1519 | params.append(cursor)
1520 |
1521 | # Add memory_type filter if specified
1522 | if memory_type is not None:
1523 | where_conditions.append("memory_type = ?")
1524 | params.append(memory_type)
1525 |
1526 | # Add tags filter if specified (using LIKE for tag matching)
1527 | if tags and len(tags) > 0:
1528 | tag_conditions = " OR ".join(["tags LIKE ?" for _ in tags])
1529 | where_conditions.append(f"({tag_conditions})")
1530 | params.extend([f"%{tag}%" for tag in tags])
1531 |
1532 | # Apply WHERE clause if we have any conditions
1533 | if where_conditions:
1534 | sql += " WHERE " + " AND ".join(where_conditions)
1535 |
1536 | sql += " ORDER BY created_at DESC"
1537 |
1538 | if limit is not None:
1539 | sql += " LIMIT ?"
1540 | params.append(limit)
1541 |
1542 | payload = {"sql": sql, "params": params}
1543 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1544 | result = response.json()
1545 |
1546 | if not result.get("success"):
1547 | raise ValueError(f"D1 query failed: {result}")
1548 |
1549 | memories = []
1550 | if result.get("result", [{}])[0].get("results"):
1551 | for row in result["result"][0]["results"]:
1552 | memory = await self._load_memory_from_row(row)
1553 | if memory:
1554 | memories.append(memory)
1555 |
1556 | logger.debug(f"Retrieved {len(memories)} memories from D1 with cursor-based pagination")
1557 | return memories
1558 |
1559 | except Exception as e:
1560 | logger.error(f"Error getting memories with cursor: {str(e)}")
1561 | return []
1562 |
1563 | async def get_memories_updated_since(self, timestamp: float, limit: int = 100) -> List[Memory]:
1564 | """
1565 | Get memories updated since a specific timestamp (v8.25.0+).
1566 |
1567 | Used by hybrid backend drift detection to find memories with metadata
1568 | changes that need to be synchronized.
1569 |
1570 | Args:
1571 | timestamp: Unix timestamp (seconds since epoch)
1572 | limit: Maximum number of memories to return (default: 100)
1573 |
1574 | Returns:
1575 | List of Memory objects updated after the timestamp, ordered by updated_at DESC
1576 | """
1577 | try:
1578 | # Convert timestamp to ISO format for D1 query
1579 | from datetime import datetime
1580 | dt = datetime.utcfromtimestamp(timestamp)
1581 | iso_timestamp = dt.isoformat()
1582 |
1583 | query = """
1584 | SELECT content, content_hash, tags, memory_type, metadata,
1585 | created_at, created_at_iso, updated_at, updated_at_iso
1586 | FROM memories
1587 | WHERE updated_at_iso > ?
1588 | ORDER BY updated_at DESC
1589 | LIMIT ?
1590 | """
1591 |
1592 | payload = {"sql": query, "params": [iso_timestamp, limit]}
1593 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1594 | result = response.json()
1595 |
1596 | if not result.get("success"):
1597 | logger.warning(f"Failed to get updated memories: {result.get('error')}")
1598 | return []
1599 |
1600 | memories = []
1601 | if result.get("result", [{}])[0].get("results"):
1602 | for row in result["result"][0]["results"]:
1603 | memory = await self._load_memory_from_row(row)
1604 | if memory:
1605 | memories.append(memory)
1606 |
1607 | logger.debug(f"Retrieved {len(memories)} memories updated since {iso_timestamp}")
1608 | return memories
1609 |
1610 | except Exception as e:
1611 | logger.error(f"Error getting updated memories: {e}")
1612 | return []
1613 |
1614 | async def get_memories_by_time_range(self, start_time: float, end_time: float) -> List[Memory]:
1615 | """
1616 | Get memories within a specific time range (v8.31.0+).
1617 |
1618 | Pushes date-range filtering to D1 database layer for better performance.
1619 | This optimization reduces network transfer and enables efficient queries
1620 | on databases with >10,000 memories.
1621 |
1622 | Benefits:
1623 | - Reduces network transfer (only relevant memories)
1624 | - Leverages D1 indexes on created_at
1625 | - Scales to databases with >10,000 memories
1626 | - 10x performance improvement over application-layer filtering
1627 |
1628 | Args:
1629 | start_time: Start timestamp (Unix seconds, inclusive)
1630 | end_time: End timestamp (Unix seconds, inclusive)
1631 |
1632 | Returns:
1633 | List of Memory objects within the time range, ordered by created_at DESC
1634 | """
1635 | try:
1636 | # Build SQL query with BETWEEN clause for efficient filtering
1637 | # Use SELECT m.* to get all columns (tags are loaded separately via _load_memory_tags)
1638 | sql = """
1639 | SELECT m.*
1640 | FROM memories m
1641 | WHERE m.created_at BETWEEN ? AND ?
1642 | ORDER BY m.created_at DESC
1643 | """
1644 |
1645 | payload = {"sql": sql, "params": [start_time, end_time]}
1646 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1647 | result = response.json()
1648 |
1649 | if not result.get("success"):
1650 | logger.error(f"D1 query failed: {result}")
1651 | return []
1652 |
1653 | memories = []
1654 | if result.get("result", [{}])[0].get("results"):
1655 | for row in result["result"][0]["results"]:
1656 | memory = await self._load_memory_from_row(row)
1657 | if memory:
1658 | memories.append(memory)
1659 |
1660 | logger.info(f"Retrieved {len(memories)} memories in time range {start_time}-{end_time}")
1661 | return memories
1662 |
1663 | except Exception as e:
1664 | logger.error(f"Error getting memories by time range: {str(e)}")
1665 | return []
1666 |
1667 | async def count_all_memories(self, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> int:
1668 | """
1669 | Get total count of memories in storage.
1670 |
1671 | Args:
1672 | memory_type: Optional filter by memory type
1673 | tags: Optional filter by tags (memories matching ANY of the tags)
1674 |
1675 | Returns:
1676 | Total number of memories, optionally filtered by type and/or tags
1677 | """
1678 | try:
1679 | # Build query with filters
1680 | base_sql = "SELECT m.id FROM memories m"
1681 | joins = ""
1682 | conditions = []
1683 | params: List[Any] = []
1684 |
1685 | if memory_type is not None:
1686 | conditions.append('m.memory_type = ?')
1687 | params.append(memory_type)
1688 |
1689 | tag_count = 0
1690 | if tags:
1691 | tag_count = len(tags)
1692 | placeholders = ','.join(['?'] * tag_count)
1693 | joins += " " + """
1694 | INNER JOIN memory_tags mt ON m.id = mt.memory_id
1695 | INNER JOIN tags t ON mt.tag_id = t.id
1696 | """.strip().replace("\n", " ")
1697 | conditions.append(f't.name IN ({placeholders})')
1698 | params.extend(tags)
1699 |
1700 | sql = base_sql + joins
1701 | if conditions:
1702 | sql += ' WHERE ' + ' AND '.join(conditions)
1703 |
1704 | if tags:
1705 | sql += ' GROUP BY m.id HAVING COUNT(DISTINCT t.name) = ?'
1706 |
1707 | count_sql = f'SELECT COUNT(*) as count FROM ({sql}) AS subquery'
1708 |
1709 | count_params = params.copy()
1710 | if tags:
1711 | count_params.append(tag_count)
1712 |
1713 | payload = {"sql": count_sql, "params": count_params}
1714 | response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1715 | result = response.json()
1716 |
1717 | if not result.get("success"):
1718 | raise ValueError(f"D1 query failed: {result}")
1719 |
1720 | if result.get("result", [{}])[0].get("results"):
1721 | count = result["result"][0]["results"][0].get("count", 0)
1722 | return int(count)
1723 |
1724 | return 0
1725 |
1726 | except Exception as e:
1727 | logger.error(f"Error counting memories: {str(e)}")
1728 | return 0
1729 |
1730 | async def close(self) -> None:
1731 | """Close the storage backend and cleanup resources."""
1732 | if self.client:
1733 | await self.client.aclose()
1734 | self.client = None
1735 |
1736 | # Clear embedding cache
1737 | self._embedding_cache.clear()
1738 |
1739 | logger.info("Cloudflare storage backend closed")
1740 |
```