#
tokens: 44385/50000 2/625 files (page 39/47)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 39 of 47. Use http://codebase.md/doobidoo/mcp-memory-service?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── amp-bridge.md
│   │   ├── amp-pr-automator.md
│   │   ├── code-quality-guard.md
│   │   ├── gemini-pr-automator.md
│   │   └── github-release-manager.md
│   ├── settings.local.json.backup
│   └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   ├── feature_request.yml
│   │   └── performance_issue.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── bridge-tests.yml
│       ├── CACHE_FIX.md
│       ├── claude-code-review.yml
│       ├── claude.yml
│       ├── cleanup-images.yml.disabled
│       ├── dev-setup-validation.yml
│       ├── docker-publish.yml
│       ├── LATEST_FIXES.md
│       ├── main-optimized.yml.disabled
│       ├── main.yml
│       ├── publish-and-test.yml
│       ├── README_OPTIMIZATION.md
│       ├── release-tag.yml.disabled
│       ├── release.yml
│       ├── roadmap-review-reminder.yml
│       ├── SECRET_CONDITIONAL_FIX.md
│       └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│   ├── .gitignore
│   └── reports
│       └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│   ├── deployment
│   │   ├── deploy_fastmcp_fixed.sh
│   │   ├── deploy_http_with_mcp.sh
│   │   └── deploy_mcp_v4.sh
│   ├── deployment-configs
│   │   ├── empty_config.yml
│   │   └── smithery.yaml
│   ├── development
│   │   └── test_fastmcp.py
│   ├── docs-removed-2025-08-23
│   │   ├── authentication.md
│   │   ├── claude_integration.md
│   │   ├── claude-code-compatibility.md
│   │   ├── claude-code-integration.md
│   │   ├── claude-code-quickstart.md
│   │   ├── claude-desktop-setup.md
│   │   ├── complete-setup-guide.md
│   │   ├── database-synchronization.md
│   │   ├── development
│   │   │   ├── autonomous-memory-consolidation.md
│   │   │   ├── CLEANUP_PLAN.md
│   │   │   ├── CLEANUP_README.md
│   │   │   ├── CLEANUP_SUMMARY.md
│   │   │   ├── dream-inspired-memory-consolidation.md
│   │   │   ├── hybrid-slm-memory-consolidation.md
│   │   │   ├── mcp-milestone.md
│   │   │   ├── multi-client-architecture.md
│   │   │   ├── test-results.md
│   │   │   └── TIMESTAMP_FIX_SUMMARY.md
│   │   ├── distributed-sync.md
│   │   ├── invocation_guide.md
│   │   ├── macos-intel.md
│   │   ├── master-guide.md
│   │   ├── mcp-client-configuration.md
│   │   ├── multi-client-server.md
│   │   ├── service-installation.md
│   │   ├── sessions
│   │   │   └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│   │   ├── UBUNTU_SETUP.md
│   │   ├── ubuntu.md
│   │   ├── windows-setup.md
│   │   └── windows.md
│   ├── docs-root-cleanup-2025-08-23
│   │   ├── AWESOME_LIST_SUBMISSION.md
│   │   ├── CLOUDFLARE_IMPLEMENTATION.md
│   │   ├── DOCUMENTATION_ANALYSIS.md
│   │   ├── DOCUMENTATION_CLEANUP_PLAN.md
│   │   ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│   │   ├── LITESTREAM_SETUP_GUIDE.md
│   │   ├── lm_studio_system_prompt.md
│   │   ├── PYTORCH_DOWNLOAD_FIX.md
│   │   └── README-ORIGINAL-BACKUP.md
│   ├── investigations
│   │   └── MACOS_HOOKS_INVESTIGATION.md
│   ├── litestream-configs-v6.3.0
│   │   ├── install_service.sh
│   │   ├── litestream_master_config_fixed.yml
│   │   ├── litestream_master_config.yml
│   │   ├── litestream_replica_config_fixed.yml
│   │   ├── litestream_replica_config.yml
│   │   ├── litestream_replica_simple.yml
│   │   ├── litestream-http.service
│   │   ├── litestream.service
│   │   └── requirements-cloudflare.txt
│   ├── release-notes
│   │   └── release-notes-v7.1.4.md
│   └── setup-development
│       ├── README.md
│       ├── setup_consolidation_mdns.sh
│       ├── STARTUP_SETUP_GUIDE.md
│       └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│   ├── memory-context.md
│   ├── memory-health.md
│   ├── memory-ingest-dir.md
│   ├── memory-ingest.md
│   ├── memory-recall.md
│   ├── memory-search.md
│   ├── memory-store.md
│   ├── README.md
│   └── session-start.md
├── claude-hooks
│   ├── config.json
│   ├── config.template.json
│   ├── CONFIGURATION.md
│   ├── core
│   │   ├── memory-retrieval.js
│   │   ├── mid-conversation.js
│   │   ├── session-end.js
│   │   ├── session-start.js
│   │   └── topic-change.js
│   ├── debug-pattern-test.js
│   ├── install_claude_hooks_windows.ps1
│   ├── install_hooks.py
│   ├── memory-mode-controller.js
│   ├── MIGRATION.md
│   ├── README-NATURAL-TRIGGERS.md
│   ├── README-phase2.md
│   ├── README.md
│   ├── simple-test.js
│   ├── statusline.sh
│   ├── test-adaptive-weights.js
│   ├── test-dual-protocol-hook.js
│   ├── test-mcp-hook.js
│   ├── test-natural-triggers.js
│   ├── test-recency-scoring.js
│   ├── tests
│   │   ├── integration-test.js
│   │   ├── phase2-integration-test.js
│   │   ├── test-code-execution.js
│   │   ├── test-cross-session.json
│   │   ├── test-session-tracking.json
│   │   └── test-threading.json
│   ├── utilities
│   │   ├── adaptive-pattern-detector.js
│   │   ├── context-formatter.js
│   │   ├── context-shift-detector.js
│   │   ├── conversation-analyzer.js
│   │   ├── dynamic-context-updater.js
│   │   ├── git-analyzer.js
│   │   ├── mcp-client.js
│   │   ├── memory-client.js
│   │   ├── memory-scorer.js
│   │   ├── performance-manager.js
│   │   ├── project-detector.js
│   │   ├── session-tracker.js
│   │   ├── tiered-conversation-monitor.js
│   │   └── version-checker.js
│   └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│   ├── amp-cli-bridge.md
│   ├── api
│   │   ├── code-execution-interface.md
│   │   ├── memory-metadata-api.md
│   │   ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│   │   ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│   │   ├── PHASE2_REPORT.md
│   │   └── tag-standardization.md
│   ├── architecture
│   │   ├── search-enhancement-spec.md
│   │   └── search-examples.md
│   ├── architecture.md
│   ├── archive
│   │   └── obsolete-workflows
│   │       ├── load_memory_context.md
│   │       └── README.md
│   ├── assets
│   │   └── images
│   │       ├── dashboard-v3.3.0-preview.png
│   │       ├── memory-awareness-hooks-example.png
│   │       ├── project-infographic.svg
│   │       └── README.md
│   ├── CLAUDE_CODE_QUICK_REFERENCE.md
│   ├── cloudflare-setup.md
│   ├── deployment
│   │   ├── docker.md
│   │   ├── dual-service.md
│   │   ├── production-guide.md
│   │   └── systemd-service.md
│   ├── development
│   │   ├── ai-agent-instructions.md
│   │   ├── code-quality
│   │   │   ├── phase-2a-completion.md
│   │   │   ├── phase-2a-handle-get-prompt.md
│   │   │   ├── phase-2a-index.md
│   │   │   ├── phase-2a-install-package.md
│   │   │   └── phase-2b-session-summary.md
│   │   ├── code-quality-workflow.md
│   │   ├── dashboard-workflow.md
│   │   ├── issue-management.md
│   │   ├── pr-review-guide.md
│   │   ├── refactoring-notes.md
│   │   ├── release-checklist.md
│   │   └── todo-tracker.md
│   ├── docker-optimized-build.md
│   ├── document-ingestion.md
│   ├── DOCUMENTATION_AUDIT.md
│   ├── enhancement-roadmap-issue-14.md
│   ├── examples
│   │   ├── analysis-scripts.js
│   │   ├── maintenance-session-example.md
│   │   ├── memory-distribution-chart.jsx
│   │   └── tag-schema.json
│   ├── first-time-setup.md
│   ├── glama-deployment.md
│   ├── guides
│   │   ├── advanced-command-examples.md
│   │   ├── chromadb-migration.md
│   │   ├── commands-vs-mcp-server.md
│   │   ├── mcp-enhancements.md
│   │   ├── mdns-service-discovery.md
│   │   ├── memory-consolidation-guide.md
│   │   ├── migration.md
│   │   ├── scripts.md
│   │   └── STORAGE_BACKENDS.md
│   ├── HOOK_IMPROVEMENTS.md
│   ├── hooks
│   │   └── phase2-code-execution-migration.md
│   ├── http-server-management.md
│   ├── ide-compatability.md
│   ├── IMAGE_RETENTION_POLICY.md
│   ├── images
│   │   └── dashboard-placeholder.md
│   ├── implementation
│   │   ├── health_checks.md
│   │   └── performance.md
│   ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│   ├── integration
│   │   ├── homebrew.md
│   │   └── multi-client.md
│   ├── integrations
│   │   ├── gemini.md
│   │   ├── groq-bridge.md
│   │   ├── groq-integration-summary.md
│   │   └── groq-model-comparison.md
│   ├── integrations.md
│   ├── legacy
│   │   └── dual-protocol-hooks.md
│   ├── LM_STUDIO_COMPATIBILITY.md
│   ├── maintenance
│   │   └── memory-maintenance.md
│   ├── mastery
│   │   ├── api-reference.md
│   │   ├── architecture-overview.md
│   │   ├── configuration-guide.md
│   │   ├── local-setup-and-run.md
│   │   ├── testing-guide.md
│   │   └── troubleshooting.md
│   ├── migration
│   │   └── code-execution-api-quick-start.md
│   ├── natural-memory-triggers
│   │   ├── cli-reference.md
│   │   ├── installation-guide.md
│   │   └── performance-optimization.md
│   ├── oauth-setup.md
│   ├── pr-graphql-integration.md
│   ├── quick-setup-cloudflare-dual-environment.md
│   ├── README.md
│   ├── remote-configuration-wiki-section.md
│   ├── research
│   │   ├── code-execution-interface-implementation.md
│   │   └── code-execution-interface-summary.md
│   ├── ROADMAP.md
│   ├── sqlite-vec-backend.md
│   ├── statistics
│   │   ├── charts
│   │   │   ├── activity_patterns.png
│   │   │   ├── contributors.png
│   │   │   ├── growth_trajectory.png
│   │   │   ├── monthly_activity.png
│   │   │   └── october_sprint.png
│   │   ├── data
│   │   │   ├── activity_by_day.csv
│   │   │   ├── activity_by_hour.csv
│   │   │   ├── contributors.csv
│   │   │   └── monthly_activity.csv
│   │   ├── generate_charts.py
│   │   └── REPOSITORY_STATISTICS.md
│   ├── technical
│   │   ├── development.md
│   │   ├── memory-migration.md
│   │   ├── migration-log.md
│   │   ├── sqlite-vec-embedding-fixes.md
│   │   └── tag-storage.md
│   ├── testing
│   │   └── regression-tests.md
│   ├── testing-cloudflare-backend.md
│   ├── troubleshooting
│   │   ├── cloudflare-api-token-setup.md
│   │   ├── cloudflare-authentication.md
│   │   ├── general.md
│   │   ├── hooks-quick-reference.md
│   │   ├── pr162-schema-caching-issue.md
│   │   ├── session-end-hooks.md
│   │   └── sync-issues.md
│   └── tutorials
│       ├── advanced-techniques.md
│       ├── data-analysis.md
│       └── demo-session-walkthrough.md
├── examples
│   ├── claude_desktop_config_template.json
│   ├── claude_desktop_config_windows.json
│   ├── claude-desktop-http-config.json
│   ├── config
│   │   └── claude_desktop_config.json
│   ├── http-mcp-bridge.js
│   ├── memory_export_template.json
│   ├── README.md
│   ├── setup
│   │   └── setup_multi_client_complete.py
│   └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│   ├── .claude
│   │   └── settings.local.json
│   ├── archive
│   │   └── check_missing_timestamps.py
│   ├── backup
│   │   ├── backup_memories.py
│   │   ├── backup_sqlite_vec.sh
│   │   ├── export_distributable_memories.sh
│   │   └── restore_memories.py
│   ├── benchmarks
│   │   ├── benchmark_code_execution_api.py
│   │   ├── benchmark_hybrid_sync.py
│   │   └── benchmark_server_caching.py
│   ├── database
│   │   ├── analyze_sqlite_vec_db.py
│   │   ├── check_sqlite_vec_status.py
│   │   ├── db_health_check.py
│   │   └── simple_timestamp_check.py
│   ├── development
│   │   ├── debug_server_initialization.py
│   │   ├── find_orphaned_files.py
│   │   ├── fix_mdns.sh
│   │   ├── fix_sitecustomize.py
│   │   ├── remote_ingest.sh
│   │   ├── setup-git-merge-drivers.sh
│   │   ├── uv-lock-merge.sh
│   │   └── verify_hybrid_sync.py
│   ├── hooks
│   │   └── pre-commit
│   ├── installation
│   │   ├── install_linux_service.py
│   │   ├── install_macos_service.py
│   │   ├── install_uv.py
│   │   ├── install_windows_service.py
│   │   ├── install.py
│   │   ├── setup_backup_cron.sh
│   │   ├── setup_claude_mcp.sh
│   │   └── setup_cloudflare_resources.py
│   ├── linux
│   │   ├── service_status.sh
│   │   ├── start_service.sh
│   │   ├── stop_service.sh
│   │   ├── uninstall_service.sh
│   │   └── view_logs.sh
│   ├── maintenance
│   │   ├── assign_memory_types.py
│   │   ├── check_memory_types.py
│   │   ├── cleanup_corrupted_encoding.py
│   │   ├── cleanup_memories.py
│   │   ├── cleanup_organize.py
│   │   ├── consolidate_memory_types.py
│   │   ├── consolidation_mappings.json
│   │   ├── delete_orphaned_vectors_fixed.py
│   │   ├── fast_cleanup_duplicates_with_tracking.sh
│   │   ├── find_all_duplicates.py
│   │   ├── find_cloudflare_duplicates.py
│   │   ├── find_duplicates.py
│   │   ├── memory-types.md
│   │   ├── README.md
│   │   ├── recover_timestamps_from_cloudflare.py
│   │   ├── regenerate_embeddings.py
│   │   ├── repair_malformed_tags.py
│   │   ├── repair_memories.py
│   │   ├── repair_sqlite_vec_embeddings.py
│   │   ├── repair_zero_embeddings.py
│   │   ├── restore_from_json_export.py
│   │   └── scan_todos.sh
│   ├── migration
│   │   ├── cleanup_mcp_timestamps.py
│   │   ├── legacy
│   │   │   └── migrate_chroma_to_sqlite.py
│   │   ├── mcp-migration.py
│   │   ├── migrate_sqlite_vec_embeddings.py
│   │   ├── migrate_storage.py
│   │   ├── migrate_tags.py
│   │   ├── migrate_timestamps.py
│   │   ├── migrate_to_cloudflare.py
│   │   ├── migrate_to_sqlite_vec.py
│   │   ├── migrate_v5_enhanced.py
│   │   ├── TIMESTAMP_CLEANUP_README.md
│   │   └── verify_mcp_timestamps.py
│   ├── pr
│   │   ├── amp_collect_results.sh
│   │   ├── amp_detect_breaking_changes.sh
│   │   ├── amp_generate_tests.sh
│   │   ├── amp_pr_review.sh
│   │   ├── amp_quality_gate.sh
│   │   ├── amp_suggest_fixes.sh
│   │   ├── auto_review.sh
│   │   ├── detect_breaking_changes.sh
│   │   ├── generate_tests.sh
│   │   ├── lib
│   │   │   └── graphql_helpers.sh
│   │   ├── quality_gate.sh
│   │   ├── resolve_threads.sh
│   │   ├── run_pyscn_analysis.sh
│   │   ├── run_quality_checks.sh
│   │   ├── thread_status.sh
│   │   └── watch_reviews.sh
│   ├── quality
│   │   ├── fix_dead_code_install.sh
│   │   ├── phase1_dead_code_analysis.md
│   │   ├── phase2_complexity_analysis.md
│   │   ├── README_PHASE1.md
│   │   ├── README_PHASE2.md
│   │   ├── track_pyscn_metrics.sh
│   │   └── weekly_quality_review.sh
│   ├── README.md
│   ├── run
│   │   ├── run_mcp_memory.sh
│   │   ├── run-with-uv.sh
│   │   └── start_sqlite_vec.sh
│   ├── run_memory_server.py
│   ├── server
│   │   ├── check_http_server.py
│   │   ├── check_server_health.py
│   │   ├── memory_offline.py
│   │   ├── preload_models.py
│   │   ├── run_http_server.py
│   │   ├── run_memory_server.py
│   │   ├── start_http_server.bat
│   │   └── start_http_server.sh
│   ├── service
│   │   ├── deploy_dual_services.sh
│   │   ├── install_http_service.sh
│   │   ├── mcp-memory-http.service
│   │   ├── mcp-memory.service
│   │   ├── memory_service_manager.sh
│   │   ├── service_control.sh
│   │   ├── service_utils.py
│   │   └── update_service.sh
│   ├── sync
│   │   ├── check_drift.py
│   │   ├── claude_sync_commands.py
│   │   ├── export_memories.py
│   │   ├── import_memories.py
│   │   ├── litestream
│   │   │   ├── apply_local_changes.sh
│   │   │   ├── enhanced_memory_store.sh
│   │   │   ├── init_staging_db.sh
│   │   │   ├── io.litestream.replication.plist
│   │   │   ├── manual_sync.sh
│   │   │   ├── memory_sync.sh
│   │   │   ├── pull_remote_changes.sh
│   │   │   ├── push_to_remote.sh
│   │   │   ├── README.md
│   │   │   ├── resolve_conflicts.sh
│   │   │   ├── setup_local_litestream.sh
│   │   │   ├── setup_remote_litestream.sh
│   │   │   ├── staging_db_init.sql
│   │   │   ├── stash_local_changes.sh
│   │   │   ├── sync_from_remote_noconfig.sh
│   │   │   └── sync_from_remote.sh
│   │   ├── README.md
│   │   ├── safe_cloudflare_update.sh
│   │   ├── sync_memory_backends.py
│   │   └── sync_now.py
│   ├── testing
│   │   ├── run_complete_test.py
│   │   ├── run_memory_test.sh
│   │   ├── simple_test.py
│   │   ├── test_cleanup_logic.py
│   │   ├── test_cloudflare_backend.py
│   │   ├── test_docker_functionality.py
│   │   ├── test_installation.py
│   │   ├── test_mdns.py
│   │   ├── test_memory_api.py
│   │   ├── test_memory_simple.py
│   │   ├── test_migration.py
│   │   ├── test_search_api.py
│   │   ├── test_sqlite_vec_embeddings.py
│   │   ├── test_sse_events.py
│   │   ├── test-connection.py
│   │   └── test-hook.js
│   ├── utils
│   │   ├── claude_commands_utils.py
│   │   ├── generate_personalized_claude_md.sh
│   │   ├── groq
│   │   ├── groq_agent_bridge.py
│   │   ├── list-collections.py
│   │   ├── memory_wrapper_uv.py
│   │   ├── query_memories.py
│   │   ├── smithery_wrapper.py
│   │   ├── test_groq_bridge.sh
│   │   └── uv_wrapper.py
│   └── validation
│       ├── check_dev_setup.py
│       ├── check_documentation_links.py
│       ├── diagnose_backend_config.py
│       ├── validate_configuration_complete.py
│       ├── validate_memories.py
│       ├── validate_migration.py
│       ├── validate_timestamp_integrity.py
│       ├── verify_environment.py
│       ├── verify_pytorch_windows.py
│       └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│   └── mcp_memory_service
│       ├── __init__.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── operations.py
│       │   ├── sync_wrapper.py
│       │   └── types.py
│       ├── backup
│       │   ├── __init__.py
│       │   └── scheduler.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── ingestion.py
│       │   ├── main.py
│       │   └── utils.py
│       ├── config.py
│       ├── consolidation
│       │   ├── __init__.py
│       │   ├── associations.py
│       │   ├── base.py
│       │   ├── clustering.py
│       │   ├── compression.py
│       │   ├── consolidator.py
│       │   ├── decay.py
│       │   ├── forgetting.py
│       │   ├── health.py
│       │   └── scheduler.py
│       ├── dependency_check.py
│       ├── discovery
│       │   ├── __init__.py
│       │   ├── client.py
│       │   └── mdns_service.py
│       ├── embeddings
│       │   ├── __init__.py
│       │   └── onnx_embeddings.py
│       ├── ingestion
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chunker.py
│       │   ├── csv_loader.py
│       │   ├── json_loader.py
│       │   ├── pdf_loader.py
│       │   ├── registry.py
│       │   ├── semtools_loader.py
│       │   └── text_loader.py
│       ├── lm_studio_compat.py
│       ├── mcp_server.py
│       ├── models
│       │   ├── __init__.py
│       │   └── memory.py
│       ├── server.py
│       ├── services
│       │   ├── __init__.py
│       │   └── memory_service.py
│       ├── storage
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloudflare.py
│       │   ├── factory.py
│       │   ├── http_client.py
│       │   ├── hybrid.py
│       │   └── sqlite_vec.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── exporter.py
│       │   ├── importer.py
│       │   └── litestream_config.py
│       ├── utils
│       │   ├── __init__.py
│       │   ├── cache_manager.py
│       │   ├── content_splitter.py
│       │   ├── db_utils.py
│       │   ├── debug.py
│       │   ├── document_processing.py
│       │   ├── gpu_detection.py
│       │   ├── hashing.py
│       │   ├── http_server_manager.py
│       │   ├── port_detection.py
│       │   ├── system_detection.py
│       │   └── time_parser.py
│       └── web
│           ├── __init__.py
│           ├── api
│           │   ├── __init__.py
│           │   ├── analytics.py
│           │   ├── backup.py
│           │   ├── consolidation.py
│           │   ├── documents.py
│           │   ├── events.py
│           │   ├── health.py
│           │   ├── manage.py
│           │   ├── mcp.py
│           │   ├── memories.py
│           │   ├── search.py
│           │   └── sync.py
│           ├── app.py
│           ├── dependencies.py
│           ├── oauth
│           │   ├── __init__.py
│           │   ├── authorization.py
│           │   ├── discovery.py
│           │   ├── middleware.py
│           │   ├── models.py
│           │   ├── registration.py
│           │   └── storage.py
│           ├── sse.py
│           └── static
│               ├── app.js
│               ├── index.html
│               ├── README.md
│               ├── sse_test.html
│               └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── test_compact_types.py
│   │   └── test_operations.py
│   ├── bridge
│   │   ├── mock_responses.js
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   └── test_http_mcp_bridge.js
│   ├── conftest.py
│   ├── consolidation
│   │   ├── __init__.py
│   │   ├── conftest.py
│   │   ├── test_associations.py
│   │   ├── test_clustering.py
│   │   ├── test_compression.py
│   │   ├── test_consolidator.py
│   │   ├── test_decay.py
│   │   └── test_forgetting.py
│   ├── contracts
│   │   └── api-specification.yml
│   ├── integration
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── test_api_key_fallback.py
│   │   ├── test_api_memories_chronological.py
│   │   ├── test_api_tag_time_search.py
│   │   ├── test_api_with_memory_service.py
│   │   ├── test_bridge_integration.js
│   │   ├── test_cli_interfaces.py
│   │   ├── test_cloudflare_connection.py
│   │   ├── test_concurrent_clients.py
│   │   ├── test_data_serialization_consistency.py
│   │   ├── test_http_server_startup.py
│   │   ├── test_mcp_memory.py
│   │   ├── test_mdns_integration.py
│   │   ├── test_oauth_basic_auth.py
│   │   ├── test_oauth_flow.py
│   │   ├── test_server_handlers.py
│   │   └── test_store_memory.py
│   ├── performance
│   │   ├── test_background_sync.py
│   │   └── test_hybrid_live.py
│   ├── README.md
│   ├── smithery
│   │   └── test_smithery.py
│   ├── sqlite
│   │   └── simple_sqlite_vec_test.py
│   ├── test_client.py
│   ├── test_content_splitting.py
│   ├── test_database.py
│   ├── test_hybrid_cloudflare_limits.py
│   ├── test_hybrid_storage.py
│   ├── test_memory_ops.py
│   ├── test_semantic_search.py
│   ├── test_sqlite_vec_storage.py
│   ├── test_time_parser.py
│   ├── test_timestamp_preservation.py
│   ├── timestamp
│   │   ├── test_hook_vs_manual_storage.py
│   │   ├── test_issue99_final_validation.py
│   │   ├── test_search_retrieval_inconsistency.py
│   │   ├── test_timestamp_issue.py
│   │   └── test_timestamp_simple.py
│   └── unit
│       ├── conftest.py
│       ├── test_cloudflare_storage.py
│       ├── test_csv_loader.py
│       ├── test_fastapi_dependencies.py
│       ├── test_import.py
│       ├── test_json_loader.py
│       ├── test_mdns_simple.py
│       ├── test_mdns.py
│       ├── test_memory_service.py
│       ├── test_memory.py
│       ├── test_semtools_loader.py
│       ├── test_storage_interface_compatibility.py
│       └── test_tag_time_filtering.py
├── tools
│   ├── docker
│   │   ├── DEPRECATED.md
│   │   ├── docker-compose.http.yml
│   │   ├── docker-compose.pythonpath.yml
│   │   ├── docker-compose.standalone.yml
│   │   ├── docker-compose.uv.yml
│   │   ├── docker-compose.yml
│   │   ├── docker-entrypoint-persistent.sh
│   │   ├── docker-entrypoint-unified.sh
│   │   ├── docker-entrypoint.sh
│   │   ├── Dockerfile
│   │   ├── Dockerfile.glama
│   │   ├── Dockerfile.slim
│   │   ├── README.md
│   │   └── test-docker-modes.sh
│   └── README.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/src/mcp_memory_service/storage/hybrid.py:
--------------------------------------------------------------------------------

