This is page 10 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── amp-bridge.md
│ │ ├── amp-pr-automator.md
│ │ ├── code-quality-guard.md
│ │ ├── gemini-pr-automator.md
│ │ └── github-release-manager.md
│ ├── settings.local.json.backup
│ └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── performance_issue.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── bridge-tests.yml
│ ├── CACHE_FIX.md
│ ├── claude-code-review.yml
│ ├── claude.yml
│ ├── cleanup-images.yml.disabled
│ ├── dev-setup-validation.yml
│ ├── docker-publish.yml
│ ├── LATEST_FIXES.md
│ ├── main-optimized.yml.disabled
│ ├── main.yml
│ ├── publish-and-test.yml
│ ├── README_OPTIMIZATION.md
│ ├── release-tag.yml.disabled
│ ├── release.yml
│ ├── roadmap-review-reminder.yml
│ ├── SECRET_CONDITIONAL_FIX.md
│ └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│ ├── .gitignore
│ └── reports
│ └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│ ├── deployment
│ │ ├── deploy_fastmcp_fixed.sh
│ │ ├── deploy_http_with_mcp.sh
│ │ └── deploy_mcp_v4.sh
│ ├── deployment-configs
│ │ ├── empty_config.yml
│ │ └── smithery.yaml
│ ├── development
│ │ └── test_fastmcp.py
│ ├── docs-removed-2025-08-23
│ │ ├── authentication.md
│ │ ├── claude_integration.md
│ │ ├── claude-code-compatibility.md
│ │ ├── claude-code-integration.md
│ │ ├── claude-code-quickstart.md
│ │ ├── claude-desktop-setup.md
│ │ ├── complete-setup-guide.md
│ │ ├── database-synchronization.md
│ │ ├── development
│ │ │ ├── autonomous-memory-consolidation.md
│ │ │ ├── CLEANUP_PLAN.md
│ │ │ ├── CLEANUP_README.md
│ │ │ ├── CLEANUP_SUMMARY.md
│ │ │ ├── dream-inspired-memory-consolidation.md
│ │ │ ├── hybrid-slm-memory-consolidation.md
│ │ │ ├── mcp-milestone.md
│ │ │ ├── multi-client-architecture.md
│ │ │ ├── test-results.md
│ │ │ └── TIMESTAMP_FIX_SUMMARY.md
│ │ ├── distributed-sync.md
│ │ ├── invocation_guide.md
│ │ ├── macos-intel.md
│ │ ├── master-guide.md
│ │ ├── mcp-client-configuration.md
│ │ ├── multi-client-server.md
│ │ ├── service-installation.md
│ │ ├── sessions
│ │ │ └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│ │ ├── UBUNTU_SETUP.md
│ │ ├── ubuntu.md
│ │ ├── windows-setup.md
│ │ └── windows.md
│ ├── docs-root-cleanup-2025-08-23
│ │ ├── AWESOME_LIST_SUBMISSION.md
│ │ ├── CLOUDFLARE_IMPLEMENTATION.md
│ │ ├── DOCUMENTATION_ANALYSIS.md
│ │ ├── DOCUMENTATION_CLEANUP_PLAN.md
│ │ ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│ │ ├── LITESTREAM_SETUP_GUIDE.md
│ │ ├── lm_studio_system_prompt.md
│ │ ├── PYTORCH_DOWNLOAD_FIX.md
│ │ └── README-ORIGINAL-BACKUP.md
│ ├── investigations
│ │ └── MACOS_HOOKS_INVESTIGATION.md
│ ├── litestream-configs-v6.3.0
│ │ ├── install_service.sh
│ │ ├── litestream_master_config_fixed.yml
│ │ ├── litestream_master_config.yml
│ │ ├── litestream_replica_config_fixed.yml
│ │ ├── litestream_replica_config.yml
│ │ ├── litestream_replica_simple.yml
│ │ ├── litestream-http.service
│ │ ├── litestream.service
│ │ └── requirements-cloudflare.txt
│ ├── release-notes
│ │ └── release-notes-v7.1.4.md
│ └── setup-development
│ ├── README.md
│ ├── setup_consolidation_mdns.sh
│ ├── STARTUP_SETUP_GUIDE.md
│ └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│ ├── memory-context.md
│ ├── memory-health.md
│ ├── memory-ingest-dir.md
│ ├── memory-ingest.md
│ ├── memory-recall.md
│ ├── memory-search.md
│ ├── memory-store.md
│ ├── README.md
│ └── session-start.md
├── claude-hooks
│ ├── config.json
│ ├── config.template.json
│ ├── CONFIGURATION.md
│ ├── core
│ │ ├── memory-retrieval.js
│ │ ├── mid-conversation.js
│ │ ├── session-end.js
│ │ ├── session-start.js
│ │ └── topic-change.js
│ ├── debug-pattern-test.js
│ ├── install_claude_hooks_windows.ps1
│ ├── install_hooks.py
│ ├── memory-mode-controller.js
│ ├── MIGRATION.md
│ ├── README-NATURAL-TRIGGERS.md
│ ├── README-phase2.md
│ ├── README.md
│ ├── simple-test.js
│ ├── statusline.sh
│ ├── test-adaptive-weights.js
│ ├── test-dual-protocol-hook.js
│ ├── test-mcp-hook.js
│ ├── test-natural-triggers.js
│ ├── test-recency-scoring.js
│ ├── tests
│ │ ├── integration-test.js
│ │ ├── phase2-integration-test.js
│ │ ├── test-code-execution.js
│ │ ├── test-cross-session.json
│ │ ├── test-session-tracking.json
│ │ └── test-threading.json
│ ├── utilities
│ │ ├── adaptive-pattern-detector.js
│ │ ├── context-formatter.js
│ │ ├── context-shift-detector.js
│ │ ├── conversation-analyzer.js
│ │ ├── dynamic-context-updater.js
│ │ ├── git-analyzer.js
│ │ ├── mcp-client.js
│ │ ├── memory-client.js
│ │ ├── memory-scorer.js
│ │ ├── performance-manager.js
│ │ ├── project-detector.js
│ │ ├── session-tracker.js
│ │ ├── tiered-conversation-monitor.js
│ │ └── version-checker.js
│ └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│ ├── amp-cli-bridge.md
│ ├── api
│ │ ├── code-execution-interface.md
│ │ ├── memory-metadata-api.md
│ │ ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_REPORT.md
│ │ └── tag-standardization.md
│ ├── architecture
│ │ ├── search-enhancement-spec.md
│ │ └── search-examples.md
│ ├── architecture.md
│ ├── archive
│ │ └── obsolete-workflows
│ │ ├── load_memory_context.md
│ │ └── README.md
│ ├── assets
│ │ └── images
│ │ ├── dashboard-v3.3.0-preview.png
│ │ ├── memory-awareness-hooks-example.png
│ │ ├── project-infographic.svg
│ │ └── README.md
│ ├── CLAUDE_CODE_QUICK_REFERENCE.md
│ ├── cloudflare-setup.md
│ ├── deployment
│ │ ├── docker.md
│ │ ├── dual-service.md
│ │ ├── production-guide.md
│ │ └── systemd-service.md
│ ├── development
│ │ ├── ai-agent-instructions.md
│ │ ├── code-quality
│ │ │ ├── phase-2a-completion.md
│ │ │ ├── phase-2a-handle-get-prompt.md
│ │ │ ├── phase-2a-index.md
│ │ │ ├── phase-2a-install-package.md
│ │ │ └── phase-2b-session-summary.md
│ │ ├── code-quality-workflow.md
│ │ ├── dashboard-workflow.md
│ │ ├── issue-management.md
│ │ ├── pr-review-guide.md
│ │ ├── refactoring-notes.md
│ │ ├── release-checklist.md
│ │ └── todo-tracker.md
│ ├── docker-optimized-build.md
│ ├── document-ingestion.md
│ ├── DOCUMENTATION_AUDIT.md
│ ├── enhancement-roadmap-issue-14.md
│ ├── examples
│ │ ├── analysis-scripts.js
│ │ ├── maintenance-session-example.md
│ │ ├── memory-distribution-chart.jsx
│ │ └── tag-schema.json
│ ├── first-time-setup.md
│ ├── glama-deployment.md
│ ├── guides
│ │ ├── advanced-command-examples.md
│ │ ├── chromadb-migration.md
│ │ ├── commands-vs-mcp-server.md
│ │ ├── mcp-enhancements.md
│ │ ├── mdns-service-discovery.md
│ │ ├── memory-consolidation-guide.md
│ │ ├── migration.md
│ │ ├── scripts.md
│ │ └── STORAGE_BACKENDS.md
│ ├── HOOK_IMPROVEMENTS.md
│ ├── hooks
│ │ └── phase2-code-execution-migration.md
│ ├── http-server-management.md
│ ├── ide-compatability.md
│ ├── IMAGE_RETENTION_POLICY.md
│ ├── images
│ │ └── dashboard-placeholder.md
│ ├── implementation
│ │ ├── health_checks.md
│ │ └── performance.md
│ ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│ ├── integration
│ │ ├── homebrew.md
│ │ └── multi-client.md
│ ├── integrations
│ │ ├── gemini.md
│ │ ├── groq-bridge.md
│ │ ├── groq-integration-summary.md
│ │ └── groq-model-comparison.md
│ ├── integrations.md
│ ├── legacy
│ │ └── dual-protocol-hooks.md
│ ├── LM_STUDIO_COMPATIBILITY.md
│ ├── maintenance
│ │ └── memory-maintenance.md
│ ├── mastery
│ │ ├── api-reference.md
│ │ ├── architecture-overview.md
│ │ ├── configuration-guide.md
│ │ ├── local-setup-and-run.md
│ │ ├── testing-guide.md
│ │ └── troubleshooting.md
│ ├── migration
│ │ └── code-execution-api-quick-start.md
│ ├── natural-memory-triggers
│ │ ├── cli-reference.md
│ │ ├── installation-guide.md
│ │ └── performance-optimization.md
│ ├── oauth-setup.md
│ ├── pr-graphql-integration.md
│ ├── quick-setup-cloudflare-dual-environment.md
│ ├── README.md
│ ├── remote-configuration-wiki-section.md
│ ├── research
│ │ ├── code-execution-interface-implementation.md
│ │ └── code-execution-interface-summary.md
│ ├── ROADMAP.md
│ ├── sqlite-vec-backend.md
│ ├── statistics
│ │ ├── charts
│ │ │ ├── activity_patterns.png
│ │ │ ├── contributors.png
│ │ │ ├── growth_trajectory.png
│ │ │ ├── monthly_activity.png
│ │ │ └── october_sprint.png
│ │ ├── data
│ │ │ ├── activity_by_day.csv
│ │ │ ├── activity_by_hour.csv
│ │ │ ├── contributors.csv
│ │ │ └── monthly_activity.csv
│ │ ├── generate_charts.py
│ │ └── REPOSITORY_STATISTICS.md
│ ├── technical
│ │ ├── development.md
│ │ ├── memory-migration.md
│ │ ├── migration-log.md
│ │ ├── sqlite-vec-embedding-fixes.md
│ │ └── tag-storage.md
│ ├── testing
│ │ └── regression-tests.md
│ ├── testing-cloudflare-backend.md
│ ├── troubleshooting
│ │ ├── cloudflare-api-token-setup.md
│ │ ├── cloudflare-authentication.md
│ │ ├── general.md
│ │ ├── hooks-quick-reference.md
│ │ ├── pr162-schema-caching-issue.md
│ │ ├── session-end-hooks.md
│ │ └── sync-issues.md
│ └── tutorials
│ ├── advanced-techniques.md
│ ├── data-analysis.md
│ └── demo-session-walkthrough.md
├── examples
│ ├── claude_desktop_config_template.json
│ ├── claude_desktop_config_windows.json
│ ├── claude-desktop-http-config.json
│ ├── config
│ │ └── claude_desktop_config.json
│ ├── http-mcp-bridge.js
│ ├── memory_export_template.json
│ ├── README.md
│ ├── setup
│ │ └── setup_multi_client_complete.py
│ └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│ ├── .claude
│ │ └── settings.local.json
│ ├── archive
│ │ └── check_missing_timestamps.py
│ ├── backup
│ │ ├── backup_memories.py
│ │ ├── backup_sqlite_vec.sh
│ │ ├── export_distributable_memories.sh
│ │ └── restore_memories.py
│ ├── benchmarks
│ │ ├── benchmark_code_execution_api.py
│ │ ├── benchmark_hybrid_sync.py
│ │ └── benchmark_server_caching.py
│ ├── database
│ │ ├── analyze_sqlite_vec_db.py
│ │ ├── check_sqlite_vec_status.py
│ │ ├── db_health_check.py
│ │ └── simple_timestamp_check.py
│ ├── development
│ │ ├── debug_server_initialization.py
│ │ ├── find_orphaned_files.py
│ │ ├── fix_mdns.sh
│ │ ├── fix_sitecustomize.py
│ │ ├── remote_ingest.sh
│ │ ├── setup-git-merge-drivers.sh
│ │ ├── uv-lock-merge.sh
│ │ └── verify_hybrid_sync.py
│ ├── hooks
│ │ └── pre-commit
│ ├── installation
│ │ ├── install_linux_service.py
│ │ ├── install_macos_service.py
│ │ ├── install_uv.py
│ │ ├── install_windows_service.py
│ │ ├── install.py
│ │ ├── setup_backup_cron.sh
│ │ ├── setup_claude_mcp.sh
│ │ └── setup_cloudflare_resources.py
│ ├── linux
│ │ ├── service_status.sh
│ │ ├── start_service.sh
│ │ ├── stop_service.sh
│ │ ├── uninstall_service.sh
│ │ └── view_logs.sh
│ ├── maintenance
│ │ ├── assign_memory_types.py
│ │ ├── check_memory_types.py
│ │ ├── cleanup_corrupted_encoding.py
│ │ ├── cleanup_memories.py
│ │ ├── cleanup_organize.py
│ │ ├── consolidate_memory_types.py
│ │ ├── consolidation_mappings.json
│ │ ├── delete_orphaned_vectors_fixed.py
│ │ ├── fast_cleanup_duplicates_with_tracking.sh
│ │ ├── find_all_duplicates.py
│ │ ├── find_cloudflare_duplicates.py
│ │ ├── find_duplicates.py
│ │ ├── memory-types.md
│ │ ├── README.md
│ │ ├── recover_timestamps_from_cloudflare.py
│ │ ├── regenerate_embeddings.py
│ │ ├── repair_malformed_tags.py
│ │ ├── repair_memories.py
│ │ ├── repair_sqlite_vec_embeddings.py
│ │ ├── repair_zero_embeddings.py
│ │ ├── restore_from_json_export.py
│ │ └── scan_todos.sh
│ ├── migration
│ │ ├── cleanup_mcp_timestamps.py
│ │ ├── legacy
│ │ │ └── migrate_chroma_to_sqlite.py
│ │ ├── mcp-migration.py
│ │ ├── migrate_sqlite_vec_embeddings.py
│ │ ├── migrate_storage.py
│ │ ├── migrate_tags.py
│ │ ├── migrate_timestamps.py
│ │ ├── migrate_to_cloudflare.py
│ │ ├── migrate_to_sqlite_vec.py
│ │ ├── migrate_v5_enhanced.py
│ │ ├── TIMESTAMP_CLEANUP_README.md
│ │ └── verify_mcp_timestamps.py
│ ├── pr
│ │ ├── amp_collect_results.sh
│ │ ├── amp_detect_breaking_changes.sh
│ │ ├── amp_generate_tests.sh
│ │ ├── amp_pr_review.sh
│ │ ├── amp_quality_gate.sh
│ │ ├── amp_suggest_fixes.sh
│ │ ├── auto_review.sh
│ │ ├── detect_breaking_changes.sh
│ │ ├── generate_tests.sh
│ │ ├── lib
│ │ │ └── graphql_helpers.sh
│ │ ├── quality_gate.sh
│ │ ├── resolve_threads.sh
│ │ ├── run_pyscn_analysis.sh
│ │ ├── run_quality_checks.sh
│ │ ├── thread_status.sh
│ │ └── watch_reviews.sh
│ ├── quality
│ │ ├── fix_dead_code_install.sh
│ │ ├── phase1_dead_code_analysis.md
│ │ ├── phase2_complexity_analysis.md
│ │ ├── README_PHASE1.md
│ │ ├── README_PHASE2.md
│ │ ├── track_pyscn_metrics.sh
│ │ └── weekly_quality_review.sh
│ ├── README.md
│ ├── run
│ │ ├── run_mcp_memory.sh
│ │ ├── run-with-uv.sh
│ │ └── start_sqlite_vec.sh
│ ├── run_memory_server.py
│ ├── server
│ │ ├── check_http_server.py
│ │ ├── check_server_health.py
│ │ ├── memory_offline.py
│ │ ├── preload_models.py
│ │ ├── run_http_server.py
│ │ ├── run_memory_server.py
│ │ ├── start_http_server.bat
│ │ └── start_http_server.sh
│ ├── service
│ │ ├── deploy_dual_services.sh
│ │ ├── install_http_service.sh
│ │ ├── mcp-memory-http.service
│ │ ├── mcp-memory.service
│ │ ├── memory_service_manager.sh
│ │ ├── service_control.sh
│ │ ├── service_utils.py
│ │ └── update_service.sh
│ ├── sync
│ │ ├── check_drift.py
│ │ ├── claude_sync_commands.py
│ │ ├── export_memories.py
│ │ ├── import_memories.py
│ │ ├── litestream
│ │ │ ├── apply_local_changes.sh
│ │ │ ├── enhanced_memory_store.sh
│ │ │ ├── init_staging_db.sh
│ │ │ ├── io.litestream.replication.plist
│ │ │ ├── manual_sync.sh
│ │ │ ├── memory_sync.sh
│ │ │ ├── pull_remote_changes.sh
│ │ │ ├── push_to_remote.sh
│ │ │ ├── README.md
│ │ │ ├── resolve_conflicts.sh
│ │ │ ├── setup_local_litestream.sh
│ │ │ ├── setup_remote_litestream.sh
│ │ │ ├── staging_db_init.sql
│ │ │ ├── stash_local_changes.sh
│ │ │ ├── sync_from_remote_noconfig.sh
│ │ │ └── sync_from_remote.sh
│ │ ├── README.md
│ │ ├── safe_cloudflare_update.sh
│ │ ├── sync_memory_backends.py
│ │ └── sync_now.py
│ ├── testing
│ │ ├── run_complete_test.py
│ │ ├── run_memory_test.sh
│ │ ├── simple_test.py
│ │ ├── test_cleanup_logic.py
│ │ ├── test_cloudflare_backend.py
│ │ ├── test_docker_functionality.py
│ │ ├── test_installation.py
│ │ ├── test_mdns.py
│ │ ├── test_memory_api.py
│ │ ├── test_memory_simple.py
│ │ ├── test_migration.py
│ │ ├── test_search_api.py
│ │ ├── test_sqlite_vec_embeddings.py
│ │ ├── test_sse_events.py
│ │ ├── test-connection.py
│ │ └── test-hook.js
│ ├── utils
│ │ ├── claude_commands_utils.py
│ │ ├── generate_personalized_claude_md.sh
│ │ ├── groq
│ │ ├── groq_agent_bridge.py
│ │ ├── list-collections.py
│ │ ├── memory_wrapper_uv.py
│ │ ├── query_memories.py
│ │ ├── smithery_wrapper.py
│ │ ├── test_groq_bridge.sh
│ │ └── uv_wrapper.py
│ └── validation
│ ├── check_dev_setup.py
│ ├── check_documentation_links.py
│ ├── diagnose_backend_config.py
│ ├── validate_configuration_complete.py
│ ├── validate_memories.py
│ ├── validate_migration.py
│ ├── validate_timestamp_integrity.py
│ ├── verify_environment.py
│ ├── verify_pytorch_windows.py
│ └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│ └── mcp_memory_service
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── operations.py
│ │ ├── sync_wrapper.py
│ │ └── types.py
│ ├── backup
│ │ ├── __init__.py
│ │ └── scheduler.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── ingestion.py
│ │ ├── main.py
│ │ └── utils.py
│ ├── config.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── associations.py
│ │ ├── base.py
│ │ ├── clustering.py
│ │ ├── compression.py
│ │ ├── consolidator.py
│ │ ├── decay.py
│ │ ├── forgetting.py
│ │ ├── health.py
│ │ └── scheduler.py
│ ├── dependency_check.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mdns_service.py
│ ├── embeddings
│ │ ├── __init__.py
│ │ └── onnx_embeddings.py
│ ├── ingestion
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chunker.py
│ │ ├── csv_loader.py
│ │ ├── json_loader.py
│ │ ├── pdf_loader.py
│ │ ├── registry.py
│ │ ├── semtools_loader.py
│ │ └── text_loader.py
│ ├── lm_studio_compat.py
│ ├── mcp_server.py
│ ├── models
│ │ ├── __init__.py
│ │ └── memory.py
│ ├── server.py
│ ├── services
│ │ ├── __init__.py
│ │ └── memory_service.py
│ ├── storage
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloudflare.py
│ │ ├── factory.py
│ │ ├── http_client.py
│ │ ├── hybrid.py
│ │ └── sqlite_vec.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── exporter.py
│ │ ├── importer.py
│ │ └── litestream_config.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── content_splitter.py
│ │ ├── db_utils.py
│ │ ├── debug.py
│ │ ├── document_processing.py
│ │ ├── gpu_detection.py
│ │ ├── hashing.py
│ │ ├── http_server_manager.py
│ │ ├── port_detection.py
│ │ ├── system_detection.py
│ │ └── time_parser.py
│ └── web
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── analytics.py
│ │ ├── backup.py
│ │ ├── consolidation.py
│ │ ├── documents.py
│ │ ├── events.py
│ │ ├── health.py
│ │ ├── manage.py
│ │ ├── mcp.py
│ │ ├── memories.py
│ │ ├── search.py
│ │ └── sync.py
│ ├── app.py
│ ├── dependencies.py
│ ├── oauth
│ │ ├── __init__.py
│ │ ├── authorization.py
│ │ ├── discovery.py
│ │ ├── middleware.py
│ │ ├── models.py
│ │ ├── registration.py
│ │ └── storage.py
│ ├── sse.py
│ └── static
│ ├── app.js
│ ├── index.html
│ ├── README.md
│ ├── sse_test.html
│ └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── test_compact_types.py
│ │ └── test_operations.py
│ ├── bridge
│ │ ├── mock_responses.js
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ └── test_http_mcp_bridge.js
│ ├── conftest.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_associations.py
│ │ ├── test_clustering.py
│ │ ├── test_compression.py
│ │ ├── test_consolidator.py
│ │ ├── test_decay.py
│ │ └── test_forgetting.py
│ ├── contracts
│ │ └── api-specification.yml
│ ├── integration
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── test_api_key_fallback.py
│ │ ├── test_api_memories_chronological.py
│ │ ├── test_api_tag_time_search.py
│ │ ├── test_api_with_memory_service.py
│ │ ├── test_bridge_integration.js
│ │ ├── test_cli_interfaces.py
│ │ ├── test_cloudflare_connection.py
│ │ ├── test_concurrent_clients.py
│ │ ├── test_data_serialization_consistency.py
│ │ ├── test_http_server_startup.py
│ │ ├── test_mcp_memory.py
│ │ ├── test_mdns_integration.py
│ │ ├── test_oauth_basic_auth.py
│ │ ├── test_oauth_flow.py
│ │ ├── test_server_handlers.py
│ │ └── test_store_memory.py
│ ├── performance
│ │ ├── test_background_sync.py
│ │ └── test_hybrid_live.py
│ ├── README.md
│ ├── smithery
│ │ └── test_smithery.py
│ ├── sqlite
│ │ └── simple_sqlite_vec_test.py
│ ├── test_client.py
│ ├── test_content_splitting.py
│ ├── test_database.py
│ ├── test_hybrid_cloudflare_limits.py
│ ├── test_hybrid_storage.py
│ ├── test_memory_ops.py
│ ├── test_semantic_search.py
│ ├── test_sqlite_vec_storage.py
│ ├── test_time_parser.py
│ ├── test_timestamp_preservation.py
│ ├── timestamp
│ │ ├── test_hook_vs_manual_storage.py
│ │ ├── test_issue99_final_validation.py
│ │ ├── test_search_retrieval_inconsistency.py
│ │ ├── test_timestamp_issue.py
│ │ └── test_timestamp_simple.py
│ └── unit
│ ├── conftest.py
│ ├── test_cloudflare_storage.py
│ ├── test_csv_loader.py
│ ├── test_fastapi_dependencies.py
│ ├── test_import.py
│ ├── test_json_loader.py
│ ├── test_mdns_simple.py
│ ├── test_mdns.py
│ ├── test_memory_service.py
│ ├── test_memory.py
│ ├── test_semtools_loader.py
│ ├── test_storage_interface_compatibility.py
│ └── test_tag_time_filtering.py
├── tools
│ ├── docker
│ │ ├── DEPRECATED.md
│ │ ├── docker-compose.http.yml
│ │ ├── docker-compose.pythonpath.yml
│ │ ├── docker-compose.standalone.yml
│ │ ├── docker-compose.uv.yml
│ │ ├── docker-compose.yml
│ │ ├── docker-entrypoint-persistent.sh
│ │ ├── docker-entrypoint-unified.sh
│ │ ├── docker-entrypoint.sh
│ │ ├── Dockerfile
│ │ ├── Dockerfile.glama
│ │ ├── Dockerfile.slim
│ │ ├── README.md
│ │ └── test-docker-modes.sh
│ └── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/scripts/development/fix_sitecustomize.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Enhanced fix script for sitecustomize.py recursion issues.
This script replaces the problematic sitecustomize.py with a fixed version
that works on Linux WSL2 with CUDA 12.4 and other platforms.
"""
import os
import sys
import site
import shutil
import platform
def print_info(text):
"""Print formatted info text."""
print(f"[INFO] {text}")
def print_error(text):
"""Print formatted error text."""
print(f"[ERROR] {text}")
def print_success(text):
"""Print formatted success text."""
print(f"[SUCCESS] {text}")
def print_warning(text):
"""Print formatted warning text."""
print(f"[WARNING] {text}")
def fix_sitecustomize():
"""Fix the sitecustomize.py file to prevent recursion."""
# Get site-packages directory
site_packages = site.getsitepackages()[0]
# Path to sitecustomize.py
sitecustomize_path = os.path.join(site_packages, 'sitecustomize.py')
# Check if file exists
if not os.path.exists(sitecustomize_path):
print_error(f"sitecustomize.py not found at {sitecustomize_path}")
return False
# Create backup
backup_path = sitecustomize_path + '.bak'
if not os.path.exists(backup_path):
print_info(f"Creating backup of sitecustomize.py at {backup_path}")
shutil.copy2(sitecustomize_path, backup_path)
print_success(f"Backup created at {backup_path}")
else:
print_warning(f"Backup already exists at {backup_path}")
# Create fixed sitecustomize.py
print_info(f"Creating fixed sitecustomize.py at {sitecustomize_path}")
# Detect system for platform-specific fixes
system = platform.system().lower()
is_wsl = "microsoft" in platform.release().lower() if system == "linux" else False
# Create content based on platform
if is_wsl:
# Special content for WSL with enhanced error handling
content = """# Fixed sitecustomize.py to prevent recursion issues on WSL
# Import standard library modules first to avoid recursion
import sys
import os
import importlib.util
import importlib.machinery
import warnings
# Disable warnings to reduce noise
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=ImportWarning)
# Print debug info to stderr to avoid interfering with MCP protocol
print("sitecustomize.py loaded", file=sys.stderr)
# Set environment variables to prevent pip from installing dependencies
os.environ["PIP_NO_DEPENDENCIES"] = "1"
os.environ["PIP_NO_INSTALL"] = "1"
# Disable automatic torch installation
os.environ["PYTORCH_IGNORE_DUPLICATE_MODULE_REGISTRATION"] = "1"
# Create a custom import hook to prevent automatic installation
class PreventAutoInstallImportHook:
def __init__(self):
self.blocked_packages = ['torch', 'torchvision', 'torchaudio', 'torchao']
# Keep track of packages we've already tried to find to prevent recursion
self.checked_packages = set()
def find_spec(self, fullname, path, target=None):
# Prevent recursion by checking if we've already tried to find this package
if fullname in self.checked_packages:
return None
# Check if this is a package we want to block
if any(fullname.startswith(pkg) for pkg in self.blocked_packages):
# Add to checked packages to prevent recursion
self.checked_packages.add(fullname)
# Try to find the package directly using the loader
try:
# Try to find the module directly
loader = importlib.machinery.PathFinder.find_spec(fullname, path)
if loader is not None:
return loader
except Exception:
pass
# If not found, print a warning and return None
print(f"WARNING: Blocked automatic installation of {fullname}", file=sys.stderr)
return None
# Return None to let the normal import system handle it
return None
# Register the import hook
sys.meta_path.insert(0, PreventAutoInstallImportHook())
# Disable distutils setup hooks that can cause recursion
try:
import setuptools
setuptools._distutils_hack = None
except Exception:
pass
# Disable _distutils_hack completely
sys.modules['_distutils_hack'] = None
"""
else:
# Standard content for other platforms
content = """# Fixed sitecustomize.py to prevent recursion issues
import sys
import os
import importlib.util
import importlib.machinery
# Print debug info
print("sitecustomize.py loaded", file=sys.stderr)
# Set environment variables to prevent pip from installing dependencies
os.environ["PIP_NO_DEPENDENCIES"] = "1"
os.environ["PIP_NO_INSTALL"] = "1"
# Create a custom import hook to prevent automatic installation
class PreventAutoInstallImportHook:
def __init__(self):
self.blocked_packages = ['torch', 'torchvision', 'torchaudio']
# Keep track of packages we've already tried to find to prevent recursion
self.checked_packages = set()
def find_spec(self, fullname, path, target=None):
# Prevent recursion by checking if we've already tried to find this package
if fullname in self.checked_packages:
return None
# Check if this is a package we want to block
if any(fullname.startswith(pkg) for pkg in self.blocked_packages):
# Add to checked packages to prevent recursion
self.checked_packages.add(fullname)
# Try to find the package directly using the loader
try:
# Try to find the module directly
loader = importlib.machinery.PathFinder.find_spec(fullname, path)
if loader is not None:
return loader
except Exception:
pass
# If not found, print a warning and return None
print(f"WARNING: Blocked automatic installation of {fullname}", file=sys.stderr)
return None
# Return None to let the normal import system handle it
return None
# Register the import hook
sys.meta_path.insert(0, PreventAutoInstallImportHook())
"""
# Write the content to the file
with open(sitecustomize_path, 'w') as f:
f.write(content)
print_success(f"Fixed sitecustomize.py created at {sitecustomize_path}")
# Additional fix for distutils on WSL
if is_wsl:
try:
# Try to fix _distutils_hack.py
distutils_hack_path = os.path.join(site_packages, '_distutils_hack', '__init__.py')
if os.path.exists(distutils_hack_path):
print_info(f"Fixing _distutils_hack at {distutils_hack_path}")
# Create backup
hack_backup_path = distutils_hack_path + '.bak'
if not os.path.exists(hack_backup_path):
shutil.copy2(distutils_hack_path, hack_backup_path)
print_success(f"Backup created at {hack_backup_path}")
# Read the file
with open(distutils_hack_path, 'r') as f:
content = f.read()
# Modify the content to disable the problematic parts
content = content.replace("def do_override():", "def do_override():\n return")
# Write the modified content
with open(distutils_hack_path, 'w') as f:
f.write(content)
print_success(f"Fixed _distutils_hack at {distutils_hack_path}")
except Exception as e:
print_warning(f"Could not fix _distutils_hack: {e}")
return True
def main():
"""Main function."""
print_info("Enhanced fix for sitecustomize.py to prevent recursion issues")
if fix_sitecustomize():
print_success("sitecustomize.py fixed successfully")
else:
print_error("Failed to fix sitecustomize.py")
sys.exit(1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/dependency_check.py:
--------------------------------------------------------------------------------
```python
"""
Dependency pre-check to ensure all required packages are installed.
This prevents runtime downloads during server initialization that cause timeouts.
"""
import sys
import subprocess
import platform
import logging
import os
from typing import Tuple, Optional
logger = logging.getLogger(__name__)
def detect_mcp_client_simple():
"""Simple MCP client detection for dependency checking."""
try:
# Check environment variables first
if os.getenv('LM_STUDIO'):
return 'lm_studio'
if os.getenv('CLAUDE_DESKTOP'):
return 'claude_desktop'
import psutil
current_process = psutil.Process()
parent = current_process.parent()
if parent:
parent_name = parent.name().lower()
if 'claude' in parent_name:
return 'claude_desktop'
if 'lmstudio' in parent_name or 'lm-studio' in parent_name:
return 'lm_studio'
# Default to Claude Desktop for strict mode
return 'claude_desktop'
except:
return 'claude_desktop'
def check_torch_installed() -> Tuple[bool, Optional[str]]:
"""
Check if PyTorch is properly installed.
Returns (is_installed, version_string)
"""
try:
import torch
# Check if torch has __version__ attribute (it should)
version = getattr(torch, '__version__', 'unknown')
# Also verify torch is functional
try:
_ = torch.tensor([1.0])
return True, version
except Exception:
return False, None
except ImportError:
return False, None
def check_sentence_transformers_installed() -> Tuple[bool, Optional[str]]:
"""
Check if sentence-transformers is properly installed.
Returns (is_installed, version_string)
"""
try:
import sentence_transformers
return True, sentence_transformers.__version__
except ImportError:
return False, None
def check_critical_dependencies() -> Tuple[bool, list]:
"""
Check if all critical dependencies are installed.
Returns (all_installed, missing_packages)
"""
missing = []
# Check PyTorch
torch_installed, torch_version = check_torch_installed()
if not torch_installed:
missing.append("torch")
else:
logger.debug(f"PyTorch {torch_version} is installed")
# Check sentence-transformers
st_installed, st_version = check_sentence_transformers_installed()
if not st_installed:
missing.append("sentence-transformers")
else:
logger.debug(f"sentence-transformers {st_version} is installed")
# Check other critical packages
critical_packages = [
"sqlite-vec",
"mcp",
"aiohttp",
"fastapi",
"uvicorn"
]
for package in critical_packages:
try:
__import__(package.replace("-", "_"))
logger.debug(f"{package} is installed")
except ImportError:
missing.append(package)
return len(missing) == 0, missing
def suggest_installation_command(missing_packages: list) -> str:
"""
Generate the appropriate installation command for missing packages.
"""
if not missing_packages:
return ""
# For Windows, suggest running install.py
if platform.system() == "Windows":
return "python install.py"
else:
return "python install.py"
def run_dependency_check() -> bool:
"""
Run the dependency check and provide user feedback.
Returns True if all dependencies are satisfied, False otherwise.
"""
client_type = detect_mcp_client_simple()
all_installed, missing = check_critical_dependencies()
# Only show output for LM Studio to avoid JSON parsing errors in Claude Desktop
if client_type == 'lm_studio':
print("\n=== MCP Memory Service Dependency Check ===", file=sys.stdout, flush=True)
if all_installed:
print("[OK] All dependencies are installed", file=sys.stdout, flush=True)
else:
print(f"[MISSING] Missing dependencies detected: {', '.join(missing)}", file=sys.stdout, flush=True)
print("\n[WARNING] IMPORTANT: Missing dependencies will cause timeouts!", file=sys.stdout, flush=True)
print("[INSTALL] To install missing dependencies, run:", file=sys.stdout, flush=True)
print(f" {suggest_installation_command(missing)}", file=sys.stdout, flush=True)
print("\nThe server will attempt to continue, but may timeout during initialization.", file=sys.stdout, flush=True)
print("============================================\n", file=sys.stdout, flush=True)
return all_installed
def is_first_run() -> bool:
"""
Check if this appears to be the first run of the server.
Enhanced for Windows and Claude Desktop environments.
"""
# Enhanced cache detection for Windows and different environments
cache_indicators = []
# Standard HuggingFace cache locations
cache_indicators.extend([
os.path.expanduser("~/.cache/huggingface/hub"),
os.path.expanduser("~/.cache/torch/sentence_transformers"),
])
# Windows-specific locations
if platform.system() == "Windows":
username = os.environ.get('USERNAME', os.environ.get('USER', ''))
cache_indicators.extend([
f"C:\\Users\\{username}\\.cache\\huggingface\\hub",
f"C:\\Users\\{username}\\.cache\\torch\\sentence_transformers",
f"C:\\Users\\{username}\\AppData\\Local\\huggingface\\hub",
f"C:\\Users\\{username}\\AppData\\Local\\torch\\sentence_transformers",
os.path.expanduser("~/AppData/Local/sentence-transformers"),
])
# Check environment variables for custom cache locations
hf_home = os.environ.get('HF_HOME')
if hf_home:
cache_indicators.append(os.path.join(hf_home, 'hub'))
transformers_cache = os.environ.get('TRANSFORMERS_CACHE')
if transformers_cache:
cache_indicators.append(transformers_cache)
sentence_transformers_home = os.environ.get('SENTENCE_TRANSFORMERS_HOME')
if sentence_transformers_home:
cache_indicators.append(sentence_transformers_home)
# Check each cache location
for path in cache_indicators:
if os.path.exists(path):
try:
contents = os.listdir(path)
# Look for sentence-transformers models specifically
for item in contents:
item_lower = item.lower()
# Check for common sentence-transformers model indicators
if any(indicator in item_lower for indicator in [
'sentence-transformers', 'miniml', 'all-miniml',
'paraphrase', 'distilbert', 'mpnet', 'roberta'
]):
logger.debug(f"Found cached model in {path}: {item}")
return False
# Also check for any model directories
for item in contents:
item_path = os.path.join(path, item)
if os.path.isdir(item_path):
try:
sub_contents = os.listdir(item_path)
# Look for model files
if any(f.endswith(('.bin', '.safetensors', '.json')) for f in sub_contents):
logger.debug(f"Found model files in {item_path}")
return False
except (OSError, PermissionError):
continue
except (OSError, PermissionError):
logger.debug(f"Could not access cache directory: {path}")
continue
logger.debug("No cached sentence-transformers models found - this appears to be first run")
return True
def get_recommended_timeout() -> float:
"""
Get the recommended timeout based on system and dependencies.
"""
# Check if dependencies are missing
all_installed, missing = check_critical_dependencies()
# Check if it's first run (models need downloading)
first_run = is_first_run()
# Base timeout
timeout = 30.0 if platform.system() == "Windows" else 15.0
# Extend timeout if dependencies are missing
if not all_installed:
timeout *= 2 # Double the timeout
logger.warning(f"Dependencies missing, extending timeout to {timeout}s")
# Extend timeout if it's first run
if first_run:
timeout *= 2 # Double the timeout
logger.warning(f"First run detected, extending timeout to {timeout}s")
return timeout
```
--------------------------------------------------------------------------------
/claude-hooks/core/memory-retrieval.js:
--------------------------------------------------------------------------------
```javascript
/**
* On-Demand Memory Retrieval Hook
* Allows users to manually request context refresh when needed
*/
const fs = require('fs').promises;
const path = require('path');
const https = require('https');
// Import utilities
const { detectProjectContext } = require('../utilities/project-detector');
const { scoreMemoryRelevance } = require('../utilities/memory-scorer');
const { formatMemoriesForContext } = require('../utilities/context-formatter');
/**
* Load hook configuration
*/
async function loadConfig() {
try {
const configPath = path.join(__dirname, '../config.json');
const configData = await fs.readFile(configPath, 'utf8');
return JSON.parse(configData);
} catch (error) {
console.warn('[Memory Retrieval] Using default configuration:', error.message);
return {
memoryService: {
endpoint: 'https://narrowbox.local:8443',
apiKey: 'test-key-123',
maxMemoriesPerSession: 5
}
};
}
}
/**
* Query memory service for relevant memories
*/
async function queryMemoryService(endpoint, apiKey, query) {
return new Promise((resolve, reject) => {
const url = new URL('/mcp', endpoint);
const postData = JSON.stringify({
jsonrpc: '2.0',
id: 1,
method: 'tools/call',
params: {
name: 'retrieve_memory',
arguments: {
query: query.semanticQuery || '',
n_results: query.limit || 5
}
}
});
const options = {
hostname: url.hostname,
port: url.port || 8443,
path: url.pathname,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData),
'Authorization': `Bearer ${apiKey}`
},
rejectUnauthorized: false // For self-signed certificates
};
const req = https.request(options, (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
try {
const response = JSON.parse(data);
if (response.result && response.result.content) {
let textData = response.result.content[0].text;
try {
// Convert Python dict format to JSON format safely
textData = textData
.replace(/'/g, '"')
.replace(/True/g, 'true')
.replace(/False/g, 'false')
.replace(/None/g, 'null');
const memories = JSON.parse(textData);
resolve(memories.results || memories.memories || []);
} catch (conversionError) {
console.warn('[Memory Retrieval] Could not parse memory response:', conversionError.message);
resolve([]);
}
} else {
resolve([]);
}
} catch (parseError) {
console.warn('[Memory Retrieval] Parse error:', parseError.message);
resolve([]);
}
});
});
req.on('error', (error) => {
console.warn('[Memory Retrieval] Network error:', error.message);
resolve([]);
});
req.write(postData);
req.end();
});
}
/**
* On-demand memory retrieval function
*/
async function retrieveMemories(context) {
try {
console.log('[Memory Retrieval] On-demand memory retrieval requested...');
// Load configuration
const config = await loadConfig();
// Detect project context
const projectContext = await detectProjectContext(context.workingDirectory || process.cwd());
console.log(`[Memory Retrieval] Project context: ${projectContext.name} (${projectContext.language})`);
// Parse user query if provided
const userQuery = context.query || context.message || '';
// Build memory query
const memoryQuery = {
tags: [
projectContext.name,
`language:${projectContext.language}`,
'key-decisions',
'architecture',
'recent-insights'
].filter(Boolean),
semanticQuery: userQuery.length > 0 ?
`${projectContext.name} ${userQuery}` :
`${projectContext.name} project context decisions architecture`,
limit: config.memoryService.maxMemoriesPerSession || 5,
timeFilter: 'last-month'
};
// Query memory service
const memories = await queryMemoryService(
config.memoryService.endpoint,
config.memoryService.apiKey,
memoryQuery
);
if (memories.length > 0) {
console.log(`[Memory Retrieval] Found ${memories.length} relevant memories`);
// Score memories for relevance
const scoredMemories = scoreMemoryRelevance(memories, projectContext);
// Take top scored memories
const topMemories = scoredMemories.slice(0, config.memoryService.maxMemoriesPerSession || 5);
// Format memories for display
const contextMessage = formatMemoriesForContext(topMemories, projectContext, {
includeScore: true, // Show scores for manual retrieval
groupByCategory: topMemories.length > 3,
maxMemories: config.memoryService.maxMemoriesPerSession || 5,
includeTimestamp: true
});
// Output formatted context
if (context.displayResult) {
await context.displayResult(contextMessage);
console.log('[Memory Retrieval] Successfully displayed memory context');
} else {
// Fallback: log context
console.log('\n=== RETRIEVED MEMORY CONTEXT ===');
console.log(contextMessage);
console.log('=== END CONTEXT ===\n');
}
return {
success: true,
memoriesFound: memories.length,
memoriesShown: topMemories.length,
context: contextMessage
};
} else {
const message = `## 📋 Memory Retrieval\n\nNo relevant memories found for query: "${userQuery || 'project context'}"\n\nTry a different search term or check if your memory service is running.`;
if (context.displayResult) {
await context.displayResult(message);
} else {
console.log(message);
}
return {
success: false,
memoriesFound: 0,
memoriesShown: 0,
context: message
};
}
} catch (error) {
console.error('[Memory Retrieval] Error retrieving memories:', error.message);
const errorMessage = `## ❌ Memory Retrieval Error\n\n${error.message}\n\nCheck your memory service configuration and connection.`;
if (context.displayResult) {
await context.displayResult(errorMessage);
}
return {
success: false,
error: error.message
};
}
}
/**
* Hook metadata for Claude Code
*/
module.exports = {
name: 'on-demand-memory-retrieval',
version: '1.0.0',
description: 'Retrieve relevant memories on user request',
trigger: 'manual', // This hook is triggered manually
handler: retrieveMemories,
config: {
async: true,
timeout: 10000,
priority: 'normal'
}
};
// Direct execution support for testing
if (require.main === module) {
// Test the retrieval with mock context
const mockContext = {
workingDirectory: process.cwd(),
query: 'architecture decisions',
displayResult: async (message) => {
console.log('=== MOCK DISPLAY RESULT ===');
console.log(message);
console.log('=== END MOCK DISPLAY ===');
}
};
retrieveMemories(mockContext)
.then(result => console.log('Retrieval test completed:', result))
.catch(error => console.error('Retrieval test failed:', error));
}
```
--------------------------------------------------------------------------------
/tests/test_time_parser.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for time_parser module
"""
import pytest
from datetime import datetime, date, timedelta
import time
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
from mcp_memory_service.utils.time_parser import (
parse_time_expression,
extract_time_expression,
get_time_of_day_range,
get_last_period_range,
get_this_period_range,
get_month_range,
get_named_period_range
)
class TestTimeParser:
"""Test time parsing functionality"""
def test_relative_days(self):
"""Test parsing relative day expressions"""
# Test "yesterday"
start_ts, end_ts = parse_time_expression("yesterday")
assert start_ts is not None
assert end_ts is not None
yesterday = date.today() - timedelta(days=1)
start_dt = datetime.fromtimestamp(start_ts)
end_dt = datetime.fromtimestamp(end_ts)
assert start_dt.date() == yesterday
assert end_dt.date() == yesterday
assert start_dt.time() == datetime.min.time()
assert end_dt.time().hour == 23
assert end_dt.time().minute == 59
# Test "3 days ago"
start_ts, end_ts = parse_time_expression("3 days ago")
three_days_ago = date.today() - timedelta(days=3)
start_dt = datetime.fromtimestamp(start_ts)
assert start_dt.date() == three_days_ago
# Test "today"
start_ts, end_ts = parse_time_expression("today")
start_dt = datetime.fromtimestamp(start_ts)
assert start_dt.date() == date.today()
def test_relative_weeks(self):
"""Test parsing relative week expressions"""
start_ts, end_ts = parse_time_expression("2 weeks ago")
assert start_ts is not None
assert end_ts is not None
start_dt = datetime.fromtimestamp(start_ts)
end_dt = datetime.fromtimestamp(end_ts)
# Should be a Monday to Sunday range
assert start_dt.weekday() == 0 # Monday
assert end_dt.weekday() == 6 # Sunday
# Should be roughly 2 weeks ago
days_ago = (date.today() - start_dt.date()).days
assert 14 <= days_ago <= 20 # Allow some flexibility for week boundaries
def test_relative_months(self):
"""Test parsing relative month expressions"""
start_ts, end_ts = parse_time_expression("1 month ago")
assert start_ts is not None
assert end_ts is not None
start_dt = datetime.fromtimestamp(start_ts)
end_dt = datetime.fromtimestamp(end_ts)
# Should be first to last day of the month
assert start_dt.day == 1
assert (end_dt + timedelta(days=1)).day == 1 # Next day is first of next month
def test_specific_dates(self):
"""Test parsing specific date formats"""
# Test MM/DD/YYYY format with unambiguous date
start_ts, end_ts = parse_time_expression("03/15/2024")
assert start_ts is not None
start_dt = datetime.fromtimestamp(start_ts)
assert start_dt.year == 2024
assert start_dt.month == 3
assert start_dt.day == 15
# Test YYYY-MM-DD format
start_ts, end_ts = parse_time_expression("2024-06-15")
assert start_ts is not None
start_dt = datetime.fromtimestamp(start_ts)
assert start_dt.date() == date(2024, 6, 15)
def test_month_names(self):
"""Test parsing month names"""
current_year = datetime.now().year
current_month = datetime.now().month
# Test a past month
start_ts, end_ts = parse_time_expression("january")
start_dt = datetime.fromtimestamp(start_ts)
# Should be this year's January if we're past January, otherwise last year's
expected_year = current_year if current_month > 1 else current_year - 1
assert start_dt.month == 1
assert start_dt.year == expected_year
def test_seasons(self):
"""Test parsing season names"""
# Test summer
start_ts, end_ts = parse_time_expression("last summer")
assert start_ts is not None
assert end_ts is not None
start_dt = datetime.fromtimestamp(start_ts)
end_dt = datetime.fromtimestamp(end_ts)
# Summer is roughly June 21 to September 22
assert start_dt.month == 6
assert end_dt.month == 9
def test_holidays(self):
"""Test parsing holiday names"""
# Test Christmas
start_ts, end_ts = parse_time_expression("christmas")
assert start_ts is not None
start_dt = datetime.fromtimestamp(start_ts)
end_dt = datetime.fromtimestamp(end_ts)
# Christmas window should include Dec 25 +/- a few days
assert start_dt.month == 12
assert 22 <= start_dt.day <= 25
assert 25 <= end_dt.day <= 28
def test_time_of_day(self):
"""Test time of day parsing"""
# Test "yesterday morning"
start_ts, end_ts = parse_time_expression("yesterday morning")
start_dt = datetime.fromtimestamp(start_ts)
end_dt = datetime.fromtimestamp(end_ts)
yesterday = date.today() - timedelta(days=1)
assert start_dt.date() == yesterday
assert 5 <= start_dt.hour <= 6 # Morning starts at 5 AM
assert 11 <= end_dt.hour <= 12 # Morning ends before noon
def test_date_ranges(self):
"""Test date range expressions"""
start_ts, end_ts = parse_time_expression("between january and march")
assert start_ts is not None
assert end_ts is not None
start_dt = datetime.fromtimestamp(start_ts)
end_dt = datetime.fromtimestamp(end_ts)
assert start_dt.month == 1
assert end_dt.month == 3
def test_quarters(self):
"""Test quarter expressions"""
start_ts, end_ts = parse_time_expression("first quarter of 2024")
assert start_ts is not None
start_dt = datetime.fromtimestamp(start_ts)
end_dt = datetime.fromtimestamp(end_ts)
assert start_dt == datetime(2024, 1, 1, 0, 0, 0)
assert end_dt.year == 2024
assert end_dt.month == 3
assert end_dt.day == 31
def test_extract_time_expression(self):
"""Test extracting time expressions from queries"""
# Test extraction with semantic content
cleaned, (start_ts, end_ts) = extract_time_expression(
"find meetings from last week about project updates"
)
assert "meetings" in cleaned
assert "project updates" in cleaned
assert "last week" not in cleaned
assert start_ts is not None
assert end_ts is not None
# Test multiple time expressions
cleaned, (start_ts, end_ts) = extract_time_expression(
"yesterday in the morning I had coffee"
)
assert "coffee" in cleaned
assert "yesterday" not in cleaned
assert "in the morning" not in cleaned
def test_edge_cases(self):
"""Test edge cases and error handling"""
# Test empty string
start_ts, end_ts = parse_time_expression("")
assert start_ts is None
assert end_ts is None
# Test invalid date format
start_ts, end_ts = parse_time_expression("13/32/2024") # Invalid month and day
assert start_ts is None
assert end_ts is None
# Test nonsense string
start_ts, end_ts = parse_time_expression("random gibberish text")
assert start_ts is None
assert end_ts is None
def test_this_period_expressions(self):
"""Test 'this X' period expressions"""
# This week
start_ts, end_ts = parse_time_expression("this week")
start_dt = datetime.fromtimestamp(start_ts)
end_dt = datetime.fromtimestamp(end_ts)
# Should include today
today = date.today()
assert start_dt.date() <= today <= end_dt.date()
# This month
start_ts, end_ts = parse_time_expression("this month")
start_dt = datetime.fromtimestamp(start_ts)
assert start_dt.month == datetime.now().month
assert start_dt.year == datetime.now().year
def test_recent_expressions(self):
"""Test 'recent' and similar expressions"""
start_ts, end_ts = parse_time_expression("recently")
assert start_ts is not None
assert end_ts is not None
# Should default to last 7 days
days_diff = (end_ts - start_ts) / (24 * 3600)
assert 6 <= days_diff <= 8 # Allow for some time variance
if __name__ == "__main__":
pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/consolidation/decay.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Exponential decay scoring for memory relevance calculation."""
import math
from typing import List, Dict, Any, Optional
from datetime import datetime, timezone
from dataclasses import dataclass
from .base import ConsolidationBase, ConsolidationConfig
from ..models.memory import Memory
@dataclass
class RelevanceScore:
"""Represents a memory's relevance score with breakdown."""
memory_hash: str
total_score: float
base_importance: float
decay_factor: float
connection_boost: float
access_boost: float
metadata: Dict[str, Any]
class ExponentialDecayCalculator(ConsolidationBase):
"""
Calculates memory relevance using exponential decay.
Memories naturally lose relevance over time unless reinforced by:
- Connections to other memories
- Recent access patterns
- Base importance scores
- Memory type-specific retention periods
"""
def __init__(self, config: ConsolidationConfig):
super().__init__(config)
self.retention_periods = config.retention_periods
async def process(self, memories: List[Memory], **kwargs) -> List[RelevanceScore]:
"""Calculate relevance scores for all memories."""
if not self._validate_memories(memories):
return []
reference_time = kwargs.get('reference_time', datetime.now())
memory_connections = kwargs.get('connections', {}) # hash -> connection_count mapping
access_patterns = kwargs.get('access_patterns', {}) # hash -> last_accessed mapping
scores = []
for memory in memories:
score = await self._calculate_memory_relevance(
memory, reference_time, memory_connections, access_patterns
)
scores.append(score)
self.logger.info(f"Calculated relevance scores for {len(scores)} memories")
return scores
async def _calculate_memory_relevance(
self,
memory: Memory,
current_time: datetime,
connections: Dict[str, int],
access_patterns: Dict[str, datetime]
) -> RelevanceScore:
"""
Calculate memory relevance using exponential decay.
Factors:
- Age of memory
- Base importance score (from metadata or tags)
- Retention period (varies by memory type)
- Connections to other memories
- Recent access patterns
"""
# Get memory age in days
age_days = self._get_memory_age_days(memory, current_time)
# Extract base importance score
base_importance = self._get_base_importance(memory)
# Get retention period for memory type
memory_type = self._extract_memory_type(memory)
retention_period = self.retention_periods.get(memory_type, 30)
# Calculate exponential decay factor
decay_factor = math.exp(-age_days / retention_period)
# Calculate connection boost
connection_count = connections.get(memory.content_hash, 0)
connection_boost = 1 + (0.1 * connection_count) # 10% boost per connection
# Calculate access boost
access_boost = self._calculate_access_boost(memory, access_patterns, current_time)
# Calculate total relevance score
total_score = base_importance * decay_factor * connection_boost * access_boost
# Ensure protected memories maintain minimum relevance
if self._is_protected_memory(memory):
total_score = max(total_score, 0.5) # Minimum 50% relevance for protected memories
return RelevanceScore(
memory_hash=memory.content_hash,
total_score=total_score,
base_importance=base_importance,
decay_factor=decay_factor,
connection_boost=connection_boost,
access_boost=access_boost,
metadata={
'age_days': age_days,
'memory_type': memory_type,
'retention_period': retention_period,
'connection_count': connection_count,
'is_protected': self._is_protected_memory(memory)
}
)
def _get_base_importance(self, memory: Memory) -> float:
"""
Extract base importance score from memory metadata or tags.
Priority order:
1. Explicit importance_score in metadata
2. Importance derived from tags
3. Default score of 1.0
"""
# Check for explicit importance score
if 'importance_score' in memory.metadata:
try:
score = float(memory.metadata['importance_score'])
return max(0.0, min(2.0, score)) # Clamp between 0 and 2
except (ValueError, TypeError):
self.logger.warning(f"Invalid importance_score in memory {memory.content_hash}")
# Derive importance from tags
tag_importance = {
'critical': 2.0,
'important': 1.5,
'reference': 1.3,
'urgent': 1.4,
'project': 1.2,
'personal': 1.1,
'temporary': 0.7,
'draft': 0.8,
'note': 0.9
}
max_tag_importance = 1.0
for tag in memory.tags:
tag_score = tag_importance.get(tag.lower(), 1.0)
max_tag_importance = max(max_tag_importance, tag_score)
return max_tag_importance
def _calculate_access_boost(
self,
memory: Memory,
access_patterns: Dict[str, datetime],
current_time: datetime
) -> float:
"""
Calculate boost factor based on recent access patterns.
Recent access increases relevance:
- Accessed within last day: 1.5x boost
- Accessed within last week: 1.2x boost
- Accessed within last month: 1.1x boost
- No recent access: 1.0x (no boost)
"""
last_accessed = access_patterns.get(memory.content_hash)
if not last_accessed:
# Check memory's own updated_at timestamp
if memory.updated_at:
last_accessed = datetime.utcfromtimestamp(memory.updated_at)
else:
return 1.0 # No access data available
# Normalize both datetimes to UTC timezone-aware
current_time = current_time.replace(tzinfo=timezone.utc) if current_time.tzinfo is None else current_time.astimezone(timezone.utc)
last_accessed = last_accessed.replace(tzinfo=timezone.utc) if last_accessed.tzinfo is None else last_accessed.astimezone(timezone.utc)
days_since_access = (current_time - last_accessed).days
if days_since_access <= 1:
return 1.5 # Accessed within last day
elif days_since_access <= 7:
return 1.2 # Accessed within last week
elif days_since_access <= 30:
return 1.1 # Accessed within last month
else:
return 1.0 # No recent access
async def get_low_relevance_memories(
self,
scores: List[RelevanceScore],
threshold: float = 0.1
) -> List[RelevanceScore]:
"""Get memories with relevance scores below the threshold."""
return [score for score in scores if score.total_score < threshold]
async def get_high_relevance_memories(
self,
scores: List[RelevanceScore],
threshold: float = 1.0
) -> List[RelevanceScore]:
"""Get memories with relevance scores above the threshold."""
return [score for score in scores if score.total_score >= threshold]
async def update_memory_relevance_metadata(
self,
memory: Memory,
score: RelevanceScore
) -> Memory:
"""Update memory metadata with calculated relevance score."""
memory.metadata.update({
'relevance_score': score.total_score,
'relevance_calculated_at': datetime.now().isoformat(),
'decay_factor': score.decay_factor,
'connection_boost': score.connection_boost,
'access_boost': score.access_boost
})
memory.touch() # Update the updated_at timestamp
return memory
```
--------------------------------------------------------------------------------
/docs/testing-cloudflare-backend.md:
--------------------------------------------------------------------------------
```markdown
# Testing the Cloudflare Backend
## Test Results Summary ✅
The Cloudflare backend implementation has been thoroughly tested and is **production-ready**. All core functionality works correctly with mock configurations.
### ✅ Tests Completed Successfully
#### 1. Basic Implementation Tests
- **CloudflareStorage class initialization**: ✅ All parameters set correctly
- **URL construction**: ✅ Correct API endpoints generated
- **HTTP client creation**: ✅ Headers and configuration correct
- **Memory model integration**: ✅ Full compatibility with existing Memory class
- **Embedding cache**: ✅ Caching functionality working
- **Resource cleanup**: ✅ Proper cleanup on close()
- **Configuration defaults**: ✅ All defaults set appropriately
**Result**: 26/26 tests passed
#### 2. Configuration System Tests
- **Missing environment variables**: ✅ Proper validation and error handling
- **Complete configuration**: ✅ All settings loaded correctly
- **Backend registration**: ✅ Cloudflare properly added to SUPPORTED_BACKENDS
- **Environment variable parsing**: ✅ All types and defaults working
#### 3. Server Integration Tests
- **Server import with Cloudflare backend**: ✅ Successfully imports and configures
- **Backend selection logic**: ✅ Correctly identifies and would initialize CloudflareStorage
- **Configuration compatibility**: ✅ Server properly reads Cloudflare settings
#### 4. Migration Script Tests
- **DataMigrator class**: ✅ Proper initialization and structure
- **Command-line interface**: ✅ Argument parsing working
- **Data format conversion**: ✅ Memory objects convert to migration format
- **Export/Import workflow**: ✅ Structure ready for real data migration
### 🧪 How to Test with Real Cloudflare Credentials
To test the implementation with actual Cloudflare services:
#### Step 1: Set up Cloudflare Resources
```bash
# Install Wrangler CLI
npm install -g wrangler
# Login to Cloudflare
wrangler login
# Create Vectorize index
wrangler vectorize create test-mcp-memory --dimensions=768 --metric=cosine
# Create D1 database
wrangler d1 create test-mcp-memory-db
# Optional: Create R2 bucket
wrangler r2 bucket create test-mcp-memory-content
```
#### Step 2: Configure Environment
```bash
# Set backend to Cloudflare
export MCP_MEMORY_STORAGE_BACKEND=cloudflare
# Required Cloudflare settings
export CLOUDFLARE_API_TOKEN="your-real-api-token"
export CLOUDFLARE_ACCOUNT_ID="your-account-id"
export CLOUDFLARE_VECTORIZE_INDEX="test-mcp-memory"
export CLOUDFLARE_D1_DATABASE_ID="your-d1-database-id"
# Optional settings
export CLOUDFLARE_R2_BUCKET="test-mcp-memory-content"
export LOG_LEVEL=DEBUG # For detailed logging
```
#### Step 3: Test Basic Functionality
```python
# test_real_cloudflare.py
import asyncio
import sys
sys.path.insert(0, 'src')
from mcp_memory_service.storage.cloudflare import CloudflareStorage
from mcp_memory_service.models.memory import Memory
from mcp_memory_service.utils.hashing import generate_content_hash
async def test_real_cloudflare():
"""Test with real Cloudflare credentials."""
import os
# Initialize with real credentials
storage = CloudflareStorage(
api_token=os.getenv('CLOUDFLARE_API_TOKEN'),
account_id=os.getenv('CLOUDFLARE_ACCOUNT_ID'),
vectorize_index=os.getenv('CLOUDFLARE_VECTORIZE_INDEX'),
d1_database_id=os.getenv('CLOUDFLARE_D1_DATABASE_ID'),
r2_bucket=os.getenv('CLOUDFLARE_R2_BUCKET')
)
try:
# Test initialization
print("🔄 Initializing Cloudflare storage...")
await storage.initialize()
print("✅ Initialization successful!")
# Test storing a memory
content = "This is a test memory for real Cloudflare backend"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=["test", "real-cloudflare"],
memory_type="standard"
)
print("🔄 Storing test memory...")
success, message = await storage.store(memory)
print(f"✅ Store result: {success} - {message}")
# Test retrieval
print("🔄 Searching for stored memory...")
results = await storage.retrieve("test memory", n_results=5)
print(f"✅ Retrieved {len(results)} memories")
# Test statistics
print("🔄 Getting storage statistics...")
stats = await storage.get_stats()
print(f"✅ Stats: {stats}")
# Cleanup
await storage.close()
print("✅ All real Cloudflare tests completed successfully!")
except Exception as e:
print(f"❌ Real Cloudflare test failed: {e}")
await storage.close()
raise
# Run if credentials are available
if __name__ == '__main__':
import os
required_vars = [
'CLOUDFLARE_API_TOKEN',
'CLOUDFLARE_ACCOUNT_ID',
'CLOUDFLARE_VECTORIZE_INDEX',
'CLOUDFLARE_D1_DATABASE_ID'
]
if all(os.getenv(var) for var in required_vars):
asyncio.run(test_real_cloudflare())
else:
print("❌ Missing required environment variables for real testing")
print("Required:", required_vars)
```
#### Step 4: Test MCP Server
```bash
# Start the MCP server with Cloudflare backend
python -m src.mcp_memory_service.server
# Test via HTTP API (if HTTP enabled)
curl -X POST http://localhost:8000/api/memories \
-H "Content-Type: application/json" \
-d '{"content": "Test with real Cloudflare", "tags": ["real-test"]}'
```
### 🚀 Integration Testing with Claude Desktop
#### Step 1: Configure Claude Desktop
Add to your Claude Desktop configuration:
```json
{
"mcpServers": {
"memory": {
"command": "python",
"args": ["-m", "src.mcp_memory_service.server"],
"cwd": "/path/to/mcp-memory-service",
"env": {
"MCP_MEMORY_STORAGE_BACKEND": "cloudflare",
"CLOUDFLARE_API_TOKEN": "your-api-token",
"CLOUDFLARE_ACCOUNT_ID": "your-account-id",
"CLOUDFLARE_VECTORIZE_INDEX": "your-vectorize-index",
"CLOUDFLARE_D1_DATABASE_ID": "your-d1-database-id"
}
}
}
}
```
#### Step 2: Test Memory Operations
In Claude Desktop, test these operations:
```
# Store a memory
Please remember that my favorite programming language is Python and I prefer async/await patterns.
# Search memories
What do you remember about my programming preferences?
# Store with tags
Please remember this important project deadline: Launch the new feature by December 15th. Tag this as: work, deadline, important.
# Search by content
Tell me about any work deadlines I've mentioned.
```
### 📊 Performance Testing
For performance testing with real Cloudflare services:
```python
import asyncio
import time
from statistics import mean
async def performance_test():
"""Test performance with real Cloudflare backend."""
storage = CloudflareStorage(...) # Your real credentials
await storage.initialize()
# Test memory storage performance
store_times = []
for i in range(10):
content = f"Performance test memory {i}"
memory = Memory(content=content, content_hash=generate_content_hash(content))
start = time.time()
await storage.store(memory)
end = time.time()
store_times.append(end - start)
print(f"Average store time: {mean(store_times):.3f}s")
# Test search performance
search_times = []
for i in range(5):
start = time.time()
results = await storage.retrieve("performance test")
end = time.time()
search_times.append(end - start)
print(f"Average search time: {mean(search_times):.3f}s")
print(f"Found {len(results)} memories")
await storage.close()
```
### 🛠️ Troubleshooting Common Issues
#### Authentication Errors
```
ERROR: Authentication failed
```
**Solution**: Verify API token has correct permissions (Vectorize:Edit, D1:Edit, etc.)
#### Rate Limiting
```
WARNING: Rate limited, retrying in 2s
```
**Solution**: Normal behavior - the implementation handles this automatically
#### Vectorize Index Not Found
```
ValueError: Vectorize index 'test-index' not found
```
**Solution**: Create the index with `wrangler vectorize create`
#### D1 Database Issues
```
Failed to initialize D1 schema
```
**Solution**: Verify database ID and ensure API token has D1 permissions
### ✨ What Makes This Implementation Special
1. **Production Ready**: Comprehensive error handling and retry logic
2. **Global Performance**: Leverages Cloudflare's edge network
3. **Smart Architecture**: Efficient use of Vectorize, D1, and R2
4. **Zero Breaking Changes**: Drop-in replacement for existing backends
5. **Comprehensive Testing**: 26+ tests covering all functionality
6. **Easy Migration**: Tools to migrate from SQLite-vec or ChromaDB
The Cloudflare backend is ready for production use and provides a scalable, globally distributed memory service for AI applications! 🚀
```
--------------------------------------------------------------------------------
/scripts/testing/test_search_api.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test script for search API endpoints."""
import requests
import json
import time
BASE_URL = "http://localhost:8000"
def test_search_functionality():
"""Test all search endpoints."""
print("Testing Search API Endpoints")
print("=" * 40)
# First, check server health
print("\n[0] Health check...")
try:
resp = requests.get(f"{BASE_URL}/api/health", timeout=5)
if resp.status_code != 200:
print(f"[FAIL] Server not healthy: {resp.status_code}")
return
print("[PASS] Server is healthy")
except Exception as e:
print(f"[FAIL] Cannot connect: {e}")
return
# Create some test memories for searching
print("\n[1] Creating test memories...")
test_memories = [
{
"content": "Python programming tutorial for beginners",
"tags": ["python", "programming", "tutorial"],
"memory_type": "learning",
"metadata": {"difficulty": "beginner"}
},
{
"content": "Advanced machine learning algorithms with PyTorch",
"tags": ["python", "machine-learning", "pytorch"],
"memory_type": "learning",
"metadata": {"difficulty": "advanced"}
},
{
"content": "JavaScript async await patterns and best practices",
"tags": ["javascript", "async", "programming"],
"memory_type": "reference",
"metadata": {"language": "js"}
},
{
"content": "Database design principles and normalization",
"tags": ["database", "design", "sql"],
"memory_type": "learning",
"metadata": {"topic": "databases"}
},
{
"content": "Meeting notes from yesterday's project sync",
"tags": ["meeting", "project", "notes"],
"memory_type": "note",
"metadata": {"date": "yesterday"}
}
]
created_hashes = []
for i, memory in enumerate(test_memories):
try:
resp = requests.post(
f"{BASE_URL}/api/memories",
json=memory,
headers={"Content-Type": "application/json"},
timeout=10
)
if resp.status_code == 200:
result = resp.json()
if result["success"]:
created_hashes.append(result["content_hash"])
print(f" Created memory {i+1}: {memory['content'][:30]}...")
else:
print(f" [WARN] Memory {i+1} might already exist")
else:
print(f" [WARN] Failed to create memory {i+1}: {resp.status_code}")
except Exception as e:
print(f" [WARN] Error creating memory {i+1}: {e}")
print(f"[INFO] Created {len(created_hashes)} new memories")
# Test 2: Semantic search
print("\n[2] Testing semantic search...")
search_queries = [
"programming tutorial",
"machine learning AI",
"database SQL design",
"meeting project discussion"
]
for query in search_queries:
try:
search_request = {
"query": query,
"n_results": 3,
"similarity_threshold": 0.1
}
resp = requests.post(
f"{BASE_URL}/api/search",
json=search_request,
headers={"Content-Type": "application/json"},
timeout=15
)
if resp.status_code == 200:
result = resp.json()
print(f" Query: '{query}' -> {result['total_found']} results ({result['processing_time_ms']:.1f}ms)")
for i, search_result in enumerate(result['results'][:2]): # Show top 2
memory = search_result['memory']
score = search_result.get('similarity_score', 0)
print(f" {i+1}. {memory['content'][:50]}... (score: {score:.3f})")
else:
print(f" [FAIL] Search failed for '{query}': {resp.status_code}")
except Exception as e:
print(f" [FAIL] Search error for '{query}': {e}")
# Test 3: Tag-based search
print("\n[3] Testing tag-based search...")
tag_searches = [
{"tags": ["python"], "match_all": False},
{"tags": ["programming", "tutorial"], "match_all": False},
{"tags": ["python", "programming"], "match_all": True}
]
for search in tag_searches:
try:
resp = requests.post(
f"{BASE_URL}/api/search/by-tag",
json=search,
headers={"Content-Type": "application/json"},
timeout=10
)
if resp.status_code == 200:
result = resp.json()
match_type = "ALL" if search["match_all"] else "ANY"
print(f" Tags {search['tags']} ({match_type}) -> {result['total_found']} results")
for i, search_result in enumerate(result['results'][:2]):
memory = search_result['memory']
print(f" {i+1}. {memory['content'][:40]}... (tags: {memory['tags']})")
else:
print(f" [FAIL] Tag search failed: {resp.status_code}")
except Exception as e:
print(f" [FAIL] Tag search error: {e}")
# Test 4: Time-based search
print("\n[4] Testing time-based search...")
time_queries = ["today", "yesterday", "this week", "last week"]
for query in time_queries:
try:
time_request = {
"query": query,
"n_results": 5
}
resp = requests.post(
f"{BASE_URL}/api/search/by-time",
json=time_request,
headers={"Content-Type": "application/json"},
timeout=10
)
if resp.status_code == 200:
result = resp.json()
print(f" Time: '{query}' -> {result['total_found']} results")
if result['results']:
memory = result['results'][0]['memory']
print(f" Example: {memory['content'][:40]}...")
elif resp.status_code == 400:
print(f" [INFO] Time query '{query}' not supported yet")
else:
print(f" [FAIL] Time search failed for '{query}': {resp.status_code}")
except Exception as e:
print(f" [FAIL] Time search error for '{query}': {e}")
# Test 5: Similar memories
print("\n[5] Testing similar memory search...")
if created_hashes:
try:
content_hash = created_hashes[0]
resp = requests.get(
f"{BASE_URL}/api/search/similar/{content_hash}?n_results=3",
timeout=10
)
if resp.status_code == 200:
result = resp.json()
print(f" Similar to first memory -> {result['total_found']} results")
for i, search_result in enumerate(result['results'][:2]):
memory = search_result['memory']
score = search_result.get('similarity_score', 0)
print(f" {i+1}. {memory['content'][:40]}... (score: {score:.3f})")
elif resp.status_code == 404:
print(f" [INFO] Memory not found (expected with current get-by-hash implementation)")
else:
print(f" [FAIL] Similar search failed: {resp.status_code}")
except Exception as e:
print(f" [FAIL] Similar search error: {e}")
# Cleanup: Delete test memories
print(f"\n[6] Cleaning up {len(created_hashes)} test memories...")
for content_hash in created_hashes:
try:
resp = requests.delete(f"{BASE_URL}/api/memories/{content_hash}", timeout=5)
if resp.status_code == 200:
result = resp.json()
if result["success"]:
print(f" Deleted: {content_hash[:12]}...")
except Exception as e:
print(f" [WARN] Cleanup error: {e}")
print("\n" + "=" * 40)
print("Search API testing completed!")
if __name__ == "__main__":
test_search_functionality()
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/utils/gpu_detection.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Shared GPU detection utilities for MCP Memory Service.
This module provides unified GPU platform detection logic used across
installation and verification scripts. Supports CUDA, ROCm, MPS, and DirectML.
"""
import os
import subprocess
from typing import Dict, Any, Tuple, Optional, Callable, List, Union
# Single source of truth for GPU platform detection configuration
GPU_PLATFORM_CHECKS = {
'cuda': {
'windows': {
'env_var': 'CUDA_PATH',
'version_cmd': lambda path: [os.path.join(path, 'bin', 'nvcc'), '--version'],
'version_pattern': 'release'
},
'linux': {
'paths': ['/usr/local/cuda', lambda: os.environ.get('CUDA_HOME')],
'version_cmd': lambda path: [os.path.join(path, 'bin', 'nvcc'), '--version'],
'version_pattern': 'release'
}
},
'rocm': {
'linux': {
'paths': ['/opt/rocm', lambda: os.environ.get('ROCM_HOME')],
'version_file': lambda path: os.path.join(path, 'bin', '.rocmversion'),
'version_cmd': ['rocminfo'],
'version_pattern': 'Version'
}
},
'mps': {
'macos': {
'check_cmd': ['system_profiler', 'SPDisplaysDataType'],
'check_pattern': 'Metal',
'requires_arm': True
}
},
'directml': {
'windows': {
'import_name': 'torch-directml',
'dll_name': 'DirectML.dll'
}
}
}
def parse_version(output: str, pattern: str = 'release') -> Optional[str]:
"""
Parse version string from command output.
Args:
output: Command output to parse
pattern: Pattern to search for ('release' or 'Version')
Returns:
Parsed version string or None if not found
"""
for line in output.split('\n'):
if pattern in line:
if pattern == 'release':
return line.split('release')[-1].strip().split(',')[0].strip()
elif pattern == 'Version':
return line.split(':')[-1].strip()
return None
def test_gpu_platform(platform: str, system_info: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Test for a specific GPU platform and return detection status.
Args:
platform: Platform name ('cuda', 'rocm', 'mps', 'directml')
system_info: System information dictionary with keys:
- is_windows: bool
- is_linux: bool
- is_macos: bool
- is_arm: bool (for ARM/Apple Silicon)
Returns:
Tuple of (detected: bool, version: Optional[str])
"""
if platform not in GPU_PLATFORM_CHECKS:
return False, None
platform_config = GPU_PLATFORM_CHECKS[platform]
# Determine OS-specific configuration
if system_info.get('is_windows') and 'windows' in platform_config:
os_config = platform_config['windows']
elif system_info.get('is_linux') and 'linux' in platform_config:
os_config = platform_config['linux']
elif system_info.get('is_macos') and 'macos' in platform_config:
os_config = platform_config['macos']
else:
return False, None
# Platform-specific detection logic
if platform == 'cuda':
return _detect_cuda(os_config, system_info)
elif platform == 'rocm':
return _detect_rocm(os_config)
elif platform == 'mps':
return _detect_mps(os_config, system_info)
elif platform == 'directml':
return _detect_directml(os_config)
return False, None
def _detect_cuda(config: Dict[str, Any], system_info: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Detect CUDA installation."""
# Check environment variable (Windows) or paths (Linux)
if 'env_var' in config:
cuda_path = os.environ.get(config['env_var'])
if not cuda_path or not os.path.exists(cuda_path):
return False, None
paths_to_check = [cuda_path]
elif 'paths' in config:
paths_to_check = []
for path in config['paths']:
if callable(path):
path = path()
if path and os.path.exists(path):
paths_to_check.append(path)
if not paths_to_check:
return False, None
else:
return False, None
# Try to get version
for path in paths_to_check:
try:
version_cmd = config['version_cmd'](path)
output = subprocess.check_output(
version_cmd,
stderr=subprocess.STDOUT,
universal_newlines=True
)
version = parse_version(output, config.get('version_pattern', 'release'))
return True, version
except (subprocess.SubprocessError, FileNotFoundError, OSError):
continue
# Found path but couldn't get version
return True, None
def _detect_rocm(config: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Detect ROCm installation."""
paths_to_check = []
for path in config.get('paths', []):
if callable(path):
path = path()
if path and os.path.exists(path):
paths_to_check.append(path)
if not paths_to_check:
return False, None
# Try version file first
for path in paths_to_check:
if 'version_file' in config:
version_file = config['version_file'](path)
try:
with open(version_file, 'r') as f:
version = f.read().strip()
return True, version
except (FileNotFoundError, IOError):
pass
# Try version command
if 'version_cmd' in config:
try:
output = subprocess.check_output(
config['version_cmd'],
stderr=subprocess.STDOUT,
universal_newlines=True
)
version = parse_version(output, config.get('version_pattern', 'Version'))
return True, version
except (subprocess.SubprocessError, FileNotFoundError, OSError):
pass
# Found path but couldn't get version
return True, None
def _detect_mps(config: Dict[str, Any], system_info: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Detect Apple Metal Performance Shaders (MPS)."""
# MPS requires ARM architecture
if config.get('requires_arm') and not system_info.get('is_arm'):
return False, None
try:
result = subprocess.run(
config['check_cmd'],
capture_output=True,
text=True
)
if config['check_pattern'] in result.stdout:
return True, None # MPS doesn't have a version string
except (subprocess.SubprocessError, FileNotFoundError, OSError):
pass
return False, None
def _detect_directml(config: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Detect DirectML installation."""
# Try importing the package
try:
import pkg_resources
version = pkg_resources.get_distribution(config['import_name']).version
return True, version
except (ImportError, Exception):
pass
# Try loading the DLL
try:
import ctypes
ctypes.WinDLL(config['dll_name'])
return True, None # Found DLL but no version
except (ImportError, OSError, Exception):
pass
return False, None
def detect_gpu(system_info: Dict[str, Any]) -> Dict[str, Any]:
"""
Detect all available GPU platforms and return comprehensive GPU info.
Args:
system_info: System information dictionary with keys:
- is_windows: bool
- is_linux: bool
- is_macos: bool
- is_arm: bool (for ARM/Apple Silicon)
Returns:
Dictionary containing:
- has_cuda: bool
- cuda_version: Optional[str]
- has_rocm: bool
- rocm_version: Optional[str]
- has_mps: bool
- has_directml: bool
- directml_version: Optional[str]
- accelerator: str ('cuda', 'rocm', 'mps', 'directml', or 'cpu')
"""
gpu_info = {
"has_cuda": False,
"cuda_version": None,
"has_rocm": False,
"rocm_version": None,
"has_mps": False,
"has_directml": False,
"directml_version": None,
"accelerator": "cpu"
}
# Test each platform
gpu_info["has_cuda"], gpu_info["cuda_version"] = test_gpu_platform('cuda', system_info)
gpu_info["has_rocm"], gpu_info["rocm_version"] = test_gpu_platform('rocm', system_info)
gpu_info["has_mps"], _ = test_gpu_platform('mps', system_info)
gpu_info["has_directml"], gpu_info["directml_version"] = test_gpu_platform('directml', system_info)
# Determine primary accelerator (priority order: CUDA > ROCm > MPS > DirectML > CPU)
if gpu_info["has_cuda"]:
gpu_info["accelerator"] = "cuda"
elif gpu_info["has_rocm"]:
gpu_info["accelerator"] = "rocm"
elif gpu_info["has_mps"]:
gpu_info["accelerator"] = "mps"
elif gpu_info["has_directml"]:
gpu_info["accelerator"] = "directml"
return gpu_info
```
--------------------------------------------------------------------------------
/tests/unit/test_json_loader.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit tests for JSON document loader.
"""
import pytest
import asyncio
import json
from pathlib import Path
from unittest.mock import patch
from mcp_memory_service.ingestion.json_loader import JSONLoader
from mcp_memory_service.ingestion.base import DocumentChunk
from conftest import extract_chunks_from_temp_file
class TestJSONLoader:
"""Test suite for JSONLoader class."""
def test_initialization(self):
"""Test basic initialization of JSONLoader."""
loader = JSONLoader(chunk_size=500, chunk_overlap=50)
assert loader.chunk_size == 500
assert loader.chunk_overlap == 50
assert 'json' in loader.supported_extensions
def test_can_handle_file(self):
"""Test file format detection."""
loader = JSONLoader()
# Create temporary test files
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
json_file = Path(tmpdir) / "test.json"
json_file.touch()
txt_file = Path(tmpdir) / "test.txt"
txt_file.touch()
# Test supported formats
assert loader.can_handle(json_file) is True
# Test unsupported formats
assert loader.can_handle(txt_file) is False
@pytest.mark.asyncio
async def test_extract_chunks_simple_json(self):
"""Test extraction from simple JSON file."""
loader = JSONLoader(chunk_size=1000, chunk_overlap=200)
# Create test JSON file
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
json_file = Path(tmpdir) / "test.json"
test_data = {
"name": "John Doe",
"age": 30,
"city": "New York"
}
json_file.write_text(json.dumps(test_data, indent=2))
chunks = []
async for chunk in loader.extract_chunks(json_file):
chunks.append(chunk)
# Verify chunks were created
assert len(chunks) > 0
# Verify chunk structure
first_chunk = chunks[0]
assert isinstance(first_chunk, DocumentChunk)
assert isinstance(first_chunk.content, str)
assert first_chunk.source_file == json_file
# Verify content contains flattened JSON
content = first_chunk.content
assert "name: John Doe" in content
assert "age: 30" in content
assert "city: New York" in content
@pytest.mark.asyncio
async def test_extract_chunks_nested_json(self):
"""Test extraction from nested JSON file."""
loader = JSONLoader(chunk_size=1000, chunk_overlap=200)
# Create test JSON file with nested structure
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
json_file = Path(tmpdir) / "test.json"
test_data = {
"config": {
"database": {
"host": "localhost",
"port": 5432
}
},
"servers": [
{"name": "web", "port": 8080},
{"name": "api", "port": 3000}
]
}
json_file.write_text(json.dumps(test_data, indent=2))
chunks = []
async for chunk in loader.extract_chunks(json_file):
chunks.append(chunk)
# Verify chunks were created
assert len(chunks) > 0
# Verify content contains flattened nested structure
content = chunks[0].content
assert "config.database.host: localhost" in content
assert "config.database.port: 5432" in content
assert "servers[0].name: web" in content
assert "servers[1].port: 3000" in content
@pytest.mark.asyncio
async def test_extract_chunks_with_options(self):
"""Test extraction with various options."""
loader = JSONLoader(chunk_size=1000, chunk_overlap=200)
# Create test JSON file
test_data = {
"user": {
"name": "John",
"details": {
"age": 25
}
}
}
json_content = json.dumps(test_data, indent=2)
# Test with bracket notation
chunks = await extract_chunks_from_temp_file(
loader,
"test.json",
json_content,
flatten_strategy='bracket_notation'
)
content = chunks[0].content
assert "user[name]: John" in content
assert "user[details][age]: 25" in content
@pytest.mark.asyncio
async def test_extract_chunks_invalid_json(self):
"""Test handling of invalid JSON files."""
loader = JSONLoader()
# Create invalid JSON file
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
json_file = Path(tmpdir) / "invalid.json"
json_file.write_text("{ invalid json content }")
with pytest.raises(ValueError, match="Invalid JSON format"):
async for chunk in loader.extract_chunks(json_file):
pass
@pytest.mark.asyncio
async def test_extract_chunks_empty_file(self):
"""Test handling of empty JSON files."""
loader = JSONLoader()
# Create empty file
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
json_file = Path(tmpdir) / "empty.json"
json_file.write_text("")
with pytest.raises(ValueError, match="Invalid JSON format"):
async for chunk in loader.extract_chunks(json_file):
pass
@pytest.mark.asyncio
async def test_extract_chunks_large_nested_structure(self):
"""Test extraction from deeply nested JSON."""
loader = JSONLoader(chunk_size=1000, chunk_overlap=200)
# Create deeply nested JSON
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
json_file = Path(tmpdir) / "nested.json"
test_data = {
"level1": {
"level2": {
"level3": {
"level4": {
"value": "deep"
}
}
}
}
}
json_file.write_text(json.dumps(test_data, indent=2))
chunks = []
async for chunk in loader.extract_chunks(json_file):
chunks.append(chunk)
content = chunks[0].content
assert "level1.level2.level3.level4.value: deep" in content
@pytest.mark.asyncio
async def test_extract_chunks_with_arrays(self):
"""Test extraction with different array handling strategies."""
loader = JSONLoader(chunk_size=1000, chunk_overlap=200)
# Create JSON with arrays
test_data = {
"items": ["apple", "banana", "cherry"],
"numbers": [1, 2, 3]
}
json_content = json.dumps(test_data, indent=2)
# Test expand strategy (default)
chunks = await extract_chunks_from_temp_file(
loader,
"arrays.json",
json_content,
array_handling='expand'
)
content = chunks[0].content
assert "items[0]: apple" in content
assert "items[1]: banana" in content
assert "numbers[0]: 1" in content
@pytest.mark.asyncio
async def test_extract_chunks_metadata(self):
"""Test that metadata is properly included."""
loader = JSONLoader(chunk_size=1000, chunk_overlap=200)
# Create test JSON file
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
json_file = Path(tmpdir) / "test.json"
test_data = {"key": "value"}
json_file.write_text(json.dumps(test_data))
chunks = []
async for chunk in loader.extract_chunks(json_file):
chunks.append(chunk)
first_chunk = chunks[0]
assert first_chunk.metadata['content_type'] == 'json'
assert first_chunk.metadata['encoding'] in ['utf-8', 'utf-16', 'utf-32', 'latin-1', 'cp1252']
assert 'file_size' in first_chunk.metadata
assert first_chunk.metadata['loader_type'] == 'JSONLoader'
class TestJSONLoaderRegistry:
"""Test JSON loader registration."""
def test_loader_registration(self):
"""Test that JSON loader is registered."""
from mcp_memory_service.ingestion.registry import get_loader_for_file
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
# Test JSON file
json_file = Path(tmpdir) / "test.json"
json_file.write_text('{"test": "data"}')
loader = get_loader_for_file(json_file)
# Should get JSONLoader
assert loader is not None
assert isinstance(loader, JSONLoader)
if __name__ == '__main__':
pytest.main([__file__, '-v'])
```
--------------------------------------------------------------------------------
/tests/contracts/api-specification.yml:
--------------------------------------------------------------------------------
```yaml
# MCP Memory Service API Contract Specification
# This document defines the ACTUAL behavior of the MCP Memory Service API
# Used by the HTTP-MCP bridge and other clients
openapi: 3.0.3
info:
title: MCP Memory Service API
version: "6.6.1"
description: |
API contract for MCP Memory Service - defines actual response formats
and status codes that clients can expect.
CRITICAL NOTES:
- Server returns HTTP 200 for both success and failure cases
- Use the 'success' field in response body to determine actual result
- All endpoints use /api prefix
servers:
- url: https://memory.local:8443/api
description: Default HTTPS server with self-signed certificate
- url: http://localhost:8000/api
description: Development HTTP server
security:
- BearerAuth: []
paths:
/health:
get:
summary: Service health check
description: Returns current service status and statistics
responses:
'200':
description: Service is healthy
content:
application/json:
schema:
type: object
required:
- status
- version
properties:
status:
type: string
enum: [healthy]
version:
type: string
example: "6.6.1"
timestamp:
type: string
format: date-time
uptime_seconds:
type: number
storage_type:
type: string
enum: [sqlite_vec, cloudflare, hybrid]
statistics:
type: object
properties:
total_memories:
type: integer
total_tags:
type: integer
'503':
description: Service is unhealthy
content:
application/json:
schema:
type: object
properties:
status:
type: string
enum: [unhealthy]
error:
type: string
/memories:
post:
summary: Store a memory
description: |
Store a new memory in the service.
CRITICAL: Always returns HTTP 200, regardless of success/failure!
Check the 'success' field in response body to determine actual result.
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- content
properties:
content:
type: string
description: Memory content to store
tags:
type: array
items:
type: string
default: []
memory_type:
type: string
default: "note"
metadata:
type: object
default: {}
responses:
'200':
description: Request processed (check success field!)
content:
application/json:
schema:
oneOf:
- type: object
title: Success
required:
- success
- message
- content_hash
- memory
properties:
success:
type: boolean
enum: [true]
message:
type: string
example: "Memory stored successfully"
content_hash:
type: string
memory:
$ref: '#/components/schemas/Memory'
- type: object
title: Duplicate
required:
- success
- message
- content_hash
properties:
success:
type: boolean
enum: [false]
message:
type: string
example: "Duplicate content detected"
content_hash:
type: string
memory:
type: 'null'
'400':
description: Invalid request
content:
application/json:
schema:
type: object
properties:
detail:
type: string
'401':
description: Unauthorized
content:
application/json:
schema:
type: object
properties:
detail:
type: string
example: "Invalid API key"
/search:
get:
summary: Search memories by content
parameters:
- name: q
in: query
required: true
schema:
type: string
- name: n_results
in: query
schema:
type: integer
default: 5
responses:
'200':
description: Search results
content:
application/json:
schema:
type: object
properties:
results:
type: array
items:
type: object
properties:
memory:
$ref: '#/components/schemas/Memory'
relevance_score:
type: number
minimum: 0
maximum: 1
/memories/search/tags:
get:
summary: Search memories by tags
parameters:
- name: tags
in: query
required: true
schema:
type: string
description: Comma-separated list of tags
responses:
'200':
description: Tag search results
content:
application/json:
schema:
type: object
properties:
memories:
type: array
items:
$ref: '#/components/schemas/Memory'
/memories/{content_hash}:
delete:
summary: Delete a memory by content hash
parameters:
- name: content_hash
in: path
required: true
schema:
type: string
responses:
'200':
description: Deletion result
content:
application/json:
schema:
type: object
properties:
success:
type: boolean
message:
type: string
'404':
description: Memory not found
content:
application/json:
schema:
type: object
properties:
detail:
type: string
components:
securitySchemes:
BearerAuth:
type: http
scheme: bearer
description: API key for authentication
schemas:
Memory:
type: object
required:
- content
- content_hash
- tags
- memory_type
- created_at_iso
properties:
content:
type: string
content_hash:
type: string
tags:
type: array
items:
type: string
memory_type:
type: string
metadata:
type: object
created_at:
type: number
created_at_iso:
type: string
format: date-time
updated_at:
type: number
updated_at_iso:
type: string
format: date-time
# Contract Test Cases
x-contract-tests:
critical-behaviors:
- name: "Memory storage returns 200 with success field"
description: "Server never returns 201 - always 200 with success boolean"
endpoint: "POST /memories"
expected:
status: 200
body_contains: ["success"]
- name: "Health check uses /api/health path"
description: "Health endpoint is /api/health not /health"
endpoint: "GET /health"
expected:
status: 200
- name: "URL construction preserves /api base path"
description: "Bridge must not replace /api when constructing URLs"
test: "URL construction"
- name: "Duplicate detection returns success=false"
description: "Duplicates return 200 with success=false, not error status"
endpoint: "POST /memories"
scenario: "duplicate_content"
expected:
status: 200
body:
success: false
```
--------------------------------------------------------------------------------
/tests/integration/test_oauth_basic_auth.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
OAuth 2.1 Basic Authentication Test
Tests both client_secret_basic (HTTP Basic auth) and client_secret_post (form data)
authentication methods for the OAuth token endpoint.
"""
import asyncio
import base64
import sys
from typing import Optional
import httpx
async def test_oauth_basic_auth(base_url: str = "http://localhost:8000") -> bool:
"""
Test OAuth 2.1 token endpoint with both Basic and form authentication.
Returns:
True if all tests pass, False otherwise
"""
print(f"Testing OAuth Basic Authentication at {base_url}")
print("=" * 60)
async with httpx.AsyncClient() as client:
try:
# Step 1: Register a client first
print("1. Registering OAuth client...")
registration_data = {
"client_name": "Basic Auth Test Client",
"redirect_uris": ["https://example.com/callback"],
"grant_types": ["authorization_code"],
"response_types": ["code"]
}
response = await client.post(
f"{base_url}/oauth/register",
json=registration_data
)
if response.status_code != 201:
print(f" ❌ Client registration failed: {response.status_code}")
print(f" Response: {response.text}")
return False
client_info = response.json()
client_id = client_info.get("client_id")
client_secret = client_info.get("client_secret")
if not client_id or not client_secret:
print(f" ❌ Missing client credentials in response")
return False
print(f" ✅ Client registered successfully")
print(f" 📋 Client ID: {client_id}")
# Step 2: Get authorization code
print("\n2. Getting authorization code...")
auth_params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": "https://example.com/callback",
"state": "test_state_basic_auth"
}
response = await client.get(
f"{base_url}/oauth/authorize",
params=auth_params,
follow_redirects=False
)
if response.status_code not in [302, 307]:
print(f" ❌ Authorization failed: {response.status_code}")
return False
location = response.headers.get("location", "")
if "code=" not in location:
print(f" ❌ No authorization code in redirect: {location}")
return False
# Extract authorization code
auth_code = None
for param in location.split("?")[1].split("&"):
if param.startswith("code="):
auth_code = param.split("=")[1]
break
if not auth_code:
print(f" ❌ Could not extract authorization code")
return False
print(f" ✅ Authorization code obtained")
# Step 3: Test token endpoint with HTTP Basic authentication
print("\n3. Testing Token Endpoint with HTTP Basic Auth...")
# Create Basic auth header
credentials = f"{client_id}:{client_secret}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
basic_auth_header = f"Basic {encoded_credentials}"
token_data = {
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": "https://example.com/callback"
# Note: client_id and client_secret NOT in form data for Basic auth
}
response = await client.post(
f"{base_url}/oauth/token",
data=token_data,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": basic_auth_header
}
)
if response.status_code != 200:
print(f" ❌ Basic auth token request failed: {response.status_code}")
print(f" Response: {response.text}")
return False
basic_token_response = response.json()
basic_access_token = basic_token_response.get("access_token")
if not basic_access_token:
print(f" ❌ No access token in Basic auth response")
return False
print(f" ✅ HTTP Basic authentication successful")
print(f" 📋 Token type: {basic_token_response.get('token_type')}")
# Step 4: Test the access token works for API calls
print("\n4. Testing Basic auth access token...")
headers = {"Authorization": f"Bearer {basic_access_token}"}
response = await client.get(f"{base_url}/api/memories", headers=headers)
if response.status_code == 200:
print(f" ✅ Basic auth access token works for API calls")
else:
print(f" ❌ Basic auth access token failed API call: {response.status_code}")
return False
# Step 5: Get a new authorization code for form-based test
print("\n5. Getting new authorization code for form auth test...")
auth_params["state"] = "test_state_form_auth"
response = await client.get(
f"{base_url}/oauth/authorize",
params=auth_params,
follow_redirects=False
)
location = response.headers.get("location", "")
form_auth_code = None
for param in location.split("?")[1].split("&"):
if param.startswith("code="):
form_auth_code = param.split("=")[1]
break
if not form_auth_code:
print(f" ❌ Could not get new authorization code")
return False
print(f" ✅ New authorization code obtained")
# Step 6: Test token endpoint with form-based authentication
print("\n6. Testing Token Endpoint with Form-based Auth...")
token_data = {
"grant_type": "authorization_code",
"code": form_auth_code,
"redirect_uri": "https://example.com/callback",
"client_id": client_id,
"client_secret": client_secret
# Note: credentials in form data, NO Authorization header
}
response = await client.post(
f"{base_url}/oauth/token",
data=token_data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
# Note: NO Authorization header
)
if response.status_code != 200:
print(f" ❌ Form auth token request failed: {response.status_code}")
print(f" Response: {response.text}")
return False
form_token_response = response.json()
form_access_token = form_token_response.get("access_token")
if not form_access_token:
print(f" ❌ No access token in form auth response")
return False
print(f" ✅ Form-based authentication successful")
print(f" 📋 Token type: {form_token_response.get('token_type')}")
# Step 7: Test the form-based access token works for API calls
print("\n7. Testing form auth access token...")
headers = {"Authorization": f"Bearer {form_access_token}"}
response = await client.get(f"{base_url}/api/memories", headers=headers)
if response.status_code == 200:
print(f" ✅ Form auth access token works for API calls")
else:
print(f" ❌ Form auth access token failed API call: {response.status_code}")
return False
print("\n" + "=" * 60)
print("🎉 All OAuth authentication methods work correctly!")
print("✅ HTTP Basic authentication (client_secret_basic)")
print("✅ Form-based authentication (client_secret_post)")
print("✅ Both access tokens work for protected API endpoints")
return True
except Exception as e:
print(f"\n❌ Test failed with exception: {e}")
return False
async def main():
"""Main test function."""
if len(sys.argv) > 1:
base_url = sys.argv[1]
else:
base_url = "http://localhost:8000"
print("OAuth 2.1 Basic Authentication Test")
print("===================================")
print(f"Target: {base_url}")
print()
print("This test verifies both HTTP Basic and form-based authentication")
print("methods work correctly with the OAuth token endpoint.")
print()
success = await test_oauth_basic_auth(base_url)
if success:
print("\n🚀 OAuth Basic authentication implementation is working perfectly!")
sys.exit(0)
else:
print("\n💥 OAuth Basic authentication tests failed")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/tests/timestamp/test_issue99_final_validation.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Final validation test for Issue #99 fix.
This test creates memories that SHOULD be in yesterday's range
and verifies they can be found by time-based searches.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))
import asyncio
import tempfile
import time
from datetime import datetime, timedelta
from mcp_memory_service.models.memory import Memory
from mcp_memory_service.utils.hashing import generate_content_hash
from mcp_memory_service.utils.time_parser import extract_time_expression
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
class Issue99FinalValidationTest:
"""Final validation test for Issue #99 timezone fix."""
def __init__(self):
self.storage = None
async def setup(self):
"""Set up test environment."""
print("=== Final Issue #99 Validation Test ===")
self.temp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
self.temp_db.close()
self.storage = SqliteVecMemoryStorage(
db_path=self.temp_db.name,
embedding_model="all-MiniLM-L6-v2"
)
await self.storage.initialize()
print(f"✅ Storage initialized")
async def cleanup(self):
"""Clean up test environment."""
self.storage = None
if hasattr(self, 'temp_db') and os.path.exists(self.temp_db.name):
os.unlink(self.temp_db.name)
async def test_timezone_fix_validation(self):
"""Validate that the timezone fix resolves Issue #99."""
print("\n🧪 Testing Issue #99 Fix: Timezone Handling")
print("-" * 50)
# Calculate actual yesterday timestamps
now = datetime.now()
yesterday = now - timedelta(days=1)
yesterday_start = yesterday.replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_middle = yesterday.replace(hour=12, minute=0, second=0, microsecond=0)
yesterday_end = yesterday.replace(hour=23, minute=59, second=59, microsecond=999999)
print(f"📅 Yesterday date range: {yesterday_start.date()}")
print(f"🕐 Yesterday timestamps: {yesterday_start.timestamp()} to {yesterday_end.timestamp()}")
# Create memories that should be found in yesterday's range
memories = [
{
"content": "Hook-style memory created yesterday morning",
"timestamp": yesterday_start.timestamp() + (2 * 60 * 60), # 2 AM yesterday
"tags": ["claude-code-session", "yesterday-morning"]
},
{
"content": "Manual note from yesterday afternoon",
"timestamp": yesterday_middle.timestamp() + (3 * 60 * 60), # 3 PM yesterday
"tags": ["manual-note", "yesterday-afternoon"]
},
{
"content": "Another hook memory from yesterday evening",
"timestamp": yesterday_end.timestamp() - (2 * 60 * 60), # 9 PM yesterday
"tags": ["claude-code-session", "yesterday-evening"]
}
]
# Store memories with specific yesterday timestamps
for i, mem_data in enumerate(memories):
memory = Memory(
content=mem_data["content"],
content_hash=generate_content_hash(mem_data["content"]),
tags=mem_data["tags"],
memory_type="test-memory",
created_at=mem_data["timestamp"],
created_at_iso=datetime.fromtimestamp(mem_data["timestamp"]).isoformat() + "Z"
)
success, message = await self.storage.store(memory)
if success:
print(f"✅ Stored memory {i+1}: {datetime.fromtimestamp(mem_data['timestamp'])}")
else:
print(f"❌ Failed to store memory {i+1}: {message}")
return False
# Test yesterday search
query = "yesterday"
cleaned_query, (start_ts, end_ts) = extract_time_expression(query)
print(f"\n🔍 Testing query: '{query}'")
print(f"📅 Search range: {datetime.fromtimestamp(start_ts)} to {datetime.fromtimestamp(end_ts)}")
# Perform search
search_results = await self.storage.retrieve(query, n_results=10)
print(f"🔍 Found {len(search_results)} memories")
# Check if we found the expected memories
found_count = 0
for result in search_results:
for mem_data in memories:
if mem_data["content"] in result.memory.content:
found_count += 1
print(f" ✅ Found: {result.memory.content}")
break
# Validation
expected_count = len(memories)
success = found_count == expected_count
print(f"\n📊 Results:")
print(f" Expected memories: {expected_count}")
print(f" Found memories: {found_count}")
print(f" Success: {success}")
if success:
print("🎉 Issue #99 FIXED: Time-based search now works correctly!")
else:
print("❌ Issue #99 NOT FIXED: Time-based search still has problems")
return success
async def test_hook_vs_manual_consistency(self):
"""Test that hook and manual memories are equally discoverable."""
print("\n🧪 Testing Hook vs Manual Memory Search Consistency")
print("-" * 50)
# Create one hook-style and one manual-style memory for today
now = time.time()
today_morning = now - (8 * 60 * 60) # 8 hours ago
hook_memory = Memory(
content="Hook-generated session summary from this morning",
content_hash=generate_content_hash("Hook-generated session summary from this morning"),
tags=["claude-code-session", "session-consolidation", "morning-work"],
memory_type="session-summary",
metadata={
"generated_by": "claude-code-session-end-hook",
"generated_at": datetime.fromtimestamp(today_morning).isoformat() + "Z"
},
created_at=today_morning
)
manual_memory = Memory(
content="Manual note added this morning about project status",
content_hash=generate_content_hash("Manual note added this morning about project status"),
tags=["manual-note", "project-status", "morning-work"],
memory_type="note",
metadata={
"created_by": "manual-storage",
"source": "user-input"
},
created_at=today_morning + 300 # 5 minutes later
)
# Store both memories
hook_result = await self.storage.store(hook_memory)
manual_result = await self.storage.store(manual_memory)
print(f"✅ Hook memory stored: {hook_result[0]}")
print(f"✅ Manual memory stored: {manual_result[0]}")
# Search for memories from today
query = "today morning"
search_results = await self.storage.retrieve(query, n_results=10)
hook_found = False
manual_found = False
for result in search_results:
if "Hook-generated session summary" in result.memory.content:
hook_found = True
if "Manual note added this morning" in result.memory.content:
manual_found = True
print(f"\n📊 Search Results for '{query}':")
print(f" Hook memory found: {hook_found}")
print(f" Manual memory found: {manual_found}")
print(f" Both equally discoverable: {hook_found and manual_found}")
return hook_found and manual_found
async def run_validation(self):
"""Run complete Issue #99 validation."""
try:
await self.setup()
# Run validation tests
timezone_fix = await self.test_timezone_fix_validation()
consistency_fix = await self.test_hook_vs_manual_consistency()
print("\n" + "=" * 60)
print("ISSUE #99 FINAL VALIDATION RESULTS")
print("=" * 60)
if timezone_fix:
print("✅ FIXED: Timezone handling in timestamp validation")
else:
print("❌ NOT FIXED: Timezone handling still has issues")
if consistency_fix:
print("✅ FIXED: Hook vs Manual memory search consistency")
else:
print("❌ NOT FIXED: Hook vs Manual memories still inconsistent")
overall_success = timezone_fix and consistency_fix
if overall_success:
print("\n🎉 ISSUE #99 COMPLETELY RESOLVED!")
print("✅ Time-based searches work correctly")
print("✅ Hook and manual memories are equally discoverable")
print("✅ Timezone inconsistencies have been fixed")
else:
print("\n⚠️ ISSUE #99 PARTIALLY RESOLVED")
print("Additional work may be needed")
return overall_success
finally:
await self.cleanup()
async def main():
"""Main validation execution."""
validator = Issue99FinalValidationTest()
success = await validator.run_validation()
return 0 if success else 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/embeddings/onnx_embeddings.py:
--------------------------------------------------------------------------------
```python
"""
ONNX-based embedding generation for MCP Memory Service.
Provides PyTorch-free embedding generation using ONNX Runtime.
Based on ONNXMiniLM_L6_V2 implementation.
"""
import hashlib
import json
import logging
import os
import tarfile
from pathlib import Path
from typing import List, Optional, Union
import numpy as np
logger = logging.getLogger(__name__)
# Try to import ONNX Runtime
try:
import onnxruntime as ort
ONNX_AVAILABLE = True
except ImportError:
ONNX_AVAILABLE = False
logger.warning("ONNX Runtime not available. Install with: pip install onnxruntime")
# Try to import tokenizers
try:
from tokenizers import Tokenizer
TOKENIZERS_AVAILABLE = True
except ImportError:
TOKENIZERS_AVAILABLE = False
logger.warning("Tokenizers not available. Install with: pip install tokenizers")
def _verify_sha256(fname: str, expected_sha256: str) -> bool:
"""Verify SHA256 hash of a file."""
sha256_hash = hashlib.sha256()
with open(fname, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest() == expected_sha256
class ONNXEmbeddingModel:
"""
ONNX-based embedding model that provides PyTorch-free embeddings.
Compatible with all-MiniLM-L6-v2 model.
"""
MODEL_NAME = "all-MiniLM-L6-v2"
DOWNLOAD_PATH = Path.home() / ".cache" / "mcp_memory" / "onnx_models" / MODEL_NAME
EXTRACTED_FOLDER_NAME = "onnx"
ARCHIVE_FILENAME = "onnx.tar.gz"
MODEL_DOWNLOAD_URL = (
"https://chroma-onnx-models.s3.amazonaws.com/all-MiniLM-L6-v2/onnx.tar.gz"
)
_MODEL_SHA256 = "913d7300ceae3b2dbc2c50d1de4baacab4be7b9380491c27fab7418616a16ec3"
def __init__(self, model_name: str = "all-MiniLM-L6-v2", preferred_providers: Optional[List[str]] = None):
"""
Initialize ONNX embedding model.
Args:
model_name: Name of the model (currently only all-MiniLM-L6-v2 supported)
preferred_providers: List of ONNX execution providers in order of preference
"""
if not ONNX_AVAILABLE:
raise ImportError("ONNX Runtime is required but not installed. Install with: pip install onnxruntime")
if not TOKENIZERS_AVAILABLE:
raise ImportError("Tokenizers is required but not installed. Install with: pip install tokenizers")
self.model_name = model_name
self._preferred_providers = preferred_providers or ['CPUExecutionProvider']
self._model = None
self._tokenizer = None
# Download model if needed
self._download_model_if_needed()
# Initialize the model
self._init_model()
def _download_model_if_needed(self):
"""Download and extract ONNX model if not present."""
if not self.DOWNLOAD_PATH.exists():
self.DOWNLOAD_PATH.mkdir(parents=True, exist_ok=True)
archive_path = self.DOWNLOAD_PATH / self.ARCHIVE_FILENAME
extracted_path = self.DOWNLOAD_PATH / self.EXTRACTED_FOLDER_NAME
# Check if model is already extracted
if extracted_path.exists() and (extracted_path / "model.onnx").exists():
logger.info(f"ONNX model already available at {extracted_path}")
return
# Download if not present or invalid
if not archive_path.exists() or not _verify_sha256(str(archive_path), self._MODEL_SHA256):
logger.info(f"Downloading ONNX model from {self.MODEL_DOWNLOAD_URL}")
try:
import httpx
with httpx.Client(timeout=30.0) as client:
response = client.get(self.MODEL_DOWNLOAD_URL)
response.raise_for_status()
with open(archive_path, "wb") as f:
f.write(response.content)
logger.info(f"Model downloaded to {archive_path}")
except Exception as e:
logger.error(f"Failed to download ONNX model: {e}")
raise RuntimeError(f"Could not download ONNX model: {e}")
# Extract the archive
logger.info(f"Extracting model to {extracted_path}")
with tarfile.open(archive_path, "r:gz") as tar:
tar.extractall(self.DOWNLOAD_PATH)
# Verify extraction
if not (extracted_path / "model.onnx").exists():
raise RuntimeError(f"Model extraction failed - model.onnx not found in {extracted_path}")
logger.info("ONNX model ready for use")
def _init_model(self):
"""Initialize ONNX model and tokenizer."""
model_path = self.DOWNLOAD_PATH / self.EXTRACTED_FOLDER_NAME / "model.onnx"
tokenizer_path = self.DOWNLOAD_PATH / self.EXTRACTED_FOLDER_NAME / "tokenizer.json"
if not model_path.exists():
raise FileNotFoundError(f"ONNX model not found at {model_path}")
if not tokenizer_path.exists():
raise FileNotFoundError(f"Tokenizer not found at {tokenizer_path}")
# Initialize ONNX session
logger.info(f"Loading ONNX model with providers: {self._preferred_providers}")
self._model = ort.InferenceSession(
str(model_path),
providers=self._preferred_providers
)
# Initialize tokenizer
self._tokenizer = Tokenizer.from_file(str(tokenizer_path))
# Get model info
self.embedding_dimension = self._model.get_outputs()[0].shape[-1]
logger.info(f"ONNX model loaded. Embedding dimension: {self.embedding_dimension}")
def encode(self, texts: Union[str, List[str]], convert_to_numpy: bool = True) -> np.ndarray:
"""
Generate embeddings for texts using ONNX model.
Args:
texts: Single text or list of texts to encode
convert_to_numpy: Whether to return numpy array (always True for compatibility)
Returns:
Numpy array of embeddings with shape (n_texts, embedding_dim)
"""
if isinstance(texts, str):
texts = [texts]
# Tokenize texts
encoded = self._tokenizer.encode_batch(texts)
# Prepare inputs for ONNX model
max_length = max(len(enc.ids) for enc in encoded)
# Pad sequences
input_ids = np.zeros((len(texts), max_length), dtype=np.int64)
attention_mask = np.zeros((len(texts), max_length), dtype=np.int64)
token_type_ids = np.zeros((len(texts), max_length), dtype=np.int64)
for i, enc in enumerate(encoded):
length = len(enc.ids)
input_ids[i, :length] = enc.ids
attention_mask[i, :length] = enc.attention_mask
token_type_ids[i, :length] = enc.type_ids
# Run inference
ort_inputs = {
"input_ids": input_ids,
"attention_mask": attention_mask,
"token_type_ids": token_type_ids,
}
outputs = self._model.run(None, ort_inputs)
# Extract embeddings (using mean pooling)
last_hidden_states = outputs[0]
# Mean pooling with attention mask
input_mask_expanded = attention_mask[..., np.newaxis].astype(np.float32)
sum_embeddings = np.sum(last_hidden_states * input_mask_expanded, axis=1)
sum_mask = np.clip(input_mask_expanded.sum(axis=1), a_min=1e-9, a_max=None)
embeddings = sum_embeddings / sum_mask
# Normalize embeddings
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
return embeddings
@property
def device(self):
"""Return device info for compatibility."""
return "cpu" # ONNX runtime handles device selection internally
def get_onnx_embedding_model(model_name: str = "all-MiniLM-L6-v2") -> Optional[ONNXEmbeddingModel]:
"""
Get ONNX embedding model if available.
Args:
model_name: Name of the model to load
Returns:
ONNXEmbeddingModel instance or None if ONNX is not available
"""
if not ONNX_AVAILABLE:
logger.warning("ONNX Runtime not available")
return None
if not TOKENIZERS_AVAILABLE:
logger.warning("Tokenizers not available")
return None
try:
# Detect best available providers
available_providers = ort.get_available_providers()
preferred_providers = []
# Prefer GPU providers if available
if 'CUDAExecutionProvider' in available_providers:
preferred_providers.append('CUDAExecutionProvider')
if 'DirectMLExecutionProvider' in available_providers:
preferred_providers.append('DirectMLExecutionProvider')
if 'CoreMLExecutionProvider' in available_providers:
preferred_providers.append('CoreMLExecutionProvider')
# Always include CPU as fallback
preferred_providers.append('CPUExecutionProvider')
logger.info(f"Creating ONNX model with providers: {preferred_providers}")
return ONNXEmbeddingModel(model_name, preferred_providers)
except Exception as e:
logger.error(f"Failed to create ONNX embedding model: {e}")
return None
```
--------------------------------------------------------------------------------
/docs/examples/memory-distribution-chart.jsx:
--------------------------------------------------------------------------------
```javascript
import React from 'react';
import { BarChart, Bar, XAxis, YAxis, CartesianGrid, Tooltip, Legend, ResponsiveContainer } from 'recharts';
/**
* Memory Distribution Chart Component
*
* A comprehensive visualization component for displaying monthly memory storage
* distribution with insights, statistics, and interactive features.
*
* Features:
* - Responsive bar chart with monthly distribution
* - Custom tooltips with percentages
* - Statistics cards for key metrics
* - Automatic insights generation
* - Professional styling and layout
*
* Usage:
* 1. Install dependencies: npm install recharts
* 2. Import and use: <MemoryDistributionChart data={yourData} />
* 3. Or use with sample data as shown below
*/
const MemoryDistributionChart = ({ data = null, title = "Memory Storage Distribution by Month" }) => {
// Sample data based on real MCP Memory Service analysis
// Replace with actual data from your analytics pipeline
const defaultData = [
{ month: "Jan 2025", count: 50, monthKey: "2025-01" },
{ month: "Feb 2025", count: 15, monthKey: "2025-02" },
{ month: "Mar 2025", count: 8, monthKey: "2025-03" },
{ month: "Apr 2025", count: 12, monthKey: "2025-04" },
{ month: "May 2025", count: 4, monthKey: "2025-05" },
{ month: "Jun 2025", count: 45, monthKey: "2025-06" }
];
const monthlyData = data || defaultData;
const totalMemories = monthlyData.reduce((sum, item) => sum + item.count, 0);
// Calculate statistics
const peakMonth = monthlyData.reduce((max, item) =>
item.count > max.count ? item : max, monthlyData[0]);
const averagePerMonth = (totalMemories / monthlyData.length).toFixed(1);
// Find most recent month with data
const recentMonth = monthlyData[monthlyData.length - 1];
// Custom tooltip component
const CustomTooltip = ({ active, payload, label }) => {
if (active && payload && payload.length) {
const data = payload[0].payload;
const percentage = ((data.count / totalMemories) * 100).toFixed(1);
return (
<div className="bg-white p-3 border border-gray-300 rounded-lg shadow-lg">
<p className="font-semibold text-gray-800">{label}</p>
<p className="text-blue-600">
<span className="font-medium">Memories: </span>
{data.count}
</p>
<p className="text-gray-600">
<span className="font-medium">Percentage: </span>
{percentage}%
</p>
</div>
);
}
return null;
};
// Custom label function for bars
const renderCustomLabel = (entry) => {
if (entry.count > 5) { // Only show labels for bars with more than 5 memories
return entry.count;
}
return null;
};
// Generate insights based on data patterns
const generateInsights = () => {
const insights = [];
// Peak activity insight
const peakPercentage = ((peakMonth.count / totalMemories) * 100).toFixed(1);
insights.push(`Peak activity in ${peakMonth.month} (${peakPercentage}% of total memories)`);
// Recent activity insight
const recentPercentage = ((recentMonth.count / totalMemories) * 100).toFixed(1);
if (recentMonth.count > averagePerMonth) {
insights.push(`High recent activity: ${recentMonth.month} above average`);
}
// Growth pattern insight
const firstMonth = monthlyData[0];
const lastMonth = monthlyData[monthlyData.length - 1];
if (lastMonth.count > firstMonth.count * 0.8) {
insights.push(`Sustained activity: Recent months maintain high productivity`);
}
return insights;
};
const insights = generateInsights();
return (
<div className="w-full max-w-6xl mx-auto p-6 bg-gray-50 rounded-lg">
{/* Header Section */}
<div className="mb-6">
<h2 className="text-2xl font-bold text-gray-800 mb-2">
{title}
</h2>
<p className="text-gray-600">
Total memories analyzed: <span className="font-semibold text-blue-600">{totalMemories}</span> memories
</p>
</div>
{/* Main Chart */}
<div className="bg-white p-4 rounded-lg shadow-sm mb-6">
<ResponsiveContainer width="100%" height={400}>
<BarChart
data={monthlyData}
margin={{
top: 20,
right: 30,
left: 20,
bottom: 5,
}}
>
<CartesianGrid strokeDasharray="3 3" stroke="#f0f0f0" />
<XAxis
dataKey="month"
tick={{ fontSize: 12 }}
tickLine={{ stroke: '#d1d5db' }}
axisLine={{ stroke: '#d1d5db' }}
/>
<YAxis
tick={{ fontSize: 12 }}
tickLine={{ stroke: '#d1d5db' }}
axisLine={{ stroke: '#d1d5db' }}
label={{
value: 'Number of Memories',
angle: -90,
position: 'insideLeft',
style: { textAnchor: 'middle', fontSize: '12px', fill: '#6b7280' }
}}
/>
<Tooltip content={<CustomTooltip />} />
<Legend />
<Bar
dataKey="count"
name="Memories Stored"
fill="#3b82f6"
radius={[4, 4, 0, 0]}
label={renderCustomLabel}
/>
</BarChart>
</ResponsiveContainer>
</div>
{/* Statistics Cards */}
<div className="grid grid-cols-1 md:grid-cols-3 gap-4 mb-6">
<div className="bg-blue-50 p-4 rounded-lg">
<h3 className="font-semibold text-blue-800 mb-2">Peak Month</h3>
<p className="text-lg font-bold text-blue-600">{peakMonth.month}</p>
<p className="text-sm text-blue-600">
{peakMonth.count} memories ({((peakMonth.count / totalMemories) * 100).toFixed(1)}%)
</p>
</div>
<div className="bg-green-50 p-4 rounded-lg">
<h3 className="font-semibold text-green-800 mb-2">Recent Activity</h3>
<p className="text-lg font-bold text-green-600">{recentMonth.month}</p>
<p className="text-sm text-green-600">
{recentMonth.count} memories ({((recentMonth.count / totalMemories) * 100).toFixed(1)}%)
</p>
</div>
<div className="bg-amber-50 p-4 rounded-lg">
<h3 className="font-semibold text-amber-800 mb-2">Average/Month</h3>
<p className="text-lg font-bold text-amber-600">{averagePerMonth}</p>
<p className="text-sm text-amber-600">memories per month</p>
</div>
</div>
{/* Insights Section */}
<div className="bg-white p-4 rounded-lg shadow-sm">
<h3 className="font-semibold text-gray-800 mb-3">📊 Data Insights</h3>
<div className="space-y-2">
{insights.map((insight, index) => (
<div key={index} className="flex items-start">
<span className="text-blue-500 mr-2">•</span>
<p className="text-sm text-gray-600">{insight}</p>
</div>
))}
</div>
<div className="mt-4 pt-4 border-t border-gray-200">
<p className="text-xs text-gray-500">
<strong>Analysis Pattern:</strong> This distribution shows typical software development
lifecycle phases - high initial activity (project setup), consolidation periods,
and renewed intensive development phases.
</p>
</div>
</div>
</div>
);
};
export default MemoryDistributionChart;
/**
* Usage Examples:
*
* 1. Basic Usage (with sample data):
* <MemoryDistributionChart />
*
* 2. With Custom Data:
* const myData = [
* { month: "Jan 2025", count: 25, monthKey: "2025-01" },
* { month: "Feb 2025", count: 30, monthKey: "2025-02" },
* // ... more data
* ];
* <MemoryDistributionChart data={myData} title="My Project Analysis" />
*
* 3. Integration with MCP Memory Service:
*
* async function loadMemoryData() {
* const memories = await recall_memory({
* "query": "memories from this year",
* "n_results": 500
* });
*
* // Process memories into chart format
* const processedData = processMemoriesForChart(memories);
* return processedData;
* }
*
* function processMemoriesForChart(memories) {
* const monthlyDistribution = {};
*
* memories.forEach(memory => {
* const date = new Date(memory.timestamp);
* const monthKey = `${date.getFullYear()}-${String(date.getMonth() + 1).padStart(2, '0')}`;
*
* if (!monthlyDistribution[monthKey]) {
* monthlyDistribution[monthKey] = 0;
* }
* monthlyDistribution[monthKey]++;
* });
*
* return Object.entries(monthlyDistribution)
* .sort(([a], [b]) => a.localeCompare(b))
* .map(([month, count]) => {
* const [year, monthNum] = month.split('-');
* const monthNames = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
* 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'];
* const monthName = monthNames[parseInt(monthNum) - 1];
*
* return {
* month: `${monthName} ${year}`,
* count: count,
* monthKey: month
* };
* });
* }
*
* Dependencies:
* npm install recharts
*
* For Tailwind CSS styling, ensure you have Tailwind configured in your project.
*/
```
--------------------------------------------------------------------------------
/docs/quick-setup-cloudflare-dual-environment.md:
--------------------------------------------------------------------------------
```markdown
# Quick Setup: Cloudflare Backend for Claude Desktop + Claude Code
This guide provides streamlined instructions to configure Cloudflare backend for both Claude Desktop and Claude Code simultaneously.
## 🎯 Overview
This setup ensures both environments use the same Cloudflare backend for consistent memory storage across Claude Desktop and Claude Code.
**Expected Result:**
- Claude Desktop: ✅ Cloudflare backend with 1000+ memories
- Claude Code: ✅ Cloudflare backend with same memories
- Health checks show: `"backend": "cloudflare"` and `"storage_type": "CloudflareStorage"`
## ⚡ Quick Setup (5 minutes)
### Step 1: Prepare Cloudflare Resources
If you don't have Cloudflare resources yet:
```bash
# Install wrangler CLI
npm install -g wrangler
# Login and create resources
wrangler login
wrangler vectorize create mcp-memory-index --dimensions=768 --metric=cosine
wrangler d1 create mcp-memory-db
# Note the database ID from output
```
### Step 2: Create Environment Configuration
Create `.env` file in the project root:
```bash
cd C:/REPOSITORIES/mcp-memory-service
# Create .env file with your Cloudflare credentials
cat > .env << 'EOF'
# MCP Memory Service Environment Configuration
MCP_MEMORY_STORAGE_BACKEND=cloudflare
# Cloudflare D1 Database Configuration
CLOUDFLARE_API_TOKEN=your-api-token-here
CLOUDFLARE_ACCOUNT_ID=your-account-id-here
CLOUDFLARE_D1_DATABASE_ID=your-d1-database-id-here
CLOUDFLARE_VECTORIZE_INDEX=mcp-memory-index
# Backup paths (for fallback)
MCP_MEMORY_BACKUPS_PATH=C:\Users\your-username\AppData\Local\mcp-memory\backups
MCP_MEMORY_SQLITE_PATH=C:\Users\your-username\AppData\Local\mcp-memory\backups\sqlite_vec.db
EOF
```
### Step 3: Configure Claude Desktop
Update `~/.claude.json` (or `%APPDATA%\Claude\claude_desktop_config.json` on Windows):
```json
{
"mcpServers": {
"memory": {
"command": "python",
"args": ["-m", "mcp_memory_service.server"],
"cwd": "C:/REPOSITORIES/mcp-memory-service",
"env": {
"MCP_MEMORY_STORAGE_BACKEND": "cloudflare",
"CLOUDFLARE_API_TOKEN": "your-api-token-here",
"CLOUDFLARE_ACCOUNT_ID": "your-account-id-here",
"CLOUDFLARE_D1_DATABASE_ID": "your-d1-database-id-here",
"CLOUDFLARE_VECTORIZE_INDEX": "mcp-memory-index",
"MCP_MEMORY_BACKUPS_PATH": "C:\\Users\\your-username\\AppData\\Local\\mcp-memory\\backups",
"MCP_MEMORY_SQLITE_PATH": "C:\\Users\\your-username\\AppData\\Local\\mcp-memory\\backups\\sqlite_vec.db"
}
}
}
}
```
### Step 4: Configure Claude Code
```bash
# Navigate to project directory
cd C:/REPOSITORIES/mcp-memory-service
# Add memory server with explicit environment variables
claude mcp add memory python \
-e MCP_MEMORY_STORAGE_BACKEND=cloudflare \
-e CLOUDFLARE_API_TOKEN=your-api-token-here \
-e CLOUDFLARE_ACCOUNT_ID=your-account-id-here \
-e CLOUDFLARE_D1_DATABASE_ID=your-d1-database-id-here \
-e CLOUDFLARE_VECTORIZE_INDEX=mcp-memory-index \
-e MCP_MEMORY_BACKUPS_PATH="C:\Users\your-username\AppData\Local\mcp-memory\backups" \
-e MCP_MEMORY_SQLITE_PATH="C:\Users\your-username\AppData\Local\mcp-memory\backups\sqlite_vec.db" \
-- -m mcp_memory_service.server
```
### Step 5: Verify Configuration
**Test Claude Desktop:**
1. Restart Claude Desktop
2. Open a new conversation
3. Ask: "Check memory health"
4. Should show: `"backend": "cloudflare"` and `"storage_type": "CloudflareStorage"`
**Test Claude Code:**
```bash
# Check MCP server status
claude mcp list
# Should show: memory: python -m mcp_memory_service.server - ✓ Connected
```
## 🔧 Configuration Templates
### Claude Desktop Template (`claude_desktop_config.json`)
```json
{
"mcpServers": {
"memory": {
"command": "python",
"args": ["-m", "mcp_memory_service.server"],
"cwd": "C:/REPOSITORIES/mcp-memory-service",
"env": {
"MCP_MEMORY_STORAGE_BACKEND": "cloudflare",
"CLOUDFLARE_API_TOKEN": "YOUR_TOKEN_HERE",
"CLOUDFLARE_ACCOUNT_ID": "YOUR_ACCOUNT_ID_HERE",
"CLOUDFLARE_D1_DATABASE_ID": "YOUR_D1_DATABASE_ID_HERE",
"CLOUDFLARE_VECTORIZE_INDEX": "mcp-memory-index",
"MCP_MEMORY_BACKUPS_PATH": "C:\\Users\\USERNAME\\AppData\\Local\\mcp-memory\\backups",
"MCP_MEMORY_SQLITE_PATH": "C:\\Users\\USERNAME\\AppData\\Local\\mcp-memory\\backups\\sqlite_vec.db"
}
}
}
}
```
### Project Environment Template (`.env`)
```bash
# Storage Backend Configuration
MCP_MEMORY_STORAGE_BACKEND=cloudflare
# Required Cloudflare Settings
CLOUDFLARE_API_TOKEN=YOUR_TOKEN_HERE
CLOUDFLARE_ACCOUNT_ID=YOUR_ACCOUNT_ID_HERE
CLOUDFLARE_D1_DATABASE_ID=YOUR_D1_DATABASE_ID_HERE
CLOUDFLARE_VECTORIZE_INDEX=mcp-memory-index
# Optional Settings
CLOUDFLARE_R2_BUCKET=mcp-memory-content
CLOUDFLARE_EMBEDDING_MODEL=@cf/baai/bge-base-en-v1.5
CLOUDFLARE_LARGE_CONTENT_THRESHOLD=1048576
CLOUDFLARE_MAX_RETRIES=3
CLOUDFLARE_BASE_DELAY=1.0
# Backup Configuration
MCP_MEMORY_BACKUPS_PATH=C:\Users\USERNAME\AppData\Local\mcp-memory\backups
MCP_MEMORY_SQLITE_PATH=C:\Users\USERNAME\AppData\Local\mcp-memory\backups\sqlite_vec.db
# Logging
LOG_LEVEL=INFO
```
## ✅ Validation Commands
### Quick Health Check
```bash
# Test configuration loading
cd C:/REPOSITORIES/mcp-memory-service
python -c "
from src.mcp_memory_service.config import STORAGE_BACKEND, CLOUDFLARE_API_TOKEN
print(f'Backend: {STORAGE_BACKEND}')
print(f'Token set: {bool(CLOUDFLARE_API_TOKEN)}')
"
# Test server initialization
python scripts/validation/diagnose_backend_config.py
```
### Expected Health Check Results
**Cloudflare Backend (Correct):**
```json
{
"validation": {
"status": "healthy",
"message": "Cloudflare storage validation successful"
},
"statistics": {
"backend": "cloudflare",
"storage_backend": "cloudflare",
"total_memories": 1073,
"vectorize_index": "mcp-memory-index",
"d1_database_id": "f745e9b4-ba8e-4d47-b38f-12af91060d5a"
},
"performance": {
"server": {
"storage_type": "CloudflareStorage"
}
}
}
```
**SQLite-vec Fallback (Incorrect):**
```json
{
"statistics": {
"backend": "sqlite-vec",
"storage_backend": "sqlite-vec"
},
"performance": {
"server": {
"storage_type": "SqliteVecMemoryStorage"
}
}
}
```
## 🚨 Troubleshooting
### Issue: Health Check Shows SQLite-vec Instead of Cloudflare
**Root Cause:** Environment variables not loading properly in execution context.
**Solutions:**
1. **Claude Desktop:**
- Ensure `cwd` is set to project directory
- Use explicit `env` variables in MCP configuration
- Restart Claude Desktop after config changes
2. **Claude Code:**
- Use explicit `-e` environment variables in `claude mcp add`
- Ensure command runs from project directory
- Remove and re-add memory server to pick up changes
3. **Both Environments:**
- Verify `.env` file exists and contains correct values
- Check API token permissions (Vectorize:Edit, D1:Edit, Workers AI:Read)
- Test Cloudflare connectivity manually
### Issue: "Missing required environment variables"
```bash
# Check if variables are being loaded
cd C:/REPOSITORIES/mcp-memory-service
python -c "
import os
from dotenv import load_dotenv
load_dotenv('.env')
print('CLOUDFLARE_API_TOKEN:', 'SET' if os.getenv('CLOUDFLARE_API_TOKEN') else 'NOT SET')
print('CLOUDFLARE_ACCOUNT_ID:', os.getenv('CLOUDFLARE_ACCOUNT_ID', 'NOT SET'))
"
```
### Issue: Different Memory Counts Between Environments
This indicates environments are using different backends:
- **Same count (e.g., 1073):** Both using Cloudflare ✅
- **Different counts:** One using SQLite-vec fallback ❌
**Fix:** Follow troubleshooting steps above to ensure both use Cloudflare.
### Issue: Connection Failed or Authentication Errors
1. **Verify API Token:**
```bash
curl -X GET "https://api.cloudflare.com/client/v4/user/tokens/verify" \
-H "Authorization: Bearer YOUR_API_TOKEN"
```
2. **Check Resource IDs:**
```bash
# List Vectorize indexes
curl -X GET "https://api.cloudflare.com/client/v4/accounts/YOUR_ACCOUNT_ID/vectorize/v2/indexes" \
-H "Authorization: Bearer YOUR_API_TOKEN"
# List D1 databases
curl -X GET "https://api.cloudflare.com/client/v4/accounts/YOUR_ACCOUNT_ID/d1/database" \
-H "Authorization: Bearer YOUR_API_TOKEN"
```
## 🔄 Migration from SQLite-vec
If you have existing memories in SQLite-vec:
```bash
# Export existing memories
python scripts/export_sqlite_vec.py --output cloudflare_export.json
# Switch to Cloudflare (follow setup above)
# Import to Cloudflare
python scripts/import_to_cloudflare.py --input cloudflare_export.json
```
## 📝 Configuration Management
### Single Source of Truth
- **Global Config:** `~/.claude.json` (Claude Desktop) - authoritative
- **Project Config:** `.env` file (development) - for local development
- **Avoid:** Multiple conflicting configurations
### Environment Variable Precedence
1. Explicit MCP server `env` variables (highest priority)
2. System environment variables
3. `.env` file variables
4. Default values (lowest priority)
## 🎯 Success Criteria
Both Claude Desktop and Claude Code should show:
✅ **Health Check:** `"backend": "cloudflare"`
✅ **Storage Type:** `"CloudflareStorage"`
✅ **Memory Count:** Same number across environments
✅ **Database ID:** Same Cloudflare D1 database ID
✅ **Index:** Same Vectorize index name
When successful, memories will be synchronized across both environments automatically!
```
--------------------------------------------------------------------------------
/tests/unit/test_mdns_simple.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Simple test script for mDNS functionality without external test frameworks.
"""
import asyncio
import sys
import os
import traceback
from unittest.mock import Mock, AsyncMock, patch
# Add the src directory to the Python path
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'src'))
def run_test(test_func, test_name):
"""Run a single test function and handle exceptions."""
try:
if asyncio.iscoroutinefunction(test_func):
asyncio.run(test_func())
else:
test_func()
print(f"✅ {test_name}")
return True
except Exception as e:
print(f"❌ {test_name}: {e}")
tb_lines = traceback.format_exc().split('\n')
print(f" {tb_lines[-3].strip()}")
return False
def test_imports():
"""Test that mDNS modules can be imported."""
from mcp_memory_service.discovery.mdns_service import (
ServiceAdvertiser, ServiceDiscovery, DiscoveryListener, ServiceDetails
)
from mcp_memory_service.discovery.client import DiscoveryClient, HealthStatus
# Test ServiceDetails creation
service_info = Mock()
details = ServiceDetails(
name="Test Service",
host="192.168.1.100",
port=8000,
https=False,
api_version="2.1.0",
requires_auth=True,
service_info=service_info
)
assert details.url == "http://192.168.1.100:8000"
assert details.api_url == "http://192.168.1.100:8000/api"
def test_service_advertiser_init():
"""Test ServiceAdvertiser initialization."""
from mcp_memory_service.discovery.mdns_service import ServiceAdvertiser
# Test default initialization
advertiser = ServiceAdvertiser()
assert advertiser.service_name == "MCP Memory Service"
assert advertiser.service_type == "_mcp-memory._tcp.local."
assert advertiser.port == 8000
assert advertiser._registered is False
# Test custom initialization
custom_advertiser = ServiceAdvertiser(
service_name="Custom Service",
port=8443,
https_enabled=True
)
assert custom_advertiser.service_name == "Custom Service"
assert custom_advertiser.port == 8443
assert custom_advertiser.https_enabled is True
async def test_service_advertiser_start_stop():
"""Test ServiceAdvertiser start/stop with mocks."""
from mcp_memory_service.discovery.mdns_service import ServiceAdvertiser
with patch('mcp_memory_service.discovery.mdns_service.AsyncZeroconf') as mock_zeroconf_class:
mock_zeroconf = AsyncMock()
mock_zeroconf_class.return_value = mock_zeroconf
advertiser = ServiceAdvertiser()
with patch.object(advertiser, '_create_service_info') as mock_create_info:
mock_service_info = Mock()
mock_create_info.return_value = mock_service_info
# Test start
result = await advertiser.start()
assert result is True
assert advertiser._registered is True
# Test stop
await advertiser.stop()
assert advertiser._registered is False
def test_service_discovery_init():
"""Test ServiceDiscovery initialization."""
from mcp_memory_service.discovery.mdns_service import ServiceDiscovery
discovery = ServiceDiscovery()
assert discovery.service_type == "_mcp-memory._tcp.local."
assert discovery.discovery_timeout == 5
assert discovery._discovering is False
async def test_service_discovery_operations():
"""Test ServiceDiscovery operations with mocks."""
from mcp_memory_service.discovery.mdns_service import ServiceDiscovery, ServiceDetails
with patch('mcp_memory_service.discovery.mdns_service.AsyncZeroconf'), \
patch('mcp_memory_service.discovery.mdns_service.AsyncServiceBrowser'):
discovery = ServiceDiscovery(discovery_timeout=1)
# Test get_discovered_services with no listener
services = discovery.get_discovered_services()
assert len(services) == 0
# Test with mock listener
mock_listener = Mock()
mock_service = ServiceDetails(
name="Test Service",
host="192.168.1.100",
port=8000,
https=False,
api_version="2.1.0",
requires_auth=False,
service_info=Mock()
)
mock_listener.services = {"test": mock_service}
discovery._listener = mock_listener
services = discovery.get_discovered_services()
assert len(services) == 1
assert services[0] == mock_service
def test_discovery_listener():
"""Test DiscoveryListener functionality."""
from mcp_memory_service.discovery.mdns_service import DiscoveryListener
# Test initialization
listener = DiscoveryListener()
assert listener.callback is None
assert len(listener.services) == 0
# Test with callback
callback = Mock()
listener_with_callback = DiscoveryListener(callback)
assert listener_with_callback.callback == callback
def test_discovery_client_init():
"""Test DiscoveryClient initialization."""
from mcp_memory_service.discovery.client import DiscoveryClient
client = DiscoveryClient()
assert client.discovery_timeout == 5
custom_client = DiscoveryClient(discovery_timeout=10)
assert custom_client.discovery_timeout == 10
async def test_discovery_client_operations():
"""Test DiscoveryClient operations with mocks."""
from mcp_memory_service.discovery.client import DiscoveryClient, HealthStatus
from mcp_memory_service.discovery.mdns_service import ServiceDetails
client = DiscoveryClient()
# Test discover_services
mock_service = ServiceDetails(
name="Test Service",
host="192.168.1.100",
port=8000,
https=False,
api_version="2.1.0",
requires_auth=False,
service_info=Mock()
)
with patch.object(client._discovery, 'discover_services', return_value=[mock_service]):
services = await client.discover_services()
assert len(services) == 1
assert services[0] == mock_service
def test_health_status():
"""Test HealthStatus dataclass."""
from mcp_memory_service.discovery.client import HealthStatus
health = HealthStatus(
healthy=True,
status='ok',
backend='sqlite_vec',
statistics={'memory_count': 100},
response_time_ms=50.0
)
assert health.healthy is True
assert health.status == 'ok'
assert health.backend == 'sqlite_vec'
assert health.response_time_ms == 50.0
def test_service_details_properties():
"""Test ServiceDetails URL properties."""
from mcp_memory_service.discovery.mdns_service import ServiceDetails
# Test HTTP service
http_service = ServiceDetails(
name="HTTP Service",
host="192.168.1.100",
port=8000,
https=False,
api_version="2.1.0",
requires_auth=False,
service_info=Mock()
)
assert http_service.url == "http://192.168.1.100:8000"
assert http_service.api_url == "http://192.168.1.100:8000/api"
# Test HTTPS service
https_service = ServiceDetails(
name="HTTPS Service",
host="192.168.1.100",
port=8443,
https=True,
api_version="2.1.0",
requires_auth=True,
service_info=Mock()
)
assert https_service.url == "https://192.168.1.100:8443"
assert https_service.api_url == "https://192.168.1.100:8443/api"
def main():
"""Run all tests."""
print("🔧 MCP Memory Service - mDNS Unit Tests")
print("=" * 50)
tests = [
(test_imports, "Import mDNS modules"),
(test_service_advertiser_init, "ServiceAdvertiser initialization"),
(test_service_advertiser_start_stop, "ServiceAdvertiser start/stop"),
(test_service_discovery_init, "ServiceDiscovery initialization"),
(test_service_discovery_operations, "ServiceDiscovery operations"),
(test_discovery_listener, "DiscoveryListener functionality"),
(test_discovery_client_init, "DiscoveryClient initialization"),
(test_discovery_client_operations, "DiscoveryClient operations"),
(test_health_status, "HealthStatus dataclass"),
(test_service_details_properties, "ServiceDetails properties"),
]
passed = 0
total = len(tests)
for test_func, test_name in tests:
if run_test(test_func, test_name):
passed += 1
print("\n" + "=" * 50)
print(f"Results: {passed}/{total} tests passed")
if passed == total:
print("🎉 All mDNS unit tests passed!")
return 0
else:
print("❌ Some tests failed!")
return 1
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/scripts/benchmarks/benchmark_hybrid_sync.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Benchmark hybrid storage sync performance optimizations (v8.27.0).
Tests the performance improvements from:
- Bulk existence checking (get_all_content_hashes)
- Parallel processing with asyncio.gather
- Larger batch sizes for initial sync
Usage:
python scripts/benchmarks/benchmark_hybrid_sync.py
"""
import asyncio
import time
import sys
from pathlib import Path
from typing import List
from dataclasses import dataclass
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
from mcp_memory_service.storage.sqlite_vec import SQLiteVecStorage
from mcp_memory_service.models.memory import Memory
from mcp_memory_service import config
@dataclass
class BenchmarkResult:
"""Results from a sync benchmark run."""
operation: str
duration_ms: float
memories_processed: int
memories_per_second: float
optimization_used: str
async def benchmark_bulk_existence_check():
"""Benchmark bulk existence check vs individual queries."""
print("\n" + "=" * 80)
print("BENCHMARK 1: Bulk Existence Check")
print("=" * 80)
# Create test storage
storage = SQLiteVecStorage(config.SQLITE_VEC_PATH)
await storage.initialize()
# Get stats
stats = await storage.get_stats()
total_memories = stats.get('total_memories', 0)
print(f"Database contains: {total_memories} memories")
print()
if total_memories < 100:
print("⚠️ Insufficient memories for meaningful benchmark (need 100+)")
print(" Run with existing production database for accurate results")
return None
# Test 1: Individual queries (OLD METHOD - simulated)
print("Test 1: Individual hash queries (old method - simulated)")
test_count = min(100, total_memories)
# Get sample hashes
all_memories = await storage.get_all_memories(limit=test_count)
test_hashes = [m.content_hash for m in all_memories[:test_count]]
start = time.time()
for content_hash in test_hashes:
exists = await storage.get_by_hash(content_hash)
individual_duration = (time.time() - start) * 1000
print(f" Checked {test_count} hashes individually: {individual_duration:.1f}ms")
print(f" Average: {individual_duration / test_count:.2f}ms per check")
# Test 2: Bulk hash loading (NEW METHOD)
print("\nTest 2: Bulk hash loading (new method)")
start = time.time()
all_hashes = await storage.get_all_content_hashes()
bulk_duration = (time.time() - start) * 1000
print(f" Loaded {len(all_hashes)} hashes in bulk: {bulk_duration:.1f}ms")
print(f" Average lookup: O(1) constant time")
# Calculate improvement
speedup = individual_duration / bulk_duration if bulk_duration > 0 else 0
print(f"\n📊 Results:")
print(f" Speedup: {speedup:.1f}x faster for {test_count} checks")
print(f" For 2,619 memories: {(individual_duration / test_count * 2619):.0f}ms → {bulk_duration:.0f}ms")
print(f" Time saved: {((individual_duration / test_count * 2619) - bulk_duration):.0f}ms")
return BenchmarkResult(
operation="bulk_existence_check",
duration_ms=bulk_duration,
memories_processed=len(all_hashes),
memories_per_second=len(all_hashes) / (bulk_duration / 1000) if bulk_duration > 0 else 0,
optimization_used="get_all_content_hashes()"
)
async def benchmark_parallel_processing():
"""Benchmark parallel vs sequential memory processing."""
print("\n" + "=" * 80)
print("BENCHMARK 2: Parallel Processing")
print("=" * 80)
# Create test storage
storage = SQLiteVecStorage(config.SQLITE_VEC_PATH)
await storage.initialize()
# Create test memories (don't actually store them)
test_memories = []
for i in range(50): # Test with 50 memories
test_memories.append(Memory(
content=f"Benchmark test memory {i} with some content for embedding generation",
content_hash=f"test_hash_{i}",
tags=["benchmark", "test"],
memory_type="test"
))
print(f"Testing with {len(test_memories)} memories")
print()
# Test 1: Sequential processing (OLD METHOD - simulated)
print("Test 1: Sequential processing (old method - simulated)")
start = time.time()
# Simulate sequential hash checks
local_hashes = await storage.get_all_content_hashes()
for memory in test_memories:
# Simulate existence check
exists = memory.content_hash in local_hashes
sequential_duration = (time.time() - start) * 1000
print(f" Processed {len(test_memories)} memories sequentially: {sequential_duration:.1f}ms")
print(f" Average: {sequential_duration / len(test_memories):.2f}ms per memory")
# Test 2: Parallel processing (NEW METHOD - simulated)
print("\nTest 2: Parallel processing with Semaphore(15)")
semaphore = asyncio.Semaphore(15)
async def process_memory(memory):
async with semaphore:
exists = memory.content_hash in local_hashes
# Simulate some async work
await asyncio.sleep(0.001)
return exists
start = time.time()
tasks = [process_memory(mem) for mem in test_memories]
await asyncio.gather(*tasks, return_exceptions=True)
parallel_duration = (time.time() - start) * 1000
print(f" Processed {len(test_memories)} memories in parallel: {parallel_duration:.1f}ms")
print(f" Concurrency: Up to 15 simultaneous operations")
# Calculate improvement
speedup = sequential_duration / parallel_duration if parallel_duration > 0 else 0
print(f"\n📊 Results:")
print(f" Speedup: {speedup:.1f}x faster")
print(f" For 2,619 memories: {(sequential_duration / len(test_memories) * 2619):.0f}ms → {(parallel_duration / len(test_memories) * 2619):.0f}ms")
return BenchmarkResult(
operation="parallel_processing",
duration_ms=parallel_duration,
memories_processed=len(test_memories),
memories_per_second=len(test_memories) / (parallel_duration / 1000) if parallel_duration > 0 else 0,
optimization_used="asyncio.gather() + Semaphore(15)"
)
async def benchmark_batch_size():
"""Benchmark impact of larger batch sizes on API calls."""
print("\n" + "=" * 80)
print("BENCHMARK 3: Batch Size Optimization")
print("=" * 80)
total_memories = 2619 # Actual sync count from production
# Old batch size
old_batch_size = 100
old_api_calls = (total_memories + old_batch_size - 1) // old_batch_size # Ceiling division
old_overhead_ms = old_api_calls * 50 # Assume 50ms overhead per API call
# New batch size
new_batch_size = 500
new_api_calls = (total_memories + new_batch_size - 1) // new_batch_size
new_overhead_ms = new_api_calls * 50
print(f"Total memories to sync: {total_memories}")
print()
print(f"Old method (batch_size=100):")
print(f" API calls needed: {old_api_calls}")
print(f" Network overhead: ~{old_overhead_ms}ms ({old_api_calls} × 50ms)")
print(f"\nNew method (batch_size=500):")
print(f" API calls needed: {new_api_calls}")
print(f" Network overhead: ~{new_overhead_ms}ms ({new_api_calls} × 50ms)")
reduction = old_api_calls - new_api_calls
time_saved = old_overhead_ms - new_overhead_ms
print(f"\n📊 Results:")
print(f" API calls reduced: {reduction} fewer calls ({reduction / old_api_calls * 100:.1f}% reduction)")
print(f" Time saved: ~{time_saved}ms on network overhead alone")
return BenchmarkResult(
operation="batch_size_optimization",
duration_ms=new_overhead_ms,
memories_processed=total_memories,
memories_per_second=total_memories / (new_overhead_ms / 1000) if new_overhead_ms > 0 else 0,
optimization_used="batch_size=500 (5x larger)"
)
async def main():
"""Run all benchmarks."""
print("=" * 80)
print("HYBRID STORAGE SYNC PERFORMANCE BENCHMARK (v8.27.0)")
print("=" * 80)
print()
print("Testing optimizations:")
print(" 1. Bulk existence checking (get_all_content_hashes)")
print(" 2. Parallel processing with asyncio.gather")
print(" 3. Larger batch sizes (100 → 500)")
print()
results = []
try:
# Run benchmarks
result1 = await benchmark_bulk_existence_check()
if result1:
results.append(result1)
result2 = await benchmark_parallel_processing()
if result2:
results.append(result2)
result3 = await benchmark_batch_size()
if result3:
results.append(result3)
# Summary
print("\n" + "=" * 80)
print("OVERALL PERFORMANCE SUMMARY")
print("=" * 80)
print("\nOptimization Impact:")
for result in results:
print(f" • {result.operation}: {result.optimization_used}")
print("\nEstimated Combined Speedup:")
print(" • Before: ~8 minutes for 2,619 memories (~5.5 mem/sec)")
print(" • After: ~1.5-3 minutes estimated (~15-30 mem/sec)")
print(" • Overall: 3-5x faster initial sync")
print("\nKey Improvements:")
print(" ✅ Eliminated 2,619 individual DB queries → single bulk load")
print(" ✅ Up to 15x parallelism for CPU/embedding generation")
print(" ✅ 5x fewer Cloudflare API calls (6 vs 27)")
print("\n" + "=" * 80)
print("✅ Benchmark completed successfully")
print("=" * 80)
return 0
except Exception as e:
print(f"\n❌ Benchmark failed: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))
```
--------------------------------------------------------------------------------
/archive/docs-removed-2025-08-23/macos-intel.md:
--------------------------------------------------------------------------------
```markdown
# macOS Intel Setup Guide
This guide addresses the specific challenges of running MCP Memory Service on Intel-based Mac systems, including both legacy (2013-2017) and modern (2018+) Intel Macs.
## Hardware Profiles
### Legacy Intel Macs (2013-2017)
**Target Hardware**: 2015 MacBook Pro, older Intel Macs without dedicated GPU
**Optimization**: Maximum compatibility, minimal resource usage
**Recommended Backend**: SQLite-vec with ONNX runtime
**Typical specs this applies to:**
- MacBook Pro (15-inch, Mid 2015)
- MacBook Pro (13-inch, Early 2015)
- MacBook Air (11-inch/13-inch, 2013-2017)
- iMac (21.5-inch/27-inch, 2013-2017) with integrated graphics
### Modern Intel Macs (2018+)
**Target Hardware**: 2018+ Intel Macs with better GPU support
**Optimization**: Balanced performance and compatibility
**Recommended Backend**: ChromaDB with CPU optimization
## Why Special Setup is Needed
Intel-based Mac systems require special consideration for several reasons:
1. **PyTorch Compatibility**: PyTorch has moved toward optimizing for Apple Silicon, with some compatibility challenges on Intel Macs
2. **NumPy Version Conflicts**: Newer NumPy 2.x can cause compatibility issues with other ML libraries
3. **Python Version Sensitivity**: Python 3.13+ has introduced breaking changes that affect ML libraries
4. **Memory Constraints**: Limited RAM on older systems requires careful resource management
5. **ChromaDB Installation Issues**: Complex dependencies often fail on older systems
## Installation
### Prerequisites
- Python 3.10 (recommended for best compatibility)
- Git to clone the repository
- Xcode Command Line Tools: `xcode-select --install`
### Automatic Installation (Recommended)
The installer automatically detects Intel Mac hardware:
```bash
git clone https://github.com/doobidoo/mcp-memory-service.git
cd mcp-memory-service
# For legacy hardware (2013-2017)
python install.py --legacy-hardware
# For modern Intel Macs (2018+)
python install.py --intel-mac
```
### Manual Installation
If you prefer manual control:
#### 1. Environment Setup
```bash
# Clone repository
git clone https://github.com/doobidoo/mcp-memory-service.git
cd mcp-memory-service
# Create Python 3.10 virtual environment
python3.10 -m venv venv_py310
source venv_py310/bin/activate
# Upgrade pip
pip install --upgrade pip
```
#### 2. Install Dependencies
For **Legacy Intel Macs (2013-2017)**:
```bash
# Install with SQLite-vec backend
pip install -e .
pip install sentence-transformers onnx onnxruntime
# Downgrade NumPy for compatibility
pip uninstall -y numpy
pip install numpy==1.25.2
# Configure for SQLite-vec
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
export MCP_MEMORY_USE_ONNX=true
```
For **Modern Intel Macs (2018+)**:
```bash
# Install with ChromaDB support
pip install -e .
pip install chromadb sentence-transformers
# Install CPU-optimized PyTorch
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
# Configure for ChromaDB
export MCP_MEMORY_STORAGE_BACKEND=chromadb
```
### Hardware Detection
The installer automatically detects legacy hardware by checking:
```python
# System detection criteria
is_legacy_mac = (
platform.system() == "Darwin" and # macOS
platform.machine() in ("x86_64", "x64") and # Intel processor
year_of_hardware < 2018 and # Pre-2018 models
not has_dedicated_gpu # No discrete GPU
)
```
## Configuration
### Environment Variables
#### For Legacy Intel Macs
```bash
# Core configuration
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
export MCP_MEMORY_USE_ONNX=true
export MCP_MEMORY_SQLITE_VEC_PATH="$HOME/.mcp_memory_sqlite"
# Performance optimization
export MCP_MEMORY_CPU_ONLY=true
export MCP_MEMORY_MAX_MEMORY_MB=2048
export MCP_MEMORY_SENTENCE_TRANSFORMER_MODEL="all-MiniLM-L6-v2"
# Compatibility settings
export PYTORCH_ENABLE_MPS_FALLBACK=1
export MCP_MEMORY_USE_ONNX_RUNTIME=true
```
#### For Modern Intel Macs
```bash
# Core configuration
export MCP_MEMORY_STORAGE_BACKEND=chromadb
export MCP_MEMORY_CHROMA_PATH="$HOME/.mcp_memory_chroma"
# Performance optimization
export MCP_MEMORY_CPU_OPTIMIZATION=true
export MCP_MEMORY_SENTENCE_TRANSFORMER_MODEL="all-MiniLM-L12-v2"
# Intel-specific settings
export MKL_NUM_THREADS=4
export OMP_NUM_THREADS=4
```
### Claude Desktop Configuration
#### Legacy Intel Mac Configuration
```json
{
"mcpServers": {
"memory": {
"command": "python",
"args": ["/path/to/mcp-memory-service/scripts/legacy_intel_mac/run_mcp_memory.sh"],
"env": {
"MCP_MEMORY_STORAGE_BACKEND": "sqlite_vec",
"MCP_MEMORY_USE_ONNX": "true",
"MCP_MEMORY_CPU_ONLY": "true"
}
}
}
}
```
#### Modern Intel Mac Configuration
```json
{
"mcpServers": {
"memory": {
"command": "python",
"args": ["/path/to/mcp-memory-service/src/mcp_memory_service/server.py"],
"env": {
"MCP_MEMORY_STORAGE_BACKEND": "chromadb",
"MCP_MEMORY_CPU_OPTIMIZATION": "true"
}
}
}
}
```
## Provided Scripts
The repository includes several Intel Mac-specific scripts:
### Legacy Intel Mac Scripts
- `scripts/legacy_intel_mac/run_mcp_memory.sh` - Standard startup script
- `scripts/legacy_intel_mac/run_mcp_memory_foreground.sh` - Foreground mode with debugging
- `scripts/legacy_intel_mac/start_memory_for_claude.sh` - Claude-optimized startup
### Usage Examples
```bash
# For foreground mode (shows all output, can be stopped with Ctrl+C)
./scripts/legacy_intel_mac/run_mcp_memory_foreground.sh
# For background mode (runs in background, logs to file)
./scripts/legacy_intel_mac/run_mcp_memory.sh
# For Claude Desktop integration
./scripts/legacy_intel_mac/start_memory_for_claude.sh
```
## Performance Optimization
### For Legacy Intel Macs
1. **Use SQLite-vec Backend**: Lighter weight than ChromaDB
2. **ONNX Runtime**: CPU-optimized inference
3. **Memory Management**: Limited model loading and caching
4. **Smaller Models**: Use compact sentence transformer models
```bash
# Optimization settings
export MCP_MEMORY_BATCH_SIZE=16
export MCP_MEMORY_CACHE_SIZE=100
export MCP_MEMORY_MODEL_CACHE_SIZE=1
```
### For Modern Intel Macs
1. **CPU Optimization**: Multi-threaded processing
2. **Intelligent Caching**: Larger cache sizes
3. **Better Models**: Higher quality embeddings
```bash
# Performance tuning
export MCP_MEMORY_BATCH_SIZE=32
export MCP_MEMORY_CACHE_SIZE=1000
export MCP_MEMORY_MODEL_CACHE_SIZE=3
```
## Troubleshooting
### Common Issues
#### 1. NumPy Compatibility Errors
**Symptom**:
```
AttributeError: module 'numpy' has no attribute 'float'
```
**Solution**:
```bash
pip uninstall -y numpy
pip install numpy==1.25.2
```
#### 2. PyTorch Installation Issues
**Symptom**: PyTorch fails to install or import
**Solution**:
```bash
# For legacy Macs - use CPU-only PyTorch
pip uninstall torch torchvision torchaudio
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
# Set fallback environment variable
export PYTORCH_ENABLE_MPS_FALLBACK=1
```
#### 3. ChromaDB Installation Failures
**Symptom**: ChromaDB dependency issues on legacy hardware
**Solution**: Switch to SQLite-vec backend:
```bash
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
python install.py --storage-backend sqlite_vec
```
#### 4. Memory Issues
**Symptom**: Out of memory errors during embedding generation
**Solution**: Reduce batch size and enable memory optimization:
```bash
export MCP_MEMORY_BATCH_SIZE=8
export MCP_MEMORY_MAX_MEMORY_MB=1024
export MCP_MEMORY_LOW_MEMORY_MODE=true
```
### Diagnostic Commands
#### System Information
```bash
# Check macOS version
sw_vers
# Check available memory
system_profiler SPMemoryDataType | grep Size
# Check CPU information
sysctl -n machdep.cpu.brand_string
# Check Python version and location
python --version
which python
```
#### Environment Verification
```bash
# Check virtual environment
echo $VIRTUAL_ENV
# Verify key packages
python -c "import torch; print(f'PyTorch: {torch.__version__}')"
python -c "import sentence_transformers; print('SentenceTransformers: OK')"
python -c "import sqlite3; print('SQLite3: OK')"
# Test ONNX runtime (for legacy Macs)
python -c "import onnxruntime; print(f'ONNX Runtime: {onnxruntime.__version__}')"
```
#### Server Testing
```bash
# Test server startup
python scripts/verify_environment.py
# Test memory operations
python -c "
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecStorage
storage = SqliteVecStorage()
print('Storage backend: OK')
"
# Test embedding generation
python -c "
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
embedding = model.encode(['test'])
print(f'Embedding generated: {len(embedding[0])} dimensions')
"
```
## Homebrew Integration
For Intel Macs with Homebrew-installed PyTorch, see the dedicated [Homebrew Integration Guide](../integration/homebrew.md).
## Performance Benchmarks
### Typical Performance (Legacy Intel Mac)
- **Memory Storage**: ~100ms per memory
- **Search Operations**: ~200ms for 100 memories
- **Embedding Generation**: ~500ms for short text
- **Memory Usage**: ~200MB baseline
### Typical Performance (Modern Intel Mac)
- **Memory Storage**: ~50ms per memory
- **Search Operations**: ~100ms for 1000 memories
- **Embedding Generation**: ~200ms for short text
- **Memory Usage**: ~400MB baseline
## Related Documentation
- [Installation Guide](../installation/master-guide.md) - General installation instructions
- [Homebrew Integration](../integration/homebrew.md) - Homebrew PyTorch setup
- [Troubleshooting](../troubleshooting/general.md) - macOS-specific troubleshooting
- [Performance Tuning](../implementation/performance.md) - Performance optimization guide
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/web/api/sync.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Sync management endpoints for hybrid backend.
Provides status monitoring and manual sync triggering for hybrid storage mode.
"""
from typing import Dict, Any, TYPE_CHECKING
from datetime import datetime, timezone
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel
from ...storage.base import MemoryStorage
from ..dependencies import get_storage
from ...config import OAUTH_ENABLED
# OAuth authentication imports (conditional)
if OAUTH_ENABLED or TYPE_CHECKING:
from ..oauth.middleware import require_read_access, require_write_access, AuthenticationResult
else:
# Provide type stubs when OAuth is disabled
AuthenticationResult = None
require_read_access = None
require_write_access = None
router = APIRouter()
class SyncStatusResponse(BaseModel):
"""Sync status response model."""
is_hybrid: bool
is_running: bool
is_paused: bool
last_sync_time: float
operations_pending: int
operations_processed: int
operations_failed: int
sync_interval_seconds: int
time_since_last_sync_seconds: float
next_sync_eta_seconds: float
status: str # 'synced', 'syncing', 'pending', 'error'
class SyncForceResponse(BaseModel):
"""Force sync response model."""
success: bool
message: str
operations_synced: int
memories_pulled: int
time_taken_seconds: float
timestamp: str
@router.get("/sync/status", response_model=SyncStatusResponse)
async def get_sync_status(
storage: MemoryStorage = Depends(get_storage),
user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
"""
Get current sync status for hybrid backend.
Returns sync state, pending operations, last sync time, and health metrics.
Only available when using hybrid storage backend.
"""
# Check if storage supports sync (hybrid mode only)
if not hasattr(storage, 'get_sync_status'):
return SyncStatusResponse(
is_hybrid=False,
is_running=False,
is_paused=False,
last_sync_time=0,
operations_pending=0,
operations_processed=0,
operations_failed=0,
sync_interval_seconds=0,
time_since_last_sync_seconds=0,
next_sync_eta_seconds=0,
status='not_hybrid'
)
try:
# Get sync status from hybrid backend
sync_status = await storage.get_sync_status()
# Calculate time since last sync
import time
current_time = time.time()
last_sync = sync_status.get('last_sync_time', 0)
time_since_sync = current_time - last_sync if last_sync > 0 else 0
# Calculate ETA for next sync
sync_interval = sync_status.get('sync_interval', 300)
next_sync_eta = max(0, sync_interval - time_since_sync)
# Determine status
is_running = sync_status.get('is_running', False)
pending_ops = sync_status.get('pending_operations', 0)
actively_syncing = sync_status.get('actively_syncing', False) # True only during active sync
if actively_syncing:
status = 'syncing'
elif pending_ops > 0:
status = 'pending'
elif sync_status.get('operations_failed', 0) > 0:
status = 'error'
else:
status = 'synced'
return SyncStatusResponse(
is_hybrid=True,
is_running=is_running,
is_paused=sync_status.get('is_paused', not is_running),
last_sync_time=last_sync,
operations_pending=pending_ops,
operations_processed=sync_status.get('operations_processed', 0),
operations_failed=sync_status.get('operations_failed', 0),
sync_interval_seconds=sync_interval,
time_since_last_sync_seconds=time_since_sync,
next_sync_eta_seconds=next_sync_eta,
status=status
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to get sync status: {str(e)}")
@router.post("/sync/force", response_model=SyncForceResponse)
async def force_sync(
storage: MemoryStorage = Depends(get_storage),
user: AuthenticationResult = Depends(require_write_access) if OAUTH_ENABLED else None
):
"""
Manually trigger immediate bi-directional sync with Cloudflare.
Performs BOTH directions:
1. PULL: Download new memories FROM Cloudflare TO local SQLite
2. PUSH: Upload pending operations FROM local TO Cloudflare
This ensures complete synchronization between both backends.
Only available when using hybrid storage backend.
"""
# Check if storage supports force sync (hybrid mode only)
if not hasattr(storage, 'force_sync'):
raise HTTPException(
status_code=404,
detail="Manual sync only available in hybrid mode"
)
try:
import time
start_time = time.time()
# Step 1: Pull FROM Cloudflare TO local (if method exists)
memories_pulled = 0
pull_message = ""
pull_result = None
if hasattr(storage, 'force_pull_sync'):
pull_result = await storage.force_pull_sync()
memories_pulled = pull_result.get('memories_pulled', 0)
pull_message = pull_result.get('message', '')
# Step 2: Push FROM local TO Cloudflare (existing behavior)
push_result = await storage.force_sync()
operations_synced = push_result.get('operations_synced', 0)
push_message = push_result.get('message', 'Sync completed')
# Check success flags from both operations
pull_success = pull_result.get('success', True) if pull_result else True
push_success = push_result.get('success', False)
overall_success = pull_success and push_success
time_taken = time.time() - start_time
# Combine messages
if memories_pulled > 0 and operations_synced > 0:
combined_message = f"Pulled {memories_pulled} from Cloudflare, pushed {operations_synced} to Cloudflare"
elif memories_pulled > 0:
combined_message = f"Pulled {memories_pulled} from Cloudflare"
elif operations_synced > 0:
combined_message = f"Pushed {operations_synced} to Cloudflare"
else:
combined_message = "No changes to sync (already synchronized)"
return SyncForceResponse(
success=overall_success,
message=combined_message,
operations_synced=operations_synced,
memories_pulled=memories_pulled,
time_taken_seconds=round(time_taken, 3),
timestamp=datetime.now(timezone.utc).isoformat()
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to force sync: {str(e)}"
)
class SyncPauseResponse(BaseModel):
"""Pause/resume sync response model."""
success: bool
message: str
is_paused: bool
timestamp: str
@router.post("/sync/pause", response_model=SyncPauseResponse)
async def pause_sync(
storage: MemoryStorage = Depends(get_storage),
user: AuthenticationResult = Depends(require_write_access) if OAUTH_ENABLED else None
):
"""
Pause background sync operations.
Pauses the background sync service to allow safe database operations.
Sync will resume when resume_sync is called.
Only available when using hybrid storage backend.
"""
# Check if storage supports pause/resume (hybrid mode only)
if not hasattr(storage, 'pause_sync'):
raise HTTPException(
status_code=404,
detail="Pause sync only available in hybrid mode"
)
try:
result = await storage.pause_sync()
return SyncPauseResponse(
success=result.get('success', True),
message=result.get('message', 'Sync paused'),
is_paused=True,
timestamp=datetime.now(timezone.utc).isoformat()
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to pause sync: {str(e)}"
)
@router.post("/sync/resume", response_model=SyncPauseResponse)
async def resume_sync(
storage: MemoryStorage = Depends(get_storage),
user: AuthenticationResult = Depends(require_write_access) if OAUTH_ENABLED else None
):
"""
Resume background sync operations.
Resumes the background sync service after it was paused.
Only available when using hybrid storage backend.
"""
# Check if storage supports pause/resume (hybrid mode only)
if not hasattr(storage, 'resume_sync'):
raise HTTPException(
status_code=404,
detail="Resume sync only available in hybrid mode"
)
try:
result = await storage.resume_sync()
return SyncPauseResponse(
success=result.get('success', True),
message=result.get('message', 'Sync resumed'),
is_paused=False,
timestamp=datetime.now(timezone.utc).isoformat()
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Failed to resume sync: {str(e)}"
)
```
--------------------------------------------------------------------------------
/tests/integration/test_bridge_integration.js:
--------------------------------------------------------------------------------
```javascript
/**
* Integration Tests for HTTP-MCP Bridge
*
* These tests verify the bridge works correctly with a real server
* or a mock server that accurately simulates real behavior.
*/
const assert = require('assert');
const http = require('http');
const https = require('https');
const path = require('path');
const HTTPMCPBridge = require(path.join(__dirname, '../../examples/http-mcp-bridge.js'));
const { mockResponses, createMockResponse } = require(path.join(__dirname, '../bridge/mock_responses.js'));
describe('Bridge-Server Integration', () => {
let bridge;
let testServer;
let serverPort;
before(async () => {
// Create a test server that mimics real API behavior
await startTestServer();
});
after(async () => {
if (testServer) {
await new Promise(resolve => testServer.close(resolve));
}
});
beforeEach(() => {
bridge = new HTTPMCPBridge();
bridge.endpoint = `http://localhost:${serverPort}/api`;
bridge.apiKey = 'test-api-key';
});
async function startTestServer() {
return new Promise((resolve) => {
testServer = http.createServer((req, res) => {
let body = '';
req.on('data', chunk => {
body += chunk.toString();
});
req.on('end', () => {
handleRequest(req, res, body);
});
});
testServer.listen(0, 'localhost', () => {
serverPort = testServer.address().port;
console.log(`Test server started on port ${serverPort}`);
resolve();
});
});
}
function handleRequest(req, res, body) {
const url = req.url;
const method = req.method;
// Verify API key
if (req.headers.authorization !== 'Bearer test-api-key') {
res.writeHead(401, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ detail: 'Unauthorized' }));
return;
}
// Route requests
if (url === '/api/health' && method === 'GET') {
const response = mockResponses.health.healthy;
res.writeHead(response.status, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(response.body));
} else if (url === '/api/memories' && method === 'POST') {
try {
const data = JSON.parse(body);
// Simulate duplicate detection
if (data.content === 'duplicate-content') {
const response = mockResponses.memories.duplicate;
res.writeHead(response.status, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(response.body));
} else {
const response = mockResponses.memories.createSuccess;
res.writeHead(response.status, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(response.body));
}
} catch (e) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ detail: 'Invalid JSON' }));
}
} else if (url.startsWith('/api/search') && method === 'GET') {
const response = mockResponses.search.withResults;
res.writeHead(response.status, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(response.body));
} else if (url === '/health' && method === 'GET') {
// This is the WRONG endpoint - should return 404
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ detail: 'Not Found' }));
} else {
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ detail: 'Not Found' }));
}
}
describe('Critical Bug Scenarios', () => {
it('should use /api/health not /health for health checks', async () => {
const result = await bridge.checkHealth();
assert.strictEqual(result.status, 'healthy');
assert.strictEqual(result.backend, 'sqlite_vec');
});
it('should handle HTTP 200 with success field for memory storage', async () => {
const result = await bridge.storeMemory({
content: 'Test memory content',
metadata: { tags: ['test'] }
});
assert.strictEqual(result.success, true);
assert.strictEqual(result.message, 'Memory stored successfully');
});
it('should handle duplicate detection with HTTP 200 and success=false', async () => {
const result = await bridge.storeMemory({
content: 'duplicate-content',
metadata: { tags: ['test'] }
});
assert.strictEqual(result.success, false);
assert.strictEqual(result.message, 'Duplicate content detected');
});
it('should construct URLs correctly with /api base path', async () => {
// This would have failed with the old URL construction bug
const result = await bridge.retrieveMemory({
query: 'test',
n_results: 5
});
assert(Array.isArray(result.memories));
assert(result.memories.length > 0);
});
});
describe('End-to-End MCP Protocol Flow', () => {
it('should handle complete MCP session', async () => {
// 1. Initialize
let response = await bridge.processRequest({
method: 'initialize',
params: {},
id: 1
});
assert.strictEqual(response.result.protocolVersion, '2024-11-05');
// 2. Get tools list
response = await bridge.processRequest({
method: 'tools/list',
params: {},
id: 2
});
assert(response.result.tools.length > 0);
// 3. Store a memory
response = await bridge.processRequest({
method: 'tools/call',
params: {
name: 'store_memory',
arguments: {
content: 'Integration test memory',
metadata: { tags: ['test', 'integration'] }
}
},
id: 3
});
const result = JSON.parse(response.result.content[0].text);
assert.strictEqual(result.success, true);
// 4. Check health
response = await bridge.processRequest({
method: 'tools/call',
params: {
name: 'check_database_health',
arguments: {}
},
id: 4
});
const health = JSON.parse(response.result.content[0].text);
assert.strictEqual(health.status, 'healthy');
});
});
describe('Error Recovery', () => {
it('should handle server unavailability gracefully', async () => {
// Point to non-existent server (using port 9999 instead of 99999 which is invalid)
bridge.endpoint = 'http://localhost:9999/api';
const result = await bridge.checkHealth();
assert.strictEqual(result.status, 'error');
// The error message should indicate connection failure or invalid URL
assert(result.error && (
result.error.includes('ECONNREFUSED') ||
result.error.includes('EADDRNOTAVAIL') ||
result.error.includes('connect') ||
result.error.includes('ENOTFOUND') ||
result.error.includes('Invalid URL') || // This can happen with invalid ports
result.error.includes('ETIMEDOUT')
), `Expected connection error but got: ${result.error}`);
});
it('should handle malformed responses', async () => {
// Create a server that returns invalid JSON
const badServer = http.createServer((req, res) => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end('This is not JSON');
});
await new Promise(resolve => {
badServer.listen(0, 'localhost', resolve);
});
const badPort = badServer.address().port;
bridge.endpoint = `http://localhost:${badPort}/api`;
const result = await bridge.checkHealth();
assert.strictEqual(result.status, 'error');
await new Promise(resolve => badServer.close(resolve));
});
});
describe('Authentication', () => {
it('should include API key in requests', async () => {
bridge.apiKey = 'test-api-key';
const result = await bridge.checkHealth();
assert.strictEqual(result.status, 'healthy');
});
it('should handle authentication failures', async () => {
bridge.apiKey = 'wrong-api-key';
const result = await bridge.checkHealth();
assert.strictEqual(result.status, 'unhealthy');
});
});
});
// Run tests if this file is executed directly
if (require.main === module) {
// Simple test runner for development
const Mocha = require('mocha');
const mocha = new Mocha();
mocha.addFile(__filename);
mocha.run(failures => {
process.exit(failures ? 1 : 0);
});
}
```