This is page 25 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── amp-bridge.md
│ │ ├── amp-pr-automator.md
│ │ ├── code-quality-guard.md
│ │ ├── gemini-pr-automator.md
│ │ └── github-release-manager.md
│ ├── settings.local.json.backup
│ └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── performance_issue.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── bridge-tests.yml
│ ├── CACHE_FIX.md
│ ├── claude-code-review.yml
│ ├── claude.yml
│ ├── cleanup-images.yml.disabled
│ ├── dev-setup-validation.yml
│ ├── docker-publish.yml
│ ├── LATEST_FIXES.md
│ ├── main-optimized.yml.disabled
│ ├── main.yml
│ ├── publish-and-test.yml
│ ├── README_OPTIMIZATION.md
│ ├── release-tag.yml.disabled
│ ├── release.yml
│ ├── roadmap-review-reminder.yml
│ ├── SECRET_CONDITIONAL_FIX.md
│ └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│ ├── .gitignore
│ └── reports
│ └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│ ├── deployment
│ │ ├── deploy_fastmcp_fixed.sh
│ │ ├── deploy_http_with_mcp.sh
│ │ └── deploy_mcp_v4.sh
│ ├── deployment-configs
│ │ ├── empty_config.yml
│ │ └── smithery.yaml
│ ├── development
│ │ └── test_fastmcp.py
│ ├── docs-removed-2025-08-23
│ │ ├── authentication.md
│ │ ├── claude_integration.md
│ │ ├── claude-code-compatibility.md
│ │ ├── claude-code-integration.md
│ │ ├── claude-code-quickstart.md
│ │ ├── claude-desktop-setup.md
│ │ ├── complete-setup-guide.md
│ │ ├── database-synchronization.md
│ │ ├── development
│ │ │ ├── autonomous-memory-consolidation.md
│ │ │ ├── CLEANUP_PLAN.md
│ │ │ ├── CLEANUP_README.md
│ │ │ ├── CLEANUP_SUMMARY.md
│ │ │ ├── dream-inspired-memory-consolidation.md
│ │ │ ├── hybrid-slm-memory-consolidation.md
│ │ │ ├── mcp-milestone.md
│ │ │ ├── multi-client-architecture.md
│ │ │ ├── test-results.md
│ │ │ └── TIMESTAMP_FIX_SUMMARY.md
│ │ ├── distributed-sync.md
│ │ ├── invocation_guide.md
│ │ ├── macos-intel.md
│ │ ├── master-guide.md
│ │ ├── mcp-client-configuration.md
│ │ ├── multi-client-server.md
│ │ ├── service-installation.md
│ │ ├── sessions
│ │ │ └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│ │ ├── UBUNTU_SETUP.md
│ │ ├── ubuntu.md
│ │ ├── windows-setup.md
│ │ └── windows.md
│ ├── docs-root-cleanup-2025-08-23
│ │ ├── AWESOME_LIST_SUBMISSION.md
│ │ ├── CLOUDFLARE_IMPLEMENTATION.md
│ │ ├── DOCUMENTATION_ANALYSIS.md
│ │ ├── DOCUMENTATION_CLEANUP_PLAN.md
│ │ ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│ │ ├── LITESTREAM_SETUP_GUIDE.md
│ │ ├── lm_studio_system_prompt.md
│ │ ├── PYTORCH_DOWNLOAD_FIX.md
│ │ └── README-ORIGINAL-BACKUP.md
│ ├── investigations
│ │ └── MACOS_HOOKS_INVESTIGATION.md
│ ├── litestream-configs-v6.3.0
│ │ ├── install_service.sh
│ │ ├── litestream_master_config_fixed.yml
│ │ ├── litestream_master_config.yml
│ │ ├── litestream_replica_config_fixed.yml
│ │ ├── litestream_replica_config.yml
│ │ ├── litestream_replica_simple.yml
│ │ ├── litestream-http.service
│ │ ├── litestream.service
│ │ └── requirements-cloudflare.txt
│ ├── release-notes
│ │ └── release-notes-v7.1.4.md
│ └── setup-development
│ ├── README.md
│ ├── setup_consolidation_mdns.sh
│ ├── STARTUP_SETUP_GUIDE.md
│ └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│ ├── memory-context.md
│ ├── memory-health.md
│ ├── memory-ingest-dir.md
│ ├── memory-ingest.md
│ ├── memory-recall.md
│ ├── memory-search.md
│ ├── memory-store.md
│ ├── README.md
│ └── session-start.md
├── claude-hooks
│ ├── config.json
│ ├── config.template.json
│ ├── CONFIGURATION.md
│ ├── core
│ │ ├── memory-retrieval.js
│ │ ├── mid-conversation.js
│ │ ├── session-end.js
│ │ ├── session-start.js
│ │ └── topic-change.js
│ ├── debug-pattern-test.js
│ ├── install_claude_hooks_windows.ps1
│ ├── install_hooks.py
│ ├── memory-mode-controller.js
│ ├── MIGRATION.md
│ ├── README-NATURAL-TRIGGERS.md
│ ├── README-phase2.md
│ ├── README.md
│ ├── simple-test.js
│ ├── statusline.sh
│ ├── test-adaptive-weights.js
│ ├── test-dual-protocol-hook.js
│ ├── test-mcp-hook.js
│ ├── test-natural-triggers.js
│ ├── test-recency-scoring.js
│ ├── tests
│ │ ├── integration-test.js
│ │ ├── phase2-integration-test.js
│ │ ├── test-code-execution.js
│ │ ├── test-cross-session.json
│ │ ├── test-session-tracking.json
│ │ └── test-threading.json
│ ├── utilities
│ │ ├── adaptive-pattern-detector.js
│ │ ├── context-formatter.js
│ │ ├── context-shift-detector.js
│ │ ├── conversation-analyzer.js
│ │ ├── dynamic-context-updater.js
│ │ ├── git-analyzer.js
│ │ ├── mcp-client.js
│ │ ├── memory-client.js
│ │ ├── memory-scorer.js
│ │ ├── performance-manager.js
│ │ ├── project-detector.js
│ │ ├── session-tracker.js
│ │ ├── tiered-conversation-monitor.js
│ │ └── version-checker.js
│ └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│ ├── amp-cli-bridge.md
│ ├── api
│ │ ├── code-execution-interface.md
│ │ ├── memory-metadata-api.md
│ │ ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_REPORT.md
│ │ └── tag-standardization.md
│ ├── architecture
│ │ ├── search-enhancement-spec.md
│ │ └── search-examples.md
│ ├── architecture.md
│ ├── archive
│ │ └── obsolete-workflows
│ │ ├── load_memory_context.md
│ │ └── README.md
│ ├── assets
│ │ └── images
│ │ ├── dashboard-v3.3.0-preview.png
│ │ ├── memory-awareness-hooks-example.png
│ │ ├── project-infographic.svg
│ │ └── README.md
│ ├── CLAUDE_CODE_QUICK_REFERENCE.md
│ ├── cloudflare-setup.md
│ ├── deployment
│ │ ├── docker.md
│ │ ├── dual-service.md
│ │ ├── production-guide.md
│ │ └── systemd-service.md
│ ├── development
│ │ ├── ai-agent-instructions.md
│ │ ├── code-quality
│ │ │ ├── phase-2a-completion.md
│ │ │ ├── phase-2a-handle-get-prompt.md
│ │ │ ├── phase-2a-index.md
│ │ │ ├── phase-2a-install-package.md
│ │ │ └── phase-2b-session-summary.md
│ │ ├── code-quality-workflow.md
│ │ ├── dashboard-workflow.md
│ │ ├── issue-management.md
│ │ ├── pr-review-guide.md
│ │ ├── refactoring-notes.md
│ │ ├── release-checklist.md
│ │ └── todo-tracker.md
│ ├── docker-optimized-build.md
│ ├── document-ingestion.md
│ ├── DOCUMENTATION_AUDIT.md
│ ├── enhancement-roadmap-issue-14.md
│ ├── examples
│ │ ├── analysis-scripts.js
│ │ ├── maintenance-session-example.md
│ │ ├── memory-distribution-chart.jsx
│ │ └── tag-schema.json
│ ├── first-time-setup.md
│ ├── glama-deployment.md
│ ├── guides
│ │ ├── advanced-command-examples.md
│ │ ├── chromadb-migration.md
│ │ ├── commands-vs-mcp-server.md
│ │ ├── mcp-enhancements.md
│ │ ├── mdns-service-discovery.md
│ │ ├── memory-consolidation-guide.md
│ │ ├── migration.md
│ │ ├── scripts.md
│ │ └── STORAGE_BACKENDS.md
│ ├── HOOK_IMPROVEMENTS.md
│ ├── hooks
│ │ └── phase2-code-execution-migration.md
│ ├── http-server-management.md
│ ├── ide-compatability.md
│ ├── IMAGE_RETENTION_POLICY.md
│ ├── images
│ │ └── dashboard-placeholder.md
│ ├── implementation
│ │ ├── health_checks.md
│ │ └── performance.md
│ ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│ ├── integration
│ │ ├── homebrew.md
│ │ └── multi-client.md
│ ├── integrations
│ │ ├── gemini.md
│ │ ├── groq-bridge.md
│ │ ├── groq-integration-summary.md
│ │ └── groq-model-comparison.md
│ ├── integrations.md
│ ├── legacy
│ │ └── dual-protocol-hooks.md
│ ├── LM_STUDIO_COMPATIBILITY.md
│ ├── maintenance
│ │ └── memory-maintenance.md
│ ├── mastery
│ │ ├── api-reference.md
│ │ ├── architecture-overview.md
│ │ ├── configuration-guide.md
│ │ ├── local-setup-and-run.md
│ │ ├── testing-guide.md
│ │ └── troubleshooting.md
│ ├── migration
│ │ └── code-execution-api-quick-start.md
│ ├── natural-memory-triggers
│ │ ├── cli-reference.md
│ │ ├── installation-guide.md
│ │ └── performance-optimization.md
│ ├── oauth-setup.md
│ ├── pr-graphql-integration.md
│ ├── quick-setup-cloudflare-dual-environment.md
│ ├── README.md
│ ├── remote-configuration-wiki-section.md
│ ├── research
│ │ ├── code-execution-interface-implementation.md
│ │ └── code-execution-interface-summary.md
│ ├── ROADMAP.md
│ ├── sqlite-vec-backend.md
│ ├── statistics
│ │ ├── charts
│ │ │ ├── activity_patterns.png
│ │ │ ├── contributors.png
│ │ │ ├── growth_trajectory.png
│ │ │ ├── monthly_activity.png
│ │ │ └── october_sprint.png
│ │ ├── data
│ │ │ ├── activity_by_day.csv
│ │ │ ├── activity_by_hour.csv
│ │ │ ├── contributors.csv
│ │ │ └── monthly_activity.csv
│ │ ├── generate_charts.py
│ │ └── REPOSITORY_STATISTICS.md
│ ├── technical
│ │ ├── development.md
│ │ ├── memory-migration.md
│ │ ├── migration-log.md
│ │ ├── sqlite-vec-embedding-fixes.md
│ │ └── tag-storage.md
│ ├── testing
│ │ └── regression-tests.md
│ ├── testing-cloudflare-backend.md
│ ├── troubleshooting
│ │ ├── cloudflare-api-token-setup.md
│ │ ├── cloudflare-authentication.md
│ │ ├── general.md
│ │ ├── hooks-quick-reference.md
│ │ ├── pr162-schema-caching-issue.md
│ │ ├── session-end-hooks.md
│ │ └── sync-issues.md
│ └── tutorials
│ ├── advanced-techniques.md
│ ├── data-analysis.md
│ └── demo-session-walkthrough.md
├── examples
│ ├── claude_desktop_config_template.json
│ ├── claude_desktop_config_windows.json
│ ├── claude-desktop-http-config.json
│ ├── config
│ │ └── claude_desktop_config.json
│ ├── http-mcp-bridge.js
│ ├── memory_export_template.json
│ ├── README.md
│ ├── setup
│ │ └── setup_multi_client_complete.py
│ └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│ ├── .claude
│ │ └── settings.local.json
│ ├── archive
│ │ └── check_missing_timestamps.py
│ ├── backup
│ │ ├── backup_memories.py
│ │ ├── backup_sqlite_vec.sh
│ │ ├── export_distributable_memories.sh
│ │ └── restore_memories.py
│ ├── benchmarks
│ │ ├── benchmark_code_execution_api.py
│ │ ├── benchmark_hybrid_sync.py
│ │ └── benchmark_server_caching.py
│ ├── database
│ │ ├── analyze_sqlite_vec_db.py
│ │ ├── check_sqlite_vec_status.py
│ │ ├── db_health_check.py
│ │ └── simple_timestamp_check.py
│ ├── development
│ │ ├── debug_server_initialization.py
│ │ ├── find_orphaned_files.py
│ │ ├── fix_mdns.sh
│ │ ├── fix_sitecustomize.py
│ │ ├── remote_ingest.sh
│ │ ├── setup-git-merge-drivers.sh
│ │ ├── uv-lock-merge.sh
│ │ └── verify_hybrid_sync.py
│ ├── hooks
│ │ └── pre-commit
│ ├── installation
│ │ ├── install_linux_service.py
│ │ ├── install_macos_service.py
│ │ ├── install_uv.py
│ │ ├── install_windows_service.py
│ │ ├── install.py
│ │ ├── setup_backup_cron.sh
│ │ ├── setup_claude_mcp.sh
│ │ └── setup_cloudflare_resources.py
│ ├── linux
│ │ ├── service_status.sh
│ │ ├── start_service.sh
│ │ ├── stop_service.sh
│ │ ├── uninstall_service.sh
│ │ └── view_logs.sh
│ ├── maintenance
│ │ ├── assign_memory_types.py
│ │ ├── check_memory_types.py
│ │ ├── cleanup_corrupted_encoding.py
│ │ ├── cleanup_memories.py
│ │ ├── cleanup_organize.py
│ │ ├── consolidate_memory_types.py
│ │ ├── consolidation_mappings.json
│ │ ├── delete_orphaned_vectors_fixed.py
│ │ ├── fast_cleanup_duplicates_with_tracking.sh
│ │ ├── find_all_duplicates.py
│ │ ├── find_cloudflare_duplicates.py
│ │ ├── find_duplicates.py
│ │ ├── memory-types.md
│ │ ├── README.md
│ │ ├── recover_timestamps_from_cloudflare.py
│ │ ├── regenerate_embeddings.py
│ │ ├── repair_malformed_tags.py
│ │ ├── repair_memories.py
│ │ ├── repair_sqlite_vec_embeddings.py
│ │ ├── repair_zero_embeddings.py
│ │ ├── restore_from_json_export.py
│ │ └── scan_todos.sh
│ ├── migration
│ │ ├── cleanup_mcp_timestamps.py
│ │ ├── legacy
│ │ │ └── migrate_chroma_to_sqlite.py
│ │ ├── mcp-migration.py
│ │ ├── migrate_sqlite_vec_embeddings.py
│ │ ├── migrate_storage.py
│ │ ├── migrate_tags.py
│ │ ├── migrate_timestamps.py
│ │ ├── migrate_to_cloudflare.py
│ │ ├── migrate_to_sqlite_vec.py
│ │ ├── migrate_v5_enhanced.py
│ │ ├── TIMESTAMP_CLEANUP_README.md
│ │ └── verify_mcp_timestamps.py
│ ├── pr
│ │ ├── amp_collect_results.sh
│ │ ├── amp_detect_breaking_changes.sh
│ │ ├── amp_generate_tests.sh
│ │ ├── amp_pr_review.sh
│ │ ├── amp_quality_gate.sh
│ │ ├── amp_suggest_fixes.sh
│ │ ├── auto_review.sh
│ │ ├── detect_breaking_changes.sh
│ │ ├── generate_tests.sh
│ │ ├── lib
│ │ │ └── graphql_helpers.sh
│ │ ├── quality_gate.sh
│ │ ├── resolve_threads.sh
│ │ ├── run_pyscn_analysis.sh
│ │ ├── run_quality_checks.sh
│ │ ├── thread_status.sh
│ │ └── watch_reviews.sh
│ ├── quality
│ │ ├── fix_dead_code_install.sh
│ │ ├── phase1_dead_code_analysis.md
│ │ ├── phase2_complexity_analysis.md
│ │ ├── README_PHASE1.md
│ │ ├── README_PHASE2.md
│ │ ├── track_pyscn_metrics.sh
│ │ └── weekly_quality_review.sh
│ ├── README.md
│ ├── run
│ │ ├── run_mcp_memory.sh
│ │ ├── run-with-uv.sh
│ │ └── start_sqlite_vec.sh
│ ├── run_memory_server.py
│ ├── server
│ │ ├── check_http_server.py
│ │ ├── check_server_health.py
│ │ ├── memory_offline.py
│ │ ├── preload_models.py
│ │ ├── run_http_server.py
│ │ ├── run_memory_server.py
│ │ ├── start_http_server.bat
│ │ └── start_http_server.sh
│ ├── service
│ │ ├── deploy_dual_services.sh
│ │ ├── install_http_service.sh
│ │ ├── mcp-memory-http.service
│ │ ├── mcp-memory.service
│ │ ├── memory_service_manager.sh
│ │ ├── service_control.sh
│ │ ├── service_utils.py
│ │ └── update_service.sh
│ ├── sync
│ │ ├── check_drift.py
│ │ ├── claude_sync_commands.py
│ │ ├── export_memories.py
│ │ ├── import_memories.py
│ │ ├── litestream
│ │ │ ├── apply_local_changes.sh
│ │ │ ├── enhanced_memory_store.sh
│ │ │ ├── init_staging_db.sh
│ │ │ ├── io.litestream.replication.plist
│ │ │ ├── manual_sync.sh
│ │ │ ├── memory_sync.sh
│ │ │ ├── pull_remote_changes.sh
│ │ │ ├── push_to_remote.sh
│ │ │ ├── README.md
│ │ │ ├── resolve_conflicts.sh
│ │ │ ├── setup_local_litestream.sh
│ │ │ ├── setup_remote_litestream.sh
│ │ │ ├── staging_db_init.sql
│ │ │ ├── stash_local_changes.sh
│ │ │ ├── sync_from_remote_noconfig.sh
│ │ │ └── sync_from_remote.sh
│ │ ├── README.md
│ │ ├── safe_cloudflare_update.sh
│ │ ├── sync_memory_backends.py
│ │ └── sync_now.py
│ ├── testing
│ │ ├── run_complete_test.py
│ │ ├── run_memory_test.sh
│ │ ├── simple_test.py
│ │ ├── test_cleanup_logic.py
│ │ ├── test_cloudflare_backend.py
│ │ ├── test_docker_functionality.py
│ │ ├── test_installation.py
│ │ ├── test_mdns.py
│ │ ├── test_memory_api.py
│ │ ├── test_memory_simple.py
│ │ ├── test_migration.py
│ │ ├── test_search_api.py
│ │ ├── test_sqlite_vec_embeddings.py
│ │ ├── test_sse_events.py
│ │ ├── test-connection.py
│ │ └── test-hook.js
│ ├── utils
│ │ ├── claude_commands_utils.py
│ │ ├── generate_personalized_claude_md.sh
│ │ ├── groq
│ │ ├── groq_agent_bridge.py
│ │ ├── list-collections.py
│ │ ├── memory_wrapper_uv.py
│ │ ├── query_memories.py
│ │ ├── smithery_wrapper.py
│ │ ├── test_groq_bridge.sh
│ │ └── uv_wrapper.py
│ └── validation
│ ├── check_dev_setup.py
│ ├── check_documentation_links.py
│ ├── diagnose_backend_config.py
│ ├── validate_configuration_complete.py
│ ├── validate_memories.py
│ ├── validate_migration.py
│ ├── validate_timestamp_integrity.py
│ ├── verify_environment.py
│ ├── verify_pytorch_windows.py
│ └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│ └── mcp_memory_service
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── operations.py
│ │ ├── sync_wrapper.py
│ │ └── types.py
│ ├── backup
│ │ ├── __init__.py
│ │ └── scheduler.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── ingestion.py
│ │ ├── main.py
│ │ └── utils.py
│ ├── config.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── associations.py
│ │ ├── base.py
│ │ ├── clustering.py
│ │ ├── compression.py
│ │ ├── consolidator.py
│ │ ├── decay.py
│ │ ├── forgetting.py
│ │ ├── health.py
│ │ └── scheduler.py
│ ├── dependency_check.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mdns_service.py
│ ├── embeddings
│ │ ├── __init__.py
│ │ └── onnx_embeddings.py
│ ├── ingestion
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chunker.py
│ │ ├── csv_loader.py
│ │ ├── json_loader.py
│ │ ├── pdf_loader.py
│ │ ├── registry.py
│ │ ├── semtools_loader.py
│ │ └── text_loader.py
│ ├── lm_studio_compat.py
│ ├── mcp_server.py
│ ├── models
│ │ ├── __init__.py
│ │ └── memory.py
│ ├── server.py
│ ├── services
│ │ ├── __init__.py
│ │ └── memory_service.py
│ ├── storage
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloudflare.py
│ │ ├── factory.py
│ │ ├── http_client.py
│ │ ├── hybrid.py
│ │ └── sqlite_vec.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── exporter.py
│ │ ├── importer.py
│ │ └── litestream_config.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── content_splitter.py
│ │ ├── db_utils.py
│ │ ├── debug.py
│ │ ├── document_processing.py
│ │ ├── gpu_detection.py
│ │ ├── hashing.py
│ │ ├── http_server_manager.py
│ │ ├── port_detection.py
│ │ ├── system_detection.py
│ │ └── time_parser.py
│ └── web
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── analytics.py
│ │ ├── backup.py
│ │ ├── consolidation.py
│ │ ├── documents.py
│ │ ├── events.py
│ │ ├── health.py
│ │ ├── manage.py
│ │ ├── mcp.py
│ │ ├── memories.py
│ │ ├── search.py
│ │ └── sync.py
│ ├── app.py
│ ├── dependencies.py
│ ├── oauth
│ │ ├── __init__.py
│ │ ├── authorization.py
│ │ ├── discovery.py
│ │ ├── middleware.py
│ │ ├── models.py
│ │ ├── registration.py
│ │ └── storage.py
│ ├── sse.py
│ └── static
│ ├── app.js
│ ├── index.html
│ ├── README.md
│ ├── sse_test.html
│ └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── test_compact_types.py
│ │ └── test_operations.py
│ ├── bridge
│ │ ├── mock_responses.js
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ └── test_http_mcp_bridge.js
│ ├── conftest.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_associations.py
│ │ ├── test_clustering.py
│ │ ├── test_compression.py
│ │ ├── test_consolidator.py
│ │ ├── test_decay.py
│ │ └── test_forgetting.py
│ ├── contracts
│ │ └── api-specification.yml
│ ├── integration
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── test_api_key_fallback.py
│ │ ├── test_api_memories_chronological.py
│ │ ├── test_api_tag_time_search.py
│ │ ├── test_api_with_memory_service.py
│ │ ├── test_bridge_integration.js
│ │ ├── test_cli_interfaces.py
│ │ ├── test_cloudflare_connection.py
│ │ ├── test_concurrent_clients.py
│ │ ├── test_data_serialization_consistency.py
│ │ ├── test_http_server_startup.py
│ │ ├── test_mcp_memory.py
│ │ ├── test_mdns_integration.py
│ │ ├── test_oauth_basic_auth.py
│ │ ├── test_oauth_flow.py
│ │ ├── test_server_handlers.py
│ │ └── test_store_memory.py
│ ├── performance
│ │ ├── test_background_sync.py
│ │ └── test_hybrid_live.py
│ ├── README.md
│ ├── smithery
│ │ └── test_smithery.py
│ ├── sqlite
│ │ └── simple_sqlite_vec_test.py
│ ├── test_client.py
│ ├── test_content_splitting.py
│ ├── test_database.py
│ ├── test_hybrid_cloudflare_limits.py
│ ├── test_hybrid_storage.py
│ ├── test_memory_ops.py
│ ├── test_semantic_search.py
│ ├── test_sqlite_vec_storage.py
│ ├── test_time_parser.py
│ ├── test_timestamp_preservation.py
│ ├── timestamp
│ │ ├── test_hook_vs_manual_storage.py
│ │ ├── test_issue99_final_validation.py
│ │ ├── test_search_retrieval_inconsistency.py
│ │ ├── test_timestamp_issue.py
│ │ └── test_timestamp_simple.py
│ └── unit
│ ├── conftest.py
│ ├── test_cloudflare_storage.py
│ ├── test_csv_loader.py
│ ├── test_fastapi_dependencies.py
│ ├── test_import.py
│ ├── test_json_loader.py
│ ├── test_mdns_simple.py
│ ├── test_mdns.py
│ ├── test_memory_service.py
│ ├── test_memory.py
│ ├── test_semtools_loader.py
│ ├── test_storage_interface_compatibility.py
│ └── test_tag_time_filtering.py
├── tools
│ ├── docker
│ │ ├── DEPRECATED.md
│ │ ├── docker-compose.http.yml
│ │ ├── docker-compose.pythonpath.yml
│ │ ├── docker-compose.standalone.yml
│ │ ├── docker-compose.uv.yml
│ │ ├── docker-compose.yml
│ │ ├── docker-entrypoint-persistent.sh
│ │ ├── docker-entrypoint-unified.sh
│ │ ├── docker-entrypoint.sh
│ │ ├── Dockerfile
│ │ ├── Dockerfile.glama
│ │ ├── Dockerfile.slim
│ │ ├── README.md
│ │ └── test-docker-modes.sh
│ └── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/integration/test_api_with_memory_service.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for API endpoints using MemoryService.
These tests verify that the API layer correctly integrates with
MemoryService for all memory operations and maintains consistent behavior.
"""
import pytest
import pytest_asyncio
import tempfile
import os
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi.testclient import TestClient
from mcp_memory_service.web.dependencies import set_storage, get_memory_service
from mcp_memory_service.services.memory_service import MemoryService
from mcp_memory_service.models.memory import Memory
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
# Test Fixtures
@pytest.fixture
def temp_db():
"""Create a temporary database for testing."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test.db")
yield db_path
@pytest_asyncio.fixture
async def initialized_storage(temp_db):
"""Create and initialize a real SQLite storage backend."""
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
yield storage
storage.close()
@pytest.fixture
def test_app(initialized_storage):
"""Create a FastAPI test application with initialized storage."""
# Import here to avoid circular dependencies
from mcp_memory_service.web.server import app
# Set storage for the app
set_storage(initialized_storage)
client = TestClient(app)
yield client
@pytest.fixture
def mock_storage():
"""Create a mock storage for isolated testing."""
storage = AsyncMock()
return storage
@pytest.fixture
def mock_memory_service(mock_storage):
"""Create a MemoryService with mock storage."""
return MemoryService(storage=mock_storage)
@pytest.fixture
def sample_memory():
"""Create a sample memory for testing."""
return Memory(
content="Integration test memory",
content_hash="test_hash_123",
tags=["integration", "test"],
memory_type="note",
metadata={"source": "test"},
created_at=1698765432.0,
updated_at=1698765432.0
)
# Test API Store Memory Endpoint
@pytest.mark.asyncio
async def test_api_store_memory_uses_service(mock_storage):
"""Test that POST /api/memories uses MemoryService."""
mock_storage.store.return_value = None
# Create service
service = MemoryService(storage=mock_storage)
# Simulate API call through service
result = await service.store_memory(
content="Test API storage",
tags=["api", "test"],
memory_type="note"
)
assert result["success"] is True
assert "memory" in result
mock_storage.store.assert_called_once()
@pytest.mark.asyncio
async def test_api_store_memory_hostname_from_header(mock_storage):
"""Test that X-Client-Hostname header is processed correctly."""
mock_storage.store.return_value = None
service = MemoryService(storage=mock_storage)
# Simulate API call with hostname
result = await service.store_memory(
content="Test with hostname",
tags=["test"],
client_hostname="client-machine"
)
# Verify hostname tag was added
stored_memory = mock_storage.store.call_args.args[0]
assert "source:client-machine" in stored_memory.tags
assert stored_memory.metadata["hostname"] == "client-machine"
@pytest.mark.asyncio
async def test_api_store_memory_hostname_from_request_body(mock_storage):
"""Test that client_hostname in request body works."""
mock_storage.store.return_value = None
service = MemoryService(storage=mock_storage)
# Simulate API call with hostname in body
result = await service.store_memory(
content="Test",
client_hostname="body-hostname"
)
stored_memory = mock_storage.store.call_args.args[0]
assert "source:body-hostname" in stored_memory.tags
# Test API List Memories Endpoint
@pytest.mark.asyncio
async def test_api_list_memories_uses_database_filtering(mock_storage):
"""Test that GET /api/memories uses database-level filtering."""
# Setup mock to return limited results
mock_storage.get_all_memories.return_value = []
mock_storage.count_all_memories.return_value = 1000
service = MemoryService(storage=mock_storage)
# Request page 1 with 10 items from 1000 total
result = await service.list_memories(page=1, page_size=10)
# CRITICAL: Verify only 10 items requested, not all 1000
# This proves database-level filtering, not O(n) loading
call_kwargs = mock_storage.get_all_memories.call_args.kwargs
assert call_kwargs["limit"] == 10
assert call_kwargs["offset"] == 0
assert result["total"] == 1000
assert result["has_more"] is True
@pytest.mark.asyncio
async def test_api_list_memories_pagination_through_service(mock_storage):
"""Test end-to-end pagination workflow."""
# Create mock memories
memories = [
Memory(
content=f"Memory {i}",
content_hash=f"hash_{i}",
tags=["test"],
memory_type="note",
metadata={},
created_at=1698765432.0 + i,
updated_at=1698765432.0 + i
)
for i in range(25)
]
# Page 1: First 10 memories
mock_storage.get_all_memories.return_value = memories[:10]
mock_storage.count_all_memories.return_value = 25
service = MemoryService(storage=mock_storage)
page1 = await service.list_memories(page=1, page_size=10)
assert page1["page"] == 1
assert page1["page_size"] == 10
assert page1["total"] == 25
assert page1["has_more"] is True
assert len(page1["memories"]) == 10
# Page 2: Next 10 memories
mock_storage.get_all_memories.return_value = memories[10:20]
page2 = await service.list_memories(page=2, page_size=10)
assert page2["page"] == 2
assert page2["has_more"] is True
# Page 3: Last 5 memories
mock_storage.get_all_memories.return_value = memories[20:25]
page3 = await service.list_memories(page=3, page_size=10)
assert page3["page"] == 3
assert page3["has_more"] is False
assert len(page3["memories"]) == 5
@pytest.mark.asyncio
async def test_api_list_memories_tag_filter(mock_storage):
"""Test filtering by tag through API."""
mock_storage.get_all_memories.return_value = []
mock_storage.count_all_memories.return_value = 0
service = MemoryService(storage=mock_storage)
result = await service.list_memories(page=1, page_size=10, tag="important")
# Verify tag passed to storage as list
call_kwargs = mock_storage.get_all_memories.call_args.kwargs
assert call_kwargs["tags"] == ["important"]
@pytest.mark.asyncio
async def test_api_list_memories_type_filter(mock_storage):
"""Test filtering by memory type through API."""
mock_storage.get_all_memories.return_value = []
mock_storage.count_all_memories.return_value = 0
service = MemoryService(storage=mock_storage)
result = await service.list_memories(page=1, page_size=10, memory_type="reference")
call_kwargs = mock_storage.get_all_memories.call_args.kwargs
assert call_kwargs["memory_type"] == "reference"
@pytest.mark.asyncio
async def test_api_list_memories_combined_filters(mock_storage):
"""Test combining tag and type filters."""
mock_storage.get_all_memories.return_value = []
mock_storage.count_all_memories.return_value = 0
service = MemoryService(storage=mock_storage)
result = await service.list_memories(
page=1,
page_size=10,
tag="work",
memory_type="task"
)
call_kwargs = mock_storage.get_all_memories.call_args.kwargs
assert call_kwargs["tags"] == ["work"]
assert call_kwargs["memory_type"] == "task"
# Test API Search Endpoints
@pytest.mark.asyncio
async def test_api_semantic_search_uses_service(mock_storage, sample_memory):
"""Test POST /api/search uses MemoryService."""
mock_storage.retrieve.return_value = [sample_memory]
service = MemoryService(storage=mock_storage)
result = await service.retrieve_memories(query="test query", n_results=5)
assert result["query"] == "test query"
assert result["count"] == 1
mock_storage.retrieve.assert_called_once()
@pytest.mark.asyncio
async def test_api_tag_search_uses_service(mock_storage, sample_memory):
"""Test POST /api/search/by-tag uses MemoryService."""
mock_storage.search_by_tags.return_value = [sample_memory]
service = MemoryService(storage=mock_storage)
result = await service.search_by_tag(tags=["test"], match_all=False)
assert result["tags"] == ["test"]
assert result["match_type"] == "ANY"
assert result["count"] == 1
@pytest.mark.asyncio
async def test_api_time_search_uses_service(mock_storage, sample_memory):
"""Test POST /api/search/by-time flow (if applicable)."""
# Note: Time search might use retrieve_memories with time filters
mock_storage.retrieve.return_value = [sample_memory]
service = MemoryService(storage=mock_storage)
# Simulate time-based search
result = await service.retrieve_memories(query="last week", n_results=10)
assert "memories" in result
# Test API Delete Endpoint
@pytest.mark.asyncio
async def test_api_delete_memory_uses_service(mock_storage):
"""Test DELETE /api/memories/{hash} uses MemoryService."""
mock_storage.delete_memory.return_value = True
service = MemoryService(storage=mock_storage)
result = await service.delete_memory("test_hash_123")
assert result["success"] is True
assert result["content_hash"] == "test_hash_123"
mock_storage.delete_memory.assert_called_once_with("test_hash_123")
@pytest.mark.asyncio
async def test_api_delete_memory_not_found(mock_storage):
"""Test deleting non-existent memory returns proper response."""
mock_storage.delete_memory.return_value = False
service = MemoryService(storage=mock_storage)
result = await service.delete_memory("nonexistent")
assert result["success"] is False
# Test API Get Memory Endpoint
@pytest.mark.asyncio
async def test_api_get_memory_by_hash_uses_service(mock_storage, sample_memory):
"""Test GET /api/memories/{hash} uses MemoryService."""
mock_storage.get_by_hash.return_value = sample_memory
service = MemoryService(storage=mock_storage)
result = await service.get_memory_by_hash("test_hash_123")
assert result["found"] is True
assert result["memory"]["content_hash"] == "test_hash_123"
mock_storage.get_by_hash.assert_called_once_with("test_hash_123")
# Test Dependency Injection
def test_get_memory_service_dependency_injection():
"""Test that get_memory_service creates service with correct storage."""
from mcp_memory_service.web.dependencies import get_memory_service
# Create mock storage
mock_storage = MagicMock()
# Override dependency
def override_get_storage():
return mock_storage
# Get service
service = get_memory_service(storage=mock_storage)
assert isinstance(service, MemoryService)
assert service.storage == mock_storage
# Performance and Scaling Tests
@pytest.mark.asyncio
async def test_list_memories_performance_with_large_dataset(mock_storage):
"""
Test that list_memories remains efficient with large datasets.
This verifies the fix for O(n) memory loading anti-pattern.
"""
# Simulate 10,000 memories in database
mock_storage.get_all_memories.return_value = []
mock_storage.count_all_memories.return_value = 10000
service = MemoryService(storage=mock_storage)
# Request just 20 items
result = await service.list_memories(page=1, page_size=20)
# CRITICAL: Verify we only queried for 20 items, not all 10,000
call_kwargs = mock_storage.get_all_memories.call_args.kwargs
assert call_kwargs["limit"] == 20
assert call_kwargs["offset"] == 0
# This proves database-level filtering prevents loading 10,000 records
assert result["total"] == 10000
assert result["has_more"] is True
@pytest.mark.asyncio
async def test_tag_filter_performance(mock_storage):
"""Test that tag filtering happens at database level."""
mock_storage.get_all_memories.return_value = []
mock_storage.count_all_memories.return_value = 50
service = MemoryService(storage=mock_storage)
result = await service.list_memories(page=1, page_size=10, tag="important")
# Verify tag filter passed to database query
call_kwargs = mock_storage.get_all_memories.call_args.kwargs
assert call_kwargs["tags"] == ["important"]
# Result should only reflect filtered count
assert result["total"] == 50 # Only memories matching tag
# Error Handling Tests
@pytest.mark.asyncio
async def test_api_handles_storage_errors_gracefully(mock_storage):
"""Test that API returns proper errors when storage fails."""
mock_storage.get_all_memories.side_effect = Exception("Database connection lost")
service = MemoryService(storage=mock_storage)
result = await service.list_memories(page=1, page_size=10)
assert result["success"] is False
assert "error" in result
assert "Database connection lost" in result["error"]
@pytest.mark.asyncio
async def test_api_validates_input_through_service(mock_storage):
"""Test that validation errors from storage are handled."""
mock_storage.store.side_effect = ValueError("Invalid content format")
service = MemoryService(storage=mock_storage)
result = await service.store_memory(content="invalid")
assert result["success"] is False
assert "Invalid memory data" in result["error"]
# Consistency Tests
@pytest.mark.asyncio
async def test_api_and_mcp_use_same_service_logic(mock_storage):
"""
Test that API and MCP tools use the same MemoryService logic.
This verifies the DRY principle - both interfaces share the same
business logic through MemoryService.
"""
service = MemoryService(storage=mock_storage)
# Store through service (used by both API and MCP)
mock_storage.store.return_value = None
result1 = await service.store_memory(content="Test", tags=["shared"])
# Retrieve through service (used by both API and MCP)
mock_storage.retrieve.return_value = []
result2 = await service.retrieve_memories(query="test")
# Both operations used the same service
assert result1["success"] is True
assert "memories" in result2
@pytest.mark.asyncio
async def test_response_format_consistency(mock_storage, sample_memory):
"""Test that all service methods return consistently formatted responses."""
mock_storage.get_all_memories.return_value = [sample_memory]
mock_storage.count_all_memories.return_value = 1
mock_storage.retrieve.return_value = [sample_memory]
mock_storage.search_by_tags.return_value = [sample_memory]
service = MemoryService(storage=mock_storage)
# Get responses from different methods
list_result = await service.list_memories(page=1, page_size=10)
retrieve_result = await service.retrieve_memories(query="test")
tag_result = await service.search_by_tag(tags="test")
# All should have consistently formatted memories
list_memory = list_result["memories"][0]
retrieve_memory = retrieve_result["memories"][0]
tag_memory = tag_result["memories"][0]
# Verify all have same format
required_fields = ["content", "content_hash", "tags", "memory_type", "created_at"]
for field in required_fields:
assert field in list_memory
assert field in retrieve_memory
assert field in tag_memory
# Real Storage Integration Test (End-to-End)
@pytest.mark.asyncio
@pytest.mark.integration
async def test_end_to_end_workflow_with_real_storage(temp_db):
"""
End-to-end test with real SQLite storage (not mocked).
This verifies the complete integration stack works correctly.
"""
# Create real storage
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
try:
# Create service with real storage
service = MemoryService(storage=storage)
# Store a memory
store_result = await service.store_memory(
content="End-to-end test memory",
tags=["e2e", "integration"],
memory_type="test"
)
assert store_result["success"] is True
# List memories
list_result = await service.list_memories(page=1, page_size=10)
assert len(list_result["memories"]) > 0
# Search by tag
tag_result = await service.search_by_tag(tags="e2e")
assert len(tag_result["memories"]) > 0
# Get specific memory
content_hash = store_result["memory"]["content_hash"]
get_result = await service.get_memory_by_hash(content_hash)
assert get_result["found"] is True
# Delete memory
delete_result = await service.delete_memory(content_hash)
assert delete_result["success"] is True
# Verify deleted
get_after_delete = await service.get_memory_by_hash(content_hash)
assert get_after_delete["found"] is False
finally:
storage.close()
# Real HTTP API Integration Tests with TestClient
@pytest.mark.asyncio
@pytest.mark.integration
async def test_http_api_store_memory_endpoint(temp_db):
"""
Test POST /api/memories endpoint with real HTTP request.
Uses TestClient to make actual HTTP request to FastAPI app.
"""
# Create real storage
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
try:
# Import app and set storage
from mcp_memory_service.web.app import app
set_storage(storage)
# Create TestClient
client = TestClient(app)
# Make HTTP POST request
response = client.post(
"/api/memories",
json={
"content": "HTTP API test memory",
"tags": ["http", "api", "test"],
"memory_type": "note"
}
)
# Verify response
assert response.status_code == 200
data = response.json()
assert data["success"] is True
assert "memory" in data
assert data["memory"]["content"] == "HTTP API test memory"
assert "http" in data["memory"]["tags"]
finally:
storage.close()
@pytest.mark.asyncio
@pytest.mark.integration
async def test_http_api_list_memories_endpoint(temp_db):
"""
Test GET /api/memories endpoint with real HTTP request.
Verifies pagination and filtering work through HTTP API.
"""
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
try:
from mcp_memory_service.web.app import app
set_storage(storage)
# Store test memories first
service = MemoryService(storage=storage)
for i in range(5):
await service.store_memory(
content=f"Test memory {i}",
tags=["test"],
memory_type="note"
)
# Make HTTP GET request
client = TestClient(app)
response = client.get("/api/memories?page=1&page_size=10")
# Verify response
assert response.status_code == 200
data = response.json()
assert "memories" in data
assert len(data["memories"]) == 5
assert data["total"] == 5
assert data["page"] == 1
finally:
storage.close()
@pytest.mark.asyncio
@pytest.mark.integration
async def test_http_api_search_endpoint(temp_db):
"""
Test POST /api/search endpoint with real HTTP request.
Verifies semantic search works through HTTP API.
"""
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
try:
from mcp_memory_service.web.app import app
set_storage(storage)
# Store searchable memory
service = MemoryService(storage=storage)
await service.store_memory(
content="Python programming language tutorial",
tags=["python", "tutorial"],
memory_type="reference"
)
# Make HTTP POST request for search
client = TestClient(app)
response = client.post(
"/api/search",
json={"query": "python tutorial", "limit": 5}
)
# Verify response
assert response.status_code == 200
data = response.json()
assert "memories" in data
assert data["query"] == "python tutorial"
finally:
storage.close()
@pytest.mark.asyncio
@pytest.mark.integration
async def test_http_api_search_by_tag_endpoint(temp_db):
"""
Test POST /api/search/by-tag endpoint with real HTTP request.
Verifies tag search works through HTTP API.
"""
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
try:
from mcp_memory_service.web.app import app
set_storage(storage)
# Store memories with tags
service = MemoryService(storage=storage)
await service.store_memory(
content="Important work item",
tags=["important", "work"],
memory_type="task"
)
await service.store_memory(
content="Personal note",
tags=["personal"],
memory_type="note"
)
# Search by tag via HTTP
client = TestClient(app)
response = client.post(
"/api/search/by-tag",
json={"tags": ["important"], "limit": 10}
)
# Verify response
assert response.status_code == 200
data = response.json()
assert len(data["memories"]) == 1
assert "important" in data["memories"][0]["tags"]
finally:
storage.close()
@pytest.mark.asyncio
@pytest.mark.integration
async def test_http_api_get_memory_by_hash_endpoint(temp_db):
"""
Test GET /api/memories/{hash} endpoint with real HTTP request.
Verifies retrieving specific memory by hash works.
"""
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
try:
from mcp_memory_service.web.app import app
set_storage(storage)
# Store a memory
service = MemoryService(storage=storage)
store_result = await service.store_memory(
content="Memory to retrieve",
tags=["test"],
memory_type="note"
)
content_hash = store_result["memory"]["content_hash"]
# Retrieve via HTTP
client = TestClient(app)
response = client.get(f"/api/memories/{content_hash}")
# Verify response
assert response.status_code == 200
data = response.json()
assert data["content"] == "Memory to retrieve"
assert data["content_hash"] == content_hash
finally:
storage.close()
@pytest.mark.asyncio
@pytest.mark.integration
async def test_http_api_delete_memory_endpoint(temp_db):
"""
Test DELETE /api/memories/{hash} endpoint with real HTTP request.
Verifies deletion works through HTTP API.
"""
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
try:
from mcp_memory_service.web.app import app
set_storage(storage)
# Store a memory
service = MemoryService(storage=storage)
store_result = await service.store_memory(
content="Memory to delete",
tags=["test"],
memory_type="note"
)
content_hash = store_result["memory"]["content_hash"]
# Delete via HTTP
client = TestClient(app)
response = client.delete(f"/api/memories/{content_hash}")
# Verify response
assert response.status_code == 200
data = response.json()
assert data["success"] is True
# Verify memory is gone
get_response = client.get(f"/api/memories/{content_hash}")
assert get_response.status_code == 404
finally:
storage.close()
@pytest.mark.asyncio
@pytest.mark.integration
async def test_http_api_pagination_with_real_data(temp_db):
"""
Test pagination through HTTP API with multiple pages.
Verifies database-level pagination prevents O(n) loading.
"""
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
try:
from mcp_memory_service.web.app import app
set_storage(storage)
# Store 25 memories
service = MemoryService(storage=storage)
for i in range(25):
await service.store_memory(
content=f"Pagination test {i}",
tags=["pagination"],
memory_type="note"
)
client = TestClient(app)
# Page 1: First 10
response1 = client.get("/api/memories?page=1&page_size=10")
assert response1.status_code == 200
data1 = response1.json()
assert len(data1["memories"]) == 10
assert data1["total"] == 25
assert data1["has_more"] is True
# Page 2: Next 10
response2 = client.get("/api/memories?page=2&page_size=10")
data2 = response2.json()
assert len(data2["memories"]) == 10
assert data2["has_more"] is True
# Page 3: Last 5
response3 = client.get("/api/memories?page=3&page_size=10")
data3 = response3.json()
assert len(data3["memories"]) == 5
assert data3["has_more"] is False
finally:
storage.close()
@pytest.mark.asyncio
@pytest.mark.integration
async def test_http_api_error_handling_invalid_json(temp_db):
"""
Test that HTTP API handles malformed JSON gracefully.
This would have caught v8.12.0 syntax errors.
"""
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
try:
from mcp_memory_service.web.app import app
set_storage(storage)
client = TestClient(app)
# Send malformed JSON
response = client.post(
"/api/memories",
data="{'this': 'is not valid json}", # Missing quote
headers={"Content-Type": "application/json"}
)
# Should return 400 or 422, not 500
assert response.status_code in [400, 422]
finally:
storage.close()
@pytest.mark.asyncio
@pytest.mark.integration
async def test_http_api_client_hostname_header(temp_db):
"""
Test that X-Client-Hostname header is processed correctly.
Verifies hostname tagging works through real HTTP request.
"""
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
try:
from mcp_memory_service.web.app import app
set_storage(storage)
client = TestClient(app)
# Send request with hostname header
response = client.post(
"/api/memories",
json={
"content": "Test with hostname",
"tags": ["test"]
},
headers={"X-Client-Hostname": "test-machine"}
)
# Verify hostname tag added
assert response.status_code == 200
data = response.json()
assert "source:test-machine" in data["memory"]["tags"]
assert data["memory"]["metadata"]["hostname"] == "test-machine"
finally:
storage.close()
@pytest.mark.asyncio
@pytest.mark.integration
async def test_http_api_complete_crud_workflow(temp_db):
"""
Complete end-to-end CRUD workflow through real HTTP API.
This verifies the entire HTTP API stack works correctly.
"""
storage = SqliteVecMemoryStorage(temp_db)
await storage.initialize()
try:
from mcp_memory_service.web.app import app
set_storage(storage)
client = TestClient(app)
# CREATE: Store a memory
create_response = client.post(
"/api/memories",
json={
"content": "CRUD test memory",
"tags": ["crud", "test"],
"memory_type": "note"
}
)
assert create_response.status_code == 200
content_hash = create_response.json()["memory"]["content_hash"]
# READ: List all memories
list_response = client.get("/api/memories")
assert list_response.status_code == 200
assert len(list_response.json()["memories"]) > 0
# READ: Get specific memory
get_response = client.get(f"/api/memories/{content_hash}")
assert get_response.status_code == 200
assert get_response.json()["content"] == "CRUD test memory"
# UPDATE: Search for memory
search_response = client.post(
"/api/search",
json={"query": "CRUD test", "limit": 5}
)
assert search_response.status_code == 200
assert len(search_response.json()["memories"]) > 0
# DELETE: Remove memory
delete_response = client.delete(f"/api/memories/{content_hash}")
assert delete_response.status_code == 200
assert delete_response.json()["success"] is True
# VERIFY: Memory is gone
verify_response = client.get(f"/api/memories/{content_hash}")
assert verify_response.status_code == 404
finally:
storage.close()
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/web/api/documents.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Document Upload API Endpoints
Provides REST API endpoints for document ingestion through the web dashboard.
Supports single file upload, batch upload, progress tracking, and upload history.
"""
import os
import re
import uuid
import asyncio
import logging
import tempfile
from typing import List, Dict, Any, Optional
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse, unquote
from fastapi import APIRouter, UploadFile, File, Form, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from ...ingestion import get_loader_for_file, SUPPORTED_FORMATS
from ...models.memory import Memory
from ...utils import create_memory_from_chunk, _process_and_store_chunk
from ..dependencies import get_storage
logger = logging.getLogger(__name__)
router = APIRouter()
# Constants
MAX_TAG_LENGTH = 100
def parse_and_validate_tags(tags: str) -> List[str]:
"""
Parse and validate tags from user input.
Handles comma-separated and space-separated tags, removes file:// prefixes,
sanitizes path separators, and validates tag lengths.
Args:
tags: Raw tag string (comma or space separated)
Returns:
List of cleaned and validated tags
Raises:
HTTPException: If any tag exceeds MAX_TAG_LENGTH
"""
if not tags or not tags.strip():
return []
# Split by comma OR space
raw_tags = tags.replace(',', ' ').split()
tag_list = []
for tag in raw_tags:
clean_tag = tag.strip()
if not clean_tag:
continue
# Remove file:// protocol prefixes and extract filename
# Uses urllib.parse for robust handling of URL-encoded chars and different path formats
if clean_tag.startswith('file://'):
path_str = unquote(urlparse(clean_tag).path)
# On Windows, urlparse may add a leading slash (e.g., /C:/...), which needs to be removed
if os.name == 'nt' and path_str.startswith('/') and len(path_str) > 2 and path_str[2] == ':':
path_str = path_str[1:]
clean_tag = Path(path_str).name
# Remove common path separators to create clean tag names
clean_tag = re.sub(r'[/\\]', '_', clean_tag)
# Validate tag length - raise error instead of silently dropping
if len(clean_tag) > MAX_TAG_LENGTH:
raise HTTPException(
status_code=400,
detail=f"Tag '{clean_tag[:100]}...' exceeds maximum length of {MAX_TAG_LENGTH} characters. "
f"Please use shorter, more descriptive tags."
)
tag_list.append(clean_tag)
return tag_list
async def ensure_storage_initialized():
"""Ensure storage is initialized for web API usage."""
logger.info("🔍 Checking storage availability...")
try:
# Try to get storage
storage = get_storage()
logger.info("✅ Storage already available")
return storage
except Exception as e:
logger.warning(f"⚠️ Storage not available ({e}), attempting to initialize...")
try:
# Import and initialize storage
from ..dependencies import create_storage_backend, set_storage
logger.info("🏗️ Creating storage backend...")
storage = await create_storage_backend()
set_storage(storage)
logger.info("✅ Storage initialized successfully in API context")
return storage
except Exception as init_error:
logger.error(f"❌ Failed to initialize storage: {init_error}")
logger.error(f"Full error: {str(init_error)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
# Don't raise HTTPException here since this is called from background tasks
raise init_error
# In-memory storage for upload tracking (in production, use database)
upload_sessions = {}
# Note: UploadRequest and BatchUploadRequest models removed - not used
# Endpoints read parameters directly from form data
class UploadStatus(BaseModel):
upload_id: str
status: str # queued, processing, completed, failed
filename: str = ""
file_size: int = 0
chunks_processed: int = 0
chunks_stored: int = 0
total_chunks: int = 0
progress: float = 0.0
errors: List[str] = []
created_at: datetime
completed_at: Optional[datetime] = None
@router.post("/upload", response_model=Dict[str, Any])
async def upload_document(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
tags: str = Form(""),
chunk_size: int = Form(1000),
chunk_overlap: int = Form(200),
memory_type: str = Form("document")
):
"""
Upload and ingest a single document.
Args:
file: The document file to upload
tags: Comma-separated list of tags
chunk_size: Target chunk size in characters
chunk_overlap: Chunk overlap in characters
memory_type: Type label for memories
Returns:
Upload session information with ID for tracking
Uses FastAPI BackgroundTasks for proper async processing.
"""
logger.info(f"🚀 Document upload endpoint called with file: {file.filename}")
try:
# Read file content
file_content = await file.read()
file_size = len(file_content)
logger.info(f"File content length: {file_size} bytes")
# Validate file type
file_ext = Path(file.filename).suffix.lower().lstrip('.')
if file_ext not in SUPPORTED_FORMATS:
supported = ", ".join(f".{ext}" for ext in SUPPORTED_FORMATS.keys())
raise HTTPException(
status_code=400,
detail=f"Unsupported file type: .{file_ext}. Supported: {supported}"
)
# Parse and validate tags
tag_list = parse_and_validate_tags(tags)
# Create upload session
upload_id = str(uuid.uuid4())
# Create secure temporary file (avoids path traversal vulnerability)
# Extract safe file extension for suffix
file_ext = Path(file.filename).suffix if file.filename else ""
temp_file = tempfile.NamedTemporaryFile(
delete=False,
prefix=f"{upload_id}_",
suffix=file_ext
)
temp_path = temp_file.name
# Save uploaded file temporarily
with temp_file:
temp_file.write(file_content)
# Initialize upload session
session = UploadStatus(
upload_id=upload_id,
status="queued",
filename=file.filename,
file_size=file_size,
created_at=datetime.now()
)
upload_sessions[upload_id] = session
# Start background processing
background_tasks.add_task(
process_single_file_upload,
upload_id,
temp_path,
file.filename,
tag_list,
chunk_size,
chunk_overlap,
memory_type
)
return {
"upload_id": upload_id,
"status": "queued",
"message": f"Document {file.filename} queued for processing"
}
except Exception as e:
logger.error(f"Upload error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/batch-upload", response_model=Dict[str, Any])
async def batch_upload_documents(
background_tasks: BackgroundTasks,
files: List[UploadFile] = File(...),
tags: str = Form(""),
chunk_size: int = Form(1000),
chunk_overlap: int = Form(200),
memory_type: str = Form("document")
):
"""
Upload and ingest multiple documents in batch.
Args:
files: List of document files to upload
tags: Comma-separated list of tags
chunk_size: Target chunk size in characters
chunk_overlap: Chunk overlap in characters
memory_type: Type label for memories
Returns:
Batch upload session information
"""
try:
if not files:
raise HTTPException(status_code=400, detail="No files provided")
# Parse and validate tags
tag_list = parse_and_validate_tags(tags)
# Create batch upload session
batch_id = str(uuid.uuid4())
temp_paths = []
# Validate and save all files
for file in files:
file_ext = Path(file.filename).suffix.lower().lstrip('.')
if file_ext not in SUPPORTED_FORMATS:
supported = ", ".join(f".{ext}" for ext in SUPPORTED_FORMATS.keys())
raise HTTPException(
status_code=400,
detail=f"Unsupported file type for {file.filename}: {file_ext}. Supported: {supported}"
)
# Create secure temporary file (avoids path traversal vulnerability)
content = await file.read()
safe_ext = Path(file.filename).suffix.lower() if file.filename else ""
temp_file = tempfile.NamedTemporaryFile(
delete=False,
prefix=f"{batch_id}_",
suffix=safe_ext
)
temp_path = temp_file.name
with temp_file:
temp_file.write(content)
temp_paths.append((file.filename, temp_path, len(content)))
# Calculate total file size for the batch
total_file_size = sum(file_size for _, _, file_size in temp_paths)
# Initialize batch session
session = UploadStatus(
upload_id=batch_id,
status="queued",
filename=f"Batch ({len(files)} files)",
file_size=total_file_size,
created_at=datetime.now()
)
upload_sessions[batch_id] = session
# Start background processing
background_tasks.add_task(
process_batch_upload,
batch_id,
temp_paths,
tag_list,
chunk_size,
chunk_overlap,
memory_type
)
return {
"upload_id": batch_id,
"status": "queued",
"message": f"Batch of {len(files)} documents queued for processing"
}
except Exception as e:
logger.error(f"Batch upload error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/status/{upload_id}", response_model=UploadStatus)
async def get_upload_status(upload_id: str):
"""
Get the status of an upload session.
Args:
upload_id: The upload session ID
Returns:
Current upload status
"""
if upload_id not in upload_sessions:
raise HTTPException(status_code=404, detail="Upload session not found")
return upload_sessions[upload_id]
@router.get("/history", response_model=Dict[str, List[Dict[str, Any]]])
async def get_upload_history():
"""
Get the history of all uploads.
Returns:
List of completed uploads with metadata
"""
logger.info("Documents history endpoint called")
try:
# For now, return empty history since storage might not be initialized
# In production, this would query a database
history = []
for session in upload_sessions.values():
if session.status in ["completed", "failed"]:
history.append({
"upload_id": session.upload_id,
"filename": session.filename,
"file_size": session.file_size,
"status": session.status,
"chunks_processed": session.chunks_processed,
"chunks_stored": session.chunks_stored,
"progress": session.progress,
"errors": session.errors,
"created_at": session.created_at.isoformat(),
"completed_at": session.completed_at.isoformat() if session.completed_at else None
})
# Sort by creation time, most recent first
history.sort(key=lambda x: x["created_at"], reverse=True)
return {"uploads": history}
except Exception as e:
logger.error(f"Error in get_upload_history: {e}")
# Return empty history on error so the UI doesn't break
return {"uploads": []}
async def process_single_file_upload(
upload_id: str,
file_path: str,
filename: str,
tags: List[str],
chunk_size: int,
chunk_overlap: int,
memory_type: str
):
"""Background task to process a single document upload."""
try:
logger.info(f"Starting document processing: {upload_id} - {filename}")
session = upload_sessions[upload_id]
session.status = "processing"
# Get storage
storage = await ensure_storage_initialized()
# Get appropriate loader
file_path_obj = Path(file_path)
loader = get_loader_for_file(file_path_obj)
if loader is None:
raise ValueError(f"No loader available for file: {filename}")
# Configure loader
loader.chunk_size = chunk_size
loader.chunk_overlap = chunk_overlap
chunks_processed = 0
chunks_stored = 0
# Process chunks from the file
async for chunk in loader.extract_chunks(file_path_obj):
chunks_processed += 1
try:
# Add file-specific tags
all_tags = tags.copy()
all_tags.append(f"source_file:{filename}")
all_tags.append(f"file_type:{file_path_obj.suffix.lstrip('.')}")
all_tags.append(f"upload_id:{upload_id}")
if chunk.metadata.get('tags'):
# Handle tags from chunk metadata (can be string or list)
chunk_tags = chunk.metadata['tags']
if isinstance(chunk_tags, str):
# Split comma-separated string into list
chunk_tags = [tag.strip() for tag in chunk_tags.split(',') if tag.strip()]
all_tags.extend(chunk_tags)
# Add upload_id to metadata
chunk_metadata = chunk.metadata.copy() if chunk.metadata else {}
chunk_metadata['upload_id'] = upload_id
chunk_metadata['source_file'] = filename
# Create memory object
memory = Memory(
content=chunk.content,
content_hash=generate_content_hash(chunk.content, chunk_metadata),
tags=list(set(all_tags)), # Remove duplicates
memory_type=memory_type,
metadata=chunk_metadata
)
# Store the memory
success, error = await storage.store(memory)
if success:
chunks_stored += 1
else:
session.errors.append(f"Chunk {chunk.chunk_index}: {error}")
except Exception as e:
session.errors.append(f"Chunk {chunk.chunk_index}: {str(e)}")
# Update progress
session.chunks_processed = chunks_processed
session.chunks_stored = chunks_stored
session.progress = (chunks_processed / max(chunks_processed, 1)) * 100
# Mark as completed
session.status = "completed"
session.completed_at = datetime.now()
session.progress = 100.0
logger.info(f"Document processing completed: {upload_id}, {chunks_stored}/{chunks_processed} chunks")
return {"chunks_processed": chunks_processed, "chunks_stored": chunks_stored}
except Exception as e:
logger.error(f"Document processing error: {str(e)}")
session = upload_sessions.get(upload_id)
if session:
session.status = "failed"
session.errors.append(str(e))
session.completed_at = datetime.now()
finally:
# Clean up temp file (always executed)
try:
os.unlink(file_path)
except Exception as cleanup_error:
logger.debug(f"Could not delete temp file {file_path}: {cleanup_error}")
async def process_batch_upload(
batch_id: str,
file_info: List[tuple], # (filename, temp_path, size)
tags: List[str],
chunk_size: int,
chunk_overlap: int,
memory_type: str
):
"""Background task to process a batch document upload."""
try:
logger.info(f"Starting batch processing: {batch_id}")
session = upload_sessions[batch_id]
session.status = "processing"
# Get storage
storage = await ensure_storage_initialized()
total_files = len(file_info)
processed_files = 0
total_chunks_processed = 0
total_chunks_stored = 0
all_errors = []
for filename, temp_path, file_size in file_info:
try:
# Get appropriate loader
file_path_obj = Path(temp_path)
loader = get_loader_for_file(file_path_obj)
if loader is None:
all_errors.append(f"{filename}: No loader available")
processed_files += 1
continue
# Configure loader
loader.chunk_size = chunk_size
loader.chunk_overlap = chunk_overlap
file_chunks_processed = 0
file_chunks_stored = 0
# Process chunks from this file
async for chunk in loader.extract_chunks(file_path_obj):
file_chunks_processed += 1
total_chunks_processed += 1
# Process and store the chunk
success, error = await _process_and_store_chunk(
chunk,
storage,
filename,
base_tags=tags.copy(),
memory_type=memory_type,
context_tags={
"source_file": filename,
"file_type": file_path_obj.suffix.lstrip('.'),
"upload_id": batch_id
},
extra_metadata={
"upload_id": batch_id,
"source_file": filename
}
)
if success:
file_chunks_stored += 1
total_chunks_stored += 1
else:
all_errors.append(error)
processed_files += 1
except Exception as e:
all_errors.append(f"{filename}: {str(e)}")
processed_files += 1
finally:
# Clean up temp file (always executed)
try:
os.unlink(temp_path)
except Exception as cleanup_error:
logger.debug(f"Could not delete temp file {temp_path}: {cleanup_error}")
# Finalize batch
session.status = "completed" if total_chunks_stored > 0 else "failed"
session.completed_at = datetime.now()
session.chunks_processed = total_chunks_processed
session.chunks_stored = total_chunks_stored
session.progress = 100.0
session.errors = all_errors
logger.info(f"Batch processing completed: {batch_id}, {total_chunks_stored}/{total_chunks_processed} chunks")
except Exception as e:
logger.error(f"Batch processing error: {str(e)}")
session = upload_sessions.get(batch_id)
if session:
session.status = "failed"
session.errors.append(str(e))
session.completed_at = datetime.now()
# Note: send_progress_update removed - progress tracking via polling instead
# Clean up old completed sessions periodically
@router.on_event("startup") # TODO: Migrate to lifespan context manager in app.py (FastAPI 0.109+)
async def cleanup_old_sessions():
"""Clean up old completed upload sessions."""
async def cleanup():
while True:
await asyncio.sleep(3600) # Clean up every hour
current_time = datetime.now()
to_remove = []
for upload_id, session in upload_sessions.items():
if session.status in ["completed", "failed"]:
# Keep sessions for 24 hours after completion
if session.completed_at and (current_time - session.completed_at).total_seconds() > 86400:
to_remove.append(upload_id)
for upload_id in to_remove:
del upload_sessions[upload_id]
logger.debug(f"Cleaned up old upload session: {upload_id}")
asyncio.create_task(cleanup())
@router.delete("/remove/{upload_id}")
async def remove_document(upload_id: str, remove_from_memory: bool = True):
"""
Remove a document and optionally its memories.
Args:
upload_id: The upload session ID
remove_from_memory: Whether to delete associated memories (default: True)
Returns:
Removal status with count of memories deleted
"""
logger.info(f"Remove document request for upload_id: {upload_id}, remove_from_memory: {remove_from_memory}")
# Get session info if available (may not exist after server restart)
session = upload_sessions.get(upload_id)
filename = session.filename if session else "Unknown file"
memories_deleted = 0
try:
if remove_from_memory:
# Get storage
storage = get_storage()
# Search by tag pattern: upload_id:{upload_id}
upload_tag = f"upload_id:{upload_id}"
logger.info(f"Searching for memories with tag: {upload_tag}")
try:
# Delete all memories with this upload_id tag
count, _ = await storage.delete_by_tags([upload_tag])
memories_deleted = count
logger.info(f"Deleted {memories_deleted} memories with tag {upload_tag}")
# If we deleted memories but don't have session info, try to get filename from first memory
if memories_deleted > 0 and not session:
# Try to get source_file from metadata by checking remaining memories
# (we already deleted them, so we'll use a generic message)
filename = f"Document (upload_id: {upload_id[:8]}...)"
except Exception as e:
logger.warning(f"Could not delete memories by tag: {e}")
# If deletion fails and we don't know about this upload, return 404
if not session:
raise HTTPException(
status_code=404,
detail=f"Upload ID not found and no memories with tag '{upload_tag}'"
)
memories_deleted = 0
# Remove upload session if it exists
if session:
del upload_sessions[upload_id]
return {
"status": "success",
"upload_id": upload_id,
"filename": filename,
"memories_deleted": memories_deleted,
"message": f"Document '{filename}' removed successfully"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error removing document: {str(e)}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Failed to remove document: {str(e)}")
@router.delete("/remove-by-tags")
async def remove_documents_by_tags(tags: List[str]):
"""
Remove documents by their tags.
Args:
tags: List of tags to search for
Returns:
Removal status with affected upload IDs and memory counts
"""
logger.info(f"Remove documents by tags request: {tags}")
try:
# Get storage
storage = get_storage()
# Delete memories by tags
result = await storage.delete_by_tags(tags)
memories_deleted = result.get('deleted_count', 0) if isinstance(result, dict) else 0
# Find and remove affected upload sessions
affected_sessions = []
to_remove = []
for upload_id, session in upload_sessions.items():
# Check if any of the document's tags match
# This requires storing tags in the session object
# For now, just track all sessions (placeholder)
pass
return {
"status": "success",
"tags": tags,
"memories_deleted": memories_deleted,
"affected_uploads": affected_sessions,
"message": f"Deleted {memories_deleted} memories matching tags"
}
except Exception as e:
logger.error(f"Error removing documents by tags: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to remove documents: {str(e)}")
@router.get("/search-content/{upload_id}")
async def search_document_content(upload_id: str, limit: int = 1000):
"""
Search for all memories associated with an upload.
Args:
upload_id: The upload session ID
limit: Maximum number of results to return (default: 1000)
Returns:
List of memories with their content and metadata
"""
logger.info(f"Search document content for upload_id: {upload_id}, limit: {limit}")
# Get session info if available (may not exist after server restart)
session = upload_sessions.get(upload_id)
# If no session, we'll still try to find memories by upload_id tag
if not session:
logger.info(f"No upload session found for {upload_id}, searching by tag only")
try:
# Get storage
storage = get_storage()
# Search for memories with upload_id tag
upload_tag = f"upload_id:{upload_id}"
logger.info(f"Searching for memories with tag: {upload_tag}")
# Use tag search (search_by_tags doesn't support limit parameter)
all_memories = await storage.search_by_tags([upload_tag])
# If no memories found and no session, this upload_id doesn't exist
if not all_memories and not session:
raise HTTPException(status_code=404, detail=f"No memories found for upload_id: {upload_id}")
# Apply limit after retrieval
memories = all_memories[:limit] if limit and limit > 0 else all_memories
# Format results
results = []
for memory in memories:
# Handle created_at (stored as float timestamp)
created_at_str = None
if memory.created_at:
if isinstance(memory.created_at, float):
created_at_str = datetime.fromtimestamp(memory.created_at).isoformat()
elif hasattr(memory.created_at, 'isoformat'):
created_at_str = memory.created_at.isoformat()
results.append({
"content_hash": memory.content_hash,
"content": memory.content,
"tags": memory.tags,
"metadata": memory.metadata,
"created_at": created_at_str,
"chunk_index": memory.metadata.get('chunk_index', 0) if memory.metadata else 0,
"page": memory.metadata.get('page', None) if memory.metadata else None
})
# Sort by chunk index
results.sort(key=lambda x: x.get('chunk_index', 0))
# Get filename from session or from first memory's metadata
filename = session.filename if session else None
if not filename and results:
# Try to get from first memory's metadata
first_memory_metadata = results[0].get('metadata', {})
filename = first_memory_metadata.get('source_file', f"Document (upload_id: {upload_id[:8]}...)")
return {
"status": "success",
"upload_id": upload_id,
"filename": filename or "Unknown Document",
"total_found": len(results),
"memories": results
}
except Exception as e:
logger.error(f"Error searching document content: {str(e)}")
# Get filename from session if available
filename = session.filename if session else f"Document (upload_id: {upload_id[:8]}...)"
# Return empty results instead of error to avoid breaking UI
return {
"status": "partial",
"upload_id": upload_id,
"filename": filename,
"total_found": 0,
"memories": [],
"error": str(e)
}
```
--------------------------------------------------------------------------------
/tests/test_sqlite_vec_storage.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive tests for SQLite-vec storage backend.
"""
import pytest
import pytest_asyncio
import asyncio
import tempfile
import os
import shutil
import json
from unittest.mock import Mock, patch
import time
# Skip tests if sqlite-vec is not available
try:
import sqlite_vec
SQLITE_VEC_AVAILABLE = True
except ImportError:
SQLITE_VEC_AVAILABLE = False
from src.mcp_memory_service.models.memory import Memory, MemoryQueryResult
from src.mcp_memory_service.utils.hashing import generate_content_hash
if SQLITE_VEC_AVAILABLE:
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
# Skip all tests if sqlite-vec is not available
pytestmark = pytest.mark.skipif(not SQLITE_VEC_AVAILABLE, reason="sqlite-vec not available")
class TestSqliteVecStorage:
"""Test suite for SQLite-vec storage functionality."""
@pytest_asyncio.fixture
async def storage(self):
"""Create a test storage instance."""
temp_dir = tempfile.mkdtemp()
db_path = os.path.join(temp_dir, "test_memory.db")
storage = SqliteVecMemoryStorage(db_path)
await storage.initialize()
yield storage
# Cleanup
if storage.conn:
storage.conn.close()
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.fixture
def sample_memory(self):
"""Create a sample memory for testing."""
content = "This is a test memory for SQLite-vec storage"
return Memory(
content=content,
content_hash=generate_content_hash(content),
tags=["test", "sqlite-vec"],
memory_type="note",
metadata={"priority": "medium", "category": "testing"}
)
@pytest.mark.asyncio
async def test_initialization(self):
"""Test storage initialization."""
temp_dir = tempfile.mkdtemp()
db_path = os.path.join(temp_dir, "test_init.db")
try:
storage = SqliteVecMemoryStorage(db_path)
await storage.initialize()
# Check that database file was created
assert os.path.exists(db_path)
# Check that connection is established
assert storage.conn is not None
# Check that table was created
cursor = storage.conn.execute('''
SELECT name FROM sqlite_master
WHERE type='table' AND name='memories'
''')
assert cursor.fetchone() is not None
storage.close()
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.mark.asyncio
async def test_store_memory(self, storage, sample_memory):
"""Test storing a memory."""
success, message = await storage.store(sample_memory)
assert success
assert "successfully" in message.lower()
# Verify memory was stored
cursor = storage.conn.execute(
'SELECT content_hash FROM memories WHERE content_hash = ?',
(sample_memory.content_hash,)
)
result = cursor.fetchone()
assert result is not None
assert result[0] == sample_memory.content_hash
@pytest.mark.asyncio
async def test_store_duplicate_memory(self, storage, sample_memory):
"""Test that duplicate memories are rejected."""
# Store the memory first time
success, message = await storage.store(sample_memory)
assert success
# Try to store the same memory again
success, message = await storage.store(sample_memory)
assert not success
assert "duplicate" in message.lower()
@pytest.mark.asyncio
async def test_retrieve_memory(self, storage, sample_memory):
"""Test retrieving memories using semantic search."""
# Store the memory
await storage.store(sample_memory)
# Retrieve using semantic search
results = await storage.retrieve("test memory sqlite", n_results=5)
assert len(results) > 0
assert isinstance(results[0], MemoryQueryResult)
assert results[0].memory.content_hash == sample_memory.content_hash
assert results[0].relevance_score >= 0.0
assert results[0].debug_info["backend"] == "sqlite-vec"
@pytest.mark.asyncio
async def test_retrieve_no_results(self, storage):
"""Test retrieving when no memories match."""
results = await storage.retrieve("nonexistent query", n_results=5)
assert len(results) == 0
@pytest.mark.asyncio
async def test_search_by_tag(self, storage, sample_memory):
"""Test searching memories by tags."""
# Store the memory
await storage.store(sample_memory)
# Search by existing tag
results = await storage.search_by_tag(["test"])
assert len(results) == 1
assert results[0].content_hash == sample_memory.content_hash
# Search by non-existent tag
results = await storage.search_by_tag(["nonexistent"])
assert len(results) == 0
# Search by multiple tags
results = await storage.search_by_tag(["test", "sqlite-vec"])
assert len(results) == 1
@pytest.mark.asyncio
async def test_search_by_empty_tags(self, storage):
"""Test searching with empty tags list."""
results = await storage.search_by_tag([])
assert len(results) == 0
@pytest.mark.asyncio
async def test_delete_memory(self, storage, sample_memory):
"""Test deleting a memory by content hash."""
# Store the memory
await storage.store(sample_memory)
# Delete the memory
success, message = await storage.delete(sample_memory.content_hash)
assert success
assert sample_memory.content_hash in message
# Verify memory was deleted
cursor = storage.conn.execute(
'SELECT content_hash FROM memories WHERE content_hash = ?',
(sample_memory.content_hash,)
)
assert cursor.fetchone() is None
@pytest.mark.asyncio
async def test_delete_nonexistent_memory(self, storage):
"""Test deleting a non-existent memory."""
nonexistent_hash = "nonexistent123456789"
success, message = await storage.delete(nonexistent_hash)
assert not success
assert "not found" in message.lower()
@pytest.mark.asyncio
async def test_delete_by_tag(self, storage):
"""Test deleting memories by tag."""
# Store multiple memories with different tags
memory1 = Memory(
content="Memory 1",
content_hash=generate_content_hash("Memory 1"),
tags=["tag1", "shared"]
)
memory2 = Memory(
content="Memory 2",
content_hash=generate_content_hash("Memory 2"),
tags=["tag2", "shared"]
)
memory3 = Memory(
content="Memory 3",
content_hash=generate_content_hash("Memory 3"),
tags=["tag3"]
)
await storage.store(memory1)
await storage.store(memory2)
await storage.store(memory3)
# Delete by shared tag
count, message = await storage.delete_by_tag("shared")
assert count == 2
assert "deleted 2 memories" in message.lower()
# Verify correct memories were deleted
remaining = await storage.search_by_tag(["tag3"])
assert len(remaining) == 1
assert remaining[0].content_hash == memory3.content_hash
@pytest.mark.asyncio
async def test_delete_by_nonexistent_tag(self, storage):
"""Test deleting by a non-existent tag."""
count, message = await storage.delete_by_tag("nonexistent")
assert count == 0
assert "no memories found" in message.lower()
@pytest.mark.asyncio
async def test_cleanup_duplicates(self, storage):
"""Test cleaning up duplicate memories."""
# Create memory
content = "Duplicate test memory"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=["duplicate"]
)
# Store the memory
await storage.store(memory)
# Manually insert a duplicate (bypassing duplicate check)
embedding = storage._generate_embedding(content)
storage.conn.execute('''
INSERT INTO memories (
content_embedding, content_hash, content, tags, memory_type,
metadata, created_at, updated_at, created_at_iso, updated_at_iso
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
sqlite_vec.serialize_float32(embedding),
memory.content_hash,
content,
"duplicate",
None,
"{}",
time.time(),
time.time(),
"2024-01-01T00:00:00Z",
"2024-01-01T00:00:00Z"
))
storage.conn.commit()
# Clean up duplicates
count, message = await storage.cleanup_duplicates()
assert count == 1
assert "removed 1 duplicate" in message.lower()
# Verify only one copy remains
cursor = storage.conn.execute(
'SELECT COUNT(*) FROM memories WHERE content_hash = ?',
(memory.content_hash,)
)
assert cursor.fetchone()[0] == 1
@pytest.mark.asyncio
async def test_cleanup_no_duplicates(self, storage, sample_memory):
"""Test cleanup when no duplicates exist."""
await storage.store(sample_memory)
count, message = await storage.cleanup_duplicates()
assert count == 0
assert "no duplicate memories found" in message.lower()
@pytest.mark.asyncio
async def test_update_memory_metadata(self, storage, sample_memory):
"""Test updating memory metadata."""
# Store the memory
await storage.store(sample_memory)
# Update metadata
updates = {
"tags": ["updated", "test"],
"memory_type": "reminder",
"metadata": {"priority": "high", "due_date": "2024-01-15"},
"status": "active"
}
success, message = await storage.update_memory_metadata(
content_hash=sample_memory.content_hash,
updates=updates
)
assert success
assert "updated fields" in message.lower()
# Verify updates
cursor = storage.conn.execute('''
SELECT tags, memory_type, metadata
FROM memories WHERE content_hash = ?
''', (sample_memory.content_hash,))
row = cursor.fetchone()
assert row is not None
tags_str, memory_type, metadata_str = row
metadata = json.loads(metadata_str)
assert tags_str == "updated,test"
assert memory_type == "reminder"
assert metadata["priority"] == "high"
assert metadata["due_date"] == "2024-01-15"
assert metadata["status"] == "active"
@pytest.mark.asyncio
async def test_update_nonexistent_memory(self, storage):
"""Test updating metadata for non-existent memory."""
nonexistent_hash = "nonexistent123456789"
success, message = await storage.update_memory_metadata(
content_hash=nonexistent_hash,
updates={"tags": ["test"]}
)
assert not success
assert "not found" in message.lower()
@pytest.mark.asyncio
async def test_update_memory_with_invalid_tags(self, storage, sample_memory):
"""Test updating memory with invalid tags format."""
await storage.store(sample_memory)
success, message = await storage.update_memory_metadata(
content_hash=sample_memory.content_hash,
updates={"tags": "not_a_list"}
)
assert not success
assert "list of strings" in message.lower()
@pytest.mark.asyncio
async def test_update_memory_with_invalid_metadata(self, storage, sample_memory):
"""Test updating memory with invalid metadata format."""
await storage.store(sample_memory)
success, message = await storage.update_memory_metadata(
content_hash=sample_memory.content_hash,
updates={"metadata": "not_a_dict"}
)
assert not success
assert "dictionary" in message.lower()
@pytest.mark.asyncio
async def test_update_memory_preserve_timestamps(self, storage, sample_memory):
"""Test updating memory while preserving timestamps."""
await storage.store(sample_memory)
# Get original timestamps
cursor = storage.conn.execute('''
SELECT created_at, created_at_iso FROM memories WHERE content_hash = ?
''', (sample_memory.content_hash,))
original_created_at, original_created_at_iso = cursor.fetchone()
# Wait a moment
await asyncio.sleep(0.1)
# Update with preserve_timestamps=True
success, message = await storage.update_memory_metadata(
content_hash=sample_memory.content_hash,
updates={"tags": ["updated"]},
preserve_timestamps=True
)
assert success
# Check timestamps
cursor = storage.conn.execute('''
SELECT created_at, created_at_iso, updated_at FROM memories WHERE content_hash = ?
''', (sample_memory.content_hash,))
created_at, created_at_iso, updated_at = cursor.fetchone()
# created_at should be preserved
assert abs(created_at - original_created_at) < 0.01
assert created_at_iso == original_created_at_iso
# updated_at should be newer
assert updated_at > original_created_at
@pytest.mark.asyncio
async def test_update_memory_reset_timestamps(self, storage, sample_memory):
"""Test updating memory with timestamp reset."""
await storage.store(sample_memory)
# Get original timestamps
cursor = storage.conn.execute('''
SELECT created_at FROM memories WHERE content_hash = ?
''', (sample_memory.content_hash,))
original_created_at = cursor.fetchone()[0]
# Wait a moment
await asyncio.sleep(0.1)
# Update with preserve_timestamps=False
success, message = await storage.update_memory_metadata(
content_hash=sample_memory.content_hash,
updates={"tags": ["updated"]},
preserve_timestamps=False
)
assert success
# Check timestamps
cursor = storage.conn.execute('''
SELECT created_at FROM memories WHERE content_hash = ?
''', (sample_memory.content_hash,))
created_at = cursor.fetchone()[0]
# created_at should be updated (newer)
assert created_at > original_created_at
def test_get_stats(self, storage):
"""Test getting storage statistics."""
stats = storage.get_stats()
assert isinstance(stats, dict)
assert stats["backend"] == "sqlite-vec"
assert "total_memories" in stats
assert "database_size_bytes" in stats
assert "embedding_model" in stats
assert "embedding_dimension" in stats
@pytest.mark.asyncio
async def test_get_stats_with_data(self, storage, sample_memory):
"""Test getting statistics with data."""
await storage.store(sample_memory)
stats = storage.get_stats()
assert stats["total_memories"] >= 1
assert stats["database_size_bytes"] > 0
assert stats["embedding_dimension"] == storage.embedding_dimension
def test_close_connection(self, storage):
"""Test closing the database connection."""
assert storage.conn is not None
storage.close()
assert storage.conn is None
@pytest.mark.asyncio
async def test_multiple_memories_retrieval(self, storage):
"""Test retrieving multiple memories with ranking."""
# Store multiple memories
memories = []
for i in range(5):
content = f"Test memory {i} with different content and keywords"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=[f"tag{i}"],
memory_type="note"
)
memories.append(memory)
await storage.store(memory)
# Retrieve memories
results = await storage.retrieve("test memory content", n_results=3)
assert len(results) <= 3
assert len(results) > 0
# Check that results are properly ranked (higher relevance first)
for i in range(len(results) - 1):
assert results[i].relevance_score >= results[i + 1].relevance_score
@pytest.mark.asyncio
async def test_embedding_generation(self, storage):
"""Test embedding generation functionality."""
test_text = "This is a test for embedding generation"
embedding = storage._generate_embedding(test_text)
assert isinstance(embedding, list)
assert len(embedding) == storage.embedding_dimension
assert all(isinstance(x, float) for x in embedding)
@pytest.mark.asyncio
async def test_memory_with_complex_metadata(self, storage):
"""Test storing and retrieving memory with complex metadata."""
content = "Memory with complex metadata"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=["complex", "metadata", "test"],
memory_type="structured",
metadata={
"nested": {"level1": {"level2": "value"}},
"array": [1, 2, 3, "four"],
"boolean": True,
"null_value": None,
"unicode": "测试中文 🚀"
}
)
# Store the memory
success, message = await storage.store(memory)
assert success
# Retrieve and verify
results = await storage.retrieve("complex metadata", n_results=1)
assert len(results) == 1
retrieved_memory = results[0].memory
assert retrieved_memory.metadata["nested"]["level1"]["level2"] == "value"
assert retrieved_memory.metadata["array"] == [1, 2, 3, "four"]
assert retrieved_memory.metadata["boolean"] is True
assert retrieved_memory.metadata["unicode"] == "测试中文 🚀"
@pytest.mark.asyncio
async def test_concurrent_operations(self, storage):
"""Test concurrent storage operations."""
# Create multiple memories
memories = []
for i in range(10):
content = f"Concurrent test memory {i}"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=[f"concurrent{i}"]
)
memories.append(memory)
# Store memories concurrently
tasks = [storage.store(memory) for memory in memories]
results = await asyncio.gather(*tasks)
# All should succeed
assert all(success for success, _ in results)
# Verify all were stored
for memory in memories:
cursor = storage.conn.execute(
'SELECT content_hash FROM memories WHERE content_hash = ?',
(memory.content_hash,)
)
assert cursor.fetchone() is not None
@pytest.mark.asyncio
async def test_get_memories_by_time_range_basic(self, storage):
"""Test basic time range filtering."""
# Store memories at different times
now = time.time()
# Memory from 1 hour ago
memory1 = Memory(
content="Memory from 1 hour ago",
content_hash=generate_content_hash("Memory from 1 hour ago"),
tags=["timerange"],
created_at=now - 3600
)
# Memory from 30 minutes ago
memory2 = Memory(
content="Memory from 30 minutes ago",
content_hash=generate_content_hash("Memory from 30 minutes ago"),
tags=["timerange"],
created_at=now - 1800
)
# Memory from now
memory3 = Memory(
content="Memory from now",
content_hash=generate_content_hash("Memory from now"),
tags=["timerange"],
created_at=now
)
await storage.store(memory1)
await storage.store(memory2)
await storage.store(memory3)
# Get memories from last 45 minutes (should get memory2 and memory3)
results = await storage.get_memories_by_time_range(now - 2700, now + 100)
assert len(results) == 2
contents = [m.content for m in results]
assert "Memory from 30 minutes ago" in contents
assert "Memory from now" in contents
assert "Memory from 1 hour ago" not in contents
@pytest.mark.asyncio
async def test_get_memories_by_time_range_empty(self, storage):
"""Test time range with no matching memories."""
# Store one memory now
memory = Memory(
content="Current memory",
content_hash=generate_content_hash("Current memory"),
tags=["test"]
)
await storage.store(memory)
# Query for memories from far in the past
now = time.time()
results = await storage.get_memories_by_time_range(now - 86400, now - 7200)
assert len(results) == 0
@pytest.mark.asyncio
async def test_get_memories_by_time_range_boundaries(self, storage):
"""Test inclusive boundaries of time range."""
now = time.time()
# Memory exactly at start boundary
memory_start = Memory(
content="At start boundary",
content_hash=generate_content_hash("At start boundary"),
tags=["boundary"],
created_at=now - 1000
)
# Memory exactly at end boundary
memory_end = Memory(
content="At end boundary",
content_hash=generate_content_hash("At end boundary"),
tags=["boundary"],
created_at=now
)
# Memory just before start
memory_before = Memory(
content="Before start",
content_hash=generate_content_hash("Before start"),
tags=["boundary"],
created_at=now - 1001
)
# Memory just after end
memory_after = Memory(
content="After end",
content_hash=generate_content_hash("After end"),
tags=["boundary"],
created_at=now + 1
)
await storage.store(memory_start)
await storage.store(memory_end)
await storage.store(memory_before)
await storage.store(memory_after)
# Query with inclusive boundaries
results = await storage.get_memories_by_time_range(now - 1000, now)
assert len(results) == 2
contents = [m.content for m in results]
assert "At start boundary" in contents
assert "At end boundary" in contents
assert "Before start" not in contents
assert "After end" not in contents
@pytest.mark.asyncio
async def test_get_memories_by_time_range_ordering(self, storage):
"""Test that results are ordered by created_at DESC."""
now = time.time()
# Store three memories in random order
memory1 = Memory(
content="First",
content_hash=generate_content_hash("First"),
tags=["order"],
created_at=now - 300
)
memory2 = Memory(
content="Second",
content_hash=generate_content_hash("Second"),
tags=["order"],
created_at=now - 200
)
memory3 = Memory(
content="Third",
content_hash=generate_content_hash("Third"),
tags=["order"],
created_at=now - 100
)
await storage.store(memory3) # Store in non-chronological order
await storage.store(memory1)
await storage.store(memory2)
# Get all three
results = await storage.get_memories_by_time_range(now - 400, now)
assert len(results) == 3
# Should be ordered newest first (DESC)
assert results[0].content == "Third"
assert results[1].content == "Second"
assert results[2].content == "First"
class TestSqliteVecStorageWithoutEmbeddings:
"""Test SQLite-vec storage when sentence transformers is not available."""
@pytest.mark.asyncio
async def test_initialization_without_embeddings(self):
"""Test that storage can initialize without sentence transformers."""
temp_dir = tempfile.mkdtemp()
db_path = os.path.join(temp_dir, "test_no_embeddings.db")
try:
with patch('src.mcp_memory_service.storage.sqlite_vec.SENTENCE_TRANSFORMERS_AVAILABLE', False):
storage = SqliteVecMemoryStorage(db_path)
await storage.initialize()
assert storage.conn is not None
assert storage.embedding_model is None
storage.close()
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
@pytest.mark.asyncio
async def test_operations_without_embeddings(self):
"""Test basic operations without embeddings."""
temp_dir = tempfile.mkdtemp()
db_path = os.path.join(temp_dir, "test_no_embeddings.db")
try:
with patch('src.mcp_memory_service.storage.sqlite_vec.SENTENCE_TRANSFORMERS_AVAILABLE', False):
storage = SqliteVecMemoryStorage(db_path)
await storage.initialize()
# Store should work (with zero embeddings)
content = "Test without embeddings"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=["no-embeddings"]
)
success, message = await storage.store(memory)
assert success
# Tag search should work
results = await storage.search_by_tag(["no-embeddings"])
assert len(results) == 1
# Semantic search won't work well but shouldn't crash
results = await storage.retrieve("test", n_results=1)
# May or may not return results, but shouldn't crash
storage.close()
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
if __name__ == "__main__":
# Run basic tests when executed directly
async def run_basic_tests():
"""Run basic tests to verify functionality."""
if not SQLITE_VEC_AVAILABLE:
print("⚠️ sqlite-vec not available, skipping tests")
return
print("Running basic SQLite-vec storage tests...")
temp_dir = tempfile.mkdtemp()
db_path = os.path.join(temp_dir, "test_basic.db")
try:
# Test basic functionality
storage = SqliteVecMemoryStorage(db_path)
await storage.initialize()
# Store a memory
content = "Test memory for basic validation"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=["test", "basic"]
)
success, message = await storage.store(memory)
print(f"Store: {success}, {message}")
# Retrieve the memory
results = await storage.retrieve("test memory", n_results=1)
print(f"Retrieve: Found {len(results)} results")
if results:
print(f"Content: {results[0].memory.content}")
print(f"Relevance: {results[0].relevance_score}")
# Search by tag
tag_results = await storage.search_by_tag(["test"])
print(f"Tag search: Found {len(tag_results)} results")
# Get stats
stats = storage.get_stats()
print(f"Stats: {stats['total_memories']} memories, {stats['database_size_mb']} MB")
storage.close()
print("✅ Basic tests passed!")
except Exception as e:
print(f"❌ Basic tests failed: {e}")
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
# Run the basic tests
asyncio.run(run_basic_tests())
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/utils/time_parser.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Natural language time expression parser for MCP Memory Service.
This module provides utilities to parse and understand various time expressions
for retrieving memories based on when they were stored.
"""
import re
import logging
from datetime import datetime, timedelta, date, time
from typing import Tuple, Optional, Dict, Any, List
logger = logging.getLogger(__name__)
# Named time periods and their approximate date ranges
NAMED_PERIODS = {
# Holidays (US/Western-centric, would need localization for global use)
"christmas": {"month": 12, "day": 25, "window": 3},
"new year": {"month": 1, "day": 1, "window": 3},
"valentine": {"month": 2, "day": 14, "window": 1},
"halloween": {"month": 10, "day": 31, "window": 3},
"thanksgiving": {"month": 11, "day": -1, "window": 3}, # -1 means fourth Thursday
# Seasons (Northern Hemisphere)
"spring": {"start_month": 3, "start_day": 20, "end_month": 6, "end_day": 20},
"summer": {"start_month": 6, "start_day": 21, "end_month": 9, "end_day": 22},
"fall": {"start_month": 9, "start_day": 23, "end_month": 12, "end_day": 20},
"autumn": {"start_month": 9, "start_day": 23, "end_month": 12, "end_day": 20},
"winter": {"start_month": 12, "start_day": 21, "end_month": 3, "end_day": 19},
}
# Time of day mappings (24-hour format)
TIME_OF_DAY = {
"morning": (5, 11), # 5:00 AM - 11:59 AM
"noon": (12, 12), # 12:00 PM
"afternoon": (13, 17), # 1:00 PM - 5:59 PM
"evening": (18, 21), # 6:00 PM - 9:59 PM
"night": (22, 4), # 10:00 PM - 4:59 AM (wraps around midnight)
"midnight": (0, 0), # 12:00 AM
}
# Regular expressions for various time patterns
PATTERNS = {
"relative_days": re.compile(r'(?:(\d+)\s+days?\s+ago)|(?:yesterday)|(?:today)'),
"relative_weeks": re.compile(r'(\d+)\s+weeks?\s+ago'),
"relative_months": re.compile(r'(\d+)\s+months?\s+ago'),
"relative_years": re.compile(r'(\d+)\s+years?\s+ago'),
"last_period": re.compile(r'last\s+(day|week|month|year|summer|spring|winter|fall|autumn)'),
"this_period": re.compile(r'this\s+(day|week|month|year|summer|spring|winter|fall|autumn)'),
"month_name": re.compile(r'(january|february|march|april|may|june|july|august|september|october|november|december)'),
"date_range": re.compile(r'between\s+(.+?)\s+and\s+(.+?)(?:\s|$)'),
"time_of_day": re.compile(r'(morning|afternoon|evening|night|noon|midnight)'),
"recent": re.compile(r'recent|lately|recently'),
"specific_date": re.compile(r'(\d{1,2})[/-](\d{1,2})(?:[/-](\d{2,4}))?'),
"full_date": re.compile(r'(\d{4})-(\d{1,2})-(\d{1,2})'),
"named_period": re.compile(r'(spring|summer|winter|fall|autumn|christmas|new\s*year|valentine|halloween|thanksgiving|spring\s*break|summer\s*break|winter\s*break)'), "half_year": re.compile(r'(first|second)\s+half\s+of\s+(\d{4})'),
"quarter": re.compile(r'(first|second|third|fourth|1st|2nd|3rd|4th)\s+quarter(?:\s+of\s+(\d{4}))?'),
}
def _calculate_season_date_range(
period: str,
season_info: Dict[str, int],
base_year: int,
current_month: Optional[int] = None
) -> Tuple[datetime, datetime]:
"""
Calculate start and end dates for a season, handling winter's year boundary.
Args:
period: Season name ("winter", "spring", "summer", "fall"/"autumn")
season_info: Dictionary with start_month, start_day, end_month, end_day
base_year: The year to use as reference for calculation
current_month: Current month (1-12) for context-aware winter calculation (optional)
Returns:
Tuple of (start_datetime, end_datetime) for the season
"""
if period == "winter":
# Winter spans year boundary (Dec -> Mar)
# Determine start year based on current month context
if current_month is not None and current_month <= 3:
# We're in Jan-Mar, so winter started the previous year
start_year = base_year - 1
end_year = base_year
else:
# We're in any other month, winter starts this year
start_year = base_year
end_year = base_year + 1
start_dt = datetime(start_year, season_info["start_month"], season_info["start_day"])
end_dt = datetime(end_year, season_info["end_month"], season_info["end_day"], 23, 59, 59)
else:
# All other seasons fall within a single calendar year
start_dt = datetime(base_year, season_info["start_month"], season_info["start_day"])
end_dt = datetime(base_year, season_info["end_month"], season_info["end_day"], 23, 59, 59)
return start_dt, end_dt
def parse_time_expression(query: str) -> Tuple[Optional[float], Optional[float]]:
"""
Parse a natural language time expression and return timestamp range.
Args:
query: A natural language query with time expressions
Returns:
Tuple of (start_timestamp, end_timestamp), either may be None
"""
query = query.lower().strip()
# Check for multiple patterns in a single query
try:
# First check for date ranges like "between X and Y"
date_range_match = PATTERNS["date_range"].search(query)
if date_range_match:
start_expr = date_range_match.group(1)
end_expr = date_range_match.group(2)
start_ts, _ = parse_time_expression(start_expr)
_, end_ts = parse_time_expression(end_expr)
return start_ts, end_ts
# Check for full ISO dates (YYYY-MM-DD) FIRST
full_date_match = PATTERNS["full_date"].search(query)
if full_date_match:
year, month, day = full_date_match.groups()
try:
specific_date = date(int(year), int(month), int(day))
start_dt = datetime.combine(specific_date, time.min)
end_dt = datetime.combine(specific_date, time.max)
return start_dt.timestamp(), end_dt.timestamp()
except ValueError as e:
logger.warning(f"Invalid date: {e}")
return None, None
# Check for specific dates (MM/DD/YYYY)
specific_date_match = PATTERNS["specific_date"].search(query)
if specific_date_match:
month, day, year = specific_date_match.groups()
month = int(month)
day = int(day)
current_year = datetime.now().year
year = int(year) if year else current_year
# Handle 2-digit years
if year and year < 100:
year = 2000 + year if year < 50 else 1900 + year
try:
specific_date = date(year, month, day)
start_dt = datetime.combine(specific_date, time.min)
end_dt = datetime.combine(specific_date, time.max)
return start_dt.timestamp(), end_dt.timestamp()
except ValueError as e:
logger.warning(f"Invalid date: {e}")
return None, None
# Relative days: "X days ago", "yesterday", "today"
days_ago_match = PATTERNS["relative_days"].search(query)
if days_ago_match:
if "yesterday" in query:
days = 1
elif "today" in query:
days = 0
else:
days = int(days_ago_match.group(1))
target_date = date.today() - timedelta(days=days)
# Check for time of day modifiers
time_of_day_match = PATTERNS["time_of_day"].search(query)
if time_of_day_match:
# Narrow the range based on time of day
return get_time_of_day_range(target_date, time_of_day_match.group(1))
else:
# Return the full day
start_dt = datetime.combine(target_date, time.min)
end_dt = datetime.combine(target_date, time.max)
return start_dt.timestamp(), end_dt.timestamp()
# Relative weeks: "X weeks ago"
weeks_ago_match = PATTERNS["relative_weeks"].search(query)
if weeks_ago_match:
weeks = int(weeks_ago_match.group(1))
target_date = date.today() - timedelta(weeks=weeks)
# Get the start of the week (Monday)
start_date = target_date - timedelta(days=target_date.weekday())
end_date = start_date + timedelta(days=6)
start_dt = datetime.combine(start_date, time.min)
end_dt = datetime.combine(end_date, time.max)
return start_dt.timestamp(), end_dt.timestamp()
# Relative months: "X months ago"
months_ago_match = PATTERNS["relative_months"].search(query)
if months_ago_match:
months = int(months_ago_match.group(1))
current = datetime.now()
# Calculate target month
year = current.year
month = current.month - months
# Adjust year if month goes negative
while month <= 0:
year -= 1
month += 12
# Get first and last day of the month
first_day = date(year, month, 1)
if month == 12:
last_day = date(year + 1, 1, 1) - timedelta(days=1)
else:
last_day = date(year, month + 1, 1) - timedelta(days=1)
start_dt = datetime.combine(first_day, time.min)
end_dt = datetime.combine(last_day, time.max)
return start_dt.timestamp(), end_dt.timestamp()
# Relative years: "X years ago"
years_ago_match = PATTERNS["relative_years"].search(query)
if years_ago_match:
years = int(years_ago_match.group(1))
current_year = datetime.now().year
target_year = current_year - years
start_dt = datetime(target_year, 1, 1, 0, 0, 0)
end_dt = datetime(target_year, 12, 31, 23, 59, 59)
return start_dt.timestamp(), end_dt.timestamp()
# "Last X" expressions
last_period_match = PATTERNS["last_period"].search(query)
if last_period_match:
period = last_period_match.group(1)
return get_last_period_range(period)
# "This X" expressions
this_period_match = PATTERNS["this_period"].search(query)
if this_period_match:
period = this_period_match.group(1)
return get_this_period_range(period)
# Month names
month_match = PATTERNS["month_name"].search(query)
if month_match:
month_name = month_match.group(1)
return get_month_range(month_name)
# Named periods (holidays, etc.)
named_period_match = PATTERNS["named_period"].search(query)
if named_period_match:
period_name = named_period_match.group(1) # <-- Just get the matched group without replacing
return get_named_period_range(period_name)
# Half year expressions
half_year_match = PATTERNS["half_year"].search(query)
if half_year_match:
half = half_year_match.group(1)
year_str = half_year_match.group(2)
year = int(year_str) if year_str else datetime.now().year
if half.lower() == "first":
start_dt = datetime(year, 1, 1, 0, 0, 0)
end_dt = datetime(year, 6, 30, 23, 59, 59)
else: # "second"
start_dt = datetime(year, 7, 1, 0, 0, 0)
end_dt = datetime(year, 12, 31, 23, 59, 59)
return start_dt.timestamp(), end_dt.timestamp()
# Quarter expressions
quarter_match = PATTERNS["quarter"].search(query)
if quarter_match:
quarter = quarter_match.group(1).lower()
year_str = quarter_match.group(2)
year = int(year_str) if year_str else datetime.now().year
# Map textual quarter to number
quarter_num = {"first": 1, "1st": 1, "second": 2, "2nd": 2,
"third": 3, "3rd": 3, "fourth": 4, "4th": 4}[quarter]
# Calculate quarter start and end dates
quarter_month = (quarter_num - 1) * 3 + 1
start_dt = datetime(year, quarter_month, 1, 0, 0, 0)
if quarter_month + 3 > 12:
end_dt = datetime(year + 1, 1, 1, 0, 0, 0) - timedelta(seconds=1)
else:
end_dt = datetime(year, quarter_month + 3, 1, 0, 0, 0) - timedelta(seconds=1)
return start_dt.timestamp(), end_dt.timestamp()
# Recent/fuzzy time expressions
recent_match = PATTERNS["recent"].search(query)
if recent_match:
# Default to last 7 days for "recent"
end_dt = datetime.now()
start_dt = end_dt - timedelta(days=7)
return start_dt.timestamp(), end_dt.timestamp()
# If no time expression is found, return None for both timestamps
return None, None
except Exception as e:
logger.error(f"Error parsing time expression: {e}")
return None, None
def get_time_of_day_range(target_date: date, time_period: str) -> Tuple[float, float]:
"""Get timestamp range for a specific time of day on a given date."""
if time_period in TIME_OF_DAY:
start_hour, end_hour = TIME_OF_DAY[time_period]
# Handle periods that wrap around midnight
if start_hour > end_hour: # e.g., "night" = (22, 4)
# For periods that span midnight, we need to handle specially
if time_period == "night":
start_dt = datetime.combine(target_date, time(start_hour, 0))
end_dt = datetime.combine(target_date + timedelta(days=1), time(end_hour, 59, 59))
else:
# Default handling for other wrapping periods
start_dt = datetime.combine(target_date, time(start_hour, 0))
end_dt = datetime.combine(target_date + timedelta(days=1), time(end_hour, 59, 59))
else:
# Normal periods within a single day
start_dt = datetime.combine(target_date, time(start_hour, 0))
if end_hour == start_hour: # For noon, midnight (specific hour)
end_dt = datetime.combine(target_date, time(end_hour, 59, 59))
else:
end_dt = datetime.combine(target_date, time(end_hour, 59, 59))
return start_dt.timestamp(), end_dt.timestamp()
else:
# Fallback to full day
start_dt = datetime.combine(target_date, time.min)
end_dt = datetime.combine(target_date, time.max)
return start_dt.timestamp(), end_dt.timestamp()
def get_last_period_range(period: str) -> Tuple[float, float]:
"""Get timestamp range for 'last X' expressions."""
now = datetime.now()
today = date.today()
if period == "day":
# Last day = yesterday
yesterday = today - timedelta(days=1)
start_dt = datetime.combine(yesterday, time.min)
end_dt = datetime.combine(yesterday, time.max)
elif period == "week":
# Last week = previous calendar week (Mon-Sun)
# Find last Monday
last_monday = today - timedelta(days=today.weekday() + 7)
# Find last Sunday
last_sunday = last_monday + timedelta(days=6)
start_dt = datetime.combine(last_monday, time.min)
end_dt = datetime.combine(last_sunday, time.max)
elif period == "month":
# Last month = previous calendar month
first_of_this_month = date(today.year, today.month, 1)
if today.month == 1:
last_month = 12
last_month_year = today.year - 1
else:
last_month = today.month - 1
last_month_year = today.year
first_of_last_month = date(last_month_year, last_month, 1)
last_of_last_month = first_of_this_month - timedelta(days=1)
start_dt = datetime.combine(first_of_last_month, time.min)
end_dt = datetime.combine(last_of_last_month, time.max)
elif period == "year":
# Last year = previous calendar year
last_year = today.year - 1
start_dt = datetime(last_year, 1, 1, 0, 0, 0)
end_dt = datetime(last_year, 12, 31, 23, 59, 59)
elif period in ["summer", "spring", "winter", "fall", "autumn"]:
# Last season
season_info = NAMED_PERIODS[period]
current_year = today.year
# Determine if we're currently in this season
current_month = today.month
current_day = today.day
is_current_season = False
# Check if today falls within the season's date range
if period in ["winter"]: # Winter spans year boundary
if (current_month >= season_info["start_month"] or
(current_month <= season_info["end_month"] and
current_day <= season_info["end_day"])):
is_current_season = True
else:
if (current_month >= season_info["start_month"] and current_month <= season_info["end_month"] and
current_day >= season_info["start_day"] if current_month == season_info["start_month"] else True and
current_day <= season_info["end_day"] if current_month == season_info["end_month"] else True):
is_current_season = True
# If we're currently in the season, get last year's season
if is_current_season:
year = current_year - 1
else:
year = current_year
# Calculate season date range (handles winter's year boundary)
context_month = current_month if is_current_season else None
start_dt, end_dt = _calculate_season_date_range(period, season_info, year, context_month)
else:
# Fallback - last 24 hours
end_dt = now
start_dt = end_dt - timedelta(days=1)
return start_dt.timestamp(), end_dt.timestamp()
def get_this_period_range(period: str) -> Tuple[float, float]:
"""Get timestamp range for 'this X' expressions."""
now = datetime.now()
today = date.today()
if period == "day":
# This day = today
start_dt = datetime.combine(today, time.min)
end_dt = datetime.combine(today, time.max)
elif period == "week":
# This week = current calendar week (Mon-Sun)
# Find this Monday
monday = today - timedelta(days=today.weekday())
sunday = monday + timedelta(days=6)
start_dt = datetime.combine(monday, time.min)
end_dt = datetime.combine(sunday, time.max)
elif period == "month":
# This month = current calendar month
first_of_month = date(today.year, today.month, 1)
if today.month == 12:
first_of_next_month = date(today.year + 1, 1, 1)
else:
first_of_next_month = date(today.year, today.month + 1, 1)
last_of_month = first_of_next_month - timedelta(days=1)
start_dt = datetime.combine(first_of_month, time.min)
end_dt = datetime.combine(last_of_month, time.max)
elif period == "year":
# This year = current calendar year
start_dt = datetime(today.year, 1, 1, 0, 0, 0)
end_dt = datetime(today.year, 12, 31, 23, 59, 59)
elif period in ["summer", "spring", "winter", "fall", "autumn"]:
# This season
season_info = NAMED_PERIODS[period]
current_year = today.year
# Calculate season date range (handles winter's year boundary)
start_dt, end_dt = _calculate_season_date_range(period, season_info, current_year, today.month)
else:
# Fallback - current 24 hours
end_dt = now
start_dt = datetime.combine(today, time.min)
return start_dt.timestamp(), end_dt.timestamp()
def get_month_range(month_name: str) -> Tuple[float, float]:
"""Get timestamp range for a named month."""
# Map month name to number
month_map = {
"january": 1, "february": 2, "march": 3, "april": 4,
"may": 5, "june": 6, "july": 7, "august": 8,
"september": 9, "october": 10, "november": 11, "december": 12
}
if month_name in month_map:
month_num = month_map[month_name]
current_year = datetime.now().year
# If the month is in the future for this year, use last year
current_month = datetime.now().month
year = current_year if month_num <= current_month else current_year - 1
# Get first and last day of the month
first_day = date(year, month_num, 1)
if month_num == 12:
last_day = date(year + 1, 1, 1) - timedelta(days=1)
else:
last_day = date(year, month_num + 1, 1) - timedelta(days=1)
start_dt = datetime.combine(first_day, time.min)
end_dt = datetime.combine(last_day, time.max)
return start_dt.timestamp(), end_dt.timestamp()
else:
return None, None
def get_named_period_range(period_name: str) -> Tuple[Optional[float], Optional[float]]:
"""Get timestamp range for named periods like holidays."""
period_name = period_name.lower().replace("_", " ")
current_year = datetime.now().year
current_month = datetime.now().month
current_day = datetime.now().day
if period_name in NAMED_PERIODS:
info = NAMED_PERIODS[period_name]
# Found matching period
# Determine if the period is in the past or future for this year
if "month" in info and "day" in info:
# Simple fixed-date holiday
month = info["month"]
day = info["day"]
window = info.get("window", 1) # Default 1-day window
# Special case for Thanksgiving (fourth Thursday in November)
if day == -1 and month == 11: # Thanksgiving
# Find the fourth Thursday in November
first_day = date(current_year, 11, 1)
# Find first Thursday
first_thursday = first_day + timedelta(days=((3 - first_day.weekday()) % 7))
# Fourth Thursday is 3 weeks later
thanksgiving = first_thursday + timedelta(weeks=3)
day = thanksgiving.day
# Check if the holiday has passed this year
is_past = (current_month > month or
(current_month == month and current_day > day + window))
year = current_year if not is_past else current_year - 1
target_date = date(year, month, day)
# Create date range with window
start_date = target_date - timedelta(days=window)
end_date = target_date + timedelta(days=window)
start_dt = datetime.combine(start_date, time.min)
end_dt = datetime.combine(end_date, time.max)
return start_dt.timestamp(), end_dt.timestamp()
elif "start_month" in info and "end_month" in info:
# Season or date range
start_month = info["start_month"]
start_day = info["start_day"]
end_month = info["end_month"]
end_day = info["end_day"]
# Determine year based on current date
if start_month > end_month: # Period crosses year boundary
if current_month < end_month or (current_month == end_month and current_day <= end_day):
# We're in the end part of the period that started last year
start_dt = datetime(current_year - 1, start_month, start_day)
end_dt = datetime(current_year, end_month, end_day, 23, 59, 59)
else:
# The period is either coming up this year or happened earlier this year
if current_month > start_month or (current_month == start_month and current_day >= start_day):
# Period already started this year
start_dt = datetime(current_year, start_month, start_day)
end_dt = datetime(current_year + 1, end_month, end_day, 23, 59, 59)
else:
# Period from last year
start_dt = datetime(current_year - 1, start_month, start_day)
end_dt = datetime(current_year, end_month, end_day, 23, 59, 59)
else:
# Period within a single year
# Check if period has already occurred this year
if (current_month > end_month or
(current_month == end_month and current_day > end_day)):
# Period already passed this year
start_dt = datetime(current_year, start_month, start_day)
end_dt = datetime(current_year, end_month, end_day, 23, 59, 59)
else:
# Check if current date is within the period
is_within_period = (
(current_month > start_month or
(current_month == start_month and current_day >= start_day))
and
(current_month < end_month or
(current_month == end_month and current_day <= end_day))
)
if is_within_period:
# We're in the period this year
start_dt = datetime(current_year, start_month, start_day)
end_dt = datetime(current_year, end_month, end_day, 23, 59, 59)
else:
# Period from last year
start_dt = datetime(current_year - 1, start_month, start_day)
end_dt = datetime(current_year - 1, end_month, end_day, 23, 59, 59)
return start_dt.timestamp(), end_dt.timestamp()
# If no match found
return None, None
# Helper function to detect time expressions in a general query
def extract_time_expression(query: str) -> Tuple[str, Tuple[Optional[float], Optional[float]]]:
"""
Extract time-related expressions from a query and return the timestamps.
Args:
query: A natural language query that may contain time expressions
Returns:
Tuple of (cleaned_query, (start_timestamp, end_timestamp))
The cleaned_query has time expressions removed
"""
# Check for time expressions
time_expressions = [
r'\b\d+\s+days?\s+ago\b',
r'\byesterday\b',
r'\btoday\b',
r'\b\d+\s+weeks?\s+ago\b',
r'\b\d+\s+months?\s+ago\b',
r'\b\d+\s+years?\s+ago\b',
r'\blast\s+(day|week|month|year|summer|spring|winter|fall|autumn)\b',
r'\bthis\s+(day|week|month|year|summer|spring|winter|fall|autumn)\b',
r'\b(january|february|march|april|may|june|july|august|september|october|november|december)\b',
r'\bbetween\s+.+?\s+and\s+.+?(?:\s|$)',
r'\bin\s+the\s+(morning|afternoon|evening|night|noon|midnight)\b',
r'\brecent|lately|recently\b',
r'\b\d{1,2}[/-]\d{1,2}(?:[/-]\d{2,4})?\b',
r'\b\d{4}-\d{1,2}-\d{1,2}\b',
r'\b(spring|summer|winter|fall|autumn|christmas|new\s*year|valentine|halloween|thanksgiving|spring\s*break|summer\s*break|winter\s*break)\b',
r'\b(first|second)\s+half\s+of\s+\d{4}\b',
r'\b(first|second|third|fourth|1st|2nd|3rd|4th)\s+quarter(?:\s+of\s+\d{4})?\b',
r'\bfrom\s+.+\s+to\s+.+\b'
]
# Combine all patterns
combined_pattern = '|'.join(f'({expr})' for expr in time_expressions)
combined_regex = re.compile(combined_pattern, re.IGNORECASE)
# Find all matches
matches = list(combined_regex.finditer(query))
if not matches:
return query, (None, None)
# Extract the time expressions
time_expressions = []
for match in matches:
span = match.span()
expression = query[span[0]:span[1]]
time_expressions.append(expression)
# Parse time expressions to get timestamps
full_time_expression = ' '.join(time_expressions)
start_ts, end_ts = parse_time_expression(full_time_expression)
# Remove time expressions from the query
cleaned_query = query
for expr in time_expressions:
cleaned_query = cleaned_query.replace(expr, '')
# Clean up multiple spaces
cleaned_query = re.sub(r'\s+', ' ', cleaned_query).strip()
return cleaned_query, (start_ts, end_ts)
```
--------------------------------------------------------------------------------
/claude-hooks/utilities/memory-scorer.js:
--------------------------------------------------------------------------------
```javascript
/**
* Memory Relevance Scoring Utility
* Implements intelligent algorithms to score memories by relevance to current project context
* Phase 2: Enhanced with conversation context awareness for dynamic memory loading
*/
/**
* Calculate time decay factor for memory relevance
* More recent memories get higher scores
*/
function calculateTimeDecay(memoryDate, decayRate = 0.1) {
try {
const now = new Date();
// Handle both Unix timestamps (seconds) and ISO strings
let memoryTime;
if (typeof memoryDate === 'string') {
// ISO string format
memoryTime = new Date(memoryDate);
} else if (typeof memoryDate === 'number') {
// Unix timestamp in seconds, convert to milliseconds
memoryTime = new Date(memoryDate * 1000);
} else {
return 0.5; // Invalid format
}
if (isNaN(memoryTime.getTime())) {
return 0.5; // Default score for invalid dates
}
// Calculate days since memory creation
const daysDiff = (now - memoryTime) / (1000 * 60 * 60 * 24);
// Exponential decay: score = e^(-decayRate * days)
// Recent memories (0-7 days): score 0.8-1.0
// Older memories (8-30 days): score 0.3-0.8
// Ancient memories (30+ days): score 0.0-0.3
const decayScore = Math.exp(-decayRate * daysDiff);
// Ensure score is between 0 and 1
return Math.max(0.01, Math.min(1.0, decayScore));
} catch (error) {
// Silently fail with default score to avoid noise
return 0.5;
}
}
/**
* Calculate tag relevance score
* Memories with tags matching project context get higher scores
*/
function calculateTagRelevance(memoryTags = [], projectContext) {
try {
if (!Array.isArray(memoryTags) || memoryTags.length === 0) {
return 0.3; // Default score for memories without tags
}
const contextTags = [
projectContext.name?.toLowerCase(),
projectContext.language?.toLowerCase(),
...(projectContext.frameworks || []).map(f => f.toLowerCase()),
...(projectContext.tools || []).map(t => t.toLowerCase())
].filter(Boolean);
if (contextTags.length === 0) {
return 0.5; // No context to match against
}
// Calculate tag overlap (exact match only to prevent cross-project pollution)
const memoryTagsLower = memoryTags.map(tag => tag.toLowerCase());
const matchingTags = contextTags.filter(contextTag =>
memoryTagsLower.includes(contextTag)
);
// Score based on percentage of matching tags
const overlapScore = matchingTags.length / contextTags.length;
// Bonus for exact project name matches
const exactProjectMatch = memoryTagsLower.includes(projectContext.name?.toLowerCase());
const projectBonus = exactProjectMatch ? 0.3 : 0;
// Bonus for exact language matches
const exactLanguageMatch = memoryTagsLower.includes(projectContext.language?.toLowerCase());
const languageBonus = exactLanguageMatch ? 0.2 : 0;
// Bonus for framework matches
const frameworkMatches = (projectContext.frameworks || []).filter(framework =>
memoryTagsLower.some(tag => tag.includes(framework.toLowerCase()))
);
const frameworkBonus = frameworkMatches.length * 0.1;
const totalScore = Math.min(1.0, overlapScore + projectBonus + languageBonus + frameworkBonus);
return Math.max(0.1, totalScore);
} catch (error) {
// Silently fail with default score to avoid noise
return 0.3;
}
}
/**
* Calculate content quality score to penalize generic/empty content
*/
function calculateContentQuality(memoryContent = '') {
try {
if (!memoryContent || typeof memoryContent !== 'string') {
return 0.1;
}
const content = memoryContent.trim();
// Check for generic session summary patterns
const genericPatterns = [
/## 🎯 Topics Discussed\s*-\s*implementation\s*-\s*\.\.\.?$/m,
/Topics Discussed.*implementation.*\.\.\..*$/s,
/Session Summary.*implementation.*\.\.\..*$/s,
/^# Session Summary.*Date.*Project.*Topics Discussed.*implementation.*\.\.\..*$/s
];
const isGeneric = genericPatterns.some(pattern => pattern.test(content));
if (isGeneric) {
return 0.05; // Heavily penalize generic content
}
// Check content length and substance
if (content.length < 50) {
return 0.2; // Short content gets low score
}
// Check for meaningful content indicators
const meaningfulIndicators = [
'decided', 'implemented', 'changed', 'fixed', 'created', 'updated',
'because', 'reason', 'approach', 'solution', 'result', 'impact',
'learned', 'discovered', 'found', 'issue', 'problem', 'challenge'
];
const meaningfulMatches = meaningfulIndicators.filter(indicator =>
content.toLowerCase().includes(indicator)
).length;
// Calculate information density
const words = content.split(/\s+/).filter(w => w.length > 2);
const uniqueWords = new Set(words.map(w => w.toLowerCase()));
const diversityRatio = uniqueWords.size / Math.max(words.length, 1);
// Combine factors
const meaningfulnessScore = Math.min(0.4, meaningfulMatches * 0.08);
const diversityScore = Math.min(0.3, diversityRatio * 0.5);
const lengthScore = Math.min(0.3, content.length / 1000); // Longer content gets bonus
const qualityScore = meaningfulnessScore + diversityScore + lengthScore;
return Math.max(0.05, Math.min(1.0, qualityScore));
} catch (error) {
// Silently fail with default score to avoid noise
return 0.3;
}
}
/**
* Calculate content relevance using simple text analysis
* Memories with content matching project keywords get higher scores
*/
function calculateContentRelevance(memoryContent = '', projectContext) {
try {
if (!memoryContent || typeof memoryContent !== 'string') {
return 0.3;
}
const content = memoryContent.toLowerCase();
const keywords = [
projectContext.name?.toLowerCase(),
projectContext.language?.toLowerCase(),
...(projectContext.frameworks || []).map(f => f.toLowerCase()),
...(projectContext.tools || []).map(t => t.toLowerCase()),
// Add common technical keywords
'architecture', 'decision', 'implementation', 'bug', 'fix',
'feature', 'config', 'setup', 'deployment', 'performance'
].filter(Boolean);
if (keywords.length === 0) {
return 0.5;
}
// Count keyword occurrences
let totalMatches = 0;
let keywordScore = 0;
keywords.forEach(keyword => {
const occurrences = (content.match(new RegExp(keyword, 'g')) || []).length;
if (occurrences > 0) {
totalMatches++;
keywordScore += Math.log(1 + occurrences) * 0.1; // Logarithmic scoring
}
});
// Normalize score
const matchRatio = totalMatches / keywords.length;
const contentScore = Math.min(1.0, matchRatio + keywordScore);
return Math.max(0.1, contentScore);
} catch (error) {
// Silently fail with default score to avoid noise
return 0.3;
}
}
/**
* Calculate memory type bonus
* Certain memory types are more valuable for context injection
*/
function calculateTypeBonus(memoryType) {
const typeScores = {
'decision': 0.3, // Architectural decisions are highly valuable
'architecture': 0.3, // Architecture documentation is important
'reference': 0.2, // Reference materials are useful
'session': 0.15, // Session summaries provide good context
'insight': 0.2, // Insights are valuable for learning
'bug-fix': 0.15, // Bug fixes provide historical context
'feature': 0.1, // Feature descriptions are moderately useful
'note': 0.05, // General notes are less critical
'todo': 0.05, // TODOs are task-specific
'temporary': -0.1 // Temporary notes should be deprioritized
};
return typeScores[memoryType?.toLowerCase()] || 0;
}
/**
* Calculate recency bonus to prioritize very recent memories
* Provides explicit boost for memories created within specific time windows
*/
function calculateRecencyBonus(memoryDate) {
// Recency bonus tiers (days and corresponding bonus values)
const RECENCY_TIERS = [
{ days: 7, bonus: 0.15 }, // Strong boost for last week
{ days: 14, bonus: 0.10 }, // Moderate boost for last 2 weeks
{ days: 30, bonus: 0.05 } // Small boost for last month
];
try {
const now = new Date();
// Handle both Unix timestamps (seconds) and ISO strings
let memoryTime;
if (typeof memoryDate === 'string') {
// ISO string format
memoryTime = new Date(memoryDate);
} else if (typeof memoryDate === 'number') {
// Unix timestamp in seconds, convert to milliseconds
memoryTime = new Date(memoryDate * 1000);
} else {
return 0; // Invalid format
}
if (isNaN(memoryTime.getTime()) || memoryTime > now) {
return 0; // No bonus for invalid or future dates
}
const daysDiff = (now - memoryTime) / (1000 * 60 * 60 * 24);
// Find the appropriate tier for this memory's age
for (const tier of RECENCY_TIERS) {
if (daysDiff <= tier.days) {
return tier.bonus;
}
}
return 0; // No bonus for older memories
} catch (error) {
return 0;
}
}
/**
* Calculate conversation context relevance score (Phase 2)
* Matches memory content with current conversation topics and intent
*/
function calculateConversationRelevance(memory, conversationAnalysis) {
try {
if (!conversationAnalysis || !memory.content) {
return 0.3; // Default score when no conversation context
}
const memoryContent = memory.content.toLowerCase();
let relevanceScore = 0;
let factorCount = 0;
// Score based on topic matching
if (conversationAnalysis.topics && conversationAnalysis.topics.length > 0) {
conversationAnalysis.topics.forEach(topic => {
const topicMatches = (memoryContent.match(new RegExp(topic.name, 'gi')) || []).length;
if (topicMatches > 0) {
relevanceScore += topic.confidence * Math.min(topicMatches * 0.2, 0.8);
factorCount++;
}
});
}
// Score based on entity matching
if (conversationAnalysis.entities && conversationAnalysis.entities.length > 0) {
conversationAnalysis.entities.forEach(entity => {
const entityMatches = (memoryContent.match(new RegExp(entity.name, 'gi')) || []).length;
if (entityMatches > 0) {
relevanceScore += entity.confidence * 0.3;
factorCount++;
}
});
}
// Score based on intent alignment
if (conversationAnalysis.intent) {
const intentKeywords = {
'learning': ['learn', 'understand', 'explain', 'how', 'tutorial', 'guide'],
'problem-solving': ['fix', 'error', 'debug', 'issue', 'problem', 'solve'],
'development': ['build', 'create', 'implement', 'develop', 'code', 'feature'],
'optimization': ['optimize', 'improve', 'performance', 'faster', 'better'],
'review': ['review', 'check', 'analyze', 'audit', 'validate'],
'planning': ['plan', 'design', 'architecture', 'approach', 'strategy']
};
const intentWords = intentKeywords[conversationAnalysis.intent.name] || [];
let intentMatches = 0;
intentWords.forEach(word => {
if (memoryContent.includes(word)) {
intentMatches++;
}
});
if (intentMatches > 0) {
relevanceScore += conversationAnalysis.intent.confidence * (intentMatches / intentWords.length);
factorCount++;
}
}
// Score based on code context if present
if (conversationAnalysis.codeContext && conversationAnalysis.codeContext.isCodeRelated) {
const codeIndicators = ['code', 'function', 'class', 'method', 'variable', 'api', 'library'];
let codeMatches = 0;
codeIndicators.forEach(indicator => {
if (memoryContent.includes(indicator)) {
codeMatches++;
}
});
if (codeMatches > 0) {
relevanceScore += 0.4 * (codeMatches / codeIndicators.length);
factorCount++;
}
}
// Normalize score
const normalizedScore = factorCount > 0 ? relevanceScore / factorCount : 0.3;
return Math.max(0.1, Math.min(1.0, normalizedScore));
} catch (error) {
// Silently fail with default score to avoid noise
return 0.3;
}
}
/**
* Calculate final relevance score for a memory (Enhanced with quality scoring)
*/
function calculateRelevanceScore(memory, projectContext, options = {}) {
try {
const {
weights = {},
timeDecayRate = 0.1, // Default decay rate
includeConversationContext = false,
conversationAnalysis = null
} = options;
// Default weights including content quality factor
const defaultWeights = includeConversationContext ? {
timeDecay: 0.20, // Reduced weight for time
tagRelevance: 0.30, // Tag matching remains important
contentRelevance: 0.15, // Content matching reduced
contentQuality: 0.25, // New quality factor
conversationRelevance: 0.25, // Conversation context factor
typeBonus: 0.05 // Memory type provides minor adjustment
} : {
timeDecay: 0.25, // Reduced time weight
tagRelevance: 0.35, // Tag matching important
contentRelevance: 0.15, // Content matching
contentQuality: 0.25, // Quality factor prioritized
typeBonus: 0.05 // Type bonus reduced
};
const w = { ...defaultWeights, ...weights };
// Calculate individual scores
const timeScore = calculateTimeDecay(memory.created_at || memory.created_at_iso, timeDecayRate);
const tagScore = calculateTagRelevance(memory.tags, projectContext);
const contentScore = calculateContentRelevance(memory.content, projectContext);
const qualityScore = calculateContentQuality(memory.content);
const typeBonus = calculateTypeBonus(memory.memory_type);
const recencyBonus = calculateRecencyBonus(memory.created_at || memory.created_at_iso);
let finalScore = (
(timeScore * w.timeDecay) +
(tagScore * w.tagRelevance) +
(contentScore * w.contentRelevance) +
(qualityScore * w.contentQuality) +
typeBonus + // Type bonus is not weighted, acts as adjustment
recencyBonus // Recency bonus provides explicit boost for very recent memories
);
const breakdown = {
timeDecay: timeScore,
tagRelevance: tagScore,
contentRelevance: contentScore,
contentQuality: qualityScore,
typeBonus: typeBonus,
recencyBonus: recencyBonus
};
// Add conversation context scoring if enabled (Phase 2)
if (includeConversationContext && conversationAnalysis) {
const conversationScore = calculateConversationRelevance(memory, conversationAnalysis);
finalScore += (conversationScore * (w.conversationRelevance || 0));
breakdown.conversationRelevance = conversationScore;
}
// Apply quality penalty for very low quality content (multiplicative)
if (qualityScore < 0.2) {
finalScore *= 0.5; // Heavily penalize low quality content
}
// Apply project affinity penalty - memories without project tag match get penalized
// This prevents cross-project memory pollution (e.g., Azure memories in Python project)
const memoryTags = (memory.tags || []).map(t => t.toLowerCase());
const memoryContent = (memory.content || '').toLowerCase();
const projectName = projectContext.name?.toLowerCase();
// Check for project name in tags OR content
const hasProjectTag = projectName && (
memoryTags.some(tag => tag === projectName || tag.includes(projectName)) ||
memoryContent.includes(projectName)
);
if (!hasProjectTag && tagScore < 0.3) {
// No project reference at all - definitely unrelated memory
// Hard filter: set score to 0 to exclude from results entirely
finalScore = 0;
breakdown.projectAffinity = 'none (filtered)';
} else if (!hasProjectTag) {
// Some tag relevance but no project tag - might be related
finalScore *= 0.5; // Moderate penalty
breakdown.projectAffinity = 'low';
} else {
breakdown.projectAffinity = 'high';
}
// Ensure score is between 0 and 1
const normalizedScore = Math.max(0, Math.min(1, finalScore));
return {
finalScore: normalizedScore,
breakdown: breakdown,
weights: w,
hasConversationContext: includeConversationContext
};
} catch (error) {
// Silently fail with default score to avoid noise
return {
finalScore: 0.1,
breakdown: { error: error.message },
weights: {},
hasConversationContext: false
};
}
}
/**
* Score and sort memories by relevance
*/
function scoreMemoryRelevance(memories, projectContext, options = {}) {
try {
const { verbose = true } = options;
if (!Array.isArray(memories)) {
if (verbose) console.warn('[Memory Scorer] Invalid memories array');
return [];
}
if (verbose) {
console.log(`[Memory Scorer] Scoring ${memories.length} memories for project: ${projectContext.name}`);
}
// Score each memory
const scoredMemories = memories.map(memory => {
const scoreResult = calculateRelevanceScore(memory, projectContext, options);
return {
...memory,
relevanceScore: scoreResult.finalScore,
scoreBreakdown: scoreResult.breakdown,
hasConversationContext: scoreResult.hasConversationContext
};
});
// Sort by relevance score (highest first)
const sortedMemories = scoredMemories.sort((a, b) => b.relevanceScore - a.relevanceScore);
// Log scoring results for debugging
if (verbose) {
console.log('[Memory Scorer] Top scored memories:');
sortedMemories.slice(0, 3).forEach((memory, index) => {
console.log(` ${index + 1}. Score: ${memory.relevanceScore.toFixed(3)} - ${memory.content.substring(0, 60)}...`);
});
}
return sortedMemories;
} catch (error) {
if (verbose) console.error('[Memory Scorer] Error scoring memories:', error.message);
return memories || [];
}
}
/**
* Filter memories by minimum relevance threshold
*/
function filterByRelevance(memories, minScore = 0.3, options = {}) {
try {
const { verbose = true } = options;
const filtered = memories.filter(memory => memory.relevanceScore >= minScore);
if (verbose) {
console.log(`[Memory Scorer] Filtered ${filtered.length}/${memories.length} memories above threshold ${minScore}`);
}
return filtered;
} catch (error) {
if (verbose) console.warn('[Memory Scorer] Error filtering memories:', error.message);
return memories;
}
}
/**
* Analyze memory age distribution to detect staleness
* Returns statistics and recommended weight adjustments
*/
function analyzeMemoryAgeDistribution(memories, options = {}) {
try {
const { verbose = false } = options;
if (!Array.isArray(memories) || memories.length === 0) {
return {
avgAge: 0,
medianAge: 0,
p75Age: 0,
p90Age: 0,
recentCount: 0,
staleCount: 0,
isStale: false,
recommendedAdjustments: {}
};
}
const now = new Date();
// Calculate ages in days
const ages = memories.map(memory => {
// Handle both Unix timestamps (seconds) and ISO strings
let memoryTime;
if (memory.created_at_iso) {
memoryTime = new Date(memory.created_at_iso);
} else if (memory.created_at) {
// created_at is in seconds, convert to milliseconds
memoryTime = new Date(memory.created_at * 1000);
} else {
return 365; // Default to very old if no timestamp
}
if (isNaN(memoryTime.getTime())) return 365; // Default to very old
return (now - memoryTime) / (1000 * 60 * 60 * 24);
}).sort((a, b) => a - b);
// Calculate percentiles
const avgAge = ages.reduce((sum, age) => sum + age, 0) / ages.length;
const medianAge = ages[Math.floor(ages.length / 2)];
const p75Age = ages[Math.floor(ages.length * 0.75)];
const p90Age = ages[Math.floor(ages.length * 0.90)];
// Count recent vs stale
const recentCount = ages.filter(age => age <= 14).length; // Last 2 weeks
const staleCount = ages.filter(age => age > 30).length; // Older than 1 month
// Determine if memory set is stale
const isStale = medianAge > 30 || (recentCount / ages.length) < 0.2;
// Recommended adjustments based on staleness
const recommendedAdjustments = {};
if (isStale) {
// Memories are old - boost time decay weight, reduce tag relevance
recommendedAdjustments.timeDecay = 0.50; // Increase from default 0.25
recommendedAdjustments.tagRelevance = 0.20; // Decrease from default 0.35
recommendedAdjustments.recencyBonus = 0.25; // Increase bonus for any recent memories
recommendedAdjustments.reason = `Stale memory set detected (median: ${Math.round(medianAge)}d old, ${Math.round(recentCount/ages.length*100)}% recent)`;
} else if (avgAge < 14) {
// Memories are very recent - balanced approach
recommendedAdjustments.timeDecay = 0.30;
recommendedAdjustments.tagRelevance = 0.30;
recommendedAdjustments.reason = `Recent memory set (avg: ${Math.round(avgAge)}d old)`;
}
if (verbose) {
console.log('[Memory Age Analyzer]', {
avgAge: Math.round(avgAge),
medianAge: Math.round(medianAge),
p75Age: Math.round(p75Age),
recentPercent: Math.round(recentCount / ages.length * 100),
isStale,
adjustments: recommendedAdjustments.reason || 'No adjustments needed'
});
}
return {
avgAge,
medianAge,
p75Age,
p90Age,
recentCount,
staleCount,
totalCount: ages.length,
isStale,
recommendedAdjustments
};
} catch (error) {
if (verbose) console.error('[Memory Age Analyzer] Error:', error.message);
return {
avgAge: 0,
medianAge: 0,
p75Age: 0,
p90Age: 0,
recentCount: 0,
staleCount: 0,
isStale: false,
recommendedAdjustments: {}
};
}
}
/**
* Calculate adaptive git context weight based on memory age and git activity
* Prevents old git-related memories from dominating when recent development exists
*/
function calculateAdaptiveGitWeight(gitContext, memoryAgeAnalysis, configuredWeight = 1.2, options = {}) {
try {
const { verbose = false } = options;
// No git context or no recent commits - use configured weight
if (!gitContext || !gitContext.recentCommits || gitContext.recentCommits.length === 0) {
return { weight: configuredWeight, reason: 'No recent git activity' };
}
// Calculate days since most recent commit
const now = new Date();
const mostRecentCommit = new Date(gitContext.recentCommits[0].date);
const daysSinceLastCommit = (now - mostRecentCommit) / (1000 * 60 * 60 * 24);
// Scenario 1: Recent commits (< 7d) BUT stale memories (median > 30d)
// Problem: Git boost would amplify old git memories over potential recent work
if (daysSinceLastCommit <= 7 && memoryAgeAnalysis.medianAge > 30) {
const reducedWeight = Math.max(1.0, configuredWeight * 0.7); // Reduce by 30%
const reason = `Recent commits (${Math.round(daysSinceLastCommit)}d ago) but stale memories (median: ${Math.round(memoryAgeAnalysis.medianAge)}d) - reducing git boost`;
if (verbose) {
console.log(`[Adaptive Git Weight] ${reason}: ${configuredWeight.toFixed(1)} → ${reducedWeight.toFixed(1)}`);
}
return { weight: reducedWeight, reason, adjusted: true };
}
// Scenario 2: Both commits and memories are recent (< 14d)
// Safe to use configured weight, git context is relevant
if (daysSinceLastCommit <= 14 && memoryAgeAnalysis.avgAge <= 14) {
return {
weight: configuredWeight,
reason: `Recent commits and memories aligned (${Math.round(daysSinceLastCommit)}d commits, ${Math.round(memoryAgeAnalysis.avgAge)}d avg memory age)`,
adjusted: false
};
}
// Scenario 3: Old commits (> 14d) but recent memories exist
// Slightly reduce git weight to let recent non-git memories surface
if (daysSinceLastCommit > 14 && memoryAgeAnalysis.recentCount > 0) {
const reducedWeight = Math.max(1.0, configuredWeight * 0.85); // Reduce by 15%
const reason = `Older commits (${Math.round(daysSinceLastCommit)}d ago) with some recent memories - slightly reducing git boost`;
if (verbose) {
console.log(`[Adaptive Git Weight] ${reason}: ${configuredWeight.toFixed(1)} → ${reducedWeight.toFixed(1)}`);
}
return { weight: reducedWeight, reason, adjusted: true };
}
// Default: use configured weight
return { weight: configuredWeight, reason: 'Using configured weight', adjusted: false };
} catch (error) {
if (verbose) console.error('[Adaptive Git Weight] Error:', error.message);
return { weight: configuredWeight, reason: 'Error - using fallback', adjusted: false };
}
}
module.exports = {
scoreMemoryRelevance,
calculateRelevanceScore,
calculateTimeDecay,
calculateTagRelevance,
calculateContentRelevance,
calculateTypeBonus,
calculateRecencyBonus,
filterByRelevance,
analyzeMemoryAgeDistribution,
calculateAdaptiveGitWeight
};
// Direct execution support for testing
if (require.main === module) {
// Test with mock data
const mockProjectContext = {
name: 'mcp-memory-service',
language: 'JavaScript',
frameworks: ['Node.js'],
tools: ['npm']
};
const mockMemories = [
{
content: 'Decided to use SQLite-vec for better performance in MCP Memory Service',
tags: ['mcp-memory-service', 'decision', 'sqlite-vec'],
memory_type: 'decision',
created_at: '2025-08-19T10:00:00Z'
},
{
content: 'Fixed bug in JavaScript hook implementation for Claude Code integration',
tags: ['javascript', 'bug-fix', 'claude-code'],
memory_type: 'bug-fix',
created_at: '2025-08-18T15:30:00Z'
},
{
content: 'Random note about completely unrelated project',
tags: ['other-project', 'note'],
memory_type: 'note',
created_at: '2025-08-01T08:00:00Z'
}
];
console.log('\n=== MEMORY SCORING TEST ===');
const scored = scoreMemoryRelevance(mockMemories, mockProjectContext);
console.log('\n=== SCORED RESULTS ===');
scored.forEach((memory, index) => {
console.log(`${index + 1}. Score: ${memory.relevanceScore.toFixed(3)}`);
console.log(` Content: ${memory.content.substring(0, 80)}...`);
console.log(` Breakdown:`, memory.scoreBreakdown);
console.log('');
});
}
```
--------------------------------------------------------------------------------
/examples/http-mcp-bridge.js:
--------------------------------------------------------------------------------
```javascript
#!/usr/bin/env node
/**
* HTTP-to-MCP Bridge for MCP Memory Service
*
* This bridge allows MCP clients (like Claude Desktop) to connect to a remote
* MCP Memory Service HTTP server instead of running a local instance.
*
* Features:
* - Automatic service discovery via mDNS (Bonjour/Zeroconf)
* - Manual endpoint configuration fallback
* - HTTPS support with self-signed certificate handling
* - API key authentication
*
* Usage in Claude Desktop config:
*
* Option 1: Auto-discovery (recommended for local networks)
* {
* "mcpServers": {
* "memory": {
* "command": "node",
* "args": ["/path/to/http-mcp-bridge.js"],
* "env": {
* "MCP_MEMORY_AUTO_DISCOVER": "true",
* "MCP_MEMORY_PREFER_HTTPS": "true",
* "MCP_MEMORY_API_KEY": "your-api-key"
* }
* }
* }
* }
*
* Option 2: Manual configuration
* {
* "mcpServers": {
* "memory": {
* "command": "node",
* "args": ["/path/to/http-mcp-bridge.js"],
* "env": {
* "MCP_MEMORY_HTTP_ENDPOINT": "https://your-server:8000/api",
* "MCP_MEMORY_API_KEY": "your-api-key"
* }
* }
* }
* }
*/
const http = require('http');
const https = require('https');
const { URL } = require('url');
const dgram = require('dgram');
const dns = require('dns');
const tls = require('tls');
/**
* Simple mDNS service discovery implementation
*/
class MDNSDiscovery {
constructor() {
this.services = new Map();
}
/**
* Discover MCP Memory Services using mDNS
*/
async discoverServices(timeout = 5000) {
return new Promise((resolve) => {
const socket = dgram.createSocket('udp4');
const services = [];
// mDNS query for _mcp-memory._tcp.local
const query = this.createMDNSQuery('_mcp-memory._tcp.local');
socket.on('message', (msg, rinfo) => {
try {
const service = this.parseMDNSResponse(msg, rinfo);
if (service) {
services.push(service);
}
} catch (error) {
// Ignore parsing errors
}
});
socket.bind(() => {
socket.addMembership('224.0.0.251');
socket.send(query, 5353, '224.0.0.251');
});
setTimeout(() => {
socket.close();
resolve(services);
}, timeout);
});
}
createMDNSQuery(serviceName) {
// Simplified mDNS query creation
// This is a basic implementation - in production, use a proper mDNS library
const header = Buffer.alloc(12);
header.writeUInt16BE(0, 0); // Transaction ID
header.writeUInt16BE(0, 2); // Flags
header.writeUInt16BE(1, 4); // Questions
header.writeUInt16BE(0, 6); // Answer RRs
header.writeUInt16BE(0, 8); // Authority RRs
header.writeUInt16BE(0, 10); // Additional RRs
// Question section (simplified)
const nameLabels = serviceName.split('.');
let nameBuffer = Buffer.alloc(0);
for (const label of nameLabels) {
if (label) {
const labelBuffer = Buffer.alloc(1 + label.length);
labelBuffer.writeUInt8(label.length, 0);
labelBuffer.write(label, 1);
nameBuffer = Buffer.concat([nameBuffer, labelBuffer]);
}
}
const endBuffer = Buffer.alloc(5);
endBuffer.writeUInt8(0, 0); // End of name
endBuffer.writeUInt16BE(12, 1); // Type PTR
endBuffer.writeUInt16BE(1, 3); // Class IN
return Buffer.concat([header, nameBuffer, endBuffer]);
}
parseMDNSResponse(msg, rinfo) {
// Simplified mDNS response parsing
// This is a basic implementation - in production, use a proper mDNS library
try {
// Look for MCP Memory Service indicators in the response
const msgStr = msg.toString('ascii', 0, Math.min(msg.length, 512));
if (msgStr.includes('mcp-memory') || msgStr.includes('MCP Memory')) {
// Try common ports for the service
const possiblePorts = [8000, 8080, 443, 80];
const host = rinfo.address;
for (const port of possiblePorts) {
return {
name: 'MCP Memory Service',
host: host,
port: port,
https: port === 443,
discovered: true
};
}
}
} catch (error) {
// Ignore parsing errors
}
return null;
}
}
class HTTPMCPBridge {
constructor() {
this.endpoint = process.env.MCP_MEMORY_HTTP_ENDPOINT;
this.apiKey = process.env.MCP_MEMORY_API_KEY;
this.autoDiscover = process.env.MCP_MEMORY_AUTO_DISCOVER === 'true';
this.preferHttps = process.env.MCP_MEMORY_PREFER_HTTPS !== 'false';
this.requestId = 0;
this.discovery = new MDNSDiscovery();
this.discoveredEndpoint = null;
}
/**
* Initialize the bridge by discovering or configuring the endpoint
*/
async initialize() {
if (this.endpoint) {
// Manual configuration takes precedence
console.error(`Using manual endpoint: ${this.endpoint}`);
return true;
}
if (this.autoDiscover) {
console.error('Attempting to discover MCP Memory Service via mDNS...');
try {
const services = await this.discovery.discoverServices();
if (services.length > 0) {
// Sort services by preference (HTTPS first if preferred)
services.sort((a, b) => {
if (this.preferHttps) {
if (a.https !== b.https) return b.https - a.https;
}
return a.port - b.port; // Prefer standard ports
});
const service = services[0];
const protocol = service.https ? 'https' : 'http';
this.discoveredEndpoint = `${protocol}://${service.host}:${service.port}/api`;
this.endpoint = this.discoveredEndpoint;
console.error(`Discovered service: ${this.endpoint}`);
// Test the discovered endpoint
const healthy = await this.testEndpoint(this.endpoint);
if (!healthy) {
console.error('Discovered endpoint failed health check, trying alternatives...');
// Try other discovered services
for (let i = 1; i < services.length; i++) {
const altService = services[i];
const altProtocol = altService.https ? 'https' : 'http';
const altEndpoint = `${altProtocol}://${altService.host}:${altService.port}/api`;
if (await this.testEndpoint(altEndpoint)) {
this.endpoint = altEndpoint;
console.error(`Using alternative endpoint: ${this.endpoint}`);
return true;
}
}
console.error('No healthy services found');
return false;
}
return true;
} else {
console.error('No MCP Memory Services discovered');
return false;
}
} catch (error) {
console.error(`Discovery failed: ${error.message}`);
return false;
}
}
// Default fallback
this.endpoint = 'http://localhost:8000/api';
console.error(`Using default endpoint: ${this.endpoint}`);
return true;
}
/**
* Test if an endpoint is healthy
*/
async testEndpoint(endpoint) {
try {
const healthUrl = `${endpoint}/api/health`;
const response = await this.makeRequestInternal(healthUrl, 'GET', null, 3000); // 3 second timeout
return response.statusCode === 200;
} catch (error) {
return false;
}
}
/**
* Make HTTP request to the MCP Memory Service with retry logic
*/
async makeRequest(path, method = 'GET', data = null, maxRetries = 3) {
let lastError;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
console.error(`Attempt ${attempt}/${maxRetries} for ${method} ${path}`);
const result = await this.makeRequestInternal(path, method, data);
if (attempt > 1) {
console.error(`Request succeeded on attempt ${attempt}`);
}
return result;
} catch (error) {
lastError = error;
console.error(`Attempt ${attempt} failed: ${error.message}`);
if (attempt < maxRetries) {
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 5000); // Exponential backoff, max 5s
console.error(`Retrying in ${delay}ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
} else {
console.error(`All ${maxRetries} attempts failed. Last error: ${error.message}`);
}
}
}
throw lastError;
}
/**
* Internal HTTP request method with timeout support and comprehensive logging
*/
async makeRequestInternal(path, method = 'GET', data = null, timeout = 10000) {
const startTime = Date.now();
const requestId = Math.random().toString(36).substr(2, 9);
console.error(`[${requestId}] Starting ${method} request to ${path}`);
return new Promise((resolve, reject) => {
// Use URL constructor's built-in path resolution to avoid duplicate base paths
// Ensure endpoint has trailing slash for proper relative path resolution
const baseUrl = this.endpoint.endsWith('/') ? this.endpoint : this.endpoint + '/';
const url = new URL(path, baseUrl);
const protocol = url.protocol === 'https:' ? https : http;
console.error(`[${requestId}] Full URL: ${url.toString()}`);
console.error(`[${requestId}] Using protocol: ${url.protocol}`);
const options = {
hostname: url.hostname,
port: url.port || (url.protocol === 'https:' ? 443 : 80),
path: url.pathname + url.search,
method: method,
headers: {
'Content-Type': 'application/json',
'User-Agent': 'MCP-HTTP-Bridge/2.0',
'Connection': 'close'
},
timeout: timeout,
keepAlive: false
};
// For HTTPS, create custom agent for self-signed certificates with TLS 1.3
if (url.protocol === 'https:') {
const agent = new https.Agent({
rejectUnauthorized: false,
requestCert: false,
checkServerIdentity: () => undefined,
keepAlive: false
});
options.agent = agent;
console.error(`[${requestId}] Using custom HTTPS agent with default TLS settings`);
}
if (this.apiKey) {
options.headers['Authorization'] = `Bearer ${this.apiKey}`;
console.error(`[${requestId}] API key added to headers`);
}
if (data) {
const postData = JSON.stringify(data);
options.headers['Content-Length'] = Buffer.byteLength(postData);
console.error(`[${requestId}] Request body size: ${Buffer.byteLength(postData)} bytes`);
}
console.error(`[${requestId}] Request options:`, JSON.stringify(options, null, 2));
const req = protocol.request(options, (res) => {
const responseStartTime = Date.now();
console.error(`[${requestId}] Response received after ${responseStartTime - startTime}ms`);
console.error(`[${requestId}] Status code: ${res.statusCode}`);
console.error(`[${requestId}] Response headers:`, JSON.stringify(res.headers, null, 2));
let responseData = '';
res.on('data', (chunk) => {
responseData += chunk;
console.error(`[${requestId}] Received ${chunk.length} bytes`);
});
res.on('end', () => {
const endTime = Date.now();
console.error(`[${requestId}] Response completed after ${endTime - startTime}ms total`);
console.error(`[${requestId}] Response body: ${responseData}`);
try {
const result = JSON.parse(responseData);
resolve({ statusCode: res.statusCode, data: result });
} catch (error) {
console.error(`[${requestId}] JSON parse error: ${error.message}`);
reject(new Error(`Invalid JSON response: ${responseData}`));
}
});
});
req.on('error', (error) => {
const errorTime = Date.now();
console.error(`[${requestId}] Request error after ${errorTime - startTime}ms: ${error.message}`);
console.error(`[${requestId}] Error details:`, error);
reject(error);
});
req.on('timeout', () => {
const timeoutTime = Date.now();
console.error(`[${requestId}] Request timeout after ${timeoutTime - startTime}ms (limit: ${timeout}ms)`);
req.destroy();
reject(new Error(`Request timeout after ${timeout}ms`));
});
console.error(`[${requestId}] Sending request...`);
if (data) {
const postData = JSON.stringify(data);
console.error(`[${requestId}] Writing request body: ${postData}`);
req.write(postData);
}
req.end();
console.error(`[${requestId}] Request sent, waiting for response...`);
});
}
/**
* Handle MCP store_memory operation
*/
async storeMemory(params) {
try {
const response = await this.makeRequest('memories', 'POST', {
content: params.content,
tags: params.metadata?.tags || [],
memory_type: params.metadata?.type || 'note',
metadata: params.metadata || {}
});
if (response.statusCode === 200 || response.statusCode === 201) {
// Server returns 200 with success field indicating actual result
if (response.data.success) {
return { success: true, message: response.data.message || 'Memory stored successfully' };
} else {
return { success: false, message: response.data.message || response.data.detail || 'Failed to store memory' };
}
} else {
return { success: false, message: response.data.detail || 'Failed to store memory' };
}
} catch (error) {
return { success: false, message: error.message };
}
}
/**
* Handle MCP retrieve_memory operation
*/
async retrieveMemory(params) {
try {
const queryParams = new URLSearchParams({
q: params.query,
n_results: params.n_results || 5
});
const response = await this.makeRequest(`search?${queryParams}`, 'GET');
if (response.statusCode === 200) {
return {
memories: response.data.results.map(result => ({
content: result.memory.content,
metadata: {
tags: result.memory.tags,
type: result.memory.memory_type,
created_at: result.memory.created_at_iso,
relevance_score: result.relevance_score
}
}))
};
} else {
return { memories: [] };
}
} catch (error) {
return { memories: [] };
}
}
/**
* Handle MCP search_by_tag operation
*/
async searchByTag(params) {
try {
const queryParams = new URLSearchParams();
if (Array.isArray(params.tags)) {
params.tags.forEach(tag => queryParams.append('tags', tag));
} else if (typeof params.tags === 'string') {
queryParams.append('tags', params.tags);
}
const response = await this.makeRequest(`memories/search/tags?${queryParams}`, 'GET');
if (response.statusCode === 200) {
return {
memories: response.data.memories.map(memory => ({
content: memory.content,
metadata: {
tags: memory.tags,
type: memory.memory_type,
created_at: memory.created_at_iso
}
}))
};
} else {
return { memories: [] };
}
} catch (error) {
return { memories: [] };
}
}
/**
* Handle MCP delete_memory operation
*/
async deleteMemory(params) {
try {
const response = await this.makeRequest(`memories/${params.content_hash}`, 'DELETE');
if (response.statusCode === 200) {
return { success: true, message: 'Memory deleted successfully' };
} else {
return { success: false, message: response.data.detail || 'Failed to delete memory' };
}
} catch (error) {
return { success: false, message: error.message };
}
}
/**
* Handle MCP check_database_health operation
*/
async checkHealth(params = {}) {
try {
const response = await this.makeRequest('health', 'GET');
if (response.statusCode === 200) {
return {
status: response.data.status,
backend: response.data.storage_type,
statistics: response.data.statistics || {}
};
} else {
return { status: 'unhealthy', backend: 'unknown', statistics: {} };
}
} catch (error) {
// Handle errors that may not have a message property (like ECONNREFUSED)
const errorMessage = error.message || error.code || error.toString() || 'Unknown error';
return { status: 'error', backend: 'unknown', statistics: {}, error: errorMessage };
}
}
/**
* Process MCP JSON-RPC request
*/
async processRequest(request) {
const { method, params, id } = request;
let result;
try {
switch (method) {
case 'initialize':
result = {
protocolVersion: "2024-11-05",
capabilities: {
tools: {
listChanged: false
}
},
serverInfo: {
name: "mcp-memory-service",
version: "2.0.0"
}
};
break;
case 'notifications/initialized':
// No response needed for notifications
return null;
case 'tools/list':
result = {
tools: [
{
name: "store_memory",
description: "Store a memory with content and optional metadata",
inputSchema: {
type: "object",
properties: {
content: { type: "string", description: "The content to store" },
metadata: {
type: "object",
properties: {
tags: { type: "array", items: { type: "string" } },
type: { type: "string" }
}
}
},
required: ["content"]
}
},
{
name: "retrieve_memory",
description: "Retrieve memories based on a query",
inputSchema: {
type: "object",
properties: {
query: { type: "string", description: "Search query" },
n_results: { type: "integer", description: "Number of results to return" }
},
required: ["query"]
}
},
{
name: "search_by_tag",
description: "Search memories by tags",
inputSchema: {
type: "object",
properties: {
tags: {
oneOf: [
{ type: "string" },
{ type: "array", items: { type: "string" } }
]
}
},
required: ["tags"]
}
},
{
name: "delete_memory",
description: "Delete a memory by content hash",
inputSchema: {
type: "object",
properties: {
content_hash: { type: "string", description: "Hash of the content to delete" }
},
required: ["content_hash"]
}
},
{
name: "check_database_health",
description: "Check the health of the memory database",
inputSchema: {
type: "object",
properties: {}
}
}
]
};
break;
case 'tools/call':
const toolName = params.name;
const toolParams = params.arguments || {};
console.error(`Processing tool call: ${toolName} with params:`, JSON.stringify(toolParams));
let toolResult;
switch (toolName) {
case 'store_memory':
toolResult = await this.storeMemory(toolParams);
break;
case 'retrieve_memory':
toolResult = await this.retrieveMemory(toolParams);
break;
case 'search_by_tag':
toolResult = await this.searchByTag(toolParams);
break;
case 'delete_memory':
toolResult = await this.deleteMemory(toolParams);
break;
case 'check_database_health':
toolResult = await this.checkHealth(toolParams);
break;
default:
throw new Error(`Unknown tool: ${toolName}`);
}
console.error(`Tool result:`, JSON.stringify(toolResult));
return {
jsonrpc: "2.0",
id: id,
result: {
content: [
{
type: "text",
text: JSON.stringify(toolResult, null, 2)
}
]
}
};
case 'store_memory':
result = await this.storeMemory(params);
break;
case 'retrieve_memory':
result = await this.retrieveMemory(params);
break;
case 'search_by_tag':
result = await this.searchByTag(params);
break;
case 'delete_memory':
result = await this.deleteMemory(params);
break;
case 'check_database_health':
result = await this.checkHealth(params);
break;
default:
throw new Error(`Unknown method: ${method}`);
}
return {
jsonrpc: "2.0",
id: id,
result: result
};
} catch (error) {
return {
jsonrpc: "2.0",
id: id,
error: {
code: -32000,
message: error.message
}
};
}
}
/**
* Start the bridge server
*/
async start() {
console.error(`MCP HTTP Bridge starting...`);
// Initialize the bridge (discovery or manual config)
const initialized = await this.initialize();
if (!initialized) {
console.error('Failed to initialize bridge - no endpoint available');
process.exit(1);
}
console.error(`Endpoint: ${this.endpoint}`);
console.error(`API Key: ${this.apiKey ? '[SET]' : '[NOT SET]'}`);
console.error(`Auto-discovery: ${this.autoDiscover ? 'ENABLED' : 'DISABLED'}`);
console.error(`Prefer HTTPS: ${this.preferHttps ? 'YES' : 'NO'}`);
if (this.discoveredEndpoint) {
console.error(`Service discovered automatically via mDNS`);
}
let buffer = '';
process.stdin.on('data', async (chunk) => {
buffer += chunk.toString();
// Process complete JSON-RPC messages
let newlineIndex;
while ((newlineIndex = buffer.indexOf('\n')) !== -1) {
const line = buffer.slice(0, newlineIndex).trim();
buffer = buffer.slice(newlineIndex + 1);
if (line) {
try {
const request = JSON.parse(line);
const response = await this.processRequest(request);
console.log(JSON.stringify(response));
} catch (error) {
console.error(`Error processing request: ${error.message}`);
console.log(JSON.stringify({
jsonrpc: "2.0",
id: null,
error: {
code: -32700,
message: "Parse error"
}
}));
}
}
}
});
process.stdin.on('end', () => {
process.exit(0);
});
// Handle graceful shutdown
process.on('SIGINT', () => {
console.error('Shutting down HTTP Bridge...');
process.exit(0);
});
process.on('SIGTERM', () => {
console.error('Shutting down HTTP Bridge...');
process.exit(0);
});
}
}
// Start the bridge if this file is run directly
if (require.main === module) {
const bridge = new HTTPMCPBridge();
bridge.start().catch(error => {
console.error(`Failed to start bridge: ${error.message}`);
process.exit(1);
});
}
module.exports = HTTPMCPBridge;
```