```python
   1 | # Copyright 2024 Heinrich Krupp
   2 | #
   3 | # Licensed under the Apache License, Version 2.0 (the "License");
   4 | # you may not use this file except in compliance with the License.
   5 | # You may obtain a copy of the License at
   6 | #
   7 | #     http://www.apache.org/licenses/LICENSE-2.0
   8 | #
   9 | # Unless required by applicable law or agreed to in writing, software
  10 | # distributed under the License is distributed on an "AS IS" BASIS,
  11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12 | # See the License for the specific language governing permissions and
  13 | # limitations under the License.
  14 | 
  15 | """
  16 | Hybrid memory storage backend for MCP Memory Service.
  17 | 
  18 | This implementation provides the best of both worlds:
  19 | - SQLite-vec as primary storage for ultra-fast reads (~5ms)
  20 | - Cloudflare as secondary storage for cloud persistence and multi-device sync
  21 | - Background synchronization service for seamless integration
  22 | - Graceful degradation when cloud services are unavailable
  23 | """
  24 | 
  25 | import asyncio
  26 | import logging
  27 | import time
  28 | from typing import List, Dict, Any, Tuple, Optional
  29 | from collections import deque
  30 | from dataclasses import dataclass
  31 | 
  32 | from .base import MemoryStorage
  33 | from .sqlite_vec import SqliteVecMemoryStorage
  34 | from .cloudflare import CloudflareStorage
  35 | from ..models.memory import Memory, MemoryQueryResult
  36 | 
  37 | # Import SSE for real-time progress updates
  38 | try:
  39 |     from ..web.sse import sse_manager, create_sync_progress_event, create_sync_completed_event
  40 |     SSE_AVAILABLE = True
  41 | except ImportError:
  42 |     SSE_AVAILABLE = False
  43 | 
  44 | # Import config to check if limit constants are available
  45 | from .. import config as app_config
  46 | 
  47 | # Use getattr to provide fallbacks if attributes don't exist (prevents duplicate defaults)
  48 | CLOUDFLARE_D1_MAX_SIZE_GB = getattr(app_config, 'CLOUDFLARE_D1_MAX_SIZE_GB', 10)
  49 | CLOUDFLARE_VECTORIZE_MAX_VECTORS = getattr(app_config, 'CLOUDFLARE_VECTORIZE_MAX_VECTORS', 5_000_000)
  50 | CLOUDFLARE_MAX_METADATA_SIZE_KB = getattr(app_config, 'CLOUDFLARE_MAX_METADATA_SIZE_KB', 10)
  51 | CLOUDFLARE_WARNING_THRESHOLD_PERCENT = getattr(app_config, 'CLOUDFLARE_WARNING_THRESHOLD_PERCENT', 80)
  52 | CLOUDFLARE_CRITICAL_THRESHOLD_PERCENT = getattr(app_config, 'CLOUDFLARE_CRITICAL_THRESHOLD_PERCENT', 95)
  53 | HYBRID_SYNC_ON_STARTUP = getattr(app_config, 'HYBRID_SYNC_ON_STARTUP', True)
  54 | HYBRID_MAX_CONTENT_LENGTH = getattr(app_config, 'HYBRID_MAX_CONTENT_LENGTH', 800)
  55 | HYBRID_MAX_EMPTY_BATCHES = getattr(app_config, 'HYBRID_MAX_EMPTY_BATCHES', 20)
  56 | HYBRID_MIN_CHECK_COUNT = getattr(app_config, 'HYBRID_MIN_CHECK_COUNT', 1000)
  57 | 
  58 | logger = logging.getLogger(__name__)
  59 | 
  60 | @dataclass
  61 | class SyncOperation:
  62 |     """Represents a pending sync operation."""
  63 |     operation: str  # 'store', 'delete', 'update'
  64 |     memory: Optional[Memory] = None
  65 |     content_hash: Optional[str] = None
  66 |     updates: Optional[Dict[str, Any]] = None
  67 |     timestamp: float = None
  68 |     retries: int = 0
  69 |     max_retries: int = 3
  70 | 
  71 |     def __post_init__(self):
  72 |         if self.timestamp is None:
  73 |             self.timestamp = time.time()
  74 | 
  75 | class BackgroundSyncService:
  76 |     """
  77 |     Handles background synchronization between SQLite-vec and Cloudflare.
  78 | 
  79 |     Features:
  80 |     - Asynchronous operation queue
  81 |     - Retry logic with exponential backoff
  82 |     - Health monitoring and error handling
  83 |     - Configurable sync intervals and batch sizes
  84 |     - Graceful degradation when cloud is unavailable
  85 |     """
  86 | 
  87 |     def __init__(self,
  88 |                  primary_storage: SqliteVecMemoryStorage,
  89 |                  secondary_storage: CloudflareStorage,
  90 |                  sync_interval: int = 300,  # 5 minutes
  91 |                  batch_size: int = 50,
  92 |                  max_queue_size: int = 1000):
  93 |         self.primary = primary_storage
  94 |         self.secondary = secondary_storage
  95 |         self.sync_interval = sync_interval
  96 |         self.batch_size = batch_size
  97 |         self.max_queue_size = max_queue_size
  98 | 
  99 |         # Sync queues and state
 100 |         self.operation_queue = asyncio.Queue(maxsize=max_queue_size)
 101 |         self.failed_operations = deque(maxlen=100)  # Keep track of failed operations
 102 |         self.is_running = False
 103 |         self.sync_task = None
 104 |         self.last_sync_time = 0
 105 | 
 106 |         # Drift detection state (v8.25.0+)
 107 |         self.last_drift_check_time = 0
 108 |         self.drift_check_enabled = getattr(app_config, 'HYBRID_SYNC_UPDATES', True)
 109 |         self.drift_check_interval = getattr(app_config, 'HYBRID_DRIFT_CHECK_INTERVAL', 3600)
 110 | 
 111 |         self.sync_stats = {
 112 |             'operations_processed': 0,
 113 |             'operations_failed': 0,
 114 |             'last_sync_duration': 0,
 115 |             'cloudflare_available': True,
 116 |             'last_drift_check': 0,
 117 |             'drift_detected_count': 0,
 118 |             'drift_synced_count': 0
 119 |         }
 120 | 
 121 |         # Health monitoring
 122 |         self.consecutive_failures = 0
 123 |         self.max_consecutive_failures = 5
 124 |         self.backoff_time = 60  # Start with 1 minute backoff
 125 | 
 126 |         # Cloudflare capacity tracking
 127 |         self.cloudflare_stats = {
 128 |             'vector_count': 0,
 129 |             'estimated_d1_size_gb': 0,
 130 |             'last_capacity_check': 0,
 131 |             'approaching_limits': False,
 132 |             'limit_warnings': []
 133 |         }
 134 | 
 135 |     async def start(self):
 136 |         """Start the background sync service."""
 137 |         if self.is_running:
 138 |             logger.warning("Background sync service is already running")
 139 |             return
 140 | 
 141 |         self.is_running = True
 142 |         self.sync_task = asyncio.create_task(self._sync_loop())
 143 |         logger.info(f"Background sync service started with {self.sync_interval}s interval")
 144 | 
 145 |     async def stop(self):
 146 |         """Stop the background sync service and process remaining operations."""
 147 |         if not self.is_running:
 148 |             return
 149 | 
 150 |         self.is_running = False
 151 | 
 152 |         # Process remaining operations in queue
 153 |         remaining_operations = []
 154 |         while not self.operation_queue.empty():
 155 |             try:
 156 |                 operation = self.operation_queue.get_nowait()
 157 |                 remaining_operations.append(operation)
 158 |             except asyncio.QueueEmpty:
 159 |                 break
 160 | 
 161 |         if remaining_operations:
 162 |             logger.info(f"Processing {len(remaining_operations)} remaining operations before shutdown")
 163 |             await self._process_operations_batch(remaining_operations)
 164 | 
 165 |         # Cancel the sync task
 166 |         if self.sync_task:
 167 |             self.sync_task.cancel()
 168 |             try:
 169 |                 await self.sync_task
 170 |             except asyncio.CancelledError:
 171 |                 pass
 172 | 
 173 |         logger.info("Background sync service stopped")
 174 | 
 175 |     async def enqueue_operation(self, operation: SyncOperation):
 176 |         """Enqueue a sync operation for background processing."""
 177 |         try:
 178 |             await self.operation_queue.put(operation)
 179 |             logger.debug(f"Enqueued {operation.operation} operation")
 180 |         except asyncio.QueueFull:
 181 |             # If queue is full, process immediately to avoid blocking
 182 |             logger.warning("Sync queue full, processing operation immediately")
 183 |             await self._process_single_operation(operation)
 184 | 
 185 |     async def force_sync(self) -> Dict[str, Any]:
 186 |         """Force an immediate full synchronization between backends."""
 187 |         logger.info("Starting forced sync between primary and secondary storage")
 188 |         sync_start_time = time.time()
 189 | 
 190 |         try:
 191 |             # Get all memories from primary storage
 192 |             primary_memories = await self.primary.get_all_memories()
 193 | 
 194 |             # Check Cloudflare availability
 195 |             try:
 196 |                 await self.secondary.get_stats()  # Simple health check
 197 |                 cloudflare_available = True
 198 |             except Exception as e:
 199 |                 logger.warning(f"Cloudflare not available during force sync: {e}")
 200 |                 cloudflare_available = False
 201 |                 self.sync_stats['cloudflare_available'] = False
 202 |                 return {
 203 |                     'status': 'partial',
 204 |                     'cloudflare_available': False,
 205 |                     'primary_memories': len(primary_memories),
 206 |                     'synced_to_secondary': 0,
 207 |                     'duration': time.time() - sync_start_time
 208 |                 }
 209 | 
 210 |             # Sync from primary to secondary using concurrent operations
 211 |             async def sync_memory(memory):
 212 |                 try:
 213 |                     success, message = await self.secondary.store(memory)
 214 |                     if success:
 215 |                         return True, None
 216 |                     else:
 217 |                         logger.debug(f"Failed to sync memory to secondary: {message}")
 218 |                         return False, message
 219 |                 except Exception as e:
 220 |                     logger.debug(f"Exception syncing memory to secondary: {e}")
 221 |                     return False, str(e)
 222 | 
 223 |             # Process memories concurrently in batches
 224 |             synced_count = 0
 225 |             failed_count = 0
 226 | 
 227 |             # Process in batches to avoid overwhelming the system
 228 |             batch_size = min(self.batch_size, 10)  # Limit concurrent operations
 229 |             for i in range(0, len(primary_memories), batch_size):
 230 |                 batch = primary_memories[i:i + batch_size]
 231 |                 results = await asyncio.gather(*[sync_memory(m) for m in batch], return_exceptions=True)
 232 | 
 233 |                 for result in results:
 234 |                     if isinstance(result, Exception):
 235 |                         failed_count += 1
 236 |                         logger.debug(f"Exception in batch sync: {result}")
 237 |                     elif isinstance(result, tuple):
 238 |                         success, _ = result
 239 |                         if success:
 240 |                             synced_count += 1
 241 |                         else:
 242 |                             failed_count += 1
 243 | 
 244 |             sync_duration = time.time() - sync_start_time
 245 |             self.sync_stats['last_sync_duration'] = sync_duration
 246 |             self.sync_stats['cloudflare_available'] = cloudflare_available
 247 | 
 248 |             logger.info(f"Force sync completed: {synced_count} synced, {failed_count} failed in {sync_duration:.2f}s")
 249 | 
 250 |             return {
 251 |                 'status': 'completed',
 252 |                 'cloudflare_available': cloudflare_available,
 253 |                 'primary_memories': len(primary_memories),
 254 |                 'synced_to_secondary': synced_count,
 255 |                 'failed_operations': failed_count,
 256 |                 'duration': sync_duration
 257 |             }
 258 | 
 259 |         except Exception as e:
 260 |             logger.error(f"Error during force sync: {e}")
 261 |             return {
 262 |                 'status': 'error',
 263 |                 'error': str(e),
 264 |                 'duration': time.time() - sync_start_time
 265 |             }
 266 | 
 267 |     async def get_sync_status(self) -> Dict[str, Any]:
 268 |         """Get current sync service status and statistics."""
 269 |         queue_size = self.operation_queue.qsize()
 270 | 
 271 |         status = {
 272 |             'is_running': self.is_running,
 273 |             'is_paused': not self.is_running,
 274 |             'pending_operations': queue_size,
 275 |             'failed_operations': len(self.failed_operations),
 276 |             'last_sync_time': self.last_sync_time,
 277 |             'consecutive_failures': self.consecutive_failures,
 278 |             'stats': self.sync_stats.copy(),
 279 |             'operations_processed': self.sync_stats.get('operations_processed', 0),
 280 |             'operations_failed': self.sync_stats.get('operations_failed', 0),
 281 |             'cloudflare_available': self.sync_stats['cloudflare_available'],
 282 |             'sync_interval': self.sync_interval,
 283 |             'next_sync_in': max(0, self.sync_interval - (time.time() - self.last_sync_time)),
 284 |             'capacity': {
 285 |                 'vector_count': self.cloudflare_stats['vector_count'],
 286 |                 'vector_limit': CLOUDFLARE_VECTORIZE_MAX_VECTORS,
 287 |                 'approaching_limits': self.cloudflare_stats['approaching_limits'],
 288 |                 'warnings': self.cloudflare_stats['limit_warnings']
 289 |             }
 290 |         }
 291 | 
 292 |         return status
 293 | 
 294 |     async def validate_memory_for_cloudflare(self, memory: Memory) -> Tuple[bool, Optional[str]]:
 295 |         """
 296 |         Validate if a memory can be synced to Cloudflare.
 297 | 
 298 |         Returns:
 299 |             Tuple of (is_valid, error_message)
 300 |         """
 301 |         # Check metadata size
 302 |         if memory.metadata:
 303 |             import json
 304 |             metadata_json = json.dumps(memory.metadata)
 305 |             metadata_size_kb = len(metadata_json.encode('utf-8')) / 1024
 306 | 
 307 |             if metadata_size_kb > CLOUDFLARE_MAX_METADATA_SIZE_KB:
 308 |                 return False, f"Metadata size {metadata_size_kb:.2f}KB exceeds Cloudflare limit of {CLOUDFLARE_MAX_METADATA_SIZE_KB}KB"
 309 | 
 310 |         # Check if we're approaching vector count limit
 311 |         if self.cloudflare_stats['vector_count'] >= CLOUDFLARE_VECTORIZE_MAX_VECTORS:
 312 |             return False, f"Cloudflare vector limit of {CLOUDFLARE_VECTORIZE_MAX_VECTORS} reached"
 313 | 
 314 |         return True, None
 315 | 
 316 |     async def check_cloudflare_capacity(self) -> Dict[str, Any]:
 317 |         """
 318 |         Check remaining Cloudflare capacity and return status.
 319 |         """
 320 |         try:
 321 |             # Get current stats from Cloudflare
 322 |             cf_stats = await self.secondary.get_stats()
 323 | 
 324 |             # Update our tracking
 325 |             self.cloudflare_stats['vector_count'] = cf_stats.get('total_memories', 0)
 326 |             self.cloudflare_stats['last_capacity_check'] = time.time()
 327 | 
 328 |             # Calculate usage percentages
 329 |             vector_usage_percent = (self.cloudflare_stats['vector_count'] / CLOUDFLARE_VECTORIZE_MAX_VECTORS) * 100
 330 | 
 331 |             # Clear previous warnings
 332 |             self.cloudflare_stats['limit_warnings'] = []
 333 | 
 334 |             # Check vector count limits
 335 |             if vector_usage_percent >= CLOUDFLARE_CRITICAL_THRESHOLD_PERCENT:
 336 |                 warning = f"CRITICAL: Vector usage at {vector_usage_percent:.1f}% ({self.cloudflare_stats['vector_count']:,}/{CLOUDFLARE_VECTORIZE_MAX_VECTORS:,})"
 337 |                 self.cloudflare_stats['limit_warnings'].append(warning)
 338 |                 logger.error(warning)
 339 |                 self.cloudflare_stats['approaching_limits'] = True
 340 |             elif vector_usage_percent >= CLOUDFLARE_WARNING_THRESHOLD_PERCENT:
 341 |                 warning = f"WARNING: Vector usage at {vector_usage_percent:.1f}% ({self.cloudflare_stats['vector_count']:,}/{CLOUDFLARE_VECTORIZE_MAX_VECTORS:,})"
 342 |                 self.cloudflare_stats['limit_warnings'].append(warning)
 343 |                 logger.warning(warning)
 344 |                 self.cloudflare_stats['approaching_limits'] = True
 345 |             else:
 346 |                 self.cloudflare_stats['approaching_limits'] = False
 347 | 
 348 |             return {
 349 |                 'vector_count': self.cloudflare_stats['vector_count'],
 350 |                 'vector_limit': CLOUDFLARE_VECTORIZE_MAX_VECTORS,
 351 |                 'vector_usage_percent': vector_usage_percent,
 352 |                 'approaching_limits': self.cloudflare_stats['approaching_limits'],
 353 |                 'warnings': self.cloudflare_stats['limit_warnings']
 354 |             }
 355 | 
 356 |         except Exception as e:
 357 |             logger.error(f"Failed to check Cloudflare capacity: {e}")
 358 |             return {
 359 |                 'error': str(e),
 360 |                 'approaching_limits': False
 361 |             }
 362 | 
 363 |     async def _sync_loop(self):
 364 |         """Main background sync loop."""
 365 |         logger.info("Background sync loop started")
 366 | 
 367 |         while self.is_running:
 368 |             try:
 369 |                 # Process queued operations
 370 |                 await self._process_operation_queue()
 371 | 
 372 |                 # Periodic full sync if enough time has passed
 373 |                 current_time = time.time()
 374 |                 if current_time - self.last_sync_time >= self.sync_interval:
 375 |                     await self._periodic_sync()
 376 |                     self.last_sync_time = current_time
 377 | 
 378 |                 # Sleep before next iteration
 379 |                 await asyncio.sleep(5)  # Check every 5 seconds
 380 | 
 381 |             except Exception as e:
 382 |                 logger.error(f"Error in sync loop: {e}")
 383 |                 self.consecutive_failures += 1
 384 | 
 385 |                 if self.consecutive_failures >= self.max_consecutive_failures:
 386 |                     logger.warning(f"Too many consecutive sync failures ({self.consecutive_failures}), backing off for {self.backoff_time}s")
 387 |                     await asyncio.sleep(self.backoff_time)
 388 |                     self.backoff_time = min(self.backoff_time * 2, 1800)  # Max 30 minutes
 389 |                 else:
 390 |                     await asyncio.sleep(1)
 391 | 
 392 |     async def _process_operation_queue(self):
 393 |         """Process operations from the queue in batches."""
 394 |         operations = []
 395 | 
 396 |         # Collect up to batch_size operations
 397 |         for _ in range(self.batch_size):
 398 |             try:
 399 |                 operation = self.operation_queue.get_nowait()
 400 |                 operations.append(operation)
 401 |             except asyncio.QueueEmpty:
 402 |                 break
 403 | 
 404 |         if operations:
 405 |             await self._process_operations_batch(operations)
 406 | 
 407 |     async def _process_operations_batch(self, operations: List[SyncOperation]):
 408 |         """Process a batch of sync operations."""
 409 |         logger.debug(f"Processing batch of {len(operations)} sync operations")
 410 | 
 411 |         for operation in operations:
 412 |             try:
 413 |                 await self._process_single_operation(operation)
 414 |                 self.sync_stats['operations_processed'] += 1
 415 | 
 416 |             except Exception as e:
 417 |                 await self._handle_sync_error(e, operation)
 418 | 
 419 |     async def _handle_sync_error(self, error: Exception, operation: SyncOperation):
 420 |         """
 421 |         Handle sync operation errors with intelligent retry logic.
 422 | 
 423 |         Args:
 424 |             error: The exception that occurred
 425 |             operation: The failed operation
 426 |         """
 427 |         error_str = str(error).lower()
 428 | 
 429 |         # Check for specific Cloudflare limit errors
 430 |         is_limit_error = any(term in error_str for term in [
 431 |             'limit exceeded', 'quota exceeded', 'maximum', 'too large',
 432 |             '413', '507', 'insufficient storage', 'capacity'
 433 |         ])
 434 | 
 435 |         if is_limit_error:
 436 |             # Don't retry limit errors - they won't succeed
 437 |             logger.error(f"Cloudflare limit error for {operation.operation}: {error}")
 438 |             self.sync_stats['operations_failed'] += 1
 439 | 
 440 |             # Update capacity tracking
 441 |             self.cloudflare_stats['approaching_limits'] = True
 442 |             self.cloudflare_stats['limit_warnings'].append(f"Limit error: {error}")
 443 | 
 444 |             # Check capacity to understand the issue
 445 |             await self.check_cloudflare_capacity()
 446 |             return
 447 | 
 448 |         # Check for temporary/network errors
 449 |         is_temporary_error = any(term in error_str for term in [
 450 |             'timeout', 'connection', 'network', '500', '502', '503', '504',
 451 |             'temporarily unavailable', 'retry'
 452 |         ])
 453 | 
 454 |         if is_temporary_error or operation.retries < operation.max_retries:
 455 |             # Retry temporary errors
 456 |             logger.warning(f"Temporary error for {operation.operation} (retry {operation.retries + 1}/{operation.max_retries}): {error}")
 457 |             operation.retries += 1
 458 | 
 459 |             if operation.retries < operation.max_retries:
 460 |                 # Add back to queue for retry with exponential backoff
 461 |                 await asyncio.sleep(min(2 ** operation.retries, 60))  # Max 60 second delay
 462 |                 self.failed_operations.append(operation)
 463 |             else:
 464 |                 logger.error(f"Max retries reached for {operation.operation}")
 465 |                 self.sync_stats['operations_failed'] += 1
 466 |         else:
 467 |             # Permanent error - don't retry
 468 |             logger.error(f"Permanent error for {operation.operation}: {error}")
 469 |             self.sync_stats['operations_failed'] += 1
 470 | 
 471 |     async def _process_single_operation(self, operation: SyncOperation):
 472 |         """Process a single sync operation to secondary storage."""
 473 |         try:
 474 |             if operation.operation == 'store' and operation.memory:
 475 |                 # Validate memory before syncing
 476 |                 is_valid, validation_error = await self.validate_memory_for_cloudflare(operation.memory)
 477 |                 if not is_valid:
 478 |                     logger.warning(f"Memory validation failed for sync: {validation_error}")
 479 |                     # Don't retry if it's a hard limit
 480 |                     if "exceeds Cloudflare limit" in validation_error or "limit of" in validation_error:
 481 |                         self.sync_stats['operations_failed'] += 1
 482 |                         return  # Skip this memory permanently
 483 |                     raise Exception(validation_error)
 484 | 
 485 |                 success, message = await self.secondary.store(operation.memory)
 486 |                 if not success:
 487 |                     raise Exception(f"Store operation failed: {message}")
 488 | 
 489 |             elif operation.operation == 'delete' and operation.content_hash:
 490 |                 success, message = await self.secondary.delete(operation.content_hash)
 491 |                 if not success:
 492 |                     raise Exception(f"Delete operation failed: {message}")
 493 | 
 494 |             elif operation.operation == 'update' and operation.content_hash and operation.updates:
 495 |                 success, message = await self.secondary.update_memory_metadata(
 496 |                     operation.content_hash, operation.updates
 497 |                 )
 498 |                 if not success:
 499 |                     raise Exception(f"Update operation failed: {message}")
 500 | 
 501 |             # Reset failure counters on success
 502 |             self.consecutive_failures = 0
 503 |             self.backoff_time = 60
 504 |             self.sync_stats['cloudflare_available'] = True
 505 | 
 506 |         except Exception as e:
 507 |             # Mark Cloudflare as potentially unavailable
 508 |             self.sync_stats['cloudflare_available'] = False
 509 |             raise
 510 | 
 511 |     async def _periodic_sync(self):
 512 |         """Perform periodic full synchronization."""
 513 |         logger.debug("Starting periodic sync")
 514 | 
 515 |         try:
 516 |             # Retry any failed operations first
 517 |             if self.failed_operations:
 518 |                 retry_operations = list(self.failed_operations)
 519 |                 self.failed_operations.clear()
 520 |                 logger.info(f"Retrying {len(retry_operations)} failed operations")
 521 |                 await self._process_operations_batch(retry_operations)
 522 | 
 523 |             # Perform a lightweight health check
 524 |             try:
 525 |                 stats = await self.secondary.get_stats()
 526 |                 logger.debug(f"Secondary storage health check passed: {stats}")
 527 |                 self.sync_stats['cloudflare_available'] = True
 528 | 
 529 |                 # Check Cloudflare capacity every periodic sync
 530 |                 capacity_status = await self.check_cloudflare_capacity()
 531 |                 if capacity_status.get('approaching_limits'):
 532 |                     logger.warning("Cloudflare approaching capacity limits")
 533 |                     for warning in capacity_status.get('warnings', []):
 534 |                         logger.warning(warning)
 535 | 
 536 |                 # Periodic drift detection (v8.25.0+)
 537 |                 if self.drift_check_enabled:
 538 |                     time_since_last_check = time.time() - self.last_drift_check_time
 539 |                     if time_since_last_check >= self.drift_check_interval:
 540 |                         logger.info(f"Running periodic drift check (interval: {self.drift_check_interval}s)")
 541 |                         drift_stats = await self._detect_and_sync_drift()
 542 |                         logger.info(f"Drift check complete: {drift_stats}")
 543 | 
 544 |             except Exception as e:
 545 |                 logger.warning(f"Secondary storage health check failed: {e}")
 546 |                 self.sync_stats['cloudflare_available'] = False
 547 | 
 548 |         except Exception as e:
 549 |             logger.error(f"Error during periodic sync: {e}")
 550 | 
 551 |     async def _detect_and_sync_drift(self, dry_run: bool = False) -> Dict[str, int]:
 552 |         """
 553 |         Detect and sync memories with divergent metadata between backends.
 554 | 
 555 |         Compares updated_at timestamps to identify metadata drift (tags, types, custom fields)
 556 |         and synchronizes changes using "newer timestamp wins" strategy.
 557 | 
 558 |         Args:
 559 |             dry_run: If True, detect drift but don't apply changes (preview mode)
 560 | 
 561 |         Returns:
 562 |             Dictionary with drift detection statistics:
 563 |             - checked: Number of memories examined
 564 |             - drift_detected: Number of memories with divergent metadata
 565 |             - synced: Number of memories synchronized
 566 |             - failed: Number of sync failures
 567 |         """
 568 |         if not self.drift_check_enabled:
 569 |             return {'checked': 0, 'drift_detected': 0, 'synced': 0, 'failed': 0}
 570 | 
 571 |         logger.info(f"Starting drift detection scan (dry_run={dry_run})...")
 572 |         stats = {'checked': 0, 'drift_detected': 0, 'synced': 0, 'failed': 0}
 573 | 
 574 |         try:
 575 |             # Get batch of recently updated memories from Cloudflare
 576 |             batch_size = getattr(app_config, 'HYBRID_DRIFT_BATCH_SIZE', 100)
 577 | 
 578 |             # Strategy: Check memories updated since last drift check
 579 |             time_threshold = self.last_drift_check_time or (time.time() - self.drift_check_interval)
 580 | 
 581 |             # Get recently updated from Cloudflare
 582 |             if hasattr(self.secondary, 'get_memories_updated_since'):
 583 |                 cf_updated = await self.secondary.get_memories_updated_since(
 584 |                     time_threshold,
 585 |                     limit=batch_size
 586 |                 )
 587 |             else:
 588 |                 # Fallback: Get recent memories and filter by timestamp
 589 |                 cf_memories = await self.secondary.get_all_memories(limit=batch_size)
 590 |                 cf_updated = [m for m in cf_memories if m.updated_at and m.updated_at >= time_threshold]
 591 | 
 592 |             logger.info(f"Found {len(cf_updated)} memories updated in Cloudflare since last check")
 593 | 
 594 |             # Compare with local versions
 595 |             for cf_memory in cf_updated:
 596 |                 stats['checked'] += 1
 597 |                 try:
 598 |                     local_memory = await self.primary.get_by_hash(cf_memory.content_hash)
 599 | 
 600 |                     if not local_memory:
 601 |                         # Memory missing locally - sync it
 602 |                         stats['drift_detected'] += 1
 603 |                         logger.debug(f"Memory {cf_memory.content_hash[:8]} missing locally, syncing...")
 604 |                         if not dry_run:
 605 |                             success, _ = await self.primary.store(cf_memory)
 606 |                             if success:
 607 |                                 stats['synced'] += 1
 608 |                         else:
 609 |                             logger.info(f"[DRY RUN] Would sync missing memory: {cf_memory.content_hash[:8]}")
 610 |                             stats['synced'] += 1
 611 |                         continue
 612 | 
 613 |                     # Compare updated_at timestamps
 614 |                     cf_updated_at = cf_memory.updated_at or 0
 615 |                     local_updated_at = local_memory.updated_at or 0
 616 | 
 617 |                     # Allow 1 second tolerance for timestamp precision
 618 |                     if abs(cf_updated_at - local_updated_at) > 1.0:
 619 |                         stats['drift_detected'] += 1
 620 |                         logger.debug(
 621 |                             f"Drift detected for {cf_memory.content_hash[:8]}: "
 622 |                             f"Cloudflare={cf_updated_at:.2f}, Local={local_updated_at:.2f}"
 623 |                         )
 624 | 
 625 |                         # Use "newer timestamp wins" strategy
 626 |                         if cf_updated_at > local_updated_at:
 627 |                             # Cloudflare is newer - update local
 628 |                             if not dry_run:
 629 |                                 success, _ = await self.primary.update_memory_metadata(
 630 |                                     cf_memory.content_hash,
 631 |                                     {
 632 |                                         'tags': cf_memory.tags,
 633 |                                         'memory_type': cf_memory.memory_type,
 634 |                                         'metadata': cf_memory.metadata,
 635 |                                         'created_at': cf_memory.created_at,
 636 |                                         'created_at_iso': cf_memory.created_at_iso,
 637 |                                         'updated_at': cf_memory.updated_at,
 638 |                                         'updated_at_iso': cf_memory.updated_at_iso,
 639 |                                     },
 640 |                                     preserve_timestamps=False  # Use Cloudflare timestamps
 641 |                                 )
 642 |                                 if success:
 643 |                                     stats['synced'] += 1
 644 |                                     logger.info(f"Synced metadata from Cloudflare → local: {cf_memory.content_hash[:8]}")
 645 |                                 else:
 646 |                                     stats['failed'] += 1
 647 |                             else:
 648 |                                 logger.info(f"[DRY RUN] Would sync metadata from Cloudflare → local: {cf_memory.content_hash[:8]}")
 649 |                                 stats['synced'] += 1
 650 |                         else:
 651 |                             # Local is newer - update Cloudflare
 652 |                             if not dry_run:
 653 |                                 operation = SyncOperation(
 654 |                                     operation='update',
 655 |                                     content_hash=local_memory.content_hash,
 656 |                                     updates={
 657 |                                         'tags': local_memory.tags,
 658 |                                         'memory_type': local_memory.memory_type,
 659 |                                         'metadata': local_memory.metadata,
 660 |                                     }
 661 |                                 )
 662 |                                 await self.enqueue_operation(operation)
 663 |                                 stats['synced'] += 1
 664 |                                 logger.info(f"Queued metadata sync from local → Cloudflare: {local_memory.content_hash[:8]}")
 665 |                             else:
 666 |                                 logger.info(f"[DRY RUN] Would queue metadata sync from local → Cloudflare: {local_memory.content_hash[:8]}")
 667 |                                 stats['synced'] += 1
 668 | 
 669 |                 except Exception as e:
 670 |                     logger.warning(f"Error checking drift for memory: {e}")
 671 |                     stats['failed'] += 1
 672 |                     continue
 673 | 
 674 |             # Update tracking
 675 |             if not dry_run:
 676 |                 self.last_drift_check_time = time.time()
 677 |                 self.sync_stats['last_drift_check'] = self.last_drift_check_time
 678 |                 self.sync_stats['drift_detected_count'] += stats['drift_detected']
 679 |                 self.sync_stats['drift_synced_count'] += stats['synced']
 680 | 
 681 |             logger.info(
 682 |                 f"Drift detection complete: checked={stats['checked']}, "
 683 |                 f"drift_detected={stats['drift_detected']}, synced={stats['synced']}, failed={stats['failed']}"
 684 |             )
 685 | 
 686 |         except Exception as e:
 687 |             logger.error(f"Error during drift detection: {e}")
 688 | 
 689 |         return stats
 690 | 
 691 | 
 692 | class HybridMemoryStorage(MemoryStorage):
 693 |     """
 694 |     Hybrid memory storage using SQLite-vec as primary and Cloudflare as secondary.
 695 | 
 696 |     This implementation provides:
 697 |     - Ultra-fast reads and writes (~5ms) via SQLite-vec
 698 |     - Cloud persistence and multi-device sync via Cloudflare
 699 |     - Background synchronization with retry logic
 700 |     - Graceful degradation when cloud services are unavailable
 701 |     - Full compatibility with the MemoryStorage interface
 702 |     """
 703 | 
 704 |     @property
 705 |     def max_content_length(self) -> Optional[int]:
 706 |         """
 707 |         Maximum content length constrained by Cloudflare secondary storage.
 708 |         Uses configured hybrid limit (defaults to Cloudflare limit).
 709 |         """
 710 |         return HYBRID_MAX_CONTENT_LENGTH
 711 | 
 712 |     @property
 713 |     def supports_chunking(self) -> bool:
 714 |         """Hybrid backend supports content chunking with metadata linking."""
 715 |         return True
 716 | 
 717 |     def __init__(self,
 718 |                  sqlite_db_path: str,
 719 |                  embedding_model: str = "all-MiniLM-L6-v2",
 720 |                  cloudflare_config: Dict[str, Any] = None,
 721 |                  sync_interval: int = 300,
 722 |                  batch_size: int = 50):
 723 |         """
 724 |         Initialize hybrid storage with primary SQLite-vec and secondary Cloudflare.
 725 | 
 726 |         Args:
 727 |             sqlite_db_path: Path to SQLite-vec database file
 728 |             embedding_model: Embedding model name for SQLite-vec
 729 |             cloudflare_config: Cloudflare configuration dict
 730 |             sync_interval: Background sync interval in seconds (default: 5 minutes)
 731 |             batch_size: Batch size for sync operations (default: 50)
 732 |         """
 733 |         self.primary = SqliteVecMemoryStorage(
 734 |             db_path=sqlite_db_path,
 735 |             embedding_model=embedding_model
 736 |         )
 737 | 
 738 |         # Initialize Cloudflare storage if config provided
 739 |         self.secondary = None
 740 |         self.sync_service = None
 741 | 
 742 |         if cloudflare_config and all(key in cloudflare_config for key in
 743 |                                     ['api_token', 'account_id', 'vectorize_index', 'd1_database_id']):
 744 |             self.secondary = CloudflareStorage(**cloudflare_config)
 745 |         else:
 746 |             logger.warning("Cloudflare config incomplete, running in SQLite-only mode")
 747 | 
 748 |         self.sync_interval = sync_interval
 749 |         self.batch_size = batch_size
 750 |         self.initialized = False
 751 | 
 752 |         # Initial sync status tracking
 753 |         self.initial_sync_in_progress = False
 754 |         self.initial_sync_total = 0
 755 |         self.initial_sync_completed = 0
 756 |         self.initial_sync_finished = False
 757 | 
 758 |     async def initialize(self) -> None:
 759 |         """Initialize the hybrid storage system."""
 760 |         logger.info("Initializing hybrid memory storage...")
 761 | 
 762 |         # Always initialize primary storage
 763 |         await self.primary.initialize()
 764 |         logger.info("Primary storage (SQLite-vec) initialized")
 765 | 
 766 |         # Initialize secondary storage and sync service if available
 767 |         if self.secondary:
 768 |             try:
 769 |                 await self.secondary.initialize()
 770 |                 logger.info("Secondary storage (Cloudflare) initialized")
 771 | 
 772 |                 # Start background sync service
 773 |                 self.sync_service = BackgroundSyncService(
 774 |                     self.primary,
 775 |                     self.secondary,
 776 |                     sync_interval=self.sync_interval,
 777 |                     batch_size=self.batch_size
 778 |                 )
 779 |                 await self.sync_service.start()
 780 |                 logger.info("Background sync service started")
 781 | 
 782 |                 # Schedule initial sync to run after server startup (non-blocking)
 783 |                 if HYBRID_SYNC_ON_STARTUP:
 784 |                     asyncio.create_task(self._perform_initial_sync_after_startup())
 785 |                     logger.info("Initial sync scheduled to run after server startup")
 786 | 
 787 |             except Exception as e:
 788 |                 logger.warning(f"Failed to initialize secondary storage: {e}")
 789 |                 self.secondary = None
 790 | 
 791 |         self.initialized = True
 792 |         logger.info("Hybrid memory storage initialization completed")
 793 | 
 794 |     async def _perform_initial_sync_after_startup(self) -> None:
 795 |         """
 796 |         Wrapper for initial sync that waits for server startup to complete.
 797 |         This allows the web server to be accessible during the sync process.
 798 |         """
 799 |         # Wait a bit for server to fully start up
 800 |         await asyncio.sleep(2)
 801 |         logger.info("Starting initial sync in background (server is now accessible)")
 802 |         await self._perform_initial_sync()
 803 | 
 804 |     async def _sync_memories_from_cloudflare(
 805 |         self,
 806 |         sync_type: str = "initial",
 807 |         broadcast_sse: bool = True,
 808 |         enable_drift_check: bool = True
 809 |     ) -> Dict[str, Any]:
 810 |         """
 811 |         Shared logic for syncing memories FROM Cloudflare TO local storage.
 812 | 
 813 |         Args:
 814 |             sync_type: Type of sync ("initial" or "manual") for logging/SSE events
 815 |             broadcast_sse: Whether to broadcast SSE progress events
 816 |             enable_drift_check: Whether to check for metadata drift (only for initial sync)
 817 | 
 818 |         Returns:
 819 |             Dict with:
 820 |             - success: bool
 821 |             - memories_synced: int
 822 |             - total_checked: int
 823 |             - message: str
 824 |             - time_taken_seconds: float
 825 |         """
 826 |         import time
 827 |         sync_start_time = time.time()
 828 | 
 829 |         try:
 830 |             # Get memory count from both storages to compare
 831 |             primary_stats = await self.primary.get_stats()
 832 |             secondary_stats = await self.secondary.get_stats()
 833 | 
 834 |             primary_count = primary_stats.get('total_memories', 0)
 835 |             secondary_count = secondary_stats.get('total_memories', 0)
 836 | 
 837 |             logger.info(f"{sync_type.capitalize()} sync: Local={primary_count}, Cloudflare={secondary_count}")
 838 | 
 839 |             if secondary_count <= primary_count:
 840 |                 logger.info(f"No new memories to sync from Cloudflare ({sync_type} sync)")
 841 |                 return {
 842 |                     'success': True,
 843 |                     'memories_synced': 0,
 844 |                     'total_checked': 0,
 845 |                     'message': 'No new memories to pull from Cloudflare',
 846 |                     'time_taken_seconds': round(time.time() - sync_start_time, 3)
 847 |                 }
 848 | 
 849 |             # Pull missing memories from Cloudflare using optimized batch processing
 850 |             missing_count = secondary_count - primary_count
 851 |             synced_count = 0
 852 |             batch_size = min(500, self.batch_size * 5)  # 5x larger batches for sync
 853 |             cursor = None
 854 |             processed_count = 0
 855 |             consecutive_empty_batches = 0
 856 | 
 857 |             # Get all local hashes once for O(1) lookup
 858 |             local_hashes = await self.primary.get_all_content_hashes()
 859 |             logger.info(f"Pulling {missing_count} potential memories from Cloudflare...")
 860 | 
 861 |             while True:
 862 |                 try:
 863 |                     # Get batch from Cloudflare using cursor-based pagination
 864 |                     logger.debug(f"Fetching batch: cursor={cursor}, batch_size={batch_size}")
 865 | 
 866 |                     if hasattr(self.secondary, 'get_all_memories_cursor'):
 867 |                         cloudflare_memories = await self.secondary.get_all_memories_cursor(
 868 |                             limit=batch_size,
 869 |                             cursor=cursor
 870 |                         )
 871 |                     else:
 872 |                         cloudflare_memories = await self.secondary.get_all_memories(
 873 |                             limit=batch_size,
 874 |                             offset=processed_count
 875 |                         )
 876 | 
 877 |                     if not cloudflare_memories:
 878 |                         logger.debug(f"No more memories from Cloudflare at cursor {cursor}")
 879 |                         break
 880 | 
 881 |                     logger.debug(f"Processing batch of {len(cloudflare_memories)} memories")
 882 |                     batch_checked = 0
 883 |                     batch_missing = 0
 884 |                     batch_synced = 0
 885 | 
 886 |                     # Parallel processing with concurrency limit
 887 |                     semaphore = asyncio.Semaphore(15)
 888 | 
 889 |                     async def sync_single_memory(cf_memory):
 890 |                         """Process a single memory with concurrency control."""
 891 |                         nonlocal batch_checked, batch_missing, batch_synced, synced_count, processed_count
 892 | 
 893 |                         async with semaphore:
 894 |                             batch_checked += 1
 895 |                             processed_count += 1
 896 |                             try:
 897 |                                 # Fast O(1) existence check
 898 |                                 if cf_memory.content_hash not in local_hashes:
 899 |                                     batch_missing += 1
 900 |                                     # Memory doesn't exist locally, sync it
 901 |                                     success, message = await self.primary.store(cf_memory)
 902 |                                     if success:
 903 |                                         batch_synced += 1
 904 |                                         synced_count += 1
 905 |                                         local_hashes.add(cf_memory.content_hash)  # Update cache
 906 | 
 907 |                                         if sync_type == "initial":
 908 |                                             self.initial_sync_completed = synced_count
 909 | 
 910 |                                         if synced_count % 10 == 0:
 911 |                                             logger.info(f"{sync_type.capitalize()} sync progress: {synced_count}/{missing_count} memories synced")
 912 | 
 913 |                                             # Broadcast SSE progress event
 914 |                                             if broadcast_sse and SSE_AVAILABLE:
 915 |                                                 try:
 916 |                                                     progress_event = create_sync_progress_event(
 917 |                                                         synced_count=synced_count,
 918 |                                                         total_count=missing_count,
 919 |                                                         sync_type=sync_type
 920 |                                                     )
 921 |                                                     await sse_manager.broadcast_event(progress_event)
 922 |                                                 except Exception as e:
 923 |                                                     logger.debug(f"Failed to broadcast SSE progress: {e}")
 924 | 
 925 |                                         return ('synced', cf_memory.content_hash)
 926 |                                     else:
 927 |                                         logger.warning(f"Failed to sync memory {cf_memory.content_hash}: {message}")
 928 |                                         return ('failed', cf_memory.content_hash, message)
 929 |                                 elif enable_drift_check and self.sync_service and self.sync_service.drift_check_enabled:
 930 |                                     # Memory exists - check for metadata drift
 931 |                                     existing = await self.primary.get_by_hash(cf_memory.content_hash)
 932 |                                     if existing:
 933 |                                         cf_updated = cf_memory.updated_at or 0
 934 |                                         local_updated = existing.updated_at or 0
 935 | 
 936 |                                         # If Cloudflare version is newer, sync metadata
 937 |                                         if cf_updated > local_updated + 1.0:
 938 |                                             logger.debug(f"Metadata drift detected: {cf_memory.content_hash[:8]}")
 939 |                                             success, _ = await self.primary.update_memory_metadata(
 940 |                                                 cf_memory.content_hash,
 941 |                                                 {
 942 |                                                     'tags': cf_memory.tags,
 943 |                                                     'memory_type': cf_memory.memory_type,
 944 |                                                     'metadata': cf_memory.metadata,
 945 |                                                     'created_at': cf_memory.created_at,
 946 |                                                     'created_at_iso': cf_memory.created_at_iso,
 947 |                                                     'updated_at': cf_memory.updated_at,
 948 |                                                     'updated_at_iso': cf_memory.updated_at_iso,
 949 |                                                 },
 950 |                                                 preserve_timestamps=False
 951 |                                             )
 952 |                                             if success:
 953 |                                                 batch_synced += 1
 954 |                                                 synced_count += 1
 955 |                                                 logger.debug(f"Synced metadata for: {cf_memory.content_hash[:8]}")
 956 |                                                 return ('drift_synced', cf_memory.content_hash)
 957 |                                 return ('skipped', cf_memory.content_hash)
 958 |                             except Exception as e:
 959 |                                 logger.warning(f"Error syncing memory {cf_memory.content_hash}: {e}")
 960 |                                 return ('error', cf_memory.content_hash, str(e))
 961 | 
 962 |                     # Process batch in parallel
 963 |                     tasks = [sync_single_memory(mem) for mem in cloudflare_memories]
 964 |                     results = await asyncio.gather(*tasks, return_exceptions=True)
 965 |                     for result in results:
 966 |                         if isinstance(result, Exception):
 967 |                             logger.error(f"Error during {sync_type} sync batch processing: {result}")
 968 | 
 969 |                     logger.debug(f"Batch complete: checked={batch_checked}, missing={batch_missing}, synced={batch_synced}")
 970 | 
 971 |                     # Track consecutive empty batches
 972 |                     if batch_synced == 0:
 973 |                         consecutive_empty_batches += 1
 974 |                         logger.debug(f"Empty batch: consecutive={consecutive_empty_batches}/{HYBRID_MAX_EMPTY_BATCHES}")
 975 |                     else:
 976 |                         consecutive_empty_batches = 0
 977 | 
 978 |                     # Log progress summary
 979 |                     if processed_count > 0 and processed_count % 100 == 0:
 980 |                         logger.info(f"Sync progress: processed={processed_count}, synced={synced_count}/{missing_count}")
 981 | 
 982 |                     # Update cursor for next batch
 983 |                     if cloudflare_memories and hasattr(self.secondary, 'get_all_memories_cursor'):
 984 |                         cursor = min(memory.created_at for memory in cloudflare_memories if memory.created_at)
 985 |                         logger.debug(f"Next cursor: {cursor}")
 986 | 
 987 |                     # Early break conditions
 988 |                     if consecutive_empty_batches >= HYBRID_MAX_EMPTY_BATCHES and synced_count > 0:
 989 |                         logger.info(f"Completed after {consecutive_empty_batches} empty batches - {synced_count}/{missing_count} synced")
 990 |                         break
 991 |                     elif processed_count >= HYBRID_MIN_CHECK_COUNT and synced_count == 0:
 992 |                         logger.info(f"No missing memories after checking {processed_count} memories")
 993 |                         break
 994 | 
 995 |                     await asyncio.sleep(0.01)
 996 | 
 997 |                 except Exception as e:
 998 |                     # Handle Cloudflare D1 errors
 999 |                     if "400" in str(e) and not hasattr(self.secondary, 'get_all_memories_cursor'):
1000 |                         logger.error(f"D1 OFFSET limitation at processed_count={processed_count}: {e}")
1001 |                         logger.warning("Cloudflare D1 OFFSET limits reached - sync incomplete")
1002 |                         break
1003 |                     else:
1004 |                         logger.error(f"Error during {sync_type} sync: {e}")
1005 |                         break
1006 | 
1007 |             time_taken = time.time() - sync_start_time
1008 |             logger.info(f"{sync_type.capitalize()} sync completed: {synced_count} memories in {time_taken:.2f}s")
1009 | 
1010 |             # Broadcast SSE completion event
1011 |             if broadcast_sse and SSE_AVAILABLE and missing_count > 0:
1012 |                 try:
1013 |                     completion_event = create_sync_completed_event(
1014 |                         synced_count=synced_count,
1015 |                         total_count=missing_count,
1016 |                         time_taken_seconds=time_taken,
1017 |                         sync_type=sync_type
1018 |                     )
1019 |                     await sse_manager.broadcast_event(completion_event)
1020 |                 except Exception as e:
1021 |                     logger.debug(f"Failed to broadcast SSE completion: {e}")
1022 | 
1023 |             return {
1024 |                 'success': True,
1025 |                 'memories_synced': synced_count,
1026 |                 'total_checked': processed_count,
1027 |                 'message': f'Successfully pulled {synced_count} memories from Cloudflare',
1028 |                 'time_taken_seconds': round(time_taken, 3)
1029 |             }
1030 | 
1031 |         except Exception as e:
1032 |             logger.error(f"{sync_type.capitalize()} sync failed: {e}")
1033 |             return {
1034 |                 'success': False,
1035 |                 'memories_synced': 0,
1036 |                 'total_checked': 0,
1037 |                 'message': f'Failed to pull from Cloudflare: {str(e)}',
1038 |                 'time_taken_seconds': round(time.time() - sync_start_time, 3)
1039 |             }
1040 | 
1041 |     async def _perform_initial_sync(self) -> None:
1042 |         """
1043 |         Perform initial sync from Cloudflare to SQLite if enabled.
1044 | 
1045 |         This downloads all memories from Cloudflare that are missing in local SQLite,
1046 |         providing immediate access to existing cloud memories.
1047 |         """
1048 |         if not HYBRID_SYNC_ON_STARTUP or not self.secondary:
1049 |             return
1050 | 
1051 |         logger.info("Starting initial sync from Cloudflare to SQLite...")
1052 | 
1053 |         self.initial_sync_in_progress = True
1054 |         self.initial_sync_completed = 0
1055 |         self.initial_sync_finished = False
1056 | 
1057 |         try:
1058 |             # Set initial_sync_total before calling the shared helper
1059 |             primary_stats = await self.primary.get_stats()
1060 |             secondary_stats = await self.secondary.get_stats()
1061 |             primary_count = primary_stats.get('total_memories', 0)
1062 |             secondary_count = secondary_stats.get('total_memories', 0)
1063 | 
1064 |             if secondary_count > primary_count:
1065 |                 self.initial_sync_total = secondary_count - primary_count
1066 |             else:
1067 |                 self.initial_sync_total = 0
1068 | 
1069 |             # Call shared helper method with drift checking enabled
1070 |             result = await self._sync_memories_from_cloudflare(
1071 |                 sync_type="initial",
1072 |                 broadcast_sse=True,
1073 |                 enable_drift_check=True
1074 |             )
1075 | 
1076 |             synced_count = result['memories_synced']
1077 | 
1078 |             # Update sync tracking to reflect actual sync completion
1079 |             if synced_count == 0 and self.initial_sync_total > 0:
1080 |                 # All memories were already present - this is a successful "no-op" sync
1081 |                 self.initial_sync_completed = self.initial_sync_total
1082 |                 logger.info(f"Sync completed successfully: All {self.initial_sync_total} memories were already present locally")
1083 | 
1084 |             self.initial_sync_finished = True
1085 | 
1086 |         except Exception as e:
1087 |             logger.error(f"Initial sync failed: {e}")
1088 |             # Don't fail initialization if initial sync fails
1089 |             logger.warning("Continuing with hybrid storage despite initial sync failure")
1090 |         finally:
1091 |             self.initial_sync_in_progress = False
1092 | 
1093 |     def get_initial_sync_status(self) -> Dict[str, Any]:
1094 |         """Get current initial sync status for monitoring."""
1095 |         return {
1096 |             "in_progress": self.initial_sync_in_progress,
1097 |             "total": self.initial_sync_total,
1098 |             "completed": self.initial_sync_completed,
1099 |             "finished": self.initial_sync_finished,
1100 |             "progress_percentage": round((self.initial_sync_completed / max(self.initial_sync_total, 1)) * 100, 1) if self.initial_sync_total > 0 else 0
1101 |         }
1102 | 
1103 |     async def store(self, memory: Memory) -> Tuple[bool, str]:
1104 |         """Store a memory in primary storage and queue for secondary sync."""
1105 |         # Always store in primary first for immediate availability
1106 |         success, message = await self.primary.store(memory)
1107 | 
1108 |         if success and self.sync_service:
1109 |             # Queue for background sync to secondary
1110 |             operation = SyncOperation(operation='store', memory=memory)
1111 |             await self.sync_service.enqueue_operation(operation)
1112 | 
1113 |         return success, message
1114 | 
1115 |     async def retrieve(self, query: str, n_results: int = 5) -> List[MemoryQueryResult]:
1116 |         """Retrieve memories from primary storage (fast)."""
1117 |         return await self.primary.retrieve(query, n_results)
1118 | 
1119 |     async def search(self, query: str, n_results: int = 5, min_similarity: float = 0.0) -> List[MemoryQueryResult]:
1120 |         """Search memories in primary storage."""
1121 |         return await self.primary.search(query, n_results)
1122 | 
1123 |     async def search_by_tag(self, tags: List[str], time_start: Optional[float] = None) -> List[Memory]:
1124 |         """Search memories by tags in primary storage with optional time filtering.
1125 | 
1126 |         This method performs an OR search for tags. The `match_all` (AND) logic
1127 |         is handled at the API layer.
1128 | 
1129 |         Args:
1130 |             tags: List of tags to search for
1131 |             time_start: Optional Unix timestamp (in seconds) to filter memories created after this time
1132 | 
1133 |         Returns:
1134 |             List of Memory objects matching the tag criteria and time filter
1135 |         """
1136 |         return await self.primary.search_by_tag(tags, time_start=time_start)
1137 | 
1138 |     async def search_by_tags(
1139 |         self,
1140 |         tags: List[str],
1141 |         operation: str = "AND",
1142 |         time_start: Optional[float] = None,
1143 |         time_end: Optional[float] = None
1144 |     ) -> List[Memory]:
1145 |         """Search memories by tags using consistent operation parameter across backends."""
1146 |         normalized_operation = operation.strip().upper() if isinstance(operation, str) else "AND"
1147 |         if normalized_operation not in {"AND", "OR"}:
1148 |             logger.warning("Unsupported tag operation %s; defaulting to AND", operation)
1149 |             normalized_operation = "AND"
1150 | 
1151 |         return await self.primary.search_by_tags(
1152 |             tags,
1153 |             operation=normalized_operation,
1154 |             time_start=time_start,
1155 |             time_end=time_end
1156 |         )
1157 | 
1158 |     async def delete(self, content_hash: str) -> Tuple[bool, str]:
1159 |         """Delete a memory from primary storage and queue for secondary sync."""
1160 |         success, message = await self.primary.delete(content_hash)
1161 | 
1162 |         if success and self.sync_service:
1163 |             # Queue for background sync to secondary
1164 |             operation = SyncOperation(operation='delete', content_hash=content_hash)
1165 |             await self.sync_service.enqueue_operation(operation)
1166 | 
1167 |         return success, message
1168 | 
1169 |     async def delete_by_tag(self, tag: str) -> Tuple[int, str]:
1170 |         """Delete memories by tag from primary storage and queue for secondary sync."""
1171 |         # First, get the memories with this tag to get their hashes for sync
1172 |         memories_to_delete = await self.primary.search_by_tags([tag])
1173 | 
1174 |         # Delete from primary
1175 |         count_deleted, message = await self.primary.delete_by_tag(tag)
1176 | 
1177 |         # Queue individual deletes for secondary sync
1178 |         if count_deleted > 0 and self.sync_service:
1179 |             for memory in memories_to_delete:
1180 |                 operation = SyncOperation(operation='delete', content_hash=memory.content_hash)
1181 |                 await self.sync_service.enqueue_operation(operation)
1182 | 
1183 |         return count_deleted, message
1184 | 
1185 |     async def delete_by_tags(self, tags: List[str]) -> Tuple[int, str]:
1186 |         """
1187 |         Delete memories matching ANY of the given tags from primary storage and queue for secondary sync.
1188 | 
1189 |         Optimized to use primary storage's delete_by_tags if available, otherwise falls back to
1190 |         calling delete_by_tag for each tag.
1191 |         """
1192 |         if not tags:
1193 |             return 0, "No tags provided"
1194 | 
1195 |         # First, get all memories with any of these tags for sync queue
1196 |         memories_to_delete = await self.primary.search_by_tags(tags, operation="OR")
1197 | 
1198 |         # Remove duplicates based on content_hash
1199 |         unique_memories = {m.content_hash: m for m in memories_to_delete}.values()
1200 | 
1201 |         # Delete from primary using optimized method if available
1202 |         count_deleted, message = await self.primary.delete_by_tags(tags)
1203 | 
1204 |         # Queue individual deletes for secondary sync
1205 |         if count_deleted > 0 and self.sync_service:
1206 |             for memory in unique_memories:
1207 |                 operation = SyncOperation(operation='delete', content_hash=memory.content_hash)
1208 |                 await self.sync_service.enqueue_operation(operation)
1209 | 
1210 |         return count_deleted, message
1211 | 
1212 |     async def cleanup_duplicates(self) -> Tuple[int, str]:
1213 |         """Clean up duplicates in primary storage."""
1214 |         # Only cleanup primary, secondary will sync naturally
1215 |         return await self.primary.cleanup_duplicates()
1216 | 
1217 |     async def update_memory_metadata(self, content_hash: str, updates: Dict[str, Any], preserve_timestamps: bool = True) -> Tuple[bool, str]:
1218 |         """Update memory metadata in primary storage and queue for secondary sync."""
1219 |         success, message = await self.primary.update_memory_metadata(content_hash, updates, preserve_timestamps)
1220 | 
1221 |         if success and self.sync_service:
1222 |             # Queue for background sync to secondary
1223 |             operation = SyncOperation(
1224 |                 operation='update',
1225 |                 content_hash=content_hash,
1226 |                 updates=updates
1227 |             )
1228 |             await self.sync_service.enqueue_operation(operation)
1229 | 
1230 |         return success, message
1231 | 
1232 |     async def update_memories_batch(self, memories: List[Memory]) -> List[bool]:
1233 |         """
1234 |         Update multiple memories in a batch operation with optimal performance.
1235 | 
1236 |         Delegates directly to primary storage's optimized batch update method,
1237 |         then queues secondary sync operations in background.
1238 | 
1239 |         Args:
1240 |             memories: List of Memory objects with updated fields
1241 | 
1242 |         Returns:
1243 |             List of success booleans, one for each memory in the batch
1244 |         """
1245 |         # Use primary storage's optimized batch update (single transaction)
1246 |         results = await self.primary.update_memories_batch(memories)
1247 | 
1248 |         # Queue successful updates for background sync to secondary
1249 |         if self.sync_service:
1250 |             for idx, (memory, success) in enumerate(zip(memories, results)):
1251 |                 if success:
1252 |                     operation = SyncOperation(
1253 |                         operation='update',
1254 |                         content_hash=memory.content_hash,
1255 |                         updates={
1256 |                             'tags': memory.tags,
1257 |                             'metadata': memory.metadata,
1258 |                             'memory_type': memory.memory_type
1259 |                         }
1260 |                     )
1261 |                     # Don't await - queue asynchronously for background processing
1262 |                     try:
1263 |                         await self.sync_service.enqueue_operation(operation)
1264 |                     except Exception as e:
1265 |                         logger.warning(f"Failed to queue sync for {memory.content_hash}: {e}")
1266 | 
1267 |         return results
1268 | 
1269 |     async def get_stats(self) -> Dict[str, Any]:
1270 |         """Get comprehensive statistics from both storage backends."""
1271 |         # SQLite-vec get_stats is now async
1272 |         primary_stats = await self.primary.get_stats()
1273 | 
1274 |         stats = {
1275 |             "storage_backend": "Hybrid (SQLite-vec + Cloudflare)",
1276 |             "primary_backend": "SQLite-vec",
1277 |             "secondary_backend": "Cloudflare" if self.secondary else "None",
1278 |             "total_memories": primary_stats.get("total_memories", 0),
1279 |             "unique_tags": primary_stats.get("unique_tags", 0),
1280 |             "memories_this_week": primary_stats.get("memories_this_week", 0),
1281 |             "database_size_bytes": primary_stats.get("database_size_bytes", 0),
1282 |             "database_size_mb": primary_stats.get("database_size_mb", 0),
1283 |             "primary_stats": primary_stats,
1284 |             "sync_enabled": self.sync_service is not None
1285 |         }
1286 | 
1287 |         # Add sync service statistics if available
1288 |         if self.sync_service:
1289 |             sync_status = await self.sync_service.get_sync_status()
1290 |             stats["sync_status"] = sync_status
1291 | 
1292 |         # Add secondary stats if available and healthy
1293 |         if self.secondary and self.sync_service and self.sync_service.sync_stats['cloudflare_available']:
1294 |             try:
1295 |                 secondary_stats = await self.secondary.get_stats()
1296 |                 stats["secondary_stats"] = secondary_stats
1297 |             except Exception as e:
1298 |                 stats["secondary_error"] = str(e)
1299 | 
1300 |         return stats
1301 | 
1302 |     async def get_all_tags_with_counts(self) -> List[Dict[str, Any]]:
1303 |         """Get all tags with their usage counts from primary storage."""
1304 |         return await self.primary.get_all_tags_with_counts()
1305 | 
1306 |     async def get_all_tags(self) -> List[str]:
1307 |         """Get all unique tags from primary storage."""
1308 |         return await self.primary.get_all_tags()
1309 | 
1310 |     async def get_recent_memories(self, n: int = 10) -> List[Memory]:
1311 |         """Get recent memories from primary storage."""
1312 |         return await self.primary.get_recent_memories(n)
1313 | 
1314 |     async def get_largest_memories(self, n: int = 10) -> List[Memory]:
1315 |         """Get largest memories by content length from primary storage."""
1316 |         return await self.primary.get_largest_memories(n)
1317 | 
1318 |     async def get_memory_timestamps(self, days: Optional[int] = None) -> List[float]:
1319 |         """
1320 |         Get memory creation timestamps only, without loading full memory objects.
1321 | 
1322 |         Delegates to primary storage (SQLite-vec) for optimal performance.
1323 | 
1324 |         Args:
1325 |             days: Optional filter to only get memories from last N days
1326 | 
1327 |         Returns:
1328 |             List of Unix timestamps (float) in descending order (newest first)
1329 |         """
1330 |         return await self.primary.get_memory_timestamps(days)
1331 | 
1332 |     async def recall(self, query: Optional[str] = None, n_results: int = 5, start_timestamp: Optional[float] = None, end_timestamp: Optional[float] = None) -> List[MemoryQueryResult]:
1333 |         """
1334 |         Retrieve memories with combined time filtering and optional semantic search.
1335 | 
1336 |         Args:
1337 |             query: Optional semantic search query. If None, only time filtering is applied.
1338 |             n_results: Maximum number of results to return.
1339 |             start_timestamp: Optional start time for filtering.
1340 |             end_timestamp: Optional end time for filtering.
1341 | 
1342 |         Returns:
1343 |             List of MemoryQueryResult objects.
1344 |         """
1345 |         return await self.primary.recall(query=query, n_results=n_results, start_timestamp=start_timestamp, end_timestamp=end_timestamp)
1346 | 
1347 |     async def recall_memory(self, query: str, n_results: int = 5) -> List[Memory]:
1348 |         """Recall memories using natural language time expressions."""
1349 |         return await self.primary.recall_memory(query, n_results)
1350 | 
1351 |     async def get_all_memories(self, limit: int = None, offset: int = 0, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> List[Memory]:
1352 |         """Get all memories from primary storage."""
1353 |         return await self.primary.get_all_memories(limit=limit, offset=offset, memory_type=memory_type, tags=tags)
1354 | 
1355 |     async def get_by_hash(self, content_hash: str) -> Optional[Memory]:
1356 |         """Get a memory by its content hash from primary storage."""
1357 |         return await self.primary.get_by_hash(content_hash)
1358 | 
1359 |     async def count_all_memories(self, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> int:
1360 |         """Get total count of memories from primary storage."""
1361 |         return await self.primary.count_all_memories(memory_type=memory_type, tags=tags)
1362 | 
1363 |     async def get_memories_by_time_range(self, start_time: float, end_time: float) -> List[Memory]:
1364 |         """Get memories within time range from primary storage."""
1365 |         return await self.primary.get_memories_by_time_range(start_time, end_time)
1366 | 
1367 |     async def close(self):
1368 |         """Clean shutdown of hybrid storage system."""
1369 |         logger.info("Shutting down hybrid memory storage...")
1370 | 
1371 |         # Stop sync service first
1372 |         if self.sync_service:
1373 |             await self.sync_service.stop()
1374 | 
1375 |         # Close storage backends
1376 |         if hasattr(self.primary, 'close') and self.primary.close:
1377 |             if asyncio.iscoroutinefunction(self.primary.close):
1378 |                 await self.primary.close()
1379 |             else:
1380 |                 self.primary.close()
1381 | 
1382 |         if self.secondary and hasattr(self.secondary, 'close') and self.secondary.close:
1383 |             if asyncio.iscoroutinefunction(self.secondary.close):
1384 |                 await self.secondary.close()
1385 |             else:
1386 |                 self.secondary.close()
1387 | 
1388 |         logger.info("Hybrid memory storage shutdown completed")
1389 | 
1390 |     async def force_sync(self) -> Dict[str, Any]:
1391 |         """Force immediate synchronization with secondary storage."""
1392 |         if not self.sync_service:
1393 |             return {
1394 |                 'status': 'disabled',
1395 |                 'message': 'Background sync service not available'
1396 |             }
1397 | 
1398 |         return await self.sync_service.force_sync()
1399 | 
1400 |     async def force_pull_sync(self) -> Dict[str, Any]:
1401 |         """
1402 |         Force immediate pull synchronization FROM Cloudflare TO local SQLite.
1403 | 
1404 |         This triggers the same logic as initial_sync but can be called on demand.
1405 |         Useful for manually refreshing local storage with memories stored via MCP.
1406 | 
1407 |         Returns:
1408 |             Dict with sync results including:
1409 |             - success: bool
1410 |             - message: str
1411 |             - memories_pulled: int
1412 |             - time_taken_seconds: float
1413 |         """
1414 |         # Call shared helper method without drift checking (manual sync doesn't need it)
1415 |         result = await self._sync_memories_from_cloudflare(
1416 |             sync_type="manual",
1417 |             broadcast_sse=True,
1418 |             enable_drift_check=False
1419 |         )
1420 | 
1421 |         # Map result to expected return format
1422 |         return {
1423 |             'success': result['success'],
1424 |             'message': result['message'],
1425 |             'memories_pulled': result['memories_synced'],
1426 |             'time_taken_seconds': result['time_taken_seconds']
1427 |         }
1428 | 
1429 |     async def get_sync_status(self) -> Dict[str, Any]:
1430 |         """Get current background sync status and statistics."""
1431 |         if not self.sync_service:
1432 |             return {
1433 |                 'is_running': False,
1434 |                 'pending_operations': 0,
1435 |                 'operations_processed': 0,
1436 |                 'operations_failed': 0,
1437 |                 'last_sync_time': 0,
1438 |                 'sync_interval': 0
1439 |             }
1440 | 
1441 |         return await self.sync_service.get_sync_status()
1442 | 
1443 |     async def pause_sync(self) -> Dict[str, Any]:
1444 |         """Pause background sync operations for safe database operations."""
1445 |         if not self.sync_service:
1446 |             return {
1447 |                 'success': False,
1448 |                 'message': 'Sync service not available'
1449 |             }
1450 | 
1451 |         try:
1452 |             if not self.sync_service.is_running:
1453 |                 return {
1454 |                     'success': True,
1455 |                     'message': 'Sync already paused'
1456 |                 }
1457 | 
1458 |             # Stop the sync service
1459 |             await self.sync_service.stop()
1460 |             logger.info("Background sync paused")
1461 | 
1462 |             return {
1463 |                 'success': True,
1464 |                 'message': 'Sync paused successfully'
1465 |             }
1466 | 
1467 |         except Exception as e:
1468 |             logger.error(f"Failed to pause sync: {e}")
1469 |             return {
1470 |                 'success': False,
1471 |                 'message': f'Failed to pause sync: {str(e)}'
1472 |             }
1473 | 
1474 |     async def resume_sync(self) -> Dict[str, Any]:
1475 |         """Resume background sync operations after pause."""
1476 |         if not self.sync_service:
1477 |             return {
1478 |                 'success': False,
1479 |                 'message': 'Sync service not available'
1480 |             }
1481 | 
1482 |         try:
1483 |             if self.sync_service.is_running:
1484 |                 return {
1485 |                     'success': True,
1486 |                     'message': 'Sync already running'
1487 |                 }
1488 | 
1489 |             # Start the sync service
1490 |             await self.sync_service.start()
1491 |             logger.info("Background sync resumed")
1492 | 
1493 |             return {
1494 |                 'success': True,
1495 |                 'message': 'Sync resumed successfully'
1496 |             }
1497 | 
1498 |         except Exception as e:
1499 |             logger.error(f"Failed to resume sync: {e}")
1500 |             return {
1501 |                 'success': False,
1502 |                 'message': f'Failed to resume sync: {str(e)}'
1503 |             }
1504 | 
1505 |     def sanitized(self, tags):
1506 |         """Sanitize and normalize tags to a JSON string.
1507 | 
1508 |         This method provides compatibility with the storage interface.
1509 |         Delegates to primary storage for consistent tag handling.
1510 |         """
1511 |         return self.primary.sanitized(tags)
1512 | 
```

--------------------------------------------------------------------------------
/src/mcp_memory_service/storage/cloudflare.py:
--------------------------------------------------------------------------------

```python
   1 | # Copyright 2024 Heinrich Krupp
   2 | #
   3 | # Licensed under the Apache License, Version 2.0 (the "License");
   4 | # you may not use this file except in compliance with the License.
   5 | # You may obtain a copy of the License at
   6 | #
   7 | #     http://www.apache.org/licenses/LICENSE-2.0
   8 | #
   9 | # Unless required by applicable law or agreed to in writing, software
  10 | # distributed under the License is distributed on an "AS IS" BASIS,
  11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12 | # See the License for the specific language governing permissions and
  13 | # limitations under the License.
  14 | 
  15 | """
  16 | Cloudflare storage backend for MCP Memory Service.
  17 | Provides cloud-native storage using Vectorize, D1, and R2.
  18 | """
  19 | 
  20 | import json
  21 | import logging
  22 | import hashlib
  23 | import asyncio
  24 | import time
  25 | from typing import List, Dict, Any, Tuple, Optional
  26 | from datetime import datetime, timezone, timedelta
  27 | import httpx
  28 | 
  29 | from .base import MemoryStorage
  30 | from ..models.memory import Memory, MemoryQueryResult
  31 | from ..utils.hashing import generate_content_hash
  32 | from ..config import CLOUDFLARE_MAX_CONTENT_LENGTH
  33 | 
  34 | logger = logging.getLogger(__name__)
  35 | 
  36 | 
  37 | def normalize_tags_for_search(tags: List[str]) -> List[str]:
  38 |     """Deduplicate and filter empty tag strings.
  39 | 
  40 |     Args:
  41 |         tags: List of tag strings (may contain duplicates or empty strings)
  42 | 
  43 |     Returns:
  44 |         Deduplicated list of non-empty tags
  45 |     """
  46 |     return list(dict.fromkeys([tag for tag in tags if tag]))
  47 | 
  48 | 
  49 | def normalize_operation(operation: Optional[str]) -> str:
  50 |     """Normalize tag search operation to AND or OR.
  51 | 
  52 |     Args:
  53 |         operation: Raw operation string (case-insensitive)
  54 | 
  55 |     Returns:
  56 |         Normalized operation: "AND" or "OR"
  57 |     """
  58 |     if isinstance(operation, str):
  59 |         normalized = operation.strip().upper() or "AND"
  60 |     else:
  61 |         normalized = "AND"
  62 | 
  63 |     if normalized not in {"AND", "OR"}:
  64 |         logger.warning(f"Unsupported operation '{operation}'; defaulting to AND")
  65 |         normalized = "AND"
  66 | 
  67 |     return normalized
  68 | 
  69 | 
  70 | def build_tag_search_query(tags: List[str], operation: str,
  71 |                           time_start: Optional[float] = None,
  72 |                           time_end: Optional[float] = None) -> Tuple[str, List[Any]]:
  73 |     """Build SQL query for tag-based search with time filtering.
  74 | 
  75 |     Args:
  76 |         tags: List of deduplicated tags
  77 |         operation: Search operation ("AND" or "OR")
  78 |         time_start: Optional start timestamp filter
  79 |         time_end: Optional end timestamp filter
  80 | 
  81 |     Returns:
  82 |         Tuple of (sql_query, parameters_list)
  83 |     """
  84 |     placeholders = ",".join(["?"] * len(tags))
  85 |     params: List[Any] = list(tags)
  86 | 
  87 |     sql = (
  88 |         "SELECT m.* FROM memories m "
  89 |         "JOIN memory_tags mt ON m.id = mt.memory_id "
  90 |         "JOIN tags t ON mt.tag_id = t.id "
  91 |         f"WHERE t.name IN ({placeholders})"
  92 |     )
  93 | 
  94 |     if time_start is not None:
  95 |         sql += " AND m.created_at >= ?"
  96 |         params.append(time_start)
  97 | 
  98 |     if time_end is not None:
  99 |         sql += " AND m.created_at <= ?"
 100 |         params.append(time_end)
 101 | 
 102 |     sql += " GROUP BY m.id"
 103 | 
 104 |     if operation == "AND":
 105 |         sql += " HAVING COUNT(DISTINCT t.name) = ?"
 106 |         params.append(len(tags))
 107 | 
 108 |     sql += " ORDER BY m.created_at DESC"
 109 | 
 110 |     return sql, params
 111 | 
 112 | 
 113 | class CloudflareStorage(MemoryStorage):
 114 |     """Cloudflare-based storage backend using Vectorize, D1, and R2."""
 115 | 
 116 |     # Content length limit from configuration
 117 |     _MAX_CONTENT_LENGTH = CLOUDFLARE_MAX_CONTENT_LENGTH
 118 | 
 119 |     @property
 120 |     def max_content_length(self) -> Optional[int]:
 121 |         """Maximum content length: 800 chars (BGE model 512 token limit)."""
 122 |         return self._MAX_CONTENT_LENGTH
 123 | 
 124 |     @property
 125 |     def supports_chunking(self) -> bool:
 126 |         """Cloudflare backend supports content chunking with metadata linking."""
 127 |         return True
 128 | 
 129 |     def __init__(self,
 130 |                  api_token: str,
 131 |                  account_id: str,
 132 |                  vectorize_index: str,
 133 |                  d1_database_id: str,
 134 |                  r2_bucket: Optional[str] = None,
 135 |                  embedding_model: str = "@cf/baai/bge-base-en-v1.5",
 136 |                  large_content_threshold: int = 1024 * 1024,  # 1MB
 137 |                  max_retries: int = 3,
 138 |                  base_delay: float = 1.0):
 139 |         """
 140 |         Initialize Cloudflare storage backend.
 141 | 
 142 |         Args:
 143 |             api_token: Cloudflare API token
 144 |             account_id: Cloudflare account ID
 145 |             vectorize_index: Vectorize index name
 146 |             d1_database_id: D1 database ID
 147 |             r2_bucket: Optional R2 bucket for large content
 148 |             embedding_model: Workers AI embedding model
 149 |             large_content_threshold: Size threshold for R2 storage
 150 |             max_retries: Maximum retry attempts for API calls
 151 |             base_delay: Base delay for exponential backoff
 152 |         """
 153 |         self.api_token = api_token
 154 |         self.account_id = account_id
 155 |         self.vectorize_index = vectorize_index
 156 |         self.d1_database_id = d1_database_id
 157 |         self.r2_bucket = r2_bucket
 158 |         self.embedding_model = embedding_model
 159 |         self.large_content_threshold = large_content_threshold
 160 |         self.max_retries = max_retries
 161 |         self.base_delay = base_delay
 162 | 
 163 |         # API endpoints
 164 |         self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}"
 165 |         self.vectorize_url = f"{self.base_url}/vectorize/v2/indexes/{vectorize_index}"
 166 |         self.d1_url = f"{self.base_url}/d1/database/{d1_database_id}"
 167 |         self.ai_url = f"{self.base_url}/ai/run/{embedding_model}"
 168 | 
 169 |         if r2_bucket:
 170 |             self.r2_url = f"{self.base_url}/r2/buckets/{r2_bucket}/objects"
 171 | 
 172 |         # HTTP client with connection pooling
 173 |         self.client = None
 174 |         self._initialized = False
 175 | 
 176 |         # Embedding cache for performance
 177 |         self._embedding_cache = {}
 178 |         self._cache_max_size = 1000
 179 |     
 180 |     async def _get_client(self) -> httpx.AsyncClient:
 181 |         """Get or create HTTP client with connection pooling."""
 182 |         if self.client is None:
 183 |             headers = {
 184 |                 "Authorization": f"Bearer {self.api_token}",
 185 |                 "Content-Type": "application/json"
 186 |             }
 187 |             self.client = httpx.AsyncClient(
 188 |                 headers=headers,
 189 |                 timeout=httpx.Timeout(30.0),
 190 |                 limits=httpx.Limits(max_connections=10, max_keepalive_connections=5)
 191 |             )
 192 |         return self.client
 193 |     
 194 |     async def _retry_request(self, method: str, url: str, **kwargs) -> httpx.Response:
 195 |         """Make HTTP request with exponential backoff retry logic."""
 196 |         client = await self._get_client()
 197 |         
 198 |         for attempt in range(self.max_retries + 1):
 199 |             try:
 200 |                 response = await client.request(method, url, **kwargs)
 201 |                 
 202 |                 # Handle rate limiting
 203 |                 if response.status_code == 429:
 204 |                     if attempt < self.max_retries:
 205 |                         delay = self.base_delay * (2 ** attempt)
 206 |                         logger.warning(f"Rate limited, retrying in {delay}s (attempt {attempt + 1}/{self.max_retries + 1})")
 207 |                         await asyncio.sleep(delay)
 208 |                         continue
 209 |                     else:
 210 |                         raise httpx.HTTPError(f"Rate limited after {self.max_retries} retries")
 211 |                 
 212 |                 # Handle server errors
 213 |                 if response.status_code >= 500:
 214 |                     if attempt < self.max_retries:
 215 |                         delay = self.base_delay * (2 ** attempt)
 216 |                         logger.warning(f"Server error {response.status_code}, retrying in {delay}s")
 217 |                         await asyncio.sleep(delay)
 218 |                         continue
 219 |                 
 220 |                 response.raise_for_status()
 221 |                 return response
 222 |                 
 223 |             except (httpx.NetworkError, httpx.TimeoutException) as e:
 224 |                 if attempt < self.max_retries:
 225 |                     delay = self.base_delay * (2 ** attempt)
 226 |                     logger.warning(f"Network error: {e}, retrying in {delay}s")
 227 |                     await asyncio.sleep(delay)
 228 |                     continue
 229 |                 raise
 230 |         
 231 |         raise httpx.HTTPError(f"Failed after {self.max_retries} retries")
 232 |     
 233 |     async def _generate_embedding(self, text: str) -> List[float]:
 234 |         """Generate embedding using Workers AI or cache."""
 235 |         # Check cache first
 236 |         text_hash = hashlib.sha256(text.encode()).hexdigest()
 237 |         if text_hash in self._embedding_cache:
 238 |             return self._embedding_cache[text_hash]
 239 |         
 240 |         try:
 241 |             # Use Workers AI to generate embedding
 242 |             payload = {"text": [text]}
 243 |             response = await self._retry_request("POST", self.ai_url, json=payload)
 244 |             result = response.json()
 245 |             
 246 |             if result.get("success") and "result" in result:
 247 |                 embedding = result["result"]["data"][0]
 248 |                 
 249 |                 # Cache the embedding (with size limit)
 250 |                 if len(self._embedding_cache) >= self._cache_max_size:
 251 |                     # Remove oldest entry (simple FIFO)
 252 |                     oldest_key = next(iter(self._embedding_cache))
 253 |                     del self._embedding_cache[oldest_key]
 254 |                 
 255 |                 self._embedding_cache[text_hash] = embedding
 256 |                 return embedding
 257 |             else:
 258 |                 raise ValueError(f"Workers AI embedding failed: {result}")
 259 |                 
 260 |         except Exception as e:
 261 |             logger.error(f"Failed to generate embedding with Workers AI: {e}")
 262 |             # TODO: Implement fallback to local sentence-transformers
 263 |             raise ValueError(f"Embedding generation failed: {e}")
 264 |     
 265 |     async def initialize(self) -> None:
 266 |         """Initialize the Cloudflare storage backend."""
 267 |         if self._initialized:
 268 |             return
 269 |         
 270 |         logger.info("Initializing Cloudflare storage backend...")
 271 |         
 272 |         try:
 273 |             # Initialize D1 database schema
 274 |             await self._initialize_d1_schema()
 275 |             
 276 |             # Verify Vectorize index exists
 277 |             await self._verify_vectorize_index()
 278 |             
 279 |             # Verify R2 bucket if configured
 280 |             if self.r2_bucket:
 281 |                 await self._verify_r2_bucket()
 282 |             
 283 |             self._initialized = True
 284 |             logger.info("Cloudflare storage backend initialized successfully")
 285 |             
 286 |         except Exception as e:
 287 |             logger.error(f"Failed to initialize Cloudflare storage: {e}")
 288 |             raise
 289 |     
 290 |     async def _initialize_d1_schema(self) -> None:
 291 |         """Initialize D1 database schema."""
 292 |         schema_sql = """
 293 |         -- Memory metadata table
 294 |         CREATE TABLE IF NOT EXISTS memories (
 295 |             id INTEGER PRIMARY KEY AUTOINCREMENT,
 296 |             content_hash TEXT UNIQUE NOT NULL,
 297 |             content TEXT NOT NULL,
 298 |             memory_type TEXT,
 299 |             created_at REAL NOT NULL,
 300 |             created_at_iso TEXT NOT NULL,
 301 |             updated_at REAL,
 302 |             updated_at_iso TEXT,
 303 |             metadata_json TEXT,
 304 |             vector_id TEXT UNIQUE,
 305 |             content_size INTEGER DEFAULT 0,
 306 |             r2_key TEXT
 307 |         );
 308 |         
 309 |         -- Tags table
 310 |         CREATE TABLE IF NOT EXISTS tags (
 311 |             id INTEGER PRIMARY KEY AUTOINCREMENT,
 312 |             name TEXT UNIQUE NOT NULL
 313 |         );
 314 |         
 315 |         -- Memory-tag relationships
 316 |         CREATE TABLE IF NOT EXISTS memory_tags (
 317 |             memory_id INTEGER,
 318 |             tag_id INTEGER,
 319 |             PRIMARY KEY (memory_id, tag_id),
 320 |             FOREIGN KEY (memory_id) REFERENCES memories(id) ON DELETE CASCADE,
 321 |             FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
 322 |         );
 323 |         
 324 |         -- Indexes for performance
 325 |         CREATE INDEX IF NOT EXISTS idx_memories_content_hash ON memories(content_hash);
 326 |         CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at);
 327 |         CREATE INDEX IF NOT EXISTS idx_memories_vector_id ON memories(vector_id);
 328 |         CREATE INDEX IF NOT EXISTS idx_tags_name ON tags(name);
 329 |         """
 330 |         
 331 |         payload = {"sql": schema_sql}
 332 |         response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 333 |         result = response.json()
 334 |         
 335 |         if not result.get("success"):
 336 |             raise ValueError(f"Failed to initialize D1 schema: {result}")
 337 |     
 338 |     async def _verify_vectorize_index(self) -> None:
 339 |         """Verify Vectorize index exists and is accessible."""
 340 |         try:
 341 |             response = await self._retry_request("GET", f"{self.vectorize_url}")
 342 |             result = response.json()
 343 |             
 344 |             if not result.get("success"):
 345 |                 raise ValueError(f"Vectorize index not accessible: {result}")
 346 |                 
 347 |             logger.info(f"Vectorize index verified: {self.vectorize_index}")
 348 |             
 349 |         except httpx.HTTPStatusError as e:
 350 |             if e.response.status_code == 404:
 351 |                 raise ValueError(f"Vectorize index '{self.vectorize_index}' not found")
 352 |             raise
 353 |     
 354 |     async def _verify_r2_bucket(self) -> None:
 355 |         """Verify R2 bucket exists and is accessible."""
 356 |         try:
 357 |             # Try to list objects (empty list is fine)
 358 |             response = await self._retry_request("GET", f"{self.r2_url}?max-keys=1")
 359 |             logger.info(f"R2 bucket verified: {self.r2_bucket}")
 360 |             
 361 |         except httpx.HTTPStatusError as e:
 362 |             if e.response.status_code == 404:
 363 |                 raise ValueError(f"R2 bucket '{self.r2_bucket}' not found")
 364 |             raise
 365 |     
 366 |     async def store(self, memory: Memory) -> Tuple[bool, str]:
 367 |         """Store a memory in Cloudflare storage."""
 368 |         try:
 369 |             # Generate embedding for the content
 370 |             embedding = await self._generate_embedding(memory.content)
 371 |             
 372 |             # Determine storage strategy based on content size
 373 |             content_size = len(memory.content.encode('utf-8'))
 374 |             use_r2 = self.r2_bucket and content_size > self.large_content_threshold
 375 |             
 376 |             # Store large content in R2 if needed
 377 |             r2_key = None
 378 |             stored_content = memory.content
 379 |             
 380 |             if use_r2:
 381 |                 r2_key = f"content/{memory.content_hash}.txt"
 382 |                 await self._store_r2_content(r2_key, memory.content)
 383 |                 stored_content = f"[R2 Content: {r2_key}]"  # Placeholder in D1
 384 |             
 385 |             # Store vector in Vectorize
 386 |             vector_id = memory.content_hash
 387 |             vector_metadata = {
 388 |                 "content_hash": memory.content_hash,
 389 |                 "memory_type": memory.memory_type or "standard",
 390 |                 "tags": ",".join(memory.tags) if memory.tags else "",
 391 |                 "created_at": memory.created_at_iso or datetime.now().isoformat()
 392 |             }
 393 |             
 394 |             await self._store_vectorize_vector(vector_id, embedding, vector_metadata)
 395 |             
 396 |             # Store metadata in D1
 397 |             await self._store_d1_memory(memory, vector_id, content_size, r2_key, stored_content)
 398 |             
 399 |             logger.info(f"Successfully stored memory: {memory.content_hash}")
 400 |             return True, f"Memory stored successfully (vector_id: {vector_id})"
 401 |             
 402 |         except Exception as e:
 403 |             logger.error(f"Failed to store memory {memory.content_hash}: {e}")
 404 |             return False, f"Storage failed: {str(e)}"
 405 |     
 406 |     async def _store_vectorize_vector(self, vector_id: str, embedding: List[float], metadata: Dict[str, Any]) -> None:
 407 |         """Store vector in Vectorize."""
 408 |         # Try without namespace first to isolate the issue
 409 |         vector_data = {
 410 |             "id": vector_id,
 411 |             "values": embedding,
 412 |             "metadata": metadata
 413 |         }
 414 |         
 415 |         # Convert to NDJSON format as required by the HTTP API
 416 |         import json
 417 |         ndjson_content = json.dumps(vector_data) + "\n"
 418 |         
 419 |         try:
 420 |             # Send as raw NDJSON data with correct Content-Type header
 421 |             client = await self._get_client()
 422 |             
 423 |             # Override headers for this specific request
 424 |             headers = {
 425 |                 "Authorization": f"Bearer {self.api_token}",
 426 |                 "Content-Type": "application/x-ndjson"
 427 |             }
 428 |             
 429 |             response = await client.post(
 430 |                 f"{self.vectorize_url}/upsert",
 431 |                 content=ndjson_content.encode("utf-8"),
 432 |                 headers=headers
 433 |             )
 434 |             
 435 |             # Log response status for debugging (avoid logging headers/body for security)
 436 |             logger.info(f"Vectorize response status: {response.status_code}")
 437 |             response_text = response.text
 438 |             if response.status_code != 200:
 439 |                 # Only log response body on errors, and truncate to avoid credential exposure
 440 |                 truncated_response = response_text[:200] + "..." if len(response_text) > 200 else response_text
 441 |                 logger.warning(f"Vectorize error response (truncated): {truncated_response}")
 442 |             
 443 |             if response.status_code != 200:
 444 |                 raise ValueError(f"HTTP {response.status_code}: {response_text}")
 445 |             
 446 |             result = response.json()
 447 |             if not result.get("success"):
 448 |                 raise ValueError(f"Failed to store vector: {result}")
 449 |                 
 450 |         except Exception as e:
 451 |             # Add more detailed error logging
 452 |             logger.error(f"Vectorize insert failed: {e}")
 453 |             logger.error(f"Vector data was: {vector_data}")
 454 |             logger.error(f"NDJSON content: {ndjson_content.strip()}")
 455 |             logger.error(f"URL was: {self.vectorize_url}/upsert")
 456 |             raise ValueError(f"Failed to store vector: {e}")
 457 |     
 458 |     async def _store_d1_memory(self, memory: Memory, vector_id: str, content_size: int, r2_key: Optional[str], stored_content: str) -> None:
 459 |         """Store memory metadata in D1."""
 460 |         # Insert memory record
 461 |         insert_sql = """
 462 |         INSERT INTO memories (
 463 |             content_hash, content, memory_type, created_at, created_at_iso,
 464 |             updated_at, updated_at_iso, metadata_json, vector_id, content_size, r2_key
 465 |         ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
 466 |         """
 467 |         
 468 |         now = time.time()
 469 |         now_iso = datetime.now().isoformat()
 470 |         
 471 |         params = [
 472 |             memory.content_hash,
 473 |             stored_content,
 474 |             memory.memory_type,
 475 |             memory.created_at or now,
 476 |             memory.created_at_iso or now_iso,
 477 |             memory.updated_at or now,
 478 |             memory.updated_at_iso or now_iso,
 479 |             json.dumps(memory.metadata) if memory.metadata else None,
 480 |             vector_id,
 481 |             content_size,
 482 |             r2_key
 483 |         ]
 484 |         
 485 |         payload = {"sql": insert_sql, "params": params}
 486 |         response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 487 |         result = response.json()
 488 |         
 489 |         if not result.get("success"):
 490 |             raise ValueError(f"Failed to store memory in D1: {result}")
 491 |         
 492 |         # Store tags if present
 493 |         if memory.tags:
 494 |             memory_id = result["result"][0]["meta"]["last_row_id"]
 495 |             await self._store_d1_tags(memory_id, memory.tags)
 496 |     
 497 |     async def _store_d1_tags(self, memory_id: int, tags: List[str]) -> None:
 498 |         """Store tags for a memory in D1."""
 499 |         for tag in tags:
 500 |             # Insert tag if not exists
 501 |             tag_sql = "INSERT OR IGNORE INTO tags (name) VALUES (?)"
 502 |             payload = {"sql": tag_sql, "params": [tag]}
 503 |             await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 504 |             
 505 |             # Link tag to memory
 506 |             link_sql = """
 507 |             INSERT INTO memory_tags (memory_id, tag_id)
 508 |             SELECT ?, id FROM tags WHERE name = ?
 509 |             """
 510 |             payload = {"sql": link_sql, "params": [memory_id, tag]}
 511 |             await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 512 |     
 513 |     async def _store_r2_content(self, key: str, content: str) -> None:
 514 |         """Store content in R2."""
 515 |         response = await self._retry_request(
 516 |             "PUT", 
 517 |             f"{self.r2_url}/{key}",
 518 |             content=content.encode('utf-8'),
 519 |             headers={"Content-Type": "text/plain"}
 520 |         )
 521 |         
 522 |         if response.status_code not in [200, 201]:
 523 |             raise ValueError(f"Failed to store content in R2: {response.status_code}")
 524 |     
 525 |     async def retrieve(self, query: str, n_results: int = 5) -> List[MemoryQueryResult]:
 526 |         """Retrieve memories by semantic search."""
 527 |         try:
 528 |             # Generate query embedding
 529 |             query_embedding = await self._generate_embedding(query)
 530 |             
 531 |             # Search Vectorize (without namespace for now)
 532 |             search_payload = {
 533 |                 "vector": query_embedding,
 534 |                 "topK": n_results,
 535 |                 "returnMetadata": "all",
 536 |                 "returnValues": False
 537 |             }
 538 |             
 539 |             response = await self._retry_request("POST", f"{self.vectorize_url}/query", json=search_payload)
 540 |             result = response.json()
 541 |             
 542 |             if not result.get("success"):
 543 |                 raise ValueError(f"Vectorize query failed: {result}")
 544 |             
 545 |             matches = result.get("result", {}).get("matches", [])
 546 |             
 547 |             # Convert to MemoryQueryResult objects
 548 |             results = []
 549 |             for match in matches:
 550 |                 memory = await self._load_memory_from_match(match)
 551 |                 if memory:
 552 |                     query_result = MemoryQueryResult(
 553 |                         memory=memory,
 554 |                         relevance_score=match.get("score", 0.0)
 555 |                     )
 556 |                     results.append(query_result)
 557 |             
 558 |             logger.info(f"Retrieved {len(results)} memories for query")
 559 |             return results
 560 |             
 561 |         except Exception as e:
 562 |             logger.error(f"Failed to retrieve memories: {e}")
 563 |             return []
 564 |     
 565 |     async def _load_memory_from_match(self, match: Dict[str, Any]) -> Optional[Memory]:
 566 |         """Load full memory from Vectorize match."""
 567 |         try:
 568 |             vector_id = match.get("id")
 569 |             metadata = match.get("metadata", {})
 570 |             content_hash = metadata.get("content_hash")
 571 |             
 572 |             if not content_hash:
 573 |                 logger.warning(f"No content_hash in vector metadata: {vector_id}")
 574 |                 return None
 575 |             
 576 |             # Load from D1
 577 |             sql = "SELECT * FROM memories WHERE content_hash = ?"
 578 |             payload = {"sql": sql, "params": [content_hash]}
 579 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 580 |             result = response.json()
 581 |             
 582 |             if not result.get("success") or not result.get("result", [{}])[0].get("results"):
 583 |                 logger.warning(f"Memory not found in D1: {content_hash}")
 584 |                 return None
 585 |             
 586 |             row = result["result"][0]["results"][0]
 587 |             
 588 |             # Load content from R2 if needed
 589 |             content = row["content"]
 590 |             if row.get("r2_key") and content.startswith("[R2 Content:"):
 591 |                 content = await self._load_r2_content(row["r2_key"])
 592 |             
 593 |             # Load tags
 594 |             tags = await self._load_memory_tags(row["id"])
 595 |             
 596 |             # Reconstruct Memory object
 597 |             memory = Memory(
 598 |                 content=content,
 599 |                 content_hash=content_hash,
 600 |                 tags=tags,
 601 |                 memory_type=row.get("memory_type"),
 602 |                 metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
 603 |                 created_at=row.get("created_at"),
 604 |                 created_at_iso=row.get("created_at_iso"),
 605 |                 updated_at=row.get("updated_at"),
 606 |                 updated_at_iso=row.get("updated_at_iso")
 607 |             )
 608 |             
 609 |             return memory
 610 |             
 611 |         except Exception as e:
 612 |             logger.error(f"Failed to load memory from match: {e}")
 613 |             return None
 614 |     
 615 |     async def _load_r2_content(self, r2_key: str) -> str:
 616 |         """Load content from R2."""
 617 |         response = await self._retry_request("GET", f"{self.r2_url}/{r2_key}")
 618 |         return response.text
 619 |     
 620 |     async def _load_memory_tags(self, memory_id: int) -> List[str]:
 621 |         """Load tags for a memory from D1."""
 622 |         sql = """
 623 |         SELECT t.name FROM tags t
 624 |         JOIN memory_tags mt ON t.id = mt.tag_id
 625 |         WHERE mt.memory_id = ?
 626 |         """
 627 |         payload = {"sql": sql, "params": [memory_id]}
 628 |         response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 629 |         result = response.json()
 630 |         
 631 |         if result.get("success") and result.get("result", [{}])[0].get("results"):
 632 |             return [row["name"] for row in result["result"][0]["results"]]
 633 |         
 634 |         return []
 635 |     
 636 |     async def search_by_tags(
 637 |         self,
 638 |         tags: List[str],
 639 |         operation: str = "AND",
 640 |         time_start: Optional[float] = None,
 641 |         time_end: Optional[float] = None
 642 |     ) -> List[Memory]:
 643 |         """Search memories by tags with AND/OR semantics and optional time filtering."""
 644 |         return await self._search_by_tags_internal(
 645 |             tags=tags,
 646 |             operation=operation,
 647 |             time_start=time_start,
 648 |             time_end=time_end
 649 |         )
 650 | 
 651 |     async def search_by_tag(self, tags: List[str], time_start: Optional[float] = None) -> List[Memory]:
 652 |         """Search memories by tags with optional time filtering (legacy OR behavior)."""
 653 |         return await self._search_by_tags_internal(
 654 |             tags=tags,
 655 |             operation="OR",
 656 |             time_start=time_start,
 657 |             time_end=None
 658 |         )
 659 | 
 660 |     async def _search_by_tags_internal(
 661 |         self,
 662 |         tags: List[str],
 663 |         operation: Optional[str] = None,
 664 |         time_start: Optional[float] = None,
 665 |         time_end: Optional[float] = None
 666 |     ) -> List[Memory]:
 667 |         """Shared implementation for tag-based queries with optional time filtering."""
 668 |         try:
 669 |             if not tags:
 670 |                 return []
 671 | 
 672 |             deduped_tags = normalize_tags_for_search(tags)
 673 |             if not deduped_tags:
 674 |                 return []
 675 | 
 676 |             normalized_operation = normalize_operation(operation)
 677 |             sql, params = build_tag_search_query(deduped_tags, normalized_operation,
 678 |                                                 time_start, time_end)
 679 | 
 680 |             payload = {"sql": sql, "params": params}
 681 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 682 |             result = response.json()
 683 | 
 684 |             if not result.get("success"):
 685 |                 raise ValueError(f"D1 tag search failed: {result}")
 686 | 
 687 |             rows = result.get("result", [{}])[0].get("results") or []
 688 |             memories: List[Memory] = []
 689 | 
 690 |             for row in rows:
 691 |                 memory = await self._load_memory_from_row(row)
 692 |                 if memory:
 693 |                     memories.append(memory)
 694 | 
 695 |             logger.info(
 696 |                 "Found %d memories with tags: %s (operation: %s)",
 697 |                 len(memories),
 698 |                 deduped_tags,
 699 |                 normalized_operation
 700 |             )
 701 |             return memories
 702 | 
 703 |         except Exception as e:
 704 |             logger.error(
 705 |                 "Failed to search memories by tags %s with operation %s: %s",
 706 |                 tags,
 707 |                 operation,
 708 |                 e
 709 |             )
 710 |             return []
 711 |     
 712 |     async def _load_memory_from_row(self, row: Dict[str, Any]) -> Optional[Memory]:
 713 |         """Load memory from D1 row data."""
 714 |         try:
 715 |             # Load content from R2 if needed
 716 |             content = row["content"]
 717 |             if row.get("r2_key") and content.startswith("[R2 Content:"):
 718 |                 content = await self._load_r2_content(row["r2_key"])
 719 |             
 720 |             # Load tags
 721 |             tags = await self._load_memory_tags(row["id"])
 722 |             
 723 |             memory = Memory(
 724 |                 content=content,
 725 |                 content_hash=row["content_hash"],
 726 |                 tags=tags,
 727 |                 memory_type=row.get("memory_type"),
 728 |                 metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
 729 |                 created_at=row.get("created_at"),
 730 |                 created_at_iso=row.get("created_at_iso"),
 731 |                 updated_at=row.get("updated_at"),
 732 |                 updated_at_iso=row.get("updated_at_iso")
 733 |             )
 734 |             
 735 |             return memory
 736 |             
 737 |         except Exception as e:
 738 |             logger.error(f"Failed to load memory from row: {e}")
 739 |             return None
 740 |     
 741 |     async def delete(self, content_hash: str) -> Tuple[bool, str]:
 742 |         """Delete a memory by its hash."""
 743 |         try:
 744 |             # Find memory in D1
 745 |             sql = "SELECT * FROM memories WHERE content_hash = ?"
 746 |             payload = {"sql": sql, "params": [content_hash]}
 747 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 748 |             result = response.json()
 749 |             
 750 |             if not result.get("success") or not result.get("result", [{}])[0].get("results"):
 751 |                 return False, f"Memory not found: {content_hash}"
 752 |             
 753 |             row = result["result"][0]["results"][0]
 754 |             memory_id = row["id"]
 755 |             vector_id = row.get("vector_id")
 756 |             r2_key = row.get("r2_key")
 757 |             
 758 |             # Delete from Vectorize
 759 |             if vector_id:
 760 |                 await self._delete_vectorize_vector(vector_id)
 761 |             
 762 |             # Delete from R2 if present
 763 |             if r2_key:
 764 |                 await self._delete_r2_content(r2_key)
 765 |             
 766 |             # Delete from D1 (tags will be cascade deleted)
 767 |             delete_sql = "DELETE FROM memories WHERE id = ?"
 768 |             payload = {"sql": delete_sql, "params": [memory_id]}
 769 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 770 |             result = response.json()
 771 |             
 772 |             if not result.get("success"):
 773 |                 raise ValueError(f"Failed to delete from D1: {result}")
 774 |             
 775 |             logger.info(f"Successfully deleted memory: {content_hash}")
 776 |             return True, "Memory deleted successfully"
 777 | 
 778 |         except Exception as e:
 779 |             logger.error(f"Failed to delete memory {content_hash}: {e}")
 780 |             return False, f"Deletion failed: {str(e)}"
 781 | 
 782 |     async def get_by_hash(self, content_hash: str) -> Optional[Memory]:
 783 |         """Get a memory by its content hash using direct O(1) D1 lookup."""
 784 |         try:
 785 |             # Query D1 for the memory
 786 |             sql = "SELECT * FROM memories WHERE content_hash = ?"
 787 |             payload = {"sql": sql, "params": [content_hash]}
 788 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 789 |             result = response.json()
 790 | 
 791 |             if not result.get("success") or not result.get("result", [{}])[0].get("results"):
 792 |                 return None
 793 | 
 794 |             row = result["result"][0]["results"][0]
 795 | 
 796 |             # Load content from R2 if needed
 797 |             content = row["content"]
 798 |             if row.get("r2_key") and content.startswith("[R2 Content:"):
 799 |                 content = await self._load_r2_content(row["r2_key"])
 800 | 
 801 |             # Load tags
 802 |             tags = await self._load_memory_tags(row["id"])
 803 | 
 804 |             # Construct Memory object
 805 |             memory = Memory(
 806 |                 content=content,
 807 |                 content_hash=content_hash,
 808 |                 tags=tags,
 809 |                 memory_type=row.get("memory_type"),
 810 |                 metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
 811 |                 created_at=row.get("created_at"),
 812 |                 created_at_iso=row.get("created_at_iso"),
 813 |                 updated_at=row.get("updated_at"),
 814 |                 updated_at_iso=row.get("updated_at_iso")
 815 |             )
 816 | 
 817 |             return memory
 818 | 
 819 |         except Exception as e:
 820 |             logger.error(f"Failed to get memory by hash {content_hash}: {e}")
 821 |             return None
 822 | 
 823 |     async def _delete_vectorize_vector(self, vector_id: str) -> None:
 824 |         """Delete vector from Vectorize."""
 825 |         # Correct endpoint uses underscores, not hyphens
 826 |         payload = {"ids": [vector_id]}
 827 | 
 828 |         response = await self._retry_request("POST", f"{self.vectorize_url}/delete_by_ids", json=payload)
 829 |         result = response.json()
 830 | 
 831 |         if not result.get("success"):
 832 |             logger.warning(f"Failed to delete vector from Vectorize: {result}")
 833 | 
 834 |     async def delete_vectors_by_ids(self, vector_ids: List[str]) -> Dict[str, Any]:
 835 |         """Delete multiple vectors from Vectorize by their IDs."""
 836 |         payload = {"ids": vector_ids}
 837 |         response = await self._retry_request(
 838 |             "POST",
 839 |             f"{self.vectorize_url}/delete_by_ids",
 840 |             json=payload
 841 |         )
 842 |         return response.json()
 843 | 
 844 |     async def _delete_r2_content(self, r2_key: str) -> None:
 845 |         """Delete content from R2."""
 846 |         try:
 847 |             response = await self._retry_request("DELETE", f"{self.r2_url}/{r2_key}")
 848 |             if response.status_code not in [200, 204, 404]:  # 404 is fine if already deleted
 849 |                 logger.warning(f"Failed to delete R2 content: {response.status_code}")
 850 |         except Exception as e:
 851 |             logger.warning(f"Failed to delete R2 content {r2_key}: {e}")
 852 |     
 853 |     async def delete_by_tag(self, tag: str) -> Tuple[int, str]:
 854 |         """Delete memories by tag."""
 855 |         try:
 856 |             # Find memories with the tag
 857 |             memories = await self.search_by_tag([tag])
 858 |             
 859 |             deleted_count = 0
 860 |             for memory in memories:
 861 |                 success, _ = await self.delete(memory.content_hash)
 862 |                 if success:
 863 |                     deleted_count += 1
 864 |             
 865 |             logger.info(f"Deleted {deleted_count} memories with tag: {tag}")
 866 |             return deleted_count, f"Deleted {deleted_count} memories"
 867 |             
 868 |         except Exception as e:
 869 |             logger.error(f"Failed to delete by tag {tag}: {e}")
 870 |             return 0, f"Deletion failed: {str(e)}"
 871 |     
 872 |     async def cleanup_duplicates(self) -> Tuple[int, str]:
 873 |         """Remove duplicate memories based on content hash."""
 874 |         try:
 875 |             # Find duplicates in D1
 876 |             sql = """
 877 |             SELECT content_hash, COUNT(*) as count, MIN(id) as keep_id
 878 |             FROM memories
 879 |             GROUP BY content_hash
 880 |             HAVING COUNT(*) > 1
 881 |             """
 882 |             
 883 |             payload = {"sql": sql}
 884 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 885 |             result = response.json()
 886 |             
 887 |             if not result.get("success"):
 888 |                 raise ValueError(f"Failed to find duplicates: {result}")
 889 |             
 890 |             duplicate_groups = result.get("result", [{}])[0].get("results", [])
 891 |             
 892 |             total_deleted = 0
 893 |             for group in duplicate_groups:
 894 |                 content_hash = group["content_hash"]
 895 |                 keep_id = group["keep_id"]
 896 |                 
 897 |                 # Delete all except the first one
 898 |                 delete_sql = "DELETE FROM memories WHERE content_hash = ? AND id != ?"
 899 |                 payload = {"sql": delete_sql, "params": [content_hash, keep_id]}
 900 |                 response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 901 |                 result = response.json()
 902 |                 
 903 |                 if result.get("success") and result.get("result", [{}])[0].get("meta"):
 904 |                     deleted = result["result"][0]["meta"].get("changes", 0)
 905 |                     total_deleted += deleted
 906 |             
 907 |             logger.info(f"Cleaned up {total_deleted} duplicate memories")
 908 |             return total_deleted, f"Removed {total_deleted} duplicates"
 909 |             
 910 |         except Exception as e:
 911 |             logger.error(f"Failed to cleanup duplicates: {e}")
 912 |             return 0, f"Cleanup failed: {str(e)}"
 913 |     
 914 |     async def update_memory_metadata(self, content_hash: str, updates: Dict[str, Any], preserve_timestamps: bool = True) -> Tuple[bool, str]:
 915 |         """Update memory metadata without recreating the entry."""
 916 |         try:
 917 |             # Build update SQL
 918 |             update_fields = []
 919 |             params = []
 920 |             
 921 |             if "metadata" in updates:
 922 |                 update_fields.append("metadata_json = ?")
 923 |                 params.append(json.dumps(updates["metadata"]))
 924 |             
 925 |             if "memory_type" in updates:
 926 |                 update_fields.append("memory_type = ?")
 927 |                 params.append(updates["memory_type"])
 928 |             
 929 |             if "tags" in updates:
 930 |                 # Handle tags separately - they require relational updates
 931 |                 pass
 932 | 
 933 |             # Handle timestamp updates based on preserve_timestamps flag
 934 |             now = time.time()
 935 |             now_iso = datetime.now().isoformat()
 936 | 
 937 |             if not preserve_timestamps:
 938 |                 # When preserve_timestamps=False, use timestamps from updates dict if provided
 939 |                 # This allows syncing timestamps from source (e.g., SQLite → Cloudflare)
 940 |                 # Always preserve created_at (never reset to current time!)
 941 |                 if "created_at" in updates:
 942 |                     update_fields.append("created_at = ?")
 943 |                     params.append(updates["created_at"])
 944 |                 if "created_at_iso" in updates:
 945 |                     update_fields.append("created_at_iso = ?")
 946 |                     params.append(updates["created_at_iso"])
 947 |                 # Use updated_at from updates or current time
 948 |                 if "updated_at" in updates:
 949 |                     update_fields.append("updated_at = ?")
 950 |                     params.append(updates["updated_at"])
 951 |                 else:
 952 |                     update_fields.append("updated_at = ?")
 953 |                     params.append(now)
 954 |                 if "updated_at_iso" in updates:
 955 |                     update_fields.append("updated_at_iso = ?")
 956 |                     params.append(updates["updated_at_iso"])
 957 |                 else:
 958 |                     update_fields.append("updated_at_iso = ?")
 959 |                     params.append(now_iso)
 960 |             else:
 961 |                 # preserve_timestamps=True: only update updated_at to current time
 962 |                 update_fields.append("updated_at = ?")
 963 |                 update_fields.append("updated_at_iso = ?")
 964 |                 params.extend([now, now_iso])
 965 |             
 966 |             if not update_fields:
 967 |                 return True, "No updates needed"
 968 |             
 969 |             # Update memory record
 970 |             sql = f"UPDATE memories SET {', '.join(update_fields)} WHERE content_hash = ?"
 971 |             params.append(content_hash)
 972 |             
 973 |             payload = {"sql": sql, "params": params}
 974 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 975 |             result = response.json()
 976 |             
 977 |             if not result.get("success"):
 978 |                 raise ValueError(f"Failed to update memory: {result}")
 979 |             
 980 |             # Handle tag updates if provided
 981 |             if "tags" in updates:
 982 |                 await self._update_memory_tags(content_hash, updates["tags"])
 983 |             
 984 |             logger.info(f"Successfully updated memory metadata: {content_hash}")
 985 |             return True, "Memory metadata updated successfully"
 986 |             
 987 |         except Exception as e:
 988 |             logger.error(f"Failed to update memory metadata {content_hash}: {e}")
 989 |             return False, f"Update failed: {str(e)}"
 990 |     
 991 |     async def _update_memory_tags(self, content_hash: str, new_tags: List[str]) -> None:
 992 |         """Update tags for a memory."""
 993 |         # Get memory ID
 994 |         sql = "SELECT id FROM memories WHERE content_hash = ?"
 995 |         payload = {"sql": sql, "params": [content_hash]}
 996 |         response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
 997 |         result = response.json()
 998 |         
 999 |         if not result.get("success") or not result.get("result", [{}])[0].get("results"):
1000 |             raise ValueError(f"Memory not found: {content_hash}")
1001 |         
1002 |         memory_id = result["result"][0]["results"][0]["id"]
1003 |         
1004 |         # Delete existing tag relationships
1005 |         delete_sql = "DELETE FROM memory_tags WHERE memory_id = ?"
1006 |         payload = {"sql": delete_sql, "params": [memory_id]}
1007 |         await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1008 |         
1009 |         # Add new tags
1010 |         if new_tags:
1011 |             await self._store_d1_tags(memory_id, new_tags)
1012 |     
1013 |     async def get_stats(self) -> Dict[str, Any]:
1014 |         """Get storage statistics."""
1015 |         try:
1016 |             # Calculate timestamp for memories from last 7 days
1017 |             week_ago = time.time() - (7 * 24 * 60 * 60)
1018 | 
1019 |             # Get memory count and size from D1
1020 |             sql = f"""
1021 |             SELECT
1022 |                 COUNT(*) as total_memories,
1023 |                 SUM(content_size) as total_content_size,
1024 |                 COUNT(DISTINCT vector_id) as total_vectors,
1025 |                 COUNT(r2_key) as r2_stored_count,
1026 |                 (SELECT COUNT(*) FROM tags) as unique_tags,
1027 |                 (SELECT COUNT(*) FROM memories WHERE created_at >= {week_ago}) as memories_this_week
1028 |             FROM memories
1029 |             """
1030 | 
1031 |             payload = {"sql": sql}
1032 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1033 |             result = response.json()
1034 | 
1035 |             if result.get("success") and result.get("result", [{}])[0].get("results"):
1036 |                 stats = result["result"][0]["results"][0]
1037 | 
1038 |                 return {
1039 |                     "total_memories": stats.get("total_memories", 0),
1040 |                     "unique_tags": stats.get("unique_tags", 0),
1041 |                     "memories_this_week": stats.get("memories_this_week", 0),
1042 |                     "total_content_size_bytes": stats.get("total_content_size", 0),
1043 |                     "total_vectors": stats.get("total_vectors", 0),
1044 |                     "r2_stored_count": stats.get("r2_stored_count", 0),
1045 |                     "storage_backend": "cloudflare",
1046 |                     "vectorize_index": self.vectorize_index,
1047 |                     "d1_database": self.d1_database_id,
1048 |                     "r2_bucket": self.r2_bucket,
1049 |                     "status": "operational"
1050 |                 }
1051 | 
1052 |             return {
1053 |                 "total_memories": 0,
1054 |                 "unique_tags": 0,
1055 |                 "memories_this_week": 0,
1056 |                 "storage_backend": "cloudflare",
1057 |                 "status": "operational"
1058 |             }
1059 | 
1060 |         except Exception as e:
1061 |             logger.error(f"Failed to get stats: {e}")
1062 |             return {
1063 |                 "total_memories": 0,
1064 |                 "unique_tags": 0,
1065 |                 "memories_this_week": 0,
1066 |                 "storage_backend": "cloudflare",
1067 |                 "status": "error",
1068 |                 "error": str(e)
1069 |             }
1070 |     
1071 |     async def get_all_tags(self) -> List[str]:
1072 |         """Get all unique tags in the storage."""
1073 |         try:
1074 |             sql = "SELECT name FROM tags ORDER BY name"
1075 |             payload = {"sql": sql}
1076 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1077 |             result = response.json()
1078 | 
1079 |             if result.get("success") and result.get("result", [{}])[0].get("results"):
1080 |                 return [row["name"] for row in result["result"][0]["results"]]
1081 | 
1082 |             return []
1083 | 
1084 |         except Exception as e:
1085 |             logger.error(f"Failed to get all tags: {e}")
1086 |             return []
1087 | 
1088 |     async def get_all_tags_with_counts(self) -> List[Dict[str, Any]]:
1089 |         """Get all tags with their usage counts."""
1090 |         try:
1091 |             sql = """
1092 |                 SELECT t.name as tag, COUNT(mt.memory_id) as count
1093 |                 FROM tags t
1094 |                 LEFT JOIN memory_tags mt ON t.id = mt.tag_id
1095 |                 GROUP BY t.id
1096 |                 ORDER BY count DESC, t.name ASC
1097 |             """
1098 |             payload = {"sql": sql}
1099 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1100 |             result = response.json()
1101 | 
1102 |             if result.get("success") and result.get("result", [{}])[0].get("results"):
1103 |                 return [row for row in result["result"][0]["results"]]
1104 | 
1105 |             return []
1106 | 
1107 |         except Exception as e:
1108 |             logger.error(f"Failed to get tags with counts: {e}")
1109 |             return []
1110 |     
1111 |     async def get_recent_memories(self, n: int = 10) -> List[Memory]:
1112 |         """Get n most recent memories."""
1113 |         try:
1114 |             sql = "SELECT * FROM memories ORDER BY created_at DESC LIMIT ?"
1115 |             payload = {"sql": sql, "params": [n]}
1116 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1117 |             result = response.json()
1118 | 
1119 |             memories = []
1120 |             if result.get("success") and result.get("result", [{}])[0].get("results"):
1121 |                 for row in result["result"][0]["results"]:
1122 |                     memory = await self._load_memory_from_row(row)
1123 |                     if memory:
1124 |                         memories.append(memory)
1125 | 
1126 |             logger.info(f"Retrieved {len(memories)} recent memories")
1127 |             return memories
1128 | 
1129 |         except Exception as e:
1130 |             logger.error(f"Failed to get recent memories: {e}")
1131 |             return []
1132 | 
1133 |     async def get_largest_memories(self, n: int = 10) -> List[Memory]:
1134 |         """Get n largest memories by content length."""
1135 |         try:
1136 |             sql = "SELECT * FROM memories ORDER BY LENGTH(content) DESC LIMIT ?"
1137 |             payload = {"sql": sql, "params": [n]}
1138 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1139 |             result = response.json()
1140 | 
1141 |             memories = []
1142 |             if result.get("success") and result.get("result", [{}])[0].get("results"):
1143 |                 for row in result["result"][0]["results"]:
1144 |                     memory = await self._load_memory_from_row(row)
1145 |                     if memory:
1146 |                         memories.append(memory)
1147 | 
1148 |             logger.info(f"Retrieved {len(memories)} largest memories")
1149 |             return memories
1150 | 
1151 |         except Exception as e:
1152 |             logger.error(f"Failed to get largest memories: {e}")
1153 |             return []
1154 | 
1155 |     async def _fetch_d1_timestamps(self, cutoff_timestamp: Optional[float] = None) -> List[float]:
1156 |         """Fetch timestamps from D1 database with optional time filter.
1157 | 
1158 |         Args:
1159 |             cutoff_timestamp: Optional Unix timestamp to filter memories (>=)
1160 | 
1161 |         Returns:
1162 |             List of Unix timestamps (float) in descending order
1163 | 
1164 |         Raises:
1165 |             Exception: If D1 query fails
1166 |         """
1167 |         if cutoff_timestamp is not None:
1168 |             sql = "SELECT created_at FROM memories WHERE created_at >= ? ORDER BY created_at DESC"
1169 |             payload = {"sql": sql, "params": [cutoff_timestamp]}
1170 |         else:
1171 |             sql = "SELECT created_at FROM memories ORDER BY created_at DESC"
1172 |             payload = {"sql": sql, "params": []}
1173 | 
1174 |         response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1175 |         result = response.json()
1176 | 
1177 |         timestamps = []
1178 |         if result.get("success") and result.get("result", [{}])[0].get("results"):
1179 |             for row in result["result"][0]["results"]:
1180 |                 if row.get("created_at") is not None:
1181 |                     timestamps.append(float(row["created_at"]))
1182 | 
1183 |         return timestamps
1184 | 
1185 |     async def get_memory_timestamps(self, days: Optional[int] = None) -> List[float]:
1186 |         """
1187 |         Get memory creation timestamps only, without loading full memory objects.
1188 | 
1189 |         This is an optimized method for analytics that only needs timestamps,
1190 |         avoiding the overhead of loading full memory content and embeddings.
1191 | 
1192 |         Args:
1193 |             days: Optional filter to only get memories from last N days
1194 | 
1195 |         Returns:
1196 |             List of Unix timestamps (float) in descending order (newest first)
1197 |         """
1198 |         try:
1199 |             cutoff_timestamp = None
1200 |             if days is not None:
1201 |                 cutoff = datetime.now(timezone.utc) - timedelta(days=days)
1202 |                 cutoff_timestamp = cutoff.timestamp()
1203 | 
1204 |             timestamps = await self._fetch_d1_timestamps(cutoff_timestamp)
1205 |             logger.info(f"Retrieved {len(timestamps)} memory timestamps")
1206 |             return timestamps
1207 | 
1208 |         except Exception as e:
1209 |             logger.error(f"Failed to get memory timestamps: {e}")
1210 |             return []
1211 | 
1212 |     def sanitized(self, tags):
1213 |         """Sanitize and normalize tags to a JSON string.
1214 | 
1215 |         This method provides compatibility with the storage backend interface.
1216 |         """
1217 |         if tags is None:
1218 |             return json.dumps([])
1219 |         
1220 |         # If we get a string, split it into an array
1221 |         if isinstance(tags, str):
1222 |             tags = [tag.strip() for tag in tags.split(",") if tag.strip()]
1223 |         # If we get an array, use it directly
1224 |         elif isinstance(tags, list):
1225 |             tags = [str(tag).strip() for tag in tags if str(tag).strip()]
1226 |         else:
1227 |             return json.dumps([])
1228 |                 
1229 |         # Return JSON string representation of the array
1230 |         return json.dumps(tags)
1231 |     
1232 |     async def recall(self, query: Optional[str] = None, n_results: int = 5, start_timestamp: Optional[float] = None, end_timestamp: Optional[float] = None) -> List[MemoryQueryResult]:
1233 |         """
1234 |         Retrieve memories with combined time filtering and optional semantic search.
1235 | 
1236 |         Args:
1237 |             query: Optional semantic search query. If None, only time filtering is applied.
1238 |             n_results: Maximum number of results to return.
1239 |             start_timestamp: Optional start time for filtering.
1240 |             end_timestamp: Optional end time for filtering.
1241 | 
1242 |         Returns:
1243 |             List of MemoryQueryResult objects.
1244 |         """
1245 |         try:
1246 |             # Build time filtering WHERE clause for D1
1247 |             time_conditions = []
1248 |             params = []
1249 | 
1250 |             if start_timestamp is not None:
1251 |                 time_conditions.append("created_at >= ?")
1252 |                 params.append(float(start_timestamp))
1253 | 
1254 |             if end_timestamp is not None:
1255 |                 time_conditions.append("created_at <= ?")
1256 |                 params.append(float(end_timestamp))
1257 | 
1258 |             time_where = " AND ".join(time_conditions) if time_conditions else ""
1259 | 
1260 |             logger.info(f"Recall - Time filtering conditions: {time_where}, params: {params}")
1261 | 
1262 |             # Determine search strategy
1263 |             if query and query.strip():
1264 |                 # Combined semantic search with time filtering
1265 |                 logger.info(f"Recall - Using semantic search with query: '{query}'")
1266 | 
1267 |                 try:
1268 |                     # Generate query embedding
1269 |                     query_embedding = await self._generate_embedding(query)
1270 | 
1271 |                     # Search Vectorize with semantic query
1272 |                     search_payload = {
1273 |                         "vector": query_embedding,
1274 |                         "topK": n_results,
1275 |                         "returnMetadata": "all",
1276 |                         "returnValues": False
1277 |                     }
1278 | 
1279 |                     # Add time filtering to vectorize metadata if specified
1280 |                     if time_conditions:
1281 |                         # Note: Vectorize metadata filtering capabilities may be limited
1282 |                         # We'll filter after retrieval for now
1283 |                         logger.info("Recall - Time filtering will be applied post-retrieval from Vectorize")
1284 | 
1285 |                     response = await self._retry_request("POST", f"{self.vectorize_url}/query", json=search_payload)
1286 |                     result = response.json()
1287 | 
1288 |                     if not result.get("success"):
1289 |                         raise ValueError(f"Vectorize query failed: {result}")
1290 | 
1291 |                     matches = result.get("result", {}).get("matches", [])
1292 | 
1293 |                     # Convert matches to MemoryQueryResult objects with time filtering
1294 |                     results = []
1295 |                     for match in matches:
1296 |                         memory = await self._load_memory_from_match(match)
1297 |                         if memory:
1298 |                             # Apply time filtering if needed
1299 |                             if start_timestamp is not None and memory.created_at and memory.created_at < start_timestamp:
1300 |                                 continue
1301 |                             if end_timestamp is not None and memory.created_at and memory.created_at > end_timestamp:
1302 |                                 continue
1303 | 
1304 |                             query_result = MemoryQueryResult(
1305 |                                 memory=memory,
1306 |                                 relevance_score=match.get("score", 0.0)
1307 |                             )
1308 |                             results.append(query_result)
1309 | 
1310 |                     logger.info(f"Recall - Retrieved {len(results)} memories with semantic search and time filtering")
1311 |                     return results[:n_results]  # Ensure we don't exceed n_results
1312 | 
1313 |                 except Exception as e:
1314 |                     logger.error(f"Recall - Semantic search failed, falling back to time-based search: {e}")
1315 |                     # Fall through to time-based search
1316 | 
1317 |             # Time-based search only (or fallback)
1318 |             logger.info(f"Recall - Using time-based search only")
1319 | 
1320 |             # Build D1 query for time-based retrieval
1321 |             if time_where:
1322 |                 sql = f"SELECT * FROM memories WHERE {time_where} ORDER BY created_at DESC LIMIT ?"
1323 |                 params.append(n_results)
1324 |             else:
1325 |                 # No time filters, get most recent
1326 |                 sql = "SELECT * FROM memories ORDER BY created_at DESC LIMIT ?"
1327 |                 params = [n_results]
1328 | 
1329 |             payload = {"sql": sql, "params": params}
1330 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1331 |             result = response.json()
1332 | 
1333 |             if not result.get("success"):
1334 |                 raise ValueError(f"D1 query failed: {result}")
1335 | 
1336 |             # Convert D1 results to MemoryQueryResult objects
1337 |             results = []
1338 |             if result.get("result", [{}])[0].get("results"):
1339 |                 for row in result["result"][0]["results"]:
1340 |                     memory = await self._load_memory_from_row(row)
1341 |                     if memory:
1342 |                         # For time-based search without semantic query, use timestamp as relevance
1343 |                         relevance_score = memory.created_at or 0.0
1344 |                         query_result = MemoryQueryResult(
1345 |                             memory=memory,
1346 |                             relevance_score=relevance_score
1347 |                         )
1348 |                         results.append(query_result)
1349 | 
1350 |             logger.info(f"Recall - Retrieved {len(results)} memories with time-based search")
1351 |             return results
1352 | 
1353 |         except Exception as e:
1354 |             logger.error(f"Recall failed: {e}")
1355 |             return []
1356 | 
1357 |     async def get_all_memories(self, limit: int = None, offset: int = 0, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> List[Memory]:
1358 |         """
1359 |         Get all memories in storage ordered by creation time (newest first).
1360 | 
1361 |         Args:
1362 |             limit: Maximum number of memories to return (None for all)
1363 |             offset: Number of memories to skip (for pagination)
1364 |             memory_type: Optional filter by memory type
1365 |             tags: Optional filter by tags (matches ANY of the provided tags)
1366 | 
1367 |         Returns:
1368 |             List of Memory objects ordered by created_at DESC, optionally filtered by type and tags
1369 |         """
1370 |         try:
1371 |             # Build SQL query with optional memory_type and tags filters
1372 |             sql = "SELECT m.* FROM memories m"
1373 |             joins = ""
1374 |             where_conditions = []
1375 |             params: List[Any] = []
1376 | 
1377 |             if memory_type is not None:
1378 |                 where_conditions.append("m.memory_type = ?")
1379 |                 params.append(memory_type)
1380 | 
1381 |             tag_count = 0
1382 |             if tags:
1383 |                 tag_count = len(tags)
1384 |                 placeholders = ",".join(["?"] * tag_count)
1385 |                 joins += " " + """
1386 |                     INNER JOIN memory_tags mt ON m.id = mt.memory_id
1387 |                     INNER JOIN tags t ON mt.tag_id = t.id
1388 |                 """.strip().replace("\n", " ")
1389 |                 where_conditions.append(f"t.name IN ({placeholders})")
1390 |                 params.extend(tags)
1391 | 
1392 |             if joins:
1393 |                 sql += joins
1394 | 
1395 |             if where_conditions:
1396 |                 sql += " WHERE " + " AND ".join(where_conditions)
1397 | 
1398 |             if tags:
1399 |                 sql += " GROUP BY m.id HAVING COUNT(DISTINCT t.name) = ?"
1400 | 
1401 |             sql += " ORDER BY m.created_at DESC"
1402 | 
1403 |             query_params = params.copy()
1404 |             if tags:
1405 |                 query_params.append(tag_count)
1406 | 
1407 |             if limit is not None:
1408 |                 sql += " LIMIT ?"
1409 |                 query_params.append(limit)
1410 | 
1411 |             if offset > 0:
1412 |                 sql += " OFFSET ?"
1413 |                 query_params.append(offset)
1414 | 
1415 |             payload = {"sql": sql, "params": query_params}
1416 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1417 |             result = response.json()
1418 | 
1419 |             if not result.get("success"):
1420 |                 raise ValueError(f"D1 query failed: {result}")
1421 | 
1422 |             memories = []
1423 |             if result.get("result", [{}])[0].get("results"):
1424 |                 for row in result["result"][0]["results"]:
1425 |                     memory = await self._load_memory_from_row(row)
1426 |                     if memory:
1427 |                         memories.append(memory)
1428 | 
1429 |             logger.debug(f"Retrieved {len(memories)} memories from D1")
1430 |             return memories
1431 | 
1432 |         except Exception as e:
1433 |             logger.error(f"Error getting all memories: {str(e)}")
1434 |             return []
1435 | 
1436 |     def _row_to_memory(self, row: Dict[str, Any]) -> Memory:
1437 |         """Convert D1 row to Memory object without loading tags (for bulk operations)."""
1438 |         # Load content from R2 if needed
1439 |         content = row["content"]
1440 |         if row.get("r2_key") and content.startswith("[R2 Content:"):
1441 |             # For bulk operations, we don't load R2 content to avoid additional requests
1442 |             # Just keep the placeholder
1443 |             pass
1444 | 
1445 |         return Memory(
1446 |             content=content,
1447 |             content_hash=row["content_hash"],
1448 |             tags=[],  # Skip tag loading for bulk operations
1449 |             memory_type=row.get("memory_type"),
1450 |             metadata=json.loads(row["metadata_json"]) if row.get("metadata_json") else {},
1451 |             created_at=row.get("created_at"),
1452 |             created_at_iso=row.get("created_at_iso"),
1453 |             updated_at=row.get("updated_at"),
1454 |             updated_at_iso=row.get("updated_at_iso")
1455 |         )
1456 | 
1457 |     async def get_all_memories_bulk(
1458 |         self,
1459 |         include_tags: bool = False
1460 |     ) -> List[Memory]:
1461 |         """
1462 |         Efficiently load all memories from D1.
1463 | 
1464 |         If include_tags=False, skips N+1 tag queries for better performance.
1465 |         Useful for maintenance scripts that don't need tag information.
1466 | 
1467 |         Args:
1468 |             include_tags: Whether to load tags for each memory (slower but complete)
1469 | 
1470 |         Returns:
1471 |             List of Memory objects ordered by created_at DESC
1472 |         """
1473 |         query = "SELECT * FROM memories ORDER BY created_at DESC"
1474 |         payload = {"sql": query}
1475 |         response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1476 |         result = response.json()
1477 | 
1478 |         if not result.get("success"):
1479 |             raise ValueError(f"D1 query failed: {result}")
1480 | 
1481 |         memories = []
1482 |         if result.get("result", [{}])[0].get("results"):
1483 |             for row in result["result"][0]["results"]:
1484 |                 if include_tags:
1485 |                     memory = await self._load_memory_from_row(row)
1486 |                 else:
1487 |                     memory = self._row_to_memory(row)
1488 |                 if memory:
1489 |                     memories.append(memory)
1490 | 
1491 |         logger.debug(f"Bulk loaded {len(memories)} memories from D1")
1492 |         return memories
1493 | 
1494 |     async def get_all_memories_cursor(self, limit: int = None, cursor: float = None, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> List[Memory]:
1495 |         """
1496 |         Get all memories using cursor-based pagination to avoid D1 OFFSET limitations.
1497 | 
1498 |         This method uses timestamp-based cursors instead of OFFSET, which is more efficient
1499 |         and avoids Cloudflare D1's OFFSET limitations that cause 400 Bad Request errors.
1500 | 
1501 |         Args:
1502 |             limit: Maximum number of memories to return (None for all)
1503 |             cursor: Timestamp cursor for pagination (created_at value from last result)
1504 |             memory_type: Optional filter by memory type
1505 |             tags: Optional filter by tags (matches ANY of the provided tags)
1506 | 
1507 |         Returns:
1508 |             List of Memory objects ordered by created_at DESC, starting after cursor
1509 |         """
1510 |         try:
1511 |             # Build SQL query with cursor-based pagination
1512 |             sql = "SELECT * FROM memories"
1513 |             params = []
1514 |             where_conditions = []
1515 | 
1516 |             # Add cursor condition (timestamp-based pagination)
1517 |             if cursor is not None:
1518 |                 where_conditions.append("created_at < ?")
1519 |                 params.append(cursor)
1520 | 
1521 |             # Add memory_type filter if specified
1522 |             if memory_type is not None:
1523 |                 where_conditions.append("memory_type = ?")
1524 |                 params.append(memory_type)
1525 | 
1526 |             # Add tags filter if specified (using LIKE for tag matching)
1527 |             if tags and len(tags) > 0:
1528 |                 tag_conditions = " OR ".join(["tags LIKE ?" for _ in tags])
1529 |                 where_conditions.append(f"({tag_conditions})")
1530 |                 params.extend([f"%{tag}%" for tag in tags])
1531 | 
1532 |             # Apply WHERE clause if we have any conditions
1533 |             if where_conditions:
1534 |                 sql += " WHERE " + " AND ".join(where_conditions)
1535 | 
1536 |             sql += " ORDER BY created_at DESC"
1537 | 
1538 |             if limit is not None:
1539 |                 sql += " LIMIT ?"
1540 |                 params.append(limit)
1541 | 
1542 |             payload = {"sql": sql, "params": params}
1543 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1544 |             result = response.json()
1545 | 
1546 |             if not result.get("success"):
1547 |                 raise ValueError(f"D1 query failed: {result}")
1548 | 
1549 |             memories = []
1550 |             if result.get("result", [{}])[0].get("results"):
1551 |                 for row in result["result"][0]["results"]:
1552 |                     memory = await self._load_memory_from_row(row)
1553 |                     if memory:
1554 |                         memories.append(memory)
1555 | 
1556 |             logger.debug(f"Retrieved {len(memories)} memories from D1 with cursor-based pagination")
1557 |             return memories
1558 | 
1559 |         except Exception as e:
1560 |             logger.error(f"Error getting memories with cursor: {str(e)}")
1561 |             return []
1562 | 
1563 |     async def get_memories_updated_since(self, timestamp: float, limit: int = 100) -> List[Memory]:
1564 |         """
1565 |         Get memories updated since a specific timestamp (v8.25.0+).
1566 | 
1567 |         Used by hybrid backend drift detection to find memories with metadata
1568 |         changes that need to be synchronized.
1569 | 
1570 |         Args:
1571 |             timestamp: Unix timestamp (seconds since epoch)
1572 |             limit: Maximum number of memories to return (default: 100)
1573 | 
1574 |         Returns:
1575 |             List of Memory objects updated after the timestamp, ordered by updated_at DESC
1576 |         """
1577 |         try:
1578 |             # Convert timestamp to ISO format for D1 query
1579 |             from datetime import datetime
1580 |             dt = datetime.utcfromtimestamp(timestamp)
1581 |             iso_timestamp = dt.isoformat()
1582 | 
1583 |             query = """
1584 |             SELECT content, content_hash, tags, memory_type, metadata,
1585 |                    created_at, created_at_iso, updated_at, updated_at_iso
1586 |             FROM memories
1587 |             WHERE updated_at_iso > ?
1588 |             ORDER BY updated_at DESC
1589 |             LIMIT ?
1590 |             """
1591 | 
1592 |             payload = {"sql": query, "params": [iso_timestamp, limit]}
1593 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1594 |             result = response.json()
1595 | 
1596 |             if not result.get("success"):
1597 |                 logger.warning(f"Failed to get updated memories: {result.get('error')}")
1598 |                 return []
1599 | 
1600 |             memories = []
1601 |             if result.get("result", [{}])[0].get("results"):
1602 |                 for row in result["result"][0]["results"]:
1603 |                     memory = await self._load_memory_from_row(row)
1604 |                     if memory:
1605 |                         memories.append(memory)
1606 | 
1607 |             logger.debug(f"Retrieved {len(memories)} memories updated since {iso_timestamp}")
1608 |             return memories
1609 | 
1610 |         except Exception as e:
1611 |             logger.error(f"Error getting updated memories: {e}")
1612 |             return []
1613 | 
1614 |     async def get_memories_by_time_range(self, start_time: float, end_time: float) -> List[Memory]:
1615 |         """
1616 |         Get memories within a specific time range (v8.31.0+).
1617 | 
1618 |         Pushes date-range filtering to D1 database layer for better performance.
1619 |         This optimization reduces network transfer and enables efficient queries
1620 |         on databases with >10,000 memories.
1621 | 
1622 |         Benefits:
1623 |         - Reduces network transfer (only relevant memories)
1624 |         - Leverages D1 indexes on created_at
1625 |         - Scales to databases with >10,000 memories
1626 |         - 10x performance improvement over application-layer filtering
1627 | 
1628 |         Args:
1629 |             start_time: Start timestamp (Unix seconds, inclusive)
1630 |             end_time: End timestamp (Unix seconds, inclusive)
1631 | 
1632 |         Returns:
1633 |             List of Memory objects within the time range, ordered by created_at DESC
1634 |         """
1635 |         try:
1636 |             # Build SQL query with BETWEEN clause for efficient filtering
1637 |             # Use SELECT m.* to get all columns (tags are loaded separately via _load_memory_tags)
1638 |             sql = """
1639 |                 SELECT m.*
1640 |                 FROM memories m
1641 |                 WHERE m.created_at BETWEEN ? AND ?
1642 |                 ORDER BY m.created_at DESC
1643 |             """
1644 | 
1645 |             payload = {"sql": sql, "params": [start_time, end_time]}
1646 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1647 |             result = response.json()
1648 | 
1649 |             if not result.get("success"):
1650 |                 logger.error(f"D1 query failed: {result}")
1651 |                 return []
1652 | 
1653 |             memories = []
1654 |             if result.get("result", [{}])[0].get("results"):
1655 |                 for row in result["result"][0]["results"]:
1656 |                     memory = await self._load_memory_from_row(row)
1657 |                     if memory:
1658 |                         memories.append(memory)
1659 | 
1660 |             logger.info(f"Retrieved {len(memories)} memories in time range {start_time}-{end_time}")
1661 |             return memories
1662 | 
1663 |         except Exception as e:
1664 |             logger.error(f"Error getting memories by time range: {str(e)}")
1665 |             return []
1666 | 
1667 |     async def count_all_memories(self, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> int:
1668 |         """
1669 |         Get total count of memories in storage.
1670 | 
1671 |         Args:
1672 |             memory_type: Optional filter by memory type
1673 |             tags: Optional filter by tags (memories matching ANY of the tags)
1674 | 
1675 |         Returns:
1676 |             Total number of memories, optionally filtered by type and/or tags
1677 |         """
1678 |         try:
1679 |             # Build query with filters
1680 |             base_sql = "SELECT m.id FROM memories m"
1681 |             joins = ""
1682 |             conditions = []
1683 |             params: List[Any] = []
1684 | 
1685 |             if memory_type is not None:
1686 |                 conditions.append('m.memory_type = ?')
1687 |                 params.append(memory_type)
1688 | 
1689 |             tag_count = 0
1690 |             if tags:
1691 |                 tag_count = len(tags)
1692 |                 placeholders = ','.join(['?'] * tag_count)
1693 |                 joins += " " + """
1694 |                     INNER JOIN memory_tags mt ON m.id = mt.memory_id
1695 |                     INNER JOIN tags t ON mt.tag_id = t.id
1696 |                 """.strip().replace("\n", " ")
1697 |                 conditions.append(f't.name IN ({placeholders})')
1698 |                 params.extend(tags)
1699 | 
1700 |             sql = base_sql + joins
1701 |             if conditions:
1702 |                 sql += ' WHERE ' + ' AND '.join(conditions)
1703 | 
1704 |             if tags:
1705 |                 sql += ' GROUP BY m.id HAVING COUNT(DISTINCT t.name) = ?'
1706 | 
1707 |             count_sql = f'SELECT COUNT(*) as count FROM ({sql}) AS subquery'
1708 | 
1709 |             count_params = params.copy()
1710 |             if tags:
1711 |                 count_params.append(tag_count)
1712 | 
1713 |             payload = {"sql": count_sql, "params": count_params}
1714 |             response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
1715 |             result = response.json()
1716 | 
1717 |             if not result.get("success"):
1718 |                 raise ValueError(f"D1 query failed: {result}")
1719 | 
1720 |             if result.get("result", [{}])[0].get("results"):
1721 |                 count = result["result"][0]["results"][0].get("count", 0)
1722 |                 return int(count)
1723 | 
1724 |             return 0
1725 | 
1726 |         except Exception as e:
1727 |             logger.error(f"Error counting memories: {str(e)}")
1728 |             return 0
1729 | 
1730 |     async def close(self) -> None:
1731 |         """Close the storage backend and cleanup resources."""
1732 |         if self.client:
1733 |             await self.client.aclose()
1734 |             self.client = None
1735 | 
1736 |         # Clear embedding cache
1737 |         self._embedding_cache.clear()
1738 | 
1739 |         logger.info("Cloudflare storage backend closed")
1740 | 
```
Page 39/47FirstPrevNextLast