This is page 12 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── amp-bridge.md
│ │ ├── amp-pr-automator.md
│ │ ├── code-quality-guard.md
│ │ ├── gemini-pr-automator.md
│ │ └── github-release-manager.md
│ ├── settings.local.json.backup
│ └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── performance_issue.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── bridge-tests.yml
│ ├── CACHE_FIX.md
│ ├── claude-code-review.yml
│ ├── claude.yml
│ ├── cleanup-images.yml.disabled
│ ├── dev-setup-validation.yml
│ ├── docker-publish.yml
│ ├── LATEST_FIXES.md
│ ├── main-optimized.yml.disabled
│ ├── main.yml
│ ├── publish-and-test.yml
│ ├── README_OPTIMIZATION.md
│ ├── release-tag.yml.disabled
│ ├── release.yml
│ ├── roadmap-review-reminder.yml
│ ├── SECRET_CONDITIONAL_FIX.md
│ └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│ ├── .gitignore
│ └── reports
│ └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│ ├── deployment
│ │ ├── deploy_fastmcp_fixed.sh
│ │ ├── deploy_http_with_mcp.sh
│ │ └── deploy_mcp_v4.sh
│ ├── deployment-configs
│ │ ├── empty_config.yml
│ │ └── smithery.yaml
│ ├── development
│ │ └── test_fastmcp.py
│ ├── docs-removed-2025-08-23
│ │ ├── authentication.md
│ │ ├── claude_integration.md
│ │ ├── claude-code-compatibility.md
│ │ ├── claude-code-integration.md
│ │ ├── claude-code-quickstart.md
│ │ ├── claude-desktop-setup.md
│ │ ├── complete-setup-guide.md
│ │ ├── database-synchronization.md
│ │ ├── development
│ │ │ ├── autonomous-memory-consolidation.md
│ │ │ ├── CLEANUP_PLAN.md
│ │ │ ├── CLEANUP_README.md
│ │ │ ├── CLEANUP_SUMMARY.md
│ │ │ ├── dream-inspired-memory-consolidation.md
│ │ │ ├── hybrid-slm-memory-consolidation.md
│ │ │ ├── mcp-milestone.md
│ │ │ ├── multi-client-architecture.md
│ │ │ ├── test-results.md
│ │ │ └── TIMESTAMP_FIX_SUMMARY.md
│ │ ├── distributed-sync.md
│ │ ├── invocation_guide.md
│ │ ├── macos-intel.md
│ │ ├── master-guide.md
│ │ ├── mcp-client-configuration.md
│ │ ├── multi-client-server.md
│ │ ├── service-installation.md
│ │ ├── sessions
│ │ │ └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│ │ ├── UBUNTU_SETUP.md
│ │ ├── ubuntu.md
│ │ ├── windows-setup.md
│ │ └── windows.md
│ ├── docs-root-cleanup-2025-08-23
│ │ ├── AWESOME_LIST_SUBMISSION.md
│ │ ├── CLOUDFLARE_IMPLEMENTATION.md
│ │ ├── DOCUMENTATION_ANALYSIS.md
│ │ ├── DOCUMENTATION_CLEANUP_PLAN.md
│ │ ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│ │ ├── LITESTREAM_SETUP_GUIDE.md
│ │ ├── lm_studio_system_prompt.md
│ │ ├── PYTORCH_DOWNLOAD_FIX.md
│ │ └── README-ORIGINAL-BACKUP.md
│ ├── investigations
│ │ └── MACOS_HOOKS_INVESTIGATION.md
│ ├── litestream-configs-v6.3.0
│ │ ├── install_service.sh
│ │ ├── litestream_master_config_fixed.yml
│ │ ├── litestream_master_config.yml
│ │ ├── litestream_replica_config_fixed.yml
│ │ ├── litestream_replica_config.yml
│ │ ├── litestream_replica_simple.yml
│ │ ├── litestream-http.service
│ │ ├── litestream.service
│ │ └── requirements-cloudflare.txt
│ ├── release-notes
│ │ └── release-notes-v7.1.4.md
│ └── setup-development
│ ├── README.md
│ ├── setup_consolidation_mdns.sh
│ ├── STARTUP_SETUP_GUIDE.md
│ └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│ ├── memory-context.md
│ ├── memory-health.md
│ ├── memory-ingest-dir.md
│ ├── memory-ingest.md
│ ├── memory-recall.md
│ ├── memory-search.md
│ ├── memory-store.md
│ ├── README.md
│ └── session-start.md
├── claude-hooks
│ ├── config.json
│ ├── config.template.json
│ ├── CONFIGURATION.md
│ ├── core
│ │ ├── memory-retrieval.js
│ │ ├── mid-conversation.js
│ │ ├── session-end.js
│ │ ├── session-start.js
│ │ └── topic-change.js
│ ├── debug-pattern-test.js
│ ├── install_claude_hooks_windows.ps1
│ ├── install_hooks.py
│ ├── memory-mode-controller.js
│ ├── MIGRATION.md
│ ├── README-NATURAL-TRIGGERS.md
│ ├── README-phase2.md
│ ├── README.md
│ ├── simple-test.js
│ ├── statusline.sh
│ ├── test-adaptive-weights.js
│ ├── test-dual-protocol-hook.js
│ ├── test-mcp-hook.js
│ ├── test-natural-triggers.js
│ ├── test-recency-scoring.js
│ ├── tests
│ │ ├── integration-test.js
│ │ ├── phase2-integration-test.js
│ │ ├── test-code-execution.js
│ │ ├── test-cross-session.json
│ │ ├── test-session-tracking.json
│ │ └── test-threading.json
│ ├── utilities
│ │ ├── adaptive-pattern-detector.js
│ │ ├── context-formatter.js
│ │ ├── context-shift-detector.js
│ │ ├── conversation-analyzer.js
│ │ ├── dynamic-context-updater.js
│ │ ├── git-analyzer.js
│ │ ├── mcp-client.js
│ │ ├── memory-client.js
│ │ ├── memory-scorer.js
│ │ ├── performance-manager.js
│ │ ├── project-detector.js
│ │ ├── session-tracker.js
│ │ ├── tiered-conversation-monitor.js
│ │ └── version-checker.js
│ └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│ ├── amp-cli-bridge.md
│ ├── api
│ │ ├── code-execution-interface.md
│ │ ├── memory-metadata-api.md
│ │ ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_REPORT.md
│ │ └── tag-standardization.md
│ ├── architecture
│ │ ├── search-enhancement-spec.md
│ │ └── search-examples.md
│ ├── architecture.md
│ ├── archive
│ │ └── obsolete-workflows
│ │ ├── load_memory_context.md
│ │ └── README.md
│ ├── assets
│ │ └── images
│ │ ├── dashboard-v3.3.0-preview.png
│ │ ├── memory-awareness-hooks-example.png
│ │ ├── project-infographic.svg
│ │ └── README.md
│ ├── CLAUDE_CODE_QUICK_REFERENCE.md
│ ├── cloudflare-setup.md
│ ├── deployment
│ │ ├── docker.md
│ │ ├── dual-service.md
│ │ ├── production-guide.md
│ │ └── systemd-service.md
│ ├── development
│ │ ├── ai-agent-instructions.md
│ │ ├── code-quality
│ │ │ ├── phase-2a-completion.md
│ │ │ ├── phase-2a-handle-get-prompt.md
│ │ │ ├── phase-2a-index.md
│ │ │ ├── phase-2a-install-package.md
│ │ │ └── phase-2b-session-summary.md
│ │ ├── code-quality-workflow.md
│ │ ├── dashboard-workflow.md
│ │ ├── issue-management.md
│ │ ├── pr-review-guide.md
│ │ ├── refactoring-notes.md
│ │ ├── release-checklist.md
│ │ └── todo-tracker.md
│ ├── docker-optimized-build.md
│ ├── document-ingestion.md
│ ├── DOCUMENTATION_AUDIT.md
│ ├── enhancement-roadmap-issue-14.md
│ ├── examples
│ │ ├── analysis-scripts.js
│ │ ├── maintenance-session-example.md
│ │ ├── memory-distribution-chart.jsx
│ │ └── tag-schema.json
│ ├── first-time-setup.md
│ ├── glama-deployment.md
│ ├── guides
│ │ ├── advanced-command-examples.md
│ │ ├── chromadb-migration.md
│ │ ├── commands-vs-mcp-server.md
│ │ ├── mcp-enhancements.md
│ │ ├── mdns-service-discovery.md
│ │ ├── memory-consolidation-guide.md
│ │ ├── migration.md
│ │ ├── scripts.md
│ │ └── STORAGE_BACKENDS.md
│ ├── HOOK_IMPROVEMENTS.md
│ ├── hooks
│ │ └── phase2-code-execution-migration.md
│ ├── http-server-management.md
│ ├── ide-compatability.md
│ ├── IMAGE_RETENTION_POLICY.md
│ ├── images
│ │ └── dashboard-placeholder.md
│ ├── implementation
│ │ ├── health_checks.md
│ │ └── performance.md
│ ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│ ├── integration
│ │ ├── homebrew.md
│ │ └── multi-client.md
│ ├── integrations
│ │ ├── gemini.md
│ │ ├── groq-bridge.md
│ │ ├── groq-integration-summary.md
│ │ └── groq-model-comparison.md
│ ├── integrations.md
│ ├── legacy
│ │ └── dual-protocol-hooks.md
│ ├── LM_STUDIO_COMPATIBILITY.md
│ ├── maintenance
│ │ └── memory-maintenance.md
│ ├── mastery
│ │ ├── api-reference.md
│ │ ├── architecture-overview.md
│ │ ├── configuration-guide.md
│ │ ├── local-setup-and-run.md
│ │ ├── testing-guide.md
│ │ └── troubleshooting.md
│ ├── migration
│ │ └── code-execution-api-quick-start.md
│ ├── natural-memory-triggers
│ │ ├── cli-reference.md
│ │ ├── installation-guide.md
│ │ └── performance-optimization.md
│ ├── oauth-setup.md
│ ├── pr-graphql-integration.md
│ ├── quick-setup-cloudflare-dual-environment.md
│ ├── README.md
│ ├── remote-configuration-wiki-section.md
│ ├── research
│ │ ├── code-execution-interface-implementation.md
│ │ └── code-execution-interface-summary.md
│ ├── ROADMAP.md
│ ├── sqlite-vec-backend.md
│ ├── statistics
│ │ ├── charts
│ │ │ ├── activity_patterns.png
│ │ │ ├── contributors.png
│ │ │ ├── growth_trajectory.png
│ │ │ ├── monthly_activity.png
│ │ │ └── october_sprint.png
│ │ ├── data
│ │ │ ├── activity_by_day.csv
│ │ │ ├── activity_by_hour.csv
│ │ │ ├── contributors.csv
│ │ │ └── monthly_activity.csv
│ │ ├── generate_charts.py
│ │ └── REPOSITORY_STATISTICS.md
│ ├── technical
│ │ ├── development.md
│ │ ├── memory-migration.md
│ │ ├── migration-log.md
│ │ ├── sqlite-vec-embedding-fixes.md
│ │ └── tag-storage.md
│ ├── testing
│ │ └── regression-tests.md
│ ├── testing-cloudflare-backend.md
│ ├── troubleshooting
│ │ ├── cloudflare-api-token-setup.md
│ │ ├── cloudflare-authentication.md
│ │ ├── general.md
│ │ ├── hooks-quick-reference.md
│ │ ├── pr162-schema-caching-issue.md
│ │ ├── session-end-hooks.md
│ │ └── sync-issues.md
│ └── tutorials
│ ├── advanced-techniques.md
│ ├── data-analysis.md
│ └── demo-session-walkthrough.md
├── examples
│ ├── claude_desktop_config_template.json
│ ├── claude_desktop_config_windows.json
│ ├── claude-desktop-http-config.json
│ ├── config
│ │ └── claude_desktop_config.json
│ ├── http-mcp-bridge.js
│ ├── memory_export_template.json
│ ├── README.md
│ ├── setup
│ │ └── setup_multi_client_complete.py
│ └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│ ├── .claude
│ │ └── settings.local.json
│ ├── archive
│ │ └── check_missing_timestamps.py
│ ├── backup
│ │ ├── backup_memories.py
│ │ ├── backup_sqlite_vec.sh
│ │ ├── export_distributable_memories.sh
│ │ └── restore_memories.py
│ ├── benchmarks
│ │ ├── benchmark_code_execution_api.py
│ │ ├── benchmark_hybrid_sync.py
│ │ └── benchmark_server_caching.py
│ ├── database
│ │ ├── analyze_sqlite_vec_db.py
│ │ ├── check_sqlite_vec_status.py
│ │ ├── db_health_check.py
│ │ └── simple_timestamp_check.py
│ ├── development
│ │ ├── debug_server_initialization.py
│ │ ├── find_orphaned_files.py
│ │ ├── fix_mdns.sh
│ │ ├── fix_sitecustomize.py
│ │ ├── remote_ingest.sh
│ │ ├── setup-git-merge-drivers.sh
│ │ ├── uv-lock-merge.sh
│ │ └── verify_hybrid_sync.py
│ ├── hooks
│ │ └── pre-commit
│ ├── installation
│ │ ├── install_linux_service.py
│ │ ├── install_macos_service.py
│ │ ├── install_uv.py
│ │ ├── install_windows_service.py
│ │ ├── install.py
│ │ ├── setup_backup_cron.sh
│ │ ├── setup_claude_mcp.sh
│ │ └── setup_cloudflare_resources.py
│ ├── linux
│ │ ├── service_status.sh
│ │ ├── start_service.sh
│ │ ├── stop_service.sh
│ │ ├── uninstall_service.sh
│ │ └── view_logs.sh
│ ├── maintenance
│ │ ├── assign_memory_types.py
│ │ ├── check_memory_types.py
│ │ ├── cleanup_corrupted_encoding.py
│ │ ├── cleanup_memories.py
│ │ ├── cleanup_organize.py
│ │ ├── consolidate_memory_types.py
│ │ ├── consolidation_mappings.json
│ │ ├── delete_orphaned_vectors_fixed.py
│ │ ├── fast_cleanup_duplicates_with_tracking.sh
│ │ ├── find_all_duplicates.py
│ │ ├── find_cloudflare_duplicates.py
│ │ ├── find_duplicates.py
│ │ ├── memory-types.md
│ │ ├── README.md
│ │ ├── recover_timestamps_from_cloudflare.py
│ │ ├── regenerate_embeddings.py
│ │ ├── repair_malformed_tags.py
│ │ ├── repair_memories.py
│ │ ├── repair_sqlite_vec_embeddings.py
│ │ ├── repair_zero_embeddings.py
│ │ ├── restore_from_json_export.py
│ │ └── scan_todos.sh
│ ├── migration
│ │ ├── cleanup_mcp_timestamps.py
│ │ ├── legacy
│ │ │ └── migrate_chroma_to_sqlite.py
│ │ ├── mcp-migration.py
│ │ ├── migrate_sqlite_vec_embeddings.py
│ │ ├── migrate_storage.py
│ │ ├── migrate_tags.py
│ │ ├── migrate_timestamps.py
│ │ ├── migrate_to_cloudflare.py
│ │ ├── migrate_to_sqlite_vec.py
│ │ ├── migrate_v5_enhanced.py
│ │ ├── TIMESTAMP_CLEANUP_README.md
│ │ └── verify_mcp_timestamps.py
│ ├── pr
│ │ ├── amp_collect_results.sh
│ │ ├── amp_detect_breaking_changes.sh
│ │ ├── amp_generate_tests.sh
│ │ ├── amp_pr_review.sh
│ │ ├── amp_quality_gate.sh
│ │ ├── amp_suggest_fixes.sh
│ │ ├── auto_review.sh
│ │ ├── detect_breaking_changes.sh
│ │ ├── generate_tests.sh
│ │ ├── lib
│ │ │ └── graphql_helpers.sh
│ │ ├── quality_gate.sh
│ │ ├── resolve_threads.sh
│ │ ├── run_pyscn_analysis.sh
│ │ ├── run_quality_checks.sh
│ │ ├── thread_status.sh
│ │ └── watch_reviews.sh
│ ├── quality
│ │ ├── fix_dead_code_install.sh
│ │ ├── phase1_dead_code_analysis.md
│ │ ├── phase2_complexity_analysis.md
│ │ ├── README_PHASE1.md
│ │ ├── README_PHASE2.md
│ │ ├── track_pyscn_metrics.sh
│ │ └── weekly_quality_review.sh
│ ├── README.md
│ ├── run
│ │ ├── run_mcp_memory.sh
│ │ ├── run-with-uv.sh
│ │ └── start_sqlite_vec.sh
│ ├── run_memory_server.py
│ ├── server
│ │ ├── check_http_server.py
│ │ ├── check_server_health.py
│ │ ├── memory_offline.py
│ │ ├── preload_models.py
│ │ ├── run_http_server.py
│ │ ├── run_memory_server.py
│ │ ├── start_http_server.bat
│ │ └── start_http_server.sh
│ ├── service
│ │ ├── deploy_dual_services.sh
│ │ ├── install_http_service.sh
│ │ ├── mcp-memory-http.service
│ │ ├── mcp-memory.service
│ │ ├── memory_service_manager.sh
│ │ ├── service_control.sh
│ │ ├── service_utils.py
│ │ └── update_service.sh
│ ├── sync
│ │ ├── check_drift.py
│ │ ├── claude_sync_commands.py
│ │ ├── export_memories.py
│ │ ├── import_memories.py
│ │ ├── litestream
│ │ │ ├── apply_local_changes.sh
│ │ │ ├── enhanced_memory_store.sh
│ │ │ ├── init_staging_db.sh
│ │ │ ├── io.litestream.replication.plist
│ │ │ ├── manual_sync.sh
│ │ │ ├── memory_sync.sh
│ │ │ ├── pull_remote_changes.sh
│ │ │ ├── push_to_remote.sh
│ │ │ ├── README.md
│ │ │ ├── resolve_conflicts.sh
│ │ │ ├── setup_local_litestream.sh
│ │ │ ├── setup_remote_litestream.sh
│ │ │ ├── staging_db_init.sql
│ │ │ ├── stash_local_changes.sh
│ │ │ ├── sync_from_remote_noconfig.sh
│ │ │ └── sync_from_remote.sh
│ │ ├── README.md
│ │ ├── safe_cloudflare_update.sh
│ │ ├── sync_memory_backends.py
│ │ └── sync_now.py
│ ├── testing
│ │ ├── run_complete_test.py
│ │ ├── run_memory_test.sh
│ │ ├── simple_test.py
│ │ ├── test_cleanup_logic.py
│ │ ├── test_cloudflare_backend.py
│ │ ├── test_docker_functionality.py
│ │ ├── test_installation.py
│ │ ├── test_mdns.py
│ │ ├── test_memory_api.py
│ │ ├── test_memory_simple.py
│ │ ├── test_migration.py
│ │ ├── test_search_api.py
│ │ ├── test_sqlite_vec_embeddings.py
│ │ ├── test_sse_events.py
│ │ ├── test-connection.py
│ │ └── test-hook.js
│ ├── utils
│ │ ├── claude_commands_utils.py
│ │ ├── generate_personalized_claude_md.sh
│ │ ├── groq
│ │ ├── groq_agent_bridge.py
│ │ ├── list-collections.py
│ │ ├── memory_wrapper_uv.py
│ │ ├── query_memories.py
│ │ ├── smithery_wrapper.py
│ │ ├── test_groq_bridge.sh
│ │ └── uv_wrapper.py
│ └── validation
│ ├── check_dev_setup.py
│ ├── check_documentation_links.py
│ ├── diagnose_backend_config.py
│ ├── validate_configuration_complete.py
│ ├── validate_memories.py
│ ├── validate_migration.py
│ ├── validate_timestamp_integrity.py
│ ├── verify_environment.py
│ ├── verify_pytorch_windows.py
│ └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│ └── mcp_memory_service
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── operations.py
│ │ ├── sync_wrapper.py
│ │ └── types.py
│ ├── backup
│ │ ├── __init__.py
│ │ └── scheduler.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── ingestion.py
│ │ ├── main.py
│ │ └── utils.py
│ ├── config.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── associations.py
│ │ ├── base.py
│ │ ├── clustering.py
│ │ ├── compression.py
│ │ ├── consolidator.py
│ │ ├── decay.py
│ │ ├── forgetting.py
│ │ ├── health.py
│ │ └── scheduler.py
│ ├── dependency_check.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mdns_service.py
│ ├── embeddings
│ │ ├── __init__.py
│ │ └── onnx_embeddings.py
│ ├── ingestion
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chunker.py
│ │ ├── csv_loader.py
│ │ ├── json_loader.py
│ │ ├── pdf_loader.py
│ │ ├── registry.py
│ │ ├── semtools_loader.py
│ │ └── text_loader.py
│ ├── lm_studio_compat.py
│ ├── mcp_server.py
│ ├── models
│ │ ├── __init__.py
│ │ └── memory.py
│ ├── server.py
│ ├── services
│ │ ├── __init__.py
│ │ └── memory_service.py
│ ├── storage
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloudflare.py
│ │ ├── factory.py
│ │ ├── http_client.py
│ │ ├── hybrid.py
│ │ └── sqlite_vec.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── exporter.py
│ │ ├── importer.py
│ │ └── litestream_config.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── content_splitter.py
│ │ ├── db_utils.py
│ │ ├── debug.py
│ │ ├── document_processing.py
│ │ ├── gpu_detection.py
│ │ ├── hashing.py
│ │ ├── http_server_manager.py
│ │ ├── port_detection.py
│ │ ├── system_detection.py
│ │ └── time_parser.py
│ └── web
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── analytics.py
│ │ ├── backup.py
│ │ ├── consolidation.py
│ │ ├── documents.py
│ │ ├── events.py
│ │ ├── health.py
│ │ ├── manage.py
│ │ ├── mcp.py
│ │ ├── memories.py
│ │ ├── search.py
│ │ └── sync.py
│ ├── app.py
│ ├── dependencies.py
│ ├── oauth
│ │ ├── __init__.py
│ │ ├── authorization.py
│ │ ├── discovery.py
│ │ ├── middleware.py
│ │ ├── models.py
│ │ ├── registration.py
│ │ └── storage.py
│ ├── sse.py
│ └── static
│ ├── app.js
│ ├── index.html
│ ├── README.md
│ ├── sse_test.html
│ └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── test_compact_types.py
│ │ └── test_operations.py
│ ├── bridge
│ │ ├── mock_responses.js
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ └── test_http_mcp_bridge.js
│ ├── conftest.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_associations.py
│ │ ├── test_clustering.py
│ │ ├── test_compression.py
│ │ ├── test_consolidator.py
│ │ ├── test_decay.py
│ │ └── test_forgetting.py
│ ├── contracts
│ │ └── api-specification.yml
│ ├── integration
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── test_api_key_fallback.py
│ │ ├── test_api_memories_chronological.py
│ │ ├── test_api_tag_time_search.py
│ │ ├── test_api_with_memory_service.py
│ │ ├── test_bridge_integration.js
│ │ ├── test_cli_interfaces.py
│ │ ├── test_cloudflare_connection.py
│ │ ├── test_concurrent_clients.py
│ │ ├── test_data_serialization_consistency.py
│ │ ├── test_http_server_startup.py
│ │ ├── test_mcp_memory.py
│ │ ├── test_mdns_integration.py
│ │ ├── test_oauth_basic_auth.py
│ │ ├── test_oauth_flow.py
│ │ ├── test_server_handlers.py
│ │ └── test_store_memory.py
│ ├── performance
│ │ ├── test_background_sync.py
│ │ └── test_hybrid_live.py
│ ├── README.md
│ ├── smithery
│ │ └── test_smithery.py
│ ├── sqlite
│ │ └── simple_sqlite_vec_test.py
│ ├── test_client.py
│ ├── test_content_splitting.py
│ ├── test_database.py
│ ├── test_hybrid_cloudflare_limits.py
│ ├── test_hybrid_storage.py
│ ├── test_memory_ops.py
│ ├── test_semantic_search.py
│ ├── test_sqlite_vec_storage.py
│ ├── test_time_parser.py
│ ├── test_timestamp_preservation.py
│ ├── timestamp
│ │ ├── test_hook_vs_manual_storage.py
│ │ ├── test_issue99_final_validation.py
│ │ ├── test_search_retrieval_inconsistency.py
│ │ ├── test_timestamp_issue.py
│ │ └── test_timestamp_simple.py
│ └── unit
│ ├── conftest.py
│ ├── test_cloudflare_storage.py
│ ├── test_csv_loader.py
│ ├── test_fastapi_dependencies.py
│ ├── test_import.py
│ ├── test_json_loader.py
│ ├── test_mdns_simple.py
│ ├── test_mdns.py
│ ├── test_memory_service.py
│ ├── test_memory.py
│ ├── test_semtools_loader.py
│ ├── test_storage_interface_compatibility.py
│ └── test_tag_time_filtering.py
├── tools
│ ├── docker
│ │ ├── DEPRECATED.md
│ │ ├── docker-compose.http.yml
│ │ ├── docker-compose.pythonpath.yml
│ │ ├── docker-compose.standalone.yml
│ │ ├── docker-compose.uv.yml
│ │ ├── docker-compose.yml
│ │ ├── docker-entrypoint-persistent.sh
│ │ ├── docker-entrypoint-unified.sh
│ │ ├── docker-entrypoint.sh
│ │ ├── Dockerfile
│ │ ├── Dockerfile.glama
│ │ ├── Dockerfile.slim
│ │ ├── README.md
│ │ └── test-docker-modes.sh
│ └── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/docs/guides/migration.md:
--------------------------------------------------------------------------------
```markdown
# ChromaDB to SQLite-vec Migration Guide
This guide walks you through migrating your existing ChromaDB memories to the new SQLite-vec backend.
> **⚠️ Important Update (v5.0.1):** We've identified and fixed critical migration issues. If you experienced problems with v5.0.0 migration, please use the enhanced migration script or update to v5.0.1.
## Why Migrate?
SQLite-vec offers several advantages over ChromaDB for the MCP Memory Service:
- **Lightweight**: Single file database, no external dependencies
- **Faster startup**: No collection initialization overhead
- **Better performance**: Optimized for small to medium datasets
- **Simpler deployment**: No persistence directory management
- **Cross-platform**: Works consistently across all platforms
- **HTTP/SSE support**: New web interface only works with SQLite-vec
## Migration Methods
### Method 1: Automated Migration Script (Recommended)
Use the provided migration script for a safe, automated migration:
```bash
# Run the migration script
python scripts/migrate_chroma_to_sqlite.py
```
The script will:
- ✅ Check your existing ChromaDB data
- ✅ Count all memories to migrate
- ✅ Ask for confirmation before proceeding
- ✅ Migrate memories in batches with progress tracking
- ✅ Skip duplicates if running multiple times
- ✅ Verify migration completed successfully
- ✅ Provide next steps
### Method 2: Manual Configuration Switch
If you want to start fresh with SQLite-vec (losing existing memories):
```bash
# Set the storage backend to SQLite-vec
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
# Optionally set custom database path
export MCP_MEMORY_SQLITE_PATH=/path/to/your/memory.db
# Restart MCP Memory Service
```
## Step-by-Step Migration
### 1. Backup Your Data (Optional but Recommended)
```bash
# Create a backup of your ChromaDB data
cp -r ~/.mcp_memory_chroma ~/.mcp_memory_chroma_backup
```
### 2. Run Migration Script
```bash
cd /path/to/mcp-memory-service
python scripts/migrate_chroma_to_sqlite.py
```
**Example Output:**
```
🚀 MCP Memory Service - ChromaDB to SQLite-vec Migration
============================================================
📂 ChromaDB source: /Users/you/.mcp_memory_chroma
📂 SQLite-vec destination: /Users/you/.mcp_memory/memory_migrated.db
🔍 Checking ChromaDB data...
✅ Found 1,247 memories in ChromaDB
⚠️ About to migrate 1,247 memories from ChromaDB to SQLite-vec
📝 Destination file: /Users/you/.mcp_memory/memory_migrated.db
Proceed with migration? (y/N): y
🔌 Connecting to ChromaDB...
🔌 Connecting to SQLite-vec...
📥 Fetching all memories from ChromaDB...
🔄 Processing batch 1/25 (50 memories)...
✅ Batch 1 complete. Progress: 50/1,247
... (migration progress) ...
🎉 Migration completed successfully!
📊 MIGRATION SUMMARY
====================================
Total memories found: 1,247
Successfully migrated: 1,247
Duplicates skipped: 0
Failed migrations: 0
Migration duration: 45.32 seconds
```
### 3. Update Configuration
After successful migration, update your environment:
```bash
# Switch to SQLite-vec backend
export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec
# Set the database path (use the path shown in migration output)
export MCP_MEMORY_SQLITE_PATH=/path/to/memory_migrated.db
```
**For permanent configuration, add to your shell profile:**
```bash
# Add to ~/.bashrc, ~/.zshrc, or ~/.profile
echo 'export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec' >> ~/.bashrc
echo 'export MCP_MEMORY_SQLITE_PATH=/path/to/memory_migrated.db' >> ~/.bashrc
```
### 4. Restart and Test
```bash
# If using Claude Desktop, restart Claude Desktop application
# If using MCP server directly, restart the server
# Test that migration worked
python scripts/verify_environment.py
```
### 5. Enable HTTP/SSE Interface (Optional)
To use the new web interface:
```bash
# Enable HTTP server
export MCP_HTTP_ENABLED=true
export MCP_HTTP_PORT=8000
# Start HTTP server
python scripts/run_http_server.py
# Open browser to http://localhost:8000
```
## Configuration Reference
### Environment Variables
| Variable | Description | Default |
|----------|-------------|---------|
| `MCP_MEMORY_STORAGE_BACKEND` | Storage backend (`chroma` or `sqlite_vec`) | `chroma` |
| `MCP_MEMORY_SQLITE_PATH` | SQLite-vec database file path | `~/.mcp_memory/sqlite_vec.db` |
| `MCP_HTTP_ENABLED` | Enable HTTP/SSE interface | `false` |
| `MCP_HTTP_PORT` | HTTP server port | `8000` |
### Claude Desktop Configuration
Update your `claude_desktop_config.json`:
```json
{
"mcpServers": {
"memory": {
"command": "uv",
"args": [
"--directory",
"/path/to/mcp-memory-service",
"run",
"memory"
],
"env": {
"MCP_MEMORY_STORAGE_BACKEND": "sqlite_vec",
"MCP_MEMORY_SQLITE_PATH": "/path/to/memory_migrated.db"
}
}
}
}
```
## Troubleshooting
### Common Migration Issues (v5.0.0)
> **If you're experiencing issues with v5.0.0 migration, please use the enhanced migration script:**
> ```bash
> python scripts/migrate_v5_enhanced.py --help
> ```
#### Issue 1: Custom Data Locations Not Recognized
**Problem:** Migration script uses hardcoded paths and ignores custom ChromaDB locations.
**Solution:**
```bash
# Specify custom paths explicitly
python scripts/migrate_chroma_to_sqlite.py \
--chroma-path /your/custom/chroma/path \
--sqlite-path /your/custom/sqlite.db
# Or use environment variables
export MCP_MEMORY_CHROMA_PATH=/your/custom/chroma/path
export MCP_MEMORY_SQLITE_PATH=/your/custom/sqlite.db
python scripts/migrate_chroma_to_sqlite.py
```
#### Issue 2: Content Hash Errors
**Problem:** Migration fails with "NOT NULL constraint failed: memories.content_hash"
**Solution:** This has been fixed in v5.0.1. The migration script now generates proper SHA256 hashes. If you encounter this:
1. Update to latest version: `git pull`
2. Use the enhanced migration script: `python scripts/migrate_v5_enhanced.py`
#### Issue 3: Malformed Tags (60% Corruption)
**Problem:** Tags become corrupted during migration, appearing as `['tag1', 'tag2']` instead of `tag1,tag2`
**Solution:** The enhanced migration script includes tag validation and correction:
```bash
# Validate existing migration
python scripts/validate_migration.py /path/to/sqlite.db
# Re-migrate with fix
python scripts/migrate_v5_enhanced.py --force
```
#### Issue 4: Migration Hangs
**Problem:** Migration appears to hang with no progress indication
**Solution:** Use verbose mode and batch size control:
```bash
# Run with progress indicators
pip install tqdm # For progress bars
python scripts/migrate_v5_enhanced.py --verbose --batch-size 10
```
#### Issue 5: Dependency Conflicts
**Problem:** SSL certificate errors, version conflicts with ChromaDB/sentence-transformers
**Solution:**
```bash
# Clean install dependencies
pip uninstall chromadb sentence-transformers -y
pip install --upgrade chromadb sentence-transformers
# If SSL issues persist
export REQUESTS_CA_BUNDLE=""
export SSL_CERT_FILE=""
```
### Validation and Recovery
#### Validate Your Migration
After migration, always validate the data:
```bash
# Basic validation
python scripts/validate_migration.py
# Compare with original ChromaDB
python scripts/validate_migration.py --compare --chroma-path ~/.mcp_memory_chroma
```
#### Recovery Options
If migration failed or corrupted data:
1. **Restore from backup:**
```bash
# If you created a backup
python scripts/restore_memories.py migration_backup.json
```
2. **Rollback to ChromaDB:**
```bash
# Temporarily switch back
export MCP_MEMORY_STORAGE_BACKEND=chroma
# Your ChromaDB data is unchanged
```
3. **Re-migrate with enhanced script:**
```bash
# Clean the target database
rm /path/to/sqlite_vec.db
# Use enhanced migration
python scripts/migrate_v5_enhanced.py \
--chroma-path /path/to/chroma \
--sqlite-path /path/to/new.db \
--backup backup.json
```
### Getting Help
If you continue to experience issues:
1. **Check logs:** Add `--verbose` flag for detailed output
2. **Validate data:** Use `scripts/validate_migration.py`
3. **Report issues:** [GitHub Issues](https://github.com/doobidoo/mcp-memory-service/issues)
4. **Emergency rollback:** Your ChromaDB data remains untouched
### Migration Best Practices
1. **Always backup first:**
```bash
cp -r ~/.mcp_memory_chroma ~/.mcp_memory_chroma_backup
```
2. **Test with dry-run:**
```bash
python scripts/migrate_v5_enhanced.py --dry-run
```
3. **Validate after migration:**
```bash
python scripts/validate_migration.py
```
4. **Keep ChromaDB data until confirmed:**
- Don't delete ChromaDB data immediately
- Test the migrated database thoroughly
- Keep backups for at least a week
**"Migration verification failed"**
- Some memories may have failed to migrate
- Check error summary in migration output
- Consider re-running migration
### Runtime Issues
**"Storage backend not found"**
- Ensure `MCP_MEMORY_STORAGE_BACKEND=sqlite_vec`
- Check that SQLite-vec dependencies are installed
**"Database file not found"**
- Verify `MCP_MEMORY_SQLITE_PATH` points to migrated database
- Check file permissions
### Performance Comparison
| Aspect | ChromaDB | SQLite-vec |
|--------|----------|------------|
| Startup time | ~2-3 seconds | ~0.5 seconds |
| Memory usage | ~100-200MB | ~20-50MB |
| Storage | Directory + files | Single file |
| Dependencies | chromadb, sqlite | sqlite-vec only |
| Scalability | Better for >10k memories | Optimal for <10k memories |
## Rollback Plan
If you need to switch back to ChromaDB:
```bash
# Switch back to ChromaDB
export MCP_MEMORY_STORAGE_BACKEND=chroma
unset MCP_MEMORY_SQLITE_PATH
# Restart MCP Memory Service
```
Your original ChromaDB data remains unchanged during migration.
## Next Steps
After successful migration:
1. ✅ Test memory operations (store, retrieve, search)
2. ✅ Try the HTTP/SSE interface for real-time updates
3. ✅ Update any scripts or tools that reference storage paths
4. ✅ Consider backing up your new SQLite-vec database regularly
5. ✅ Remove old ChromaDB data after confirming migration success
## Support
If you encounter issues:
1. Check the migration output and error messages
2. Verify environment variables are set correctly
3. Test with a small subset of data first
4. Review logs for detailed error information
The migration preserves all your data including:
- Memory content and metadata
- Tags and timestamps
- Content hashes (for deduplication)
- Semantic embeddings (regenerated with same model)
```
--------------------------------------------------------------------------------
/archive/docs-removed-2025-08-23/claude-code-integration.md:
--------------------------------------------------------------------------------
```markdown
# Using MCP Memory Service with Claude Code
This guide explains how to integrate the MCP Memory Service with Claude Code, providing two powerful approaches for using persistent memory capabilities in the Claude CLI environment.
## Prerequisites
Before you begin, ensure you have:
1. Installed [Claude Code](https://www.anthropic.com/news/introducing-claude-code) CLI tool
2. Set up the MCP Memory Service on your machine
3. Basic familiarity with command-line interfaces
## Integration Approaches
The MCP Memory Service offers **two integration methods** with Claude Code:
### 🎯 Method 1: Conversational Commands (Recommended)
**New in v2.2.0** - Direct command syntax following the CCPlugins pattern
### 🔧 Method 2: MCP Server Registration
Traditional MCP server approach for deep integration
---
## Method 1: Conversational Commands (v2.2.0)
The **conversational commands approach** provides direct memory operations through Claude Code commands that follow the CCPlugins pattern. This is the **recommended approach** for most users as it provides immediate access to memory operations without MCP server configuration.
### Installation
The commands can be installed during the main MCP Memory Service installation:
```bash
# Install with commands (will prompt if Claude Code CLI is detected)
python install.py
# Force install commands without prompting
python install.py --install-claude-commands
# Skip command installation prompt
python install.py --skip-claude-commands-prompt
```
Or install them manually:
```bash
# Install commands directly
python scripts/claude_commands_utils.py
# Test installation prerequisites
python scripts/claude_commands_utils.py --test
# Uninstall commands
python scripts/claude_commands_utils.py --uninstall
```
### Available Commands
#### `/memory-store` - Store Information with Context
Store information in your memory service with automatic context detection and smart tagging.
```bash
claude /memory-store "Important architectural decision about database backend"
claude /memory-store --tags "decision,architecture" "We chose SQLite-vec for performance"
claude /memory-store --type "note" "Remember to update Docker configuration"
```
**Features:**
- Automatic project context detection from current directory
- Smart tag generation based on file types and git repository
- Session context integration
- Metadata enrichment with timestamps and paths
#### `/memory-recall` - Time-based Memory Retrieval
Retrieve memories using natural language time expressions.
```bash
claude /memory-recall "what did we decide about the database last week?"
claude /memory-recall "yesterday's architectural discussions"
claude /memory-recall --project "mcp-memory-service" "last month's progress"
```
**Features:**
- Natural language time parsing ("yesterday", "last Tuesday", "two months ago")
- Context-aware filtering based on current project
- Temporal relevance scoring
- Seasonal and event-based queries
#### `/memory-search` - Tag and Content Search
Search through stored memories using tags, content keywords, and semantic similarity.
```bash
claude /memory-search --tags "architecture,database"
claude /memory-search "SQLite performance optimization"
claude /memory-search --project "current" --type "decision"
```
**Features:**
- Tag-based filtering with partial matching
- Semantic content search using embeddings
- Combined query support (tags + content)
- Relevance scoring and ranking
#### `/memory-context` - Session Context Integration
Capture the current conversation and project context as a memory.
```bash
claude /memory-context
claude /memory-context --summary "Architecture planning session"
claude /memory-context --include-files --include-commits
```
**Features:**
- Automatic session analysis and summarization
- Git repository state capture
- File change detection and inclusion
- Key decision and insight extraction
#### `/memory-health` - Service Health and Diagnostics
Check the health and status of your MCP Memory Service.
```bash
claude /memory-health
claude /memory-health --detailed
claude /memory-health --test-operations
```
**Features:**
- Service connectivity verification
- Database health and statistics
- Performance metrics and diagnostics
- Auto-discovery testing and troubleshooting
### Command Features
- **Auto-Discovery**: Commands automatically locate your MCP Memory Service via mDNS
- **Context Awareness**: Understand current project, git repository, and session state
- **Error Recovery**: Graceful handling when memory service is unavailable
- **Cross-Platform**: Full support for Windows, macOS, and Linux
- **Backend Agnostic**: Works with both ChromaDB and SQLite-vec storage backends
### Example Workflow
```bash
# Start a development session
claude /memory-context --summary "Starting work on mDNS integration"
# Store important decisions during development
claude /memory-store --tags "mDNS,architecture" "Decided to use zeroconf library for service discovery"
# Continue development work...
# Later, recall what was decided
claude /memory-recall "what did we decide about mDNS last week?"
# Search for related technical information
claude /memory-search --tags "mDNS,zeroconf"
# Check if everything is working correctly
claude /memory-health
```
---
## Method 2: MCP Server Registration
For users who prefer the traditional MCP server approach or need deeper integration, you can register the MCP Memory Service directly with Claude Code.
### Registering the Memory Service with Claude Code
You can register the MCP Memory Service to work with Claude Code using the `claude mcp add` command.
### Check Existing MCP Servers
To see which MCP servers are already registered with Claude:
```bash
claude mcp list
```
### Add the Memory Service
To add the memory service that's running on your local machine:
```bash
claude mcp add memory-service spawn -- /path/to/your/command
```
For example, if you've installed the memory service using UV (recommended):
```bash
claude mcp add memory-service spawn -- /opt/homebrew/bin/uv --directory /Users/yourusername/path/to/mcp-memory-service run memory
```
Replace the path elements with the actual paths on your system.
## Example Configuration
Here's a real-world example of adding the memory service to Claude Code:
```bash
claude mcp add memory-service spawn -- /opt/homebrew/bin/uv --directory /Users/yourusername/Documents/GitHub/mcp-memory-service run memory
```
This command:
1. Registers a new MCP server named "memory-service"
2. Uses the "spawn" transport method, which runs the command when needed
3. Specifies the full path to the UV command
4. Sets the working directory to your mcp-memory-service location
5. Runs the "memory" module
After running this command, you should see a confirmation message like:
```
Added stdio MCP server memory-service with command: spawn /opt/homebrew/bin/uv --directory /Users/yourusername/Documents/GitHub/mcp-memory-service run memory to local config
```
## Using Memory Functions in Claude Code
Once registered, you can use the memory service directly in your conversations with Claude Code. The memory functions available include:
- Storing memories
- Retrieving memories based on semantic search
- Recalling information from specific time periods
- Searching by tags
- And many more
---
## Troubleshooting
### For Conversational Commands (Method 1)
If you encounter issues with the commands:
1. **Commands Not Available**:
- Verify Claude Code CLI is installed: `claude --version`
- Check commands are installed: `ls ~/.claude/commands/memory-*.md`
- Reinstall commands: `python scripts/claude_commands_utils.py`
2. **MCP Service Connection Issues**:
- Verify MCP Memory Service is running: `memory --help`
- Check service health: `claude /memory-health`
- Ensure service is discoverable via mDNS
3. **Permission Issues**:
- Check commands directory permissions: `ls -la ~/.claude/commands/`
- Ensure write access to the commands directory
### For MCP Server Registration (Method 2)
If you encounter issues with MCP server registration:
1. Verify the memory service is running properly as a standalone application
2. Check that the paths in your `claude mcp add` command are correct
3. Ensure you have the necessary permissions to execute the specified commands
4. Try running `claude mcp list` to verify the server was added correctly
## Which Method Should You Use?
### Choose **Conversational Commands (Method 1)** if:
- ✅ You want quick setup with minimal configuration
- ✅ You prefer direct command syntax (`claude /memory-store`)
- ✅ You want automatic service discovery and context awareness
- ✅ You're new to MCP and want the simplest approach
- ✅ You want CCPlugins-compatible command integration
### Choose **MCP Server Registration (Method 2)** if:
- ✅ You need deep integration with Claude Code's MCP system
- ✅ You want to use the service alongside other MCP servers
- ✅ You prefer traditional MCP tool-based interactions
- ✅ You need maximum control over the server configuration
- ✅ You're building complex multi-server MCP workflows
## Benefits of Claude Code Integration
Both integration methods provide powerful advantages:
### Core Benefits
1. **Persistent Memory**: Your conversations and stored information persist across sessions
2. **Semantic Search**: Claude can retrieve relevant information even when not phrased exactly the same way
3. **Temporal Recall**: Ask about information from specific time periods (e.g., "last week", "yesterday")
4. **Organized Knowledge**: Use tags to categorize and later retrieve information by category
5. **Project Context**: Commands understand your current project and development context
### Method 1 (Commands) Additional Benefits
- **Immediate Access**: Direct command syntax without MCP server complexity
- **Context Integration**: Automatic project and git repository detection
- **Error Recovery**: Graceful fallback when service is unavailable
- **User-Friendly**: Conversational interface following established patterns
### Method 2 (MCP Server) Additional Benefits
- **Deep Integration**: Full MCP protocol support with rich tool interactions
- **Flexible Configuration**: Advanced server configuration options
- **Multi-Server Support**: Seamless integration with other MCP servers
- **Protocol Compliance**: Standard MCP tool-based interactions
This integration transforms Claude Code into a powerful knowledge management system with long-term memory capabilities, making your development workflow more efficient and context-aware.
```
--------------------------------------------------------------------------------
/tests/test_content_splitting.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tests for content splitting and backend-specific length limits.
Tests cover:
- Content splitting utility functions
- Backend limit enforcement
- Automatic chunking with metadata
- Boundary preservation (sentences, paragraphs, code blocks)
- Overlap between chunks for context preservation
"""
import pytest
from src.mcp_memory_service.utils.content_splitter import (
split_content,
estimate_chunks_needed,
validate_chunk_lengths,
_find_best_split_point
)
class TestContentSplitter:
"""Test the content_splitter utility module."""
def test_split_short_content(self):
"""Content shorter than max_length should not be split."""
content = "This is a short sentence."
chunks = split_content(content, max_length=100)
assert len(chunks) == 1
assert chunks[0] == content
def test_split_long_content_character_mode(self):
"""Test character-based splitting without boundary preservation."""
content = "a" * 500
chunks = split_content(content, max_length=100, preserve_boundaries=False, overlap=10)
# Should create multiple chunks
assert len(chunks) > 1
# All chunks should be <= max_length
assert all(len(chunk) <= 100 for chunk in chunks)
# Should have overlap
assert chunks[1].startswith(chunks[0][-10:])
def test_split_preserves_paragraphs(self):
"""Test that paragraph boundaries are preferred for splitting."""
content = "First paragraph.\n\nSecond paragraph.\n\nThird paragraph."
chunks = split_content(content, max_length=30, preserve_boundaries=True)
# Should split at paragraph boundaries
assert len(chunks) >= 2
# Each chunk should end cleanly (no mid-paragraph cuts)
for chunk in chunks[:-1]: # Check all but last chunk
assert chunk.strip().endswith('.') or '\n\n' in chunk
def test_split_preserves_sentences(self):
"""Test that sentence boundaries are preferred when paragraphs don't fit."""
content = "First sentence. Second sentence. Third sentence. Fourth sentence."
chunks = split_content(content, max_length=40, preserve_boundaries=True)
# Should split at sentence boundaries
assert len(chunks) >= 2
# Most chunks should end with period
period_endings = sum(1 for chunk in chunks if chunk.strip().endswith('.'))
assert period_endings >= len(chunks) - 1
def test_split_preserves_words(self):
"""Test that word boundaries are preferred when sentences don't fit."""
content = "word1 word2 word3 word4 word5 word6 word7 word8"
chunks = split_content(content, max_length=25, preserve_boundaries=True)
# Should split at word boundaries
assert len(chunks) >= 2
# No chunk should end mid-word (except possibly last)
for chunk in chunks[:-1]:
# Should not end with partial word (will end with space or be complete)
assert chunk.endswith(' ') or chunk == chunks[-1]
def test_split_overlap(self):
"""Test that chunks have proper overlap for context."""
content = "The quick brown fox jumps over the lazy dog. " * 10
chunks = split_content(content, max_length=100, preserve_boundaries=True, overlap=20)
assert len(chunks) > 1
# Check that consecutive chunks have overlap
for i in range(len(chunks) - 1):
# The next chunk should contain some content from the end of current chunk
current_end = chunks[i][-20:]
assert any(word in chunks[i+1] for word in current_end.split()[:3])
def test_estimate_chunks_needed(self):
"""Test chunk estimation function."""
# Basic cases without overlap
assert estimate_chunks_needed(0, 100) == 0
assert estimate_chunks_needed(100, 100) == 1
assert estimate_chunks_needed(200, 100) == 2
assert estimate_chunks_needed(250, 100) == 3
# Cases with overlap
assert estimate_chunks_needed(100, 100, overlap=10) == 1 # Fits in one chunk
assert estimate_chunks_needed(150, 100, overlap=10) == 2 # First chunk 100, second chunk covers remaining 50
assert estimate_chunks_needed(200, 100, overlap=50) == 3 # Effective chunk size is 50
# Edge cases
assert estimate_chunks_needed(100, 100, overlap=100) == 1 # Invalid overlap, fallback to simple division
assert estimate_chunks_needed(100, 100, overlap=150) == 1 # Invalid overlap larger than max_length
def test_validate_chunk_lengths(self):
"""Test chunk length validation."""
valid_chunks = ["short", "also short", "still short"]
invalid_chunks = ["short", "this is way too long for the limit", "short"]
assert validate_chunk_lengths(valid_chunks, max_length=50) is True
assert validate_chunk_lengths(invalid_chunks, max_length=20) is False
def test_find_best_split_point_paragraph(self):
"""Test that paragraph breaks are prioritized."""
text = "First para.\n\nSecond para.\n\nThird para."
split_point = _find_best_split_point(text, max_length=25)
# Should split at first paragraph break
assert text[split_point-2:split_point] == '\n\n'
def test_find_best_split_point_sentence(self):
"""Test that sentence boundaries are used when no paragraph breaks."""
text = "First sentence. Second sentence. Third sentence."
split_point = _find_best_split_point(text, max_length=30)
# Should split at sentence boundary
assert '. ' in text[:split_point]
def test_split_empty_content(self):
"""Test handling of empty content."""
chunks = split_content("", max_length=100)
assert chunks == []
def test_split_exact_length(self):
"""Test content exactly at max_length."""
content = "a" * 100
chunks = split_content(content, max_length=100)
assert len(chunks) == 1
assert chunks[0] == content
def test_split_code_blocks(self):
"""Test that code blocks are handled reasonably."""
content = """def function_one():
return True
def function_two():
return False
def function_three():
return None"""
chunks = split_content(content, max_length=60, preserve_boundaries=True)
# Should split at paragraph/function boundaries
assert len(chunks) >= 2
# Each chunk should contain complete functions ideally
for chunk in chunks:
# Count function definitions
if 'def ' in chunk:
# If it has a def, it should have a return (complete function)
assert 'return' in chunk or chunk == chunks[-1]
class TestBackendLimits:
"""Test backend-specific content length limits."""
def test_cloudflare_limit(self):
"""Test that Cloudflare backend uses config constant."""
from src.mcp_memory_service.storage.cloudflare import CloudflareStorage
from src.mcp_memory_service.config import CLOUDFLARE_MAX_CONTENT_LENGTH
# Verify the class constant matches config
assert CloudflareStorage._MAX_CONTENT_LENGTH == CLOUDFLARE_MAX_CONTENT_LENGTH
def test_sqlitevec_unlimited(self):
"""Test that SQLite-vec backend uses config constant."""
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from src.mcp_memory_service.config import SQLITEVEC_MAX_CONTENT_LENGTH
# Create a mock instance to check property
import tempfile
import os
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test.db")
storage = SqliteVecMemoryStorage(db_path=db_path)
# Should return configured value (default: None/unlimited)
assert storage.max_content_length == SQLITEVEC_MAX_CONTENT_LENGTH
assert storage.supports_chunking is True
def test_hybrid_follows_config(self):
"""Test that Hybrid backend uses config constant."""
from src.mcp_memory_service.storage.hybrid import HybridMemoryStorage
from src.mcp_memory_service.config import HYBRID_MAX_CONTENT_LENGTH
import tempfile
import os
with tempfile.TemporaryDirectory() as tmpdir:
db_path = os.path.join(tmpdir, "test.db")
storage = HybridMemoryStorage(
sqlite_db_path=db_path,
cloudflare_config=None # No cloud sync for this test
)
# Should match configured hybrid limit
assert storage.max_content_length == HYBRID_MAX_CONTENT_LENGTH
assert storage.supports_chunking is True
class TestConfigurationConstants:
"""Test configuration constants for content limits."""
def test_config_constants_exist(self):
"""Test that all content limit constants are defined."""
from src.mcp_memory_service.config import (
CLOUDFLARE_MAX_CONTENT_LENGTH,
SQLITEVEC_MAX_CONTENT_LENGTH,
HYBRID_MAX_CONTENT_LENGTH,
ENABLE_AUTO_SPLIT,
CONTENT_SPLIT_OVERLAP,
CONTENT_PRESERVE_BOUNDARIES
)
assert CLOUDFLARE_MAX_CONTENT_LENGTH == 800
assert SQLITEVEC_MAX_CONTENT_LENGTH is None # Unlimited
assert HYBRID_MAX_CONTENT_LENGTH == CLOUDFLARE_MAX_CONTENT_LENGTH
assert isinstance(ENABLE_AUTO_SPLIT, bool)
assert isinstance(CONTENT_SPLIT_OVERLAP, int)
assert isinstance(CONTENT_PRESERVE_BOUNDARIES, bool)
def test_config_validation(self):
"""Test that config values are sensible."""
from src.mcp_memory_service.config import (
CLOUDFLARE_MAX_CONTENT_LENGTH,
CONTENT_SPLIT_OVERLAP
)
# Limits should be positive
assert CLOUDFLARE_MAX_CONTENT_LENGTH > 0
# Overlap should be reasonable
assert 0 <= CONTENT_SPLIT_OVERLAP <= 500
if __name__ == "__main__":
pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/api/client.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Storage client wrapper for code execution interface.
Provides global storage instance management with lazy initialization
and automatic connection reuse for optimal performance.
Design Goals:
- Single storage instance per process (avoid redundant connections)
- Lazy initialization (create on first use)
- Thread-safe access to global instance
- Automatic cleanup on process exit
- Graceful error handling with fallbacks
Performance:
- First call: ~50ms (initialization + connection)
- Subsequent calls: ~0ms (reuses connection)
- Memory overhead: ~10MB for embedding model cache
"""
import asyncio
import logging
import os
from typing import Optional
from ..storage.base import MemoryStorage
from ..storage.factory import create_storage_instance
from ..config import DATABASE_PATH, get_base_directory
logger = logging.getLogger(__name__)
# Global storage instance (module-level singleton)
_storage_instance: Optional[MemoryStorage] = None
_initialization_lock = asyncio.Lock()
# Global consolidation instances (set by HTTP server)
_consolidator_instance: Optional["DreamInspiredConsolidator"] = None
_scheduler_instance: Optional["ConsolidationScheduler"] = None
async def _get_storage_async() -> MemoryStorage:
"""
Get or create storage backend instance (async version).
This function implements lazy initialization with connection reuse:
1. Returns existing instance if available
2. Creates new instance if none exists
3. Initializes storage backend on first call
4. Reuses connection for subsequent calls
Thread Safety:
Uses asyncio.Lock to prevent race conditions during initialization.
Returns:
Initialized MemoryStorage instance
Raises:
RuntimeError: If storage initialization fails
"""
global _storage_instance
# Fast path: return existing instance
if _storage_instance is not None:
return _storage_instance
# Slow path: create new instance with lock
async with _initialization_lock:
# Double-check after acquiring lock (another coroutine may have initialized)
if _storage_instance is not None:
return _storage_instance
try:
logger.info("Initializing storage backend for code execution API...")
# Determine SQLite database path
db_path = DATABASE_PATH
if not db_path:
# Fallback to cross-platform default path
base_dir = get_base_directory()
db_path = os.path.join(base_dir, "sqlite_vec.db")
logger.warning(f"DATABASE_PATH not configured, using default: {db_path}")
# Ensure database directory exists
db_dir = os.path.dirname(db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
logger.info(f"Created database directory: {db_dir}")
# Create and initialize storage instance
_storage_instance = await create_storage_instance(db_path)
logger.info(f"Storage backend initialized: {type(_storage_instance).__name__}")
return _storage_instance
except Exception as e:
logger.error(f"Failed to initialize storage backend: {e}")
raise RuntimeError(f"Storage initialization failed: {e}") from e
async def get_storage_async() -> MemoryStorage:
"""
Get storage backend instance (async version).
This is the internal async version that should be used within
async contexts. For synchronous contexts, the sync_wrapper
will handle the event loop management.
Returns:
Initialized MemoryStorage instance
Raises:
RuntimeError: If storage initialization fails
"""
return await _get_storage_async()
def get_storage() -> MemoryStorage:
"""
Get storage backend instance (synchronous wrapper).
This is the primary entry point for code execution API operations.
It wraps the async initialization in a synchronous interface for
ease of use in non-async contexts.
Connection Reuse:
- First call: ~50ms (initialization)
- Subsequent calls: ~0ms (returns cached instance)
Returns:
Initialized MemoryStorage instance
Raises:
RuntimeError: If storage initialization fails
Example:
>>> storage = get_storage()
>>> # Use storage for operations
>>> results = await storage.retrieve("query", n_results=5)
"""
global _storage_instance
# Fast path: if already initialized, return immediately
if _storage_instance is not None:
return _storage_instance
# Need to initialize - this requires an event loop
try:
# Check if we're already in an async context
try:
loop = asyncio.get_running_loop()
# We're in an async context, but we can't use run_until_complete
# This shouldn't happen in normal usage, but handle it gracefully
logger.error("get_storage() called from async context - use get_storage_async() instead")
raise RuntimeError("get_storage() cannot be called from async context")
except RuntimeError:
# No running loop, we can proceed with synchronous initialization
pass
# Get or create event loop
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Run async initialization
storage = loop.run_until_complete(_get_storage_async())
return storage
except Exception as e:
logger.error(f"Error getting storage instance: {e}")
raise
def close() -> None:
"""
Close and clean up storage resources.
Explicitly closes the storage backend connection and clears
the global instance. This ensures proper cleanup when the
process is terminating or when you want to force a reconnection.
After calling close(), the next call to get_storage() will
create a fresh connection.
Note:
If the storage backend has an async close() method, it will
be scheduled but not awaited. For proper async cleanup, use
close_async() instead.
Example:
>>> from mcp_memory_service.api import close
>>> close() # Cleanup resources
>>> # Next get_storage() call will create new connection
"""
global _storage_instance
if _storage_instance is not None:
try:
logger.info("Closing storage instance")
# Simply clear the instance reference
# Async cleanup will happen via atexit or explicit close_async()
except Exception as e:
logger.warning(f"Error closing storage instance: {e}")
finally:
_storage_instance = None
async def close_async() -> None:
"""
Close and clean up storage resources (async version).
This is the proper way to close storage backends that have
async cleanup methods. Use this in async contexts.
Example:
>>> from mcp_memory_service.api import close_async
>>> await close_async() # Proper async cleanup
"""
global _storage_instance
if _storage_instance is not None:
try:
logger.info("Closing storage instance (async)")
# If storage has an async close method, await it
if hasattr(_storage_instance, 'close') and callable(_storage_instance.close):
close_method = _storage_instance.close()
# Check if it's a coroutine
if hasattr(close_method, '__await__'):
await close_method
except Exception as e:
logger.warning(f"Error closing storage instance: {e}")
finally:
_storage_instance = None
def reset_storage() -> None:
"""
Reset global storage instance.
Useful for testing or when configuration changes require
reinitializing the storage backend.
Warning:
This closes the existing connection. Subsequent calls to
get_storage() will create a new instance.
Example:
>>> reset_storage() # Close current connection
>>> storage = get_storage() # Creates new connection
"""
close() # Reuse the close() method for consistency
# Cleanup on module exit
import atexit
def _cleanup_storage():
"""Cleanup storage instance on process exit."""
global _storage_instance
if _storage_instance is not None:
logger.info("Cleaning up storage instance on exit")
_storage_instance = None
atexit.register(_cleanup_storage)
def set_consolidator(consolidator: "DreamInspiredConsolidator") -> None:
"""
Set global consolidator instance (called by HTTP server).
This allows the API to access the consolidator instance
that's managed by the HTTP server lifecycle.
Args:
consolidator: DreamInspiredConsolidator instance
"""
global _consolidator_instance
_consolidator_instance = consolidator
logger.info("Global consolidator instance set")
def set_scheduler(scheduler: "ConsolidationScheduler") -> None:
"""
Set global scheduler instance (called by HTTP server).
This allows the API to access the scheduler instance
that's managed by the HTTP server lifecycle.
Args:
scheduler: ConsolidationScheduler instance
"""
global _scheduler_instance
_scheduler_instance = scheduler
logger.info("Global scheduler instance set")
def get_consolidator() -> Optional["DreamInspiredConsolidator"]:
"""
Get global consolidator instance.
Returns:
DreamInspiredConsolidator instance or None if not set
"""
return _consolidator_instance
def get_scheduler() -> Optional["ConsolidationScheduler"]:
"""
Get global scheduler instance.
Returns:
ConsolidationScheduler instance or None if not set
"""
return _scheduler_instance
```
--------------------------------------------------------------------------------
/claude-hooks/memory-mode-controller.js:
--------------------------------------------------------------------------------
```javascript
#!/usr/bin/env node
/**
* Memory Mode Controller
* Command-line utility for managing memory hook performance profiles
*/
const fs = require('fs').promises;
const path = require('path');
class MemoryModeController {
constructor(configPath = null) {
this.configPath = configPath || path.join(__dirname, 'config.json');
}
/**
* Load current configuration
*/
async loadConfig() {
try {
const configData = await fs.readFile(this.configPath, 'utf8');
return JSON.parse(configData);
} catch (error) {
throw new Error(`Failed to load config: ${error.message}`);
}
}
/**
* Save configuration
*/
async saveConfig(config) {
try {
await fs.writeFile(this.configPath, JSON.stringify(config, null, 2));
} catch (error) {
throw new Error(`Failed to save config: ${error.message}`);
}
}
/**
* Switch to a performance profile
*/
async switchProfile(profileName) {
const config = await this.loadConfig();
if (!config.performance?.profiles[profileName]) {
throw new Error(`Unknown profile: ${profileName}. Available profiles: ${Object.keys(config.performance?.profiles || {}).join(', ')}`);
}
config.performance.defaultProfile = profileName;
await this.saveConfig(config);
const profile = config.performance.profiles[profileName];
console.log(`✅ Switched to profile: ${profileName}`);
console.log(`📊 Description: ${profile.description}`);
console.log(`⚡ Max Latency: ${profile.maxLatency || 'adaptive'}ms`);
console.log(`🎯 Enabled Tiers: ${profile.enabledTiers?.join(', ') || 'adaptive'}`);
console.log(`🔄 Background Processing: ${profile.backgroundProcessing ? 'enabled' : 'disabled'}`);
return profile;
}
/**
* Get current status
*/
async getStatus() {
const config = await this.loadConfig();
const currentProfile = config.performance?.defaultProfile || 'balanced';
const profile = config.performance?.profiles[currentProfile];
console.log('📊 Memory Hook Status');
console.log('═'.repeat(50));
console.log(`Current Profile: ${currentProfile}`);
console.log(`Description: ${profile?.description || 'No description'}`);
console.log(`Natural Triggers: ${config.naturalTriggers?.enabled ? 'enabled' : 'disabled'}`);
console.log(`Sensitivity: ${config.patternDetector?.sensitivity || 0.7}`);
console.log(`Trigger Threshold: ${config.naturalTriggers?.triggerThreshold || 0.6}`);
console.log(`Cooldown Period: ${(config.naturalTriggers?.cooldownPeriod || 30000) / 1000}s`);
if (profile) {
console.log('\n🎯 Performance Settings');
console.log('─'.repeat(30));
console.log(`Max Latency: ${profile.maxLatency || 'adaptive'}ms`);
console.log(`Enabled Tiers: ${profile.enabledTiers?.join(', ') || 'adaptive'}`);
console.log(`Background Processing: ${profile.backgroundProcessing ? 'enabled' : 'disabled'}`);
console.log(`Degrade Threshold: ${profile.degradeThreshold || 'adaptive'}ms`);
}
console.log('\n🔧 Available Profiles');
console.log('─'.repeat(30));
for (const [name, prof] of Object.entries(config.performance?.profiles || {})) {
const current = name === currentProfile ? ' (current)' : '';
console.log(`${name}${current}: ${prof.description}`);
}
return {
currentProfile,
config: config.performance,
naturalTriggers: config.naturalTriggers
};
}
/**
* Update sensitivity
*/
async updateSensitivity(sensitivity) {
const config = await this.loadConfig();
if (sensitivity < 0 || sensitivity > 1) {
throw new Error('Sensitivity must be between 0 and 1');
}
if (!config.patternDetector) {
config.patternDetector = {};
}
config.patternDetector.sensitivity = sensitivity;
await this.saveConfig(config);
console.log(`✅ Updated sensitivity to ${sensitivity}`);
return sensitivity;
}
/**
* Update trigger threshold
*/
async updateThreshold(threshold) {
const config = await this.loadConfig();
if (threshold < 0 || threshold > 1) {
throw new Error('Threshold must be between 0 and 1');
}
if (!config.naturalTriggers) {
config.naturalTriggers = {};
}
config.naturalTriggers.triggerThreshold = threshold;
await this.saveConfig(config);
console.log(`✅ Updated trigger threshold to ${threshold}`);
return threshold;
}
/**
* Enable or disable natural triggers
*/
async toggleNaturalTriggers(enabled = null) {
const config = await this.loadConfig();
if (!config.naturalTriggers) {
config.naturalTriggers = {};
}
if (enabled === null) {
enabled = !config.naturalTriggers.enabled;
}
config.naturalTriggers.enabled = enabled;
await this.saveConfig(config);
console.log(`✅ Natural triggers ${enabled ? 'enabled' : 'disabled'}`);
return enabled;
}
/**
* Reset to default configuration
*/
async resetToDefaults() {
const config = await this.loadConfig();
config.performance.defaultProfile = 'balanced';
config.naturalTriggers = {
enabled: true,
triggerThreshold: 0.6,
cooldownPeriod: 30000,
maxMemoriesPerTrigger: 5
};
// Pattern detector defaults
if (!config.patternDetector) {
config.patternDetector = {};
}
config.patternDetector.sensitivity = 0.7;
config.patternDetector.adaptiveLearning = true;
await this.saveConfig(config);
console.log('✅ Reset to default configuration');
return config;
}
/**
* Get performance profiles information
*/
async listProfiles() {
const config = await this.loadConfig();
const profiles = config.performance?.profiles || {};
console.log('📋 Available Performance Profiles');
console.log('═'.repeat(60));
for (const [name, profile] of Object.entries(profiles)) {
const current = name === config.performance?.defaultProfile ? ' ⭐' : '';
console.log(`\n${name}${current}`);
console.log(` Description: ${profile.description}`);
console.log(` Max Latency: ${profile.maxLatency || 'adaptive'}ms`);
console.log(` Enabled Tiers: ${profile.enabledTiers?.join(', ') || 'adaptive'}`);
console.log(` Background Processing: ${profile.backgroundProcessing ? 'yes' : 'no'}`);
}
return profiles;
}
}
/**
* Command-line interface
*/
async function main() {
const args = process.argv.slice(2);
const controller = new MemoryModeController();
try {
if (args.length === 0 || args[0] === 'status') {
await controller.getStatus();
return;
}
const command = args[0];
switch (command) {
case 'switch':
case 'profile':
if (!args[1]) {
console.error('❌ Please specify a profile name');
console.log('Available profiles: speed_focused, balanced, memory_aware, adaptive');
process.exit(1);
}
await controller.switchProfile(args[1]);
break;
case 'sensitivity':
if (!args[1]) {
console.error('❌ Please specify sensitivity value (0-1)');
process.exit(1);
}
const sensitivity = parseFloat(args[1]);
await controller.updateSensitivity(sensitivity);
break;
case 'threshold':
if (!args[1]) {
console.error('❌ Please specify threshold value (0-1)');
process.exit(1);
}
const threshold = parseFloat(args[1]);
await controller.updateThreshold(threshold);
break;
case 'enable':
await controller.toggleNaturalTriggers(true);
break;
case 'disable':
await controller.toggleNaturalTriggers(false);
break;
case 'toggle':
await controller.toggleNaturalTriggers();
break;
case 'reset':
await controller.resetToDefaults();
break;
case 'list':
case 'profiles':
await controller.listProfiles();
break;
case 'help':
case '-h':
case '--help':
showHelp();
break;
default:
console.error(`❌ Unknown command: ${command}`);
showHelp();
process.exit(1);
}
} catch (error) {
console.error(`❌ Error: ${error.message}`);
process.exit(1);
}
}
function showHelp() {
console.log(`
🧠 Memory Mode Controller
Usage: node memory-mode-controller.js <command> [options]
Commands:
status Show current configuration and status
profile <name> Switch to performance profile
sensitivity <0-1> Set pattern detection sensitivity
threshold <0-1> Set trigger threshold
enable Enable natural triggers
disable Disable natural triggers
toggle Toggle natural triggers on/off
reset Reset to default configuration
list List available profiles
help Show this help message
Performance Profiles:
speed_focused Fastest response, minimal memory (< 100ms)
balanced Moderate latency, smart triggers (< 200ms)
memory_aware Full awareness, accept latency (< 500ms)
adaptive Auto-adjust based on usage patterns
Examples:
node memory-mode-controller.js status
node memory-mode-controller.js profile balanced
node memory-mode-controller.js sensitivity 0.8
node memory-mode-controller.js disable
`);
}
// Run if called directly
if (require.main === module) {
main().catch(error => {
console.error('❌ Fatal error:', error.message);
process.exit(1);
});
}
module.exports = { MemoryModeController };
```
--------------------------------------------------------------------------------
/tests/unit/test_semtools_loader.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit tests for Semtools document loader.
"""
import pytest
import asyncio
from pathlib import Path
from unittest.mock import Mock, patch, AsyncMock, MagicMock
import shutil
from mcp_memory_service.ingestion.semtools_loader import SemtoolsLoader
from mcp_memory_service.ingestion.base import DocumentChunk
class TestSemtoolsLoader:
"""Test suite for SemtoolsLoader class."""
def test_initialization(self):
"""Test basic initialization of SemtoolsLoader."""
loader = SemtoolsLoader(chunk_size=500, chunk_overlap=50)
assert loader.chunk_size == 500
assert loader.chunk_overlap == 50
assert 'pdf' in loader.supported_extensions
assert 'docx' in loader.supported_extensions
assert 'pptx' in loader.supported_extensions
@patch('shutil.which')
def test_semtools_availability_check(self, mock_which):
"""Test detection of semtools availability."""
# Test when semtools is available
mock_which.return_value = '/usr/local/bin/semtools'
loader = SemtoolsLoader()
assert loader._semtools_available is True
# Test when semtools is not available
mock_which.return_value = None
loader = SemtoolsLoader()
assert loader._semtools_available is False
@patch('shutil.which')
def test_can_handle_file(self, mock_which):
"""Test file format detection."""
mock_which.return_value = '/usr/local/bin/semtools'
loader = SemtoolsLoader()
# Create temporary test files
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
pdf_file = Path(tmpdir) / "test.pdf"
pdf_file.touch()
docx_file = Path(tmpdir) / "test.docx"
docx_file.touch()
txt_file = Path(tmpdir) / "test.txt"
txt_file.touch()
# Test supported formats
assert loader.can_handle(pdf_file) is True
assert loader.can_handle(docx_file) is True
# Test unsupported formats
assert loader.can_handle(txt_file) is False
@patch('shutil.which')
def test_can_handle_returns_false_when_semtools_unavailable(self, mock_which):
"""Test that can_handle returns False when semtools is not installed."""
mock_which.return_value = None
loader = SemtoolsLoader()
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
pdf_file = Path(tmpdir) / "test.pdf"
pdf_file.touch()
# Should return False even for supported format when semtools unavailable
assert loader.can_handle(pdf_file) is False
@pytest.mark.asyncio
@patch('shutil.which')
async def test_extract_chunks_semtools_unavailable(self, mock_which):
"""Test that extract_chunks raises error when semtools is unavailable."""
mock_which.return_value = None
loader = SemtoolsLoader()
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
pdf_file = Path(tmpdir) / "test.pdf"
pdf_file.write_text("dummy content")
# When semtools is unavailable, validate_file will fail first
with pytest.raises(ValueError, match="File format not supported"):
async for chunk in loader.extract_chunks(pdf_file):
pass
@pytest.mark.asyncio
@patch('mcp_memory_service.ingestion.semtools_loader.SemtoolsLoader._check_semtools_availability')
@patch('asyncio.create_subprocess_exec')
async def test_extract_chunks_success(self, mock_subprocess, mock_check_semtools):
"""Test successful document extraction with semtools."""
# Force semtools to be "available"
mock_check_semtools.return_value = True
# Mock successful semtools execution with sufficient content to create chunks
mock_content = b"# Document Title\n\n" + b"This is a test document with enough content to create chunks. " * 10
mock_process = AsyncMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(
return_value=(mock_content, b"")
)
mock_subprocess.return_value = mock_process
loader = SemtoolsLoader(chunk_size=200, chunk_overlap=50)
loader._semtools_available = True # Override
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
pdf_file = Path(tmpdir) / "test.pdf"
pdf_file.write_text("dummy content")
chunks = []
async for chunk in loader.extract_chunks(pdf_file):
chunks.append(chunk)
# Verify chunks were created
assert len(chunks) > 0
# Verify chunk structure
first_chunk = chunks[0]
assert isinstance(first_chunk, DocumentChunk)
assert isinstance(first_chunk.content, str)
assert first_chunk.metadata['extraction_method'] == 'semtools'
assert first_chunk.metadata['parser_backend'] == 'llamaparse'
assert first_chunk.source_file == pdf_file
@pytest.mark.asyncio
@patch('mcp_memory_service.ingestion.semtools_loader.SemtoolsLoader._check_semtools_availability')
@patch('asyncio.create_subprocess_exec')
@patch.dict('os.environ', {'LLAMAPARSE_API_KEY': 'test-api-key'})
async def test_extract_chunks_with_api_key(self, mock_subprocess, mock_check_semtools):
"""Test that API key is passed to semtools when available."""
mock_check_semtools.return_value = True
# Mock with sufficient content to create chunks
mock_content = b"# Content\n\n" + b"This document has enough content to create chunks. " * 10
mock_process = AsyncMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(
return_value=(mock_content, b"")
)
mock_subprocess.return_value = mock_process
# Create loader with API key
loader = SemtoolsLoader()
loader._semtools_available = True # Override
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
pdf_file = Path(tmpdir) / "test.pdf"
pdf_file.write_text("dummy content")
chunks = []
async for chunk in loader.extract_chunks(pdf_file):
chunks.append(chunk)
# Verify chunks were created and API key was recognized
assert len(chunks) > 0
assert chunks[0].metadata['has_api_key'] is True
@pytest.mark.asyncio
@patch('shutil.which')
@patch('asyncio.create_subprocess_exec')
async def test_extract_chunks_semtools_error(self, mock_subprocess, mock_which):
"""Test handling of semtools execution errors."""
mock_which.return_value = '/usr/local/bin/semtools'
# Mock failed semtools execution
mock_process = AsyncMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(
return_value=(b"", b"Error: Failed to parse document")
)
mock_subprocess.return_value = mock_process
loader = SemtoolsLoader()
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
pdf_file = Path(tmpdir) / "test.pdf"
pdf_file.write_text("dummy content")
with pytest.raises(ValueError, match="Failed to parse document"):
async for chunk in loader.extract_chunks(pdf_file):
pass
@pytest.mark.asyncio
@patch('shutil.which')
@patch('asyncio.create_subprocess_exec')
@patch('asyncio.wait_for')
async def test_extract_chunks_timeout(self, mock_wait_for, mock_subprocess, mock_which):
"""Test handling of semtools timeout."""
mock_which.return_value = '/usr/local/bin/semtools'
# Mock timeout scenario
mock_wait_for.side_effect = asyncio.TimeoutError()
loader = SemtoolsLoader()
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
pdf_file = Path(tmpdir) / "test.pdf"
pdf_file.write_text("dummy content")
with pytest.raises(ValueError, match="timed out|Failed to parse"):
async for chunk in loader.extract_chunks(pdf_file):
pass
@pytest.mark.asyncio
@patch('shutil.which')
@patch('asyncio.create_subprocess_exec')
async def test_extract_chunks_empty_content(self, mock_subprocess, mock_which):
"""Test handling of empty content from semtools."""
mock_which.return_value = '/usr/local/bin/semtools'
# Mock empty output
mock_process = AsyncMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(
return_value=(b"", b"") # Empty stdout
)
mock_subprocess.return_value = mock_process
loader = SemtoolsLoader()
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
pdf_file = Path(tmpdir) / "test.pdf"
pdf_file.write_text("dummy content")
with pytest.raises(ValueError, match="empty content|Failed to parse"):
async for chunk in loader.extract_chunks(pdf_file):
pass
class TestSemtoolsLoaderRegistry:
"""Test semtools loader registration."""
@patch('shutil.which')
def test_loader_registration_with_semtools(self, mock_which):
"""Test that semtools loader is registered when available."""
mock_which.return_value = '/usr/local/bin/semtools'
from mcp_memory_service.ingestion.registry import get_loader_for_file
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
# Test DOCX file (semtools-only format)
docx_file = Path(tmpdir) / "test.docx"
docx_file.touch()
loader = get_loader_for_file(docx_file)
# Should get SemtoolsLoader when semtools is available
assert loader is not None
assert isinstance(loader, SemtoolsLoader)
@patch('shutil.which')
def test_loader_registration_without_semtools(self, mock_which):
"""Test that docx files return None when semtools unavailable."""
mock_which.return_value = None
from mcp_memory_service.ingestion.registry import get_loader_for_file
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
# Test DOCX file (semtools-only format)
docx_file = Path(tmpdir) / "test.docx"
docx_file.touch()
loader = get_loader_for_file(docx_file)
# Should return None when semtools is not available
assert loader is None
if __name__ == '__main__':
pytest.main([__file__, '-v'])
```
--------------------------------------------------------------------------------
/scripts/database/db_health_check.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Comprehensive Database Health Check for MCP Memory Service SQLite-vec Backend
"""
import asyncio
import os
import sys
import tempfile
import time
from pathlib import Path
# Add src to path
sys.path.insert(0, 'src')
# Set environment for sqlite-vec
os.environ['MCP_MEMORY_STORAGE_BACKEND'] = 'sqlite_vec'
class HealthChecker:
def __init__(self):
self.results = []
self.errors = []
async def test(self, name: str, func):
"""Run a test and record results."""
print(f"🔍 {name}...")
try:
start_time = time.time()
if asyncio.iscoroutinefunction(func):
result = await func()
else:
result = func()
duration = time.time() - start_time
if result:
print(f" ✅ PASS ({duration:.2f}s)")
self.results.append((name, "PASS", duration))
else:
print(f" ❌ FAIL ({duration:.2f}s)")
self.results.append((name, "FAIL", duration))
except Exception as e:
duration = time.time() - start_time
print(f" ❌ ERROR ({duration:.2f}s): {str(e)}")
self.results.append((name, "ERROR", duration))
self.errors.append((name, str(e)))
def test_imports(self):
"""Test all necessary imports."""
try:
import sqlite_vec
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from src.mcp_memory_service.models.memory import Memory
from src.mcp_memory_service.utils.hashing import generate_content_hash
return True
except ImportError as e:
print(f" Import error: {e}")
return False
async def test_database_creation(self):
"""Test database creation and initialization."""
temp_dir = tempfile.mkdtemp()
db_path = os.path.join(temp_dir, "health_check.db")
try:
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
storage = SqliteVecMemoryStorage(db_path)
await storage.initialize()
# Check tables exist
cursor = storage.conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [row[0] for row in cursor.fetchall()]
expected_tables = ['memories', 'memory_embeddings']
for table in expected_tables:
if table not in tables:
print(f" Missing table: {table}")
return False
storage.close()
os.remove(db_path)
os.rmdir(temp_dir)
return True
except Exception as e:
print(f" Database creation error: {e}")
return False
async def test_memory_operations(self):
"""Test core memory operations."""
temp_dir = tempfile.mkdtemp()
db_path = os.path.join(temp_dir, "operations_test.db")
try:
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from src.mcp_memory_service.models.memory import Memory
from src.mcp_memory_service.utils.hashing import generate_content_hash
storage = SqliteVecMemoryStorage(db_path)
await storage.initialize()
# Test store
content = "Health check test memory"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=["health", "check"],
memory_type="test"
)
success, message = await storage.store(memory)
if not success:
print(f" Store failed: {message}")
return False
# Test retrieve
results = await storage.retrieve("health check", n_results=1)
if not results:
print(" Retrieve failed: No results")
return False
# Test tag search
tag_results = await storage.search_by_tag(["health"])
if not tag_results:
print(" Tag search failed: No results")
return False
# Test delete
success, message = await storage.delete(memory.content_hash)
if not success:
print(f" Delete failed: {message}")
return False
storage.close()
os.remove(db_path)
os.rmdir(temp_dir)
return True
except Exception as e:
print(f" Memory operations error: {e}")
return False
async def test_vector_search(self):
"""Test vector similarity search functionality."""
temp_dir = tempfile.mkdtemp()
db_path = os.path.join(temp_dir, "vector_test.db")
try:
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from src.mcp_memory_service.models.memory import Memory
from src.mcp_memory_service.utils.hashing import generate_content_hash
storage = SqliteVecMemoryStorage(db_path)
await storage.initialize()
# Store multiple memories
test_memories = [
"Python programming is fun",
"JavaScript development tools",
"Database design patterns"
]
for content in test_memories:
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=["programming"],
memory_type="test"
)
await storage.store(memory)
# Test vector search
results = await storage.retrieve("programming languages", n_results=3)
if len(results) != 3:
print(f" Expected 3 results, got {len(results)}")
return False
# Check relevance scores
for result in results:
if result.relevance_score < 0:
print(f" Invalid relevance score: {result.relevance_score}")
return False
storage.close()
os.remove(db_path)
os.rmdir(temp_dir)
return True
except Exception as e:
print(f" Vector search error: {e}")
return False
def test_environment(self):
"""Test environment configuration."""
required_vars = {
'MCP_MEMORY_STORAGE_BACKEND': 'sqlite_vec'
}
for var, expected in required_vars.items():
actual = os.getenv(var)
if actual != expected:
print(f" Environment variable {var}: expected '{expected}', got '{actual}'")
return False
return True
def test_dependencies(self):
"""Test that all dependencies are available."""
try:
import sqlite_vec
version = getattr(sqlite_vec, '__version__', 'unknown')
print(f" sqlite-vec version: {version}")
import sentence_transformers
print(f" sentence-transformers available")
import torch
print(f" torch available")
return True
except ImportError as e:
print(f" Dependency error: {e}")
return False
def test_database_path(self):
"""Test database path accessibility."""
home = str(Path.home())
db_path = os.path.join(home, '.local', 'share', 'mcp-memory', 'sqlite_vec.db')
db_dir = os.path.dirname(db_path)
try:
# Ensure directory exists
os.makedirs(db_dir, exist_ok=True)
# Test write permission
test_file = os.path.join(db_dir, '.write_test')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
print(f" Database path: {db_path}")
print(f" Directory writable: ✅")
return True
except Exception as e:
print(f" Database path error: {e}")
return False
async def main():
"""Main health check function."""
print("=" * 60)
print("🏥 MCP Memory Service - SQLite-vec Database Health Check")
print("=" * 60)
checker = HealthChecker()
# Run all tests
await checker.test("Environment Configuration", checker.test_environment)
await checker.test("Dependencies Check", checker.test_dependencies)
await checker.test("Import Tests", checker.test_imports)
await checker.test("Database Path", checker.test_database_path)
await checker.test("Database Creation", checker.test_database_creation)
await checker.test("Memory Operations", checker.test_memory_operations)
await checker.test("Vector Search", checker.test_vector_search)
# Summary
print("\n" + "=" * 60)
print("📊 Health Check Summary")
print("=" * 60)
passed = sum(1 for _, status, _ in checker.results if status == "PASS")
failed = sum(1 for _, status, _ in checker.results if status in ["FAIL", "ERROR"])
total = len(checker.results)
for name, status, duration in checker.results:
status_icon = "✅" if status == "PASS" else "❌"
print(f"{status_icon} {name}: {status} ({duration:.2f}s)")
print(f"\nResults: {passed}/{total} tests passed")
if checker.errors:
print("\n❌ Errors Found:")
for name, error in checker.errors:
print(f" • {name}: {error}")
if passed == total:
print("\n🎉 Database Health Check: PASSED")
print(" SQLite-vec backend is fully functional and ready for production use!")
print("\n🚀 Ready for Claude Code integration:")
print(" - Start server: python -m src.mcp_memory_service.server")
print(" - Database: ~/.local/share/mcp-memory/sqlite_vec.db")
print(" - 75% memory reduction vs ChromaDB")
return 0
else:
print("\n💥 Database Health Check: FAILED")
print(" Please resolve the issues above before using the service.")
return 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))
```
--------------------------------------------------------------------------------
/tests/integration/test_concurrent_clients.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for concurrent MCP client access to SQLite-vec backend.
"""
import pytest
import pytest_asyncio
import asyncio
import tempfile
import os
import time
from typing import List
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from mcp_memory_service.models.memory import Memory
from mcp_memory_service.utils.hashing import generate_content_hash
class TestConcurrentClients:
"""Test suite for concurrent client access scenarios."""
@pytest_asyncio.fixture
async def db_path(self):
"""Create a temporary database path."""
with tempfile.NamedTemporaryFile(delete=False, suffix=".db") as tmp:
path = tmp.name
yield path
# Cleanup
for ext in ["", "-wal", "-shm"]:
try:
os.unlink(path + ext)
except:
pass
async def create_client(self, db_path: str, client_id: str) -> SqliteVecMemoryStorage:
"""Create a storage client instance."""
storage = SqliteVecMemoryStorage(db_path)
await storage.initialize()
return storage
async def client_writer(self, db_path: str, client_id: str, num_memories: int) -> List[tuple]:
"""Simulate a client writing memories."""
storage = await self.create_client(db_path, client_id)
results = []
try:
for i in range(num_memories):
content = f"Memory from {client_id} - {i}"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=[client_id, "concurrent"],
memory_type="test",
metadata={"client_id": client_id, "index": i}
)
success, msg = await storage.store(memory)
results.append((success, msg, memory.content_hash))
# Small delay to simulate real-world timing
await asyncio.sleep(0.01)
finally:
storage.close()
return results
async def client_reader(self, db_path: str, client_id: str, num_reads: int) -> List[int]:
"""Simulate a client reading memories."""
storage = await self.create_client(db_path, client_id)
counts = []
try:
for i in range(num_reads):
# Get all memories with tag "concurrent"
memories = await storage.search_by_tag(["concurrent"])
counts.append(len(memories))
# Small delay between reads
await asyncio.sleep(0.02)
finally:
storage.close()
return counts
@pytest.mark.asyncio
async def test_two_clients_concurrent_write(self, db_path):
"""Test two clients writing memories concurrently."""
# Run two clients concurrently
results = await asyncio.gather(
self.client_writer(db_path, "client1", 10),
self.client_writer(db_path, "client2", 10),
return_exceptions=True
)
# Check results
assert len(results) == 2
assert not isinstance(results[0], Exception), f"Client 1 failed: {results[0]}"
assert not isinstance(results[1], Exception), f"Client 2 failed: {results[1]}"
client1_results, client2_results = results
# Count successful writes
client1_success = sum(1 for success, _, _ in client1_results if success)
client2_success = sum(1 for success, _, _ in client2_results if success)
# Both clients should have written their memories
assert client1_success == 10, f"Client 1 only wrote {client1_success}/10 memories"
assert client2_success == 10, f"Client 2 only wrote {client2_success}/10 memories"
# Verify total memories in database
storage = await self.create_client(db_path, "verifier")
try:
all_memories = await storage.search_by_tag(["concurrent"])
assert len(all_memories) == 20, f"Expected 20 memories, found {len(all_memories)}"
finally:
storage.close()
@pytest.mark.asyncio
async def test_reader_writer_concurrent(self, db_path):
"""Test one client reading while another writes."""
# Start with some initial data
initial_storage = await self.create_client(db_path, "initial")
for i in range(5):
content = f"Initial memory {i}"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=["concurrent", "initial"],
memory_type="test"
)
await initial_storage.store(memory)
initial_storage.close()
# Run reader and writer concurrently
results = await asyncio.gather(
self.client_reader(db_path, "reader", 10),
self.client_writer(db_path, "writer", 5),
return_exceptions=True
)
assert not isinstance(results[0], Exception), f"Reader failed: {results[0]}"
assert not isinstance(results[1], Exception), f"Writer failed: {results[1]}"
read_counts, write_results = results
# Reader should see increasing counts as writer adds memories
assert read_counts[0] >= 5, "Reader should see initial memories"
assert read_counts[-1] >= read_counts[0], "Read count should not decrease"
# Writer should succeed
write_success = sum(1 for success, _, _ in write_results if success)
assert write_success == 5, f"Writer only wrote {write_success}/5 memories"
@pytest.mark.asyncio
async def test_multiple_readers_one_writer(self, db_path):
"""Test multiple readers accessing while one writer updates."""
# Create writer and multiple readers
async def run_scenario():
tasks = [
self.client_writer(db_path, "writer", 10),
self.client_reader(db_path, "reader1", 5),
self.client_reader(db_path, "reader2", 5),
self.client_reader(db_path, "reader3", 5),
]
return await asyncio.gather(*tasks, return_exceptions=True)
results = await run_scenario()
# Check all operations completed without exceptions
for i, result in enumerate(results):
assert not isinstance(result, Exception), f"Task {i} failed: {result}"
write_results = results[0]
write_success = sum(1 for success, _, _ in write_results if success)
assert write_success == 10, f"Writer only wrote {write_success}/10 memories"
# All readers should have successfully read
for reader_counts in results[1:]:
assert len(reader_counts) == 5, "Reader should complete all reads"
assert all(isinstance(count, int) for count in reader_counts)
@pytest.mark.asyncio
async def test_rapid_concurrent_access(self, db_path):
"""Test rapid concurrent access with minimal delays."""
async def rapid_writer(client_id: str):
storage = await self.create_client(db_path, client_id)
try:
for i in range(20):
content = f"Rapid {client_id}-{i}"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=["rapid"],
memory_type="test"
)
await storage.store(memory)
# No delay - rapid fire
finally:
storage.close()
async def rapid_reader(client_id: str):
storage = await self.create_client(db_path, client_id)
try:
for i in range(20):
await storage.search_by_tag(["rapid"])
# No delay - rapid fire
finally:
storage.close()
# Run multiple clients with rapid access
tasks = [
rapid_writer("w1"),
rapid_writer("w2"),
rapid_reader("r1"),
rapid_reader("r2"),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check no exceptions occurred
exceptions = [r for r in results if isinstance(r, Exception)]
assert len(exceptions) == 0, f"Exceptions occurred: {exceptions}"
@pytest.mark.asyncio
async def test_database_consistency(self, db_path):
"""Test that database remains consistent under concurrent access."""
# Write unique memories from multiple clients
async def write_unique_memories(client_id: str, start_idx: int):
storage = await self.create_client(db_path, client_id)
hashes = []
try:
for i in range(10):
content = f"Unique memory {start_idx + i}"
memory = Memory(
content=content,
content_hash=generate_content_hash(content),
tags=["consistency", client_id],
memory_type="test"
)
success, msg = await storage.store(memory)
if success:
hashes.append(memory.content_hash)
finally:
storage.close()
return hashes
# Run concurrent writes
results = await asyncio.gather(
write_unique_memories("client1", 0),
write_unique_memories("client2", 100),
write_unique_memories("client3", 200),
)
all_hashes = []
for client_hashes in results:
all_hashes.extend(client_hashes)
# Verify all memories are in database and no duplicates
storage = await self.create_client(db_path, "verifier")
try:
memories = await storage.search_by_tag(["consistency"])
# Check count matches
assert len(memories) == len(all_hashes), f"Memory count mismatch: {len(memories)} vs {len(all_hashes)}"
# Check no duplicates
db_hashes = {m.content_hash for m in memories}
assert len(db_hashes) == len(memories), "Duplicate memories found"
# Check all written memories are present
for hash_val in all_hashes:
assert hash_val in db_hashes, f"Memory {hash_val} not found in database"
finally:
storage.close()
if __name__ == "__main__":
pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/scripts/maintenance/repair_sqlite_vec_embeddings.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Repair script to fix existing SQLite-vec databases without full migration.
This script attempts to repair the database in-place by:
1. Checking the current state
2. Regenerating missing embeddings
3. Fixing dimension mismatches if possible
"""
import asyncio
import os
import sys
import sqlite3
import logging
from typing import List, Tuple
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
try:
import sqlite_vec
from sqlite_vec import serialize_float32
SQLITE_VEC_AVAILABLE = True
except ImportError:
SQLITE_VEC_AVAILABLE = False
try:
from sentence_transformers import SentenceTransformer
SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
SENTENCE_TRANSFORMERS_AVAILABLE = False
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SqliteVecRepair:
"""Repair SQLite-vec database embeddings."""
def __init__(self, db_path: str, model_name: str = "all-MiniLM-L6-v2"):
self.db_path = db_path
self.model_name = model_name
self.conn = None
self.model = None
self.embedding_dimension = 384 # Default for all-MiniLM-L6-v2
def check_dependencies(self):
"""Check required dependencies."""
print("Checking dependencies...")
if not SQLITE_VEC_AVAILABLE:
print("❌ sqlite-vec not installed. Run: pip install sqlite-vec")
return False
if not SENTENCE_TRANSFORMERS_AVAILABLE:
print("❌ sentence-transformers not installed. Run: pip install sentence-transformers torch")
return False
print("✅ All dependencies available")
return True
def connect_database(self):
"""Connect to the database."""
print(f"\nConnecting to database: {self.db_path}")
if not os.path.exists(self.db_path):
raise FileNotFoundError(f"Database not found: {self.db_path}")
self.conn = sqlite3.connect(self.db_path)
self.conn.enable_load_extension(True)
sqlite_vec.load(self.conn)
self.conn.enable_load_extension(False)
print("✅ Connected to database")
def analyze_database(self) -> dict:
"""Analyze the current state of the database."""
print("\nAnalyzing database...")
analysis = {
"memory_count": 0,
"embedding_count": 0,
"missing_embeddings": 0,
"embedding_dimension": None,
"issues": []
}
# Count memories
cursor = self.conn.execute("SELECT COUNT(*) FROM memories")
analysis["memory_count"] = cursor.fetchone()[0]
# Count embeddings
cursor = self.conn.execute("SELECT COUNT(*) FROM memory_embeddings")
analysis["embedding_count"] = cursor.fetchone()[0]
# Check embedding dimension
cursor = self.conn.execute("""
SELECT sql FROM sqlite_master
WHERE type='table' AND name='memory_embeddings'
""")
schema = cursor.fetchone()
if schema:
# Extract dimension from schema
import re
match = re.search(r'FLOAT\[(\d+)\]', schema[0])
if match:
analysis["embedding_dimension"] = int(match.group(1))
# Find memories without embeddings
cursor = self.conn.execute("""
SELECT COUNT(*) FROM memories m
WHERE NOT EXISTS (
SELECT 1 FROM memory_embeddings e WHERE e.rowid = m.id
)
""")
analysis["missing_embeddings"] = cursor.fetchone()[0]
# Identify issues
if analysis["memory_count"] != analysis["embedding_count"]:
analysis["issues"].append(
f"Mismatch: {analysis['memory_count']} memories vs {analysis['embedding_count']} embeddings"
)
if analysis["missing_embeddings"] > 0:
analysis["issues"].append(
f"Missing embeddings: {analysis['missing_embeddings']} memories have no embeddings"
)
print(f" Memories: {analysis['memory_count']}")
print(f" Embeddings: {analysis['embedding_count']}")
print(f" Missing embeddings: {analysis['missing_embeddings']}")
print(f" Embedding dimension: {analysis['embedding_dimension']}")
if analysis["issues"]:
print("\n⚠️ Issues found:")
for issue in analysis["issues"]:
print(f" - {issue}")
else:
print("\n✅ No issues found")
return analysis
def load_model(self):
"""Load the embedding model."""
print(f"\nLoading embedding model: {self.model_name}")
self.model = SentenceTransformer(self.model_name)
# Get actual dimension
test_embedding = self.model.encode(["test"], convert_to_numpy=True)
self.embedding_dimension = test_embedding.shape[1]
print(f"✅ Model loaded (dimension: {self.embedding_dimension})")
def generate_missing_embeddings(self, analysis: dict) -> int:
"""Generate embeddings for memories that don't have them."""
if analysis["missing_embeddings"] == 0:
return 0
print(f"\nGenerating {analysis['missing_embeddings']} missing embeddings...")
# Check if dimensions match
if analysis["embedding_dimension"] and analysis["embedding_dimension"] != self.embedding_dimension:
print(f"⚠️ WARNING: Database expects dimension {analysis['embedding_dimension']}, "
f"but model produces dimension {self.embedding_dimension}")
print(" This may cause errors. Consider full migration instead.")
response = input("\nContinue anyway? (y/N): ").strip().lower()
if response != 'y':
return 0
# Find memories without embeddings
cursor = self.conn.execute("""
SELECT m.id, m.content FROM memories m
WHERE NOT EXISTS (
SELECT 1 FROM memory_embeddings e WHERE e.rowid = m.id
)
""")
memories_to_fix = cursor.fetchall()
fixed_count = 0
for memory_id, content in memories_to_fix:
try:
# Generate embedding
embedding = self.model.encode([content], convert_to_numpy=True)[0]
# Insert embedding
self.conn.execute(
"INSERT INTO memory_embeddings (rowid, content_embedding) VALUES (?, ?)",
(memory_id, serialize_float32(embedding))
)
fixed_count += 1
# Show progress
if fixed_count % 10 == 0:
print(f" ... {fixed_count}/{len(memories_to_fix)} embeddings generated")
except Exception as e:
logger.error(f"Failed to generate embedding for memory {memory_id}: {e}")
self.conn.commit()
print(f"✅ Generated {fixed_count} embeddings")
return fixed_count
def verify_search(self) -> bool:
"""Test if semantic search works."""
print("\nTesting semantic search...")
try:
# Generate a test query embedding
test_query = "test query"
query_embedding = self.model.encode([test_query], convert_to_numpy=True)[0]
# Try to search
cursor = self.conn.execute("""
SELECT COUNT(*) FROM memory_embeddings
WHERE content_embedding MATCH ?
LIMIT 1
""", (serialize_float32(query_embedding),))
cursor.fetchone()
print("✅ Semantic search is working")
return True
except Exception as e:
print(f"❌ Semantic search failed: {e}")
return False
def run_repair(self):
"""Run the repair process."""
print("\n" + "="*60)
print("SQLite-vec Embedding Repair Tool")
print("="*60)
try:
# Check dependencies
if not self.check_dependencies():
return
# Connect to database
self.connect_database()
# Analyze current state
analysis = self.analyze_database()
if not analysis["issues"]:
print("\n✅ Database appears healthy, no repair needed")
return
# Load model
self.load_model()
# Fix missing embeddings
fixed = self.generate_missing_embeddings(analysis)
# Verify search works
self.verify_search()
# Re-analyze
print("\nRe-analyzing database after repair...")
new_analysis = self.analyze_database()
print("\n" + "="*60)
print("Repair Summary")
print("="*60)
print(f"Fixed {fixed} missing embeddings")
if new_analysis["issues"]:
print("\n⚠️ Some issues remain:")
for issue in new_analysis["issues"]:
print(f" - {issue}")
print("\nConsider running the full migration script instead.")
else:
print("\n✅ All issues resolved!")
except Exception as e:
print(f"\n❌ Repair failed: {e}")
logger.exception("Repair failed")
finally:
if self.conn:
self.conn.close()
def main():
"""Run the repair tool."""
if len(sys.argv) < 2:
print("Usage: python repair_sqlite_vec_embeddings.py <database_path>")
print("\nExample:")
print(" python repair_sqlite_vec_embeddings.py ~/.mcp_memory/sqlite_vec.db")
print("\nThis tool will:")
print(" - Check for missing embeddings")
print(" - Generate embeddings for memories that don't have them")
print(" - Verify semantic search functionality")
print("\nFor more complex issues (dimension mismatches, schema problems),")
print("use migrate_sqlite_vec_embeddings.py instead.")
sys.exit(1)
db_path = sys.argv[1]
repair = SqliteVecRepair(db_path)
repair.run_repair()
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/scripts/maintenance/repair_malformed_tags.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Script to repair malformed tags in the memory database.
Detects and fixes tags that contain JSON serialization artifacts like:
- Tags with quotes: ["ai" or "bug-fix"
- Tags with brackets: ["important"] or ["note"]
- Double-serialized JSON arrays
Usage:
python scripts/maintenance/repair_malformed_tags.py --dry-run # Preview changes
python scripts/maintenance/repair_malformed_tags.py # Apply fixes
"""
import sys
import os
import json
import re
import sqlite3
import argparse
import logging
from pathlib import Path
from datetime import datetime
from typing import List, Tuple, Set, Dict
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from src.mcp_memory_service.config import SQLITE_VEC_PATH
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
)
logger = logging.getLogger(__name__)
def parse_malformed_tag(tag: str) -> List[str]:
"""
Parse a malformed tag that may contain JSON serialization artifacts.
Examples:
["ai" -> ["ai"]
"bug-fix" -> ["bug-fix"]
["important"] -> ["important"]
[\"ai\",\"bug\"] -> ["ai", "bug"]
Returns:
List of clean tag strings
"""
# Remove leading/trailing whitespace
tag = tag.strip()
# If tag doesn't contain quotes or brackets, it's clean
if '"' not in tag and '[' not in tag and ']' not in tag:
return [tag] if tag else []
# Try to parse as JSON first
try:
# Handle cases where tag is a JSON string like ["tag1","tag2"]
if tag.startswith('[') and tag.endswith(']'):
parsed = json.loads(tag)
if isinstance(parsed, list):
# Recursively clean each element
result = []
for item in parsed:
result.extend(parse_malformed_tag(str(item)))
return result
except json.JSONDecodeError:
pass
# Handle escaped quotes like [\"ai\" or \"bug-fix\"
if '\\"' in tag or tag.startswith('["') or tag.startswith('"'):
# Remove all quotes and brackets
cleaned = tag.replace('\\"', '').replace('"', '').replace('[', '').replace(']', '').strip()
if cleaned:
return [cleaned]
# Handle patterns like ["ai" (missing closing bracket/quote)
if tag.startswith('["') or tag.startswith('"['):
cleaned = tag.replace('[', '').replace('"', '').strip()
if cleaned:
return [cleaned]
# If nothing worked, just remove quotes and brackets
cleaned = tag.replace('"', '').replace('[', '').replace(']', '').strip()
return [cleaned] if cleaned else []
def is_malformed_tag(tag: str) -> bool:
"""Check if a tag contains malformed characters."""
return any(char in tag for char in ['"', '[', ']', '\\'])
def analyze_tags(db_path: str) -> Tuple[int, int, Set[str], Dict[str, int]]:
"""
Analyze tags in the database to find malformed entries.
Returns:
(total_memories, malformed_count, malformed_tags_set, tag_frequency)
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# Get all memories with their tags
cursor.execute("SELECT content_hash, tags FROM memories WHERE tags IS NOT NULL AND tags != ''")
rows = cursor.fetchall()
total_memories = len(rows)
malformed_count = 0
malformed_tags = set()
tag_frequency = {}
for content_hash, tags_str in rows:
# Parse tags (comma-separated)
tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()]
has_malformed = False
for tag in tags:
if is_malformed_tag(tag):
has_malformed = True
malformed_tags.add(tag)
tag_frequency[tag] = tag_frequency.get(tag, 0) + 1
if has_malformed:
malformed_count += 1
return total_memories, malformed_count, malformed_tags, tag_frequency
finally:
conn.close()
def repair_tags(db_path: str, dry_run: bool = False) -> Tuple[int, int, Dict[str, List[str]]]:
"""
Repair malformed tags in the database.
Returns:
(memories_updated, tags_fixed, replacements_dict)
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
memories_updated = 0
tags_fixed = 0
replacements = {} # old_tag -> [new_tags]
try:
# Get all memories with tags
cursor.execute("SELECT content_hash, tags FROM memories WHERE tags IS NOT NULL AND tags != ''")
rows = cursor.fetchall()
for content_hash, tags_str in rows:
# Parse tags (comma-separated)
original_tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()]
# Check if any tags are malformed
needs_repair = any(is_malformed_tag(tag) for tag in original_tags)
if needs_repair:
# Parse and clean each tag
new_tags = []
for tag in original_tags:
if is_malformed_tag(tag):
parsed = parse_malformed_tag(tag)
if parsed:
new_tags.extend(parsed)
replacements[tag] = parsed
tags_fixed += 1
else:
new_tags.append(tag)
# Remove duplicates while preserving order
seen = set()
unique_tags = []
for tag in new_tags:
if tag and tag not in seen:
seen.add(tag)
unique_tags.append(tag)
# Update database
new_tags_str = ",".join(unique_tags)
if dry_run:
logger.info(f"[DRY RUN] Would update {content_hash[:8]}...")
logger.info(f" Old: {tags_str}")
logger.info(f" New: {new_tags_str}")
else:
cursor.execute(
"UPDATE memories SET tags = ? WHERE content_hash = ?",
(new_tags_str, content_hash)
)
memories_updated += 1
if not dry_run:
conn.commit()
logger.info(f"✅ Database updated successfully")
return memories_updated, tags_fixed, replacements
finally:
conn.close()
def create_backup(db_path: str) -> str:
"""Create a backup of the database before modifications."""
backup_path = f"{db_path}.backup-{datetime.now().strftime('%Y%m%d_%H%M%S')}"
import shutil
shutil.copy2(db_path, backup_path)
logger.info(f"✅ Backup created: {backup_path}")
return backup_path
def main():
parser = argparse.ArgumentParser(
description="Repair malformed tags in the memory database",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Preview changes without modifying database
python repair_malformed_tags.py --dry-run
# Apply fixes (creates backup automatically)
python repair_malformed_tags.py
# Verbose output with detailed logging
python repair_malformed_tags.py --verbose
"""
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Preview changes without modifying the database'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose logging'
)
parser.add_argument(
'--db-path',
type=str,
default=SQLITE_VEC_PATH,
help=f'Path to SQLite database (default: {SQLITE_VEC_PATH})'
)
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Check if database exists
if not os.path.exists(args.db_path):
logger.error(f"❌ Database not found: {args.db_path}")
sys.exit(1)
logger.info("=" * 60)
logger.info("🔧 Malformed Tag Repair Tool")
logger.info("=" * 60)
logger.info(f"Database: {args.db_path}")
logger.info(f"Mode: {'DRY RUN (no changes)' if args.dry_run else 'REPAIR (will modify database)'}")
logger.info("")
# Analyze tags
logger.info("📊 Analyzing tags...")
total_memories, malformed_count, malformed_tags, tag_frequency = analyze_tags(args.db_path)
logger.info(f"Total memories: {total_memories}")
logger.info(f"Memories with malformed tags: {malformed_count}")
logger.info(f"Unique malformed tags: {len(malformed_tags)}")
logger.info("")
if malformed_count == 0:
logger.info("✅ No malformed tags found! Database is clean.")
return
# Show most common malformed tags
logger.info("🔍 Most common malformed tags:")
sorted_tags = sorted(tag_frequency.items(), key=lambda x: x[1], reverse=True)
for tag, count in sorted_tags[:10]:
logger.info(f" {tag!r} -> appears {count} times")
parsed = parse_malformed_tag(tag)
logger.info(f" Will become: {parsed}")
logger.info("")
# Create backup if not dry-run
if not args.dry_run:
logger.info("💾 Creating backup...")
backup_path = create_backup(args.db_path)
logger.info("")
# Repair tags
logger.info("🔧 Repairing tags...")
memories_updated, tags_fixed, replacements = repair_tags(args.db_path, dry_run=args.dry_run)
logger.info("")
logger.info("=" * 60)
logger.info("📈 Summary")
logger.info("=" * 60)
logger.info(f"Memories updated: {memories_updated}")
logger.info(f"Tags fixed: {tags_fixed}")
logger.info("")
if replacements:
logger.info("🔄 Tag replacements:")
for old_tag, new_tags in list(replacements.items())[:10]:
logger.info(f" {old_tag!r} -> {new_tags}")
if len(replacements) > 10:
logger.info(f" ... and {len(replacements) - 10} more")
logger.info("")
if args.dry_run:
logger.info("⚠️ This was a DRY RUN - no changes were made")
logger.info(" Run without --dry-run to apply fixes")
else:
logger.info("✅ Repair completed successfully!")
logger.info(f" Backup saved to: {backup_path}")
logger.info("=" * 60)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/tests/api/test_compact_types.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Tests for compact data types.
Validates token efficiency, immutability, and type safety of compact types.
"""
import pytest
import time
from mcp_memory_service.api.types import (
CompactMemory,
CompactSearchResult,
CompactHealthInfo
)
class TestCompactMemory:
"""Tests for CompactMemory type."""
def test_compact_memory_creation(self):
"""Test basic CompactMemory creation."""
memory = CompactMemory(
hash='abc12345',
preview='Test content preview',
tags=('test', 'example'),
created=time.time(),
score=0.95
)
assert memory.hash == 'abc12345'
assert memory.preview == 'Test content preview'
assert memory.tags == ('test', 'example')
assert memory.score == 0.95
assert isinstance(memory.created, float)
def test_compact_memory_immutability(self):
"""Test that CompactMemory is immutable."""
memory = CompactMemory(
hash='abc12345',
preview='Test content',
tags=('test',),
created=time.time(),
score=0.85
)
# NamedTuple should be immutable
with pytest.raises(AttributeError):
memory.hash = 'new_hash' # type: ignore
def test_compact_memory_tuple_behavior(self):
"""Test that CompactMemory behaves like a tuple."""
memory = CompactMemory(
hash='abc12345',
preview='Test content',
tags=('test',),
created=1730928000.0,
score=0.85
)
# Should support tuple unpacking
hash_val, preview, tags, created, score = memory
assert hash_val == 'abc12345'
assert preview == 'Test content'
assert tags == ('test',)
assert created == 1730928000.0
assert score == 0.85
def test_compact_memory_field_access(self):
"""Test named field access."""
memory = CompactMemory(
hash='test123',
preview='Preview text',
tags=('tag1', 'tag2'),
created=1730928000.0,
score=0.75
)
# Named access should work
assert memory.hash == 'test123'
assert memory.tags[0] == 'tag1'
assert memory.tags[1] == 'tag2'
def test_compact_memory_token_size(self):
"""Test that CompactMemory achieves target token size."""
# Create memory with typical values
memory = CompactMemory(
hash='abc12345',
preview='A' * 200, # 200 char preview
tags=('tag1', 'tag2', 'tag3'),
created=1730928000.0,
score=0.85
)
# Convert to string representation (approximates token count)
repr_str = str(memory)
# Should be much smaller than full Memory object (~820 tokens)
# Target: ~73 tokens, allow some margin
# Rough estimate: 1 token H 4 characters
estimated_tokens = len(repr_str) / 4
assert estimated_tokens < 150, \
f"CompactMemory too large: {estimated_tokens} tokens (target: <150)"
class TestCompactSearchResult:
"""Tests for CompactSearchResult type."""
def test_compact_search_result_creation(self):
"""Test basic CompactSearchResult creation."""
memories = (
CompactMemory('hash1', 'preview1', ('tag1',), time.time(), 0.95),
CompactMemory('hash2', 'preview2', ('tag2',), time.time(), 0.85),
)
result = CompactSearchResult(
memories=memories,
total=2,
query='test query'
)
assert len(result.memories) == 2
assert result.total == 2
assert result.query == 'test query'
def test_compact_search_result_repr(self):
"""Test CompactSearchResult string representation."""
memories = (
CompactMemory('hash1', 'preview1', ('tag1',), time.time(), 0.95),
CompactMemory('hash2', 'preview2', ('tag2',), time.time(), 0.85),
CompactMemory('hash3', 'preview3', ('tag3',), time.time(), 0.75),
)
result = CompactSearchResult(
memories=memories,
total=3,
query='architecture'
)
repr_str = repr(result)
assert 'found=3' in repr_str
assert 'shown=3' in repr_str
def test_compact_search_result_empty(self):
"""Test CompactSearchResult with no results."""
result = CompactSearchResult(
memories=(),
total=0,
query='nonexistent query'
)
assert len(result.memories) == 0
assert result.total == 0
assert repr(result) == 'SearchResult(found=0, shown=0)'
def test_compact_search_result_iteration(self):
"""Test iterating over search results."""
memories = tuple(
CompactMemory(f'hash{i}', f'preview{i}', ('tag',), time.time(), 0.9 - i*0.1)
for i in range(5)
)
result = CompactSearchResult(
memories=memories,
total=5,
query='test'
)
# Should be iterable
for i, memory in enumerate(result.memories):
assert memory.hash == f'hash{i}'
def test_compact_search_result_token_size(self):
"""Test that CompactSearchResult achieves target token size."""
# Create result with 5 memories (typical use case)
memories = tuple(
CompactMemory(
f'hash{i:04d}',
'A' * 200, # 200 char preview
('tag1', 'tag2'),
time.time(),
0.9 - i*0.05
)
for i in range(5)
)
result = CompactSearchResult(
memories=memories,
total=5,
query='architecture decisions'
)
# Convert to string representation
repr_str = str(result.memories)
# Target: ~385 tokens for 5 results (vs ~2,625 for full Memory objects)
# Allow some margin
estimated_tokens = len(repr_str) / 4
assert estimated_tokens < 800, \
f"CompactSearchResult too large: {estimated_tokens} tokens (target: <800 for 5 results)"
class TestCompactHealthInfo:
"""Tests for CompactHealthInfo type."""
def test_compact_health_info_creation(self):
"""Test basic CompactHealthInfo creation."""
info = CompactHealthInfo(
status='healthy',
count=1247,
backend='sqlite_vec'
)
assert info.status == 'healthy'
assert info.count == 1247
assert info.backend == 'sqlite_vec'
def test_compact_health_info_status_values(self):
"""Test different status values."""
statuses = ['healthy', 'degraded', 'error']
for status in statuses:
info = CompactHealthInfo(
status=status,
count=100,
backend='cloudflare'
)
assert info.status == status
def test_compact_health_info_backends(self):
"""Test different backend types."""
backends = ['sqlite_vec', 'cloudflare', 'hybrid']
for backend in backends:
info = CompactHealthInfo(
status='healthy',
count=500,
backend=backend
)
assert info.backend == backend
def test_compact_health_info_token_size(self):
"""Test that CompactHealthInfo achieves target token size."""
info = CompactHealthInfo(
status='healthy',
count=1247,
backend='sqlite_vec'
)
repr_str = str(info)
# Target: ~20 tokens (vs ~125 for full health check)
estimated_tokens = len(repr_str) / 4
assert estimated_tokens < 50, \
f"CompactHealthInfo too large: {estimated_tokens} tokens (target: <50)"
class TestTokenEfficiency:
"""Integration tests for overall token efficiency."""
def test_memory_size_comparison(self):
"""Compare CompactMemory size to full Memory object."""
from mcp_memory_service.models.memory import Memory
# Create full Memory object
full_memory = Memory(
content='A' * 1000, # Long content
content_hash='abc123def456' * 5,
tags=['tag1', 'tag2', 'tag3'],
memory_type='note',
metadata={'key': 'value'},
embedding=[0.1] * 768, # Full embedding vector
)
# Create compact version
compact = CompactMemory(
hash='abc12345',
preview='A' * 200, # First 200 chars only
tags=('tag1', 'tag2', 'tag3'),
created=time.time(),
score=0.95
)
# Compare sizes
full_repr = str(full_memory.to_dict())
compact_repr = str(compact)
# Compact should be significantly smaller
# Note: String representation is not exact token count, allow some margin
size_ratio = len(compact_repr) / len(full_repr)
assert size_ratio < 0.30, \
f"CompactMemory not small enough: {size_ratio:.2%} of full size (target: <30%)"
def test_search_result_size_reduction(self):
"""Validate 85%+ token reduction for search results."""
# Create 5 compact memories
memories = tuple(
CompactMemory(
f'hash{i:04d}',
'A' * 200,
('tag1', 'tag2'),
time.time(),
0.9 - i*0.05
)
for i in range(5)
)
result = CompactSearchResult(
memories=memories,
total=5,
query='test'
)
# Estimate tokens
repr_str = str(result)
estimated_tokens = len(repr_str) / 4
# Target: 85% reduction from ~2,625 tokens � ~385 tokens
# Allow some margin: should be under 600 tokens
assert estimated_tokens < 600, \
f"Search result not efficient enough: {estimated_tokens} tokens (target: <600)"
# Verify we're achieving significant reduction
# Original would be ~2,625 tokens, we should be well under 1000
reduction_vs_original = 1 - (estimated_tokens / 2625)
assert reduction_vs_original >= 0.75, \
f"Token reduction insufficient: {reduction_vs_original:.1%} (target: e75%)"
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/ingestion/pdf_loader.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
PDF document loader for extracting text content from PDF files.
"""
import logging
from pathlib import Path
from typing import AsyncGenerator, Dict, Any, Optional
import asyncio
from .base import DocumentLoader, DocumentChunk
from .chunker import TextChunker, ChunkingStrategy
logger = logging.getLogger(__name__)
# Try to import PDF processing library
try:
import PyPDF2
HAS_PYPDF2 = True
except ImportError:
HAS_PYPDF2 = False
logger.warning("PyPDF2 not available. PDF support will be limited.")
try:
import pdfplumber
HAS_PDFPLUMBER = True
except ImportError:
HAS_PDFPLUMBER = False
logger.debug("pdfplumber not available, falling back to PyPDF2")
class PDFLoader(DocumentLoader):
"""
Document loader for PDF files.
Supports multiple PDF processing backends:
- pdfplumber (preferred): Better text extraction, table support
- PyPDF2 (fallback): Basic text extraction
"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
"""
Initialize PDF loader.
Args:
chunk_size: Target size for text chunks in characters
chunk_overlap: Number of characters to overlap between chunks
"""
super().__init__(chunk_size, chunk_overlap)
self.supported_extensions = ['pdf']
self.chunker = TextChunker(ChunkingStrategy(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
respect_paragraph_boundaries=True
))
# Check which PDF backend is available
if HAS_PDFPLUMBER:
self.backend = 'pdfplumber'
elif HAS_PYPDF2:
self.backend = 'pypdf2'
else:
self.backend = None
logger.error("No PDF processing library available. Install pypdf2 or pdfplumber.")
def can_handle(self, file_path: Path) -> bool:
"""
Check if this loader can handle the given PDF file.
Args:
file_path: Path to the file to check
Returns:
True if this loader can process the PDF file
"""
if self.backend is None:
return False
return (file_path.suffix.lower() == '.pdf' and
file_path.exists() and
file_path.is_file())
async def extract_chunks(self, file_path: Path, **kwargs) -> AsyncGenerator[DocumentChunk, None]:
"""
Extract text chunks from a PDF file.
Args:
file_path: Path to the PDF file
**kwargs: Additional options:
- extract_images: Whether to extract image descriptions (default: False)
- extract_tables: Whether to extract table content (default: False)
- page_range: Tuple of (start, end) pages to extract (1-indexed)
Yields:
DocumentChunk objects containing extracted text and metadata
Raises:
FileNotFoundError: If the PDF file doesn't exist
ValueError: If the PDF file is invalid or can't be processed
"""
await self.validate_file(file_path)
if self.backend is None:
raise ValueError("No PDF processing backend available")
extract_images = kwargs.get('extract_images', False)
extract_tables = kwargs.get('extract_tables', False)
page_range = kwargs.get('page_range', None)
logger.info(f"Extracting chunks from PDF: {file_path} using {self.backend}")
try:
if self.backend == 'pdfplumber':
async for chunk in self._extract_with_pdfplumber(
file_path, extract_images, extract_tables, page_range
):
yield chunk
else:
async for chunk in self._extract_with_pypdf2(
file_path, page_range
):
yield chunk
except Exception as e:
logger.error(f"Error extracting from PDF {file_path}: {str(e)}")
raise ValueError(f"Failed to extract PDF content: {str(e)}") from e
async def _extract_with_pdfplumber(
self,
file_path: Path,
extract_images: bool,
extract_tables: bool,
page_range: Optional[tuple]
) -> AsyncGenerator[DocumentChunk, None]:
"""
Extract text using pdfplumber backend.
Args:
file_path: Path to PDF file
extract_images: Whether to extract image descriptions
extract_tables: Whether to extract table content
page_range: Optional page range to extract
Yields:
DocumentChunk objects
"""
def _extract_sync():
"""Synchronous extraction function to run in thread pool."""
with pdfplumber.open(file_path) as pdf:
total_pages = len(pdf.pages)
start_page = page_range[0] - 1 if page_range else 0
end_page = min(page_range[1], total_pages) if page_range else total_pages
for page_num in range(start_page, end_page):
page = pdf.pages[page_num]
# Extract main text
text = page.extract_text() or ""
# Extract table content if requested
if extract_tables:
tables = page.extract_tables()
for table in tables or []:
table_text = self._format_table(table)
text += f"\n\n[TABLE]\n{table_text}\n[/TABLE]"
# Note: Image extraction would require additional processing
if extract_images:
# Placeholder for image extraction
images = page.images
if images:
text += f"\n\n[IMAGES: {len(images)} images found on this page]"
if text.strip():
yield (page_num + 1, text.strip())
# Run extraction in thread pool to avoid blocking
loop = asyncio.get_event_loop()
page_generator = await loop.run_in_executor(None, lambda: list(_extract_sync()))
base_metadata = self.get_base_metadata(file_path)
chunk_index = 0
for page_num, page_text in page_generator:
page_metadata = base_metadata.copy()
page_metadata.update({
'page_number': page_num,
'extraction_method': 'pdfplumber',
'content_type': 'pdf_page'
})
# Chunk the page text
chunks = self.chunker.chunk_text(page_text, page_metadata)
for chunk_text, chunk_metadata in chunks:
yield DocumentChunk(
content=chunk_text,
metadata=chunk_metadata,
chunk_index=chunk_index,
source_file=file_path
)
chunk_index += 1
async def _extract_with_pypdf2(
self,
file_path: Path,
page_range: Optional[tuple]
) -> AsyncGenerator[DocumentChunk, None]:
"""
Extract text using PyPDF2 backend.
Args:
file_path: Path to PDF file
page_range: Optional page range to extract
Yields:
DocumentChunk objects
"""
def _extract_sync():
"""Synchronous extraction function."""
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
total_pages = len(pdf_reader.pages)
start_page = page_range[0] - 1 if page_range else 0
end_page = min(page_range[1], total_pages) if page_range else total_pages
for page_num in range(start_page, end_page):
page = pdf_reader.pages[page_num]
text = page.extract_text()
if text.strip():
yield (page_num + 1, text.strip())
# Run extraction in thread pool
loop = asyncio.get_event_loop()
page_generator = await loop.run_in_executor(None, lambda: list(_extract_sync()))
base_metadata = self.get_base_metadata(file_path)
chunk_index = 0
for page_num, page_text in page_generator:
page_metadata = base_metadata.copy()
page_metadata.update({
'page_number': page_num,
'extraction_method': 'pypdf2',
'content_type': 'pdf_page'
})
# Chunk the page text
chunks = self.chunker.chunk_text(page_text, page_metadata)
for chunk_text, chunk_metadata in chunks:
yield DocumentChunk(
content=chunk_text,
metadata=chunk_metadata,
chunk_index=chunk_index,
source_file=file_path
)
chunk_index += 1
def _format_table(self, table: list) -> str:
"""
Format extracted table data as text.
Args:
table: Table data as list of rows
Returns:
Formatted table text
"""
if not table:
return ""
# Simple table formatting
formatted_rows = []
for row in table:
if row: # Skip empty rows
cleaned_row = [str(cell).strip() if cell else "" for cell in row]
formatted_rows.append(" | ".join(cleaned_row))
return "\n".join(formatted_rows)
# Register the PDF loader
def _register_pdf_loader():
"""Register PDF loader with the registry."""
try:
from .registry import register_loader
register_loader(PDFLoader, ['pdf'])
logger.debug("PDF loader registered successfully")
except ImportError:
logger.debug("Registry not available during import")
# Auto-register when module is imported
_register_pdf_loader()
```
--------------------------------------------------------------------------------
/claude-hooks/utilities/mcp-client.js:
--------------------------------------------------------------------------------
```javascript
/**
* MCP Client for Memory Hook Integration
* Provides MCP protocol communication for memory operations
*/
const { spawn } = require('child_process');
const { EventEmitter } = require('events');
const fs = require('fs');
const path = require('path');
class MCPClient extends EventEmitter {
constructor(serverCommand, options = {}) {
super();
this.serverCommand = serverCommand;
this.serverWorkingDir = options.workingDir || process.cwd();
this.connectionTimeout = options.connectionTimeout || 5000;
this.toolCallTimeout = options.toolCallTimeout || 10000;
this.serverProcess = null;
this.messageId = 0;
this.pendingRequests = new Map();
this.connected = false;
this.buffer = '';
// Load environment variables from .env file
this.loadEnvironment();
}
/**
* Load environment variables from .env file
*/
loadEnvironment() {
const envPath = path.join(this.serverWorkingDir, '.env');
try {
if (fs.existsSync(envPath)) {
const envContent = fs.readFileSync(envPath, 'utf8');
const lines = envContent.split('\n');
for (const line of lines) {
const match = line.match(/^([^#\s][^=]*?)=(.*)$/);
if (match) {
const [, key, value] = match;
if (!process.env[key]) {
process.env[key] = value.replace(/^["']|["']$/g, '');
}
}
}
}
} catch (error) {
// Ignore .env loading errors
}
}
/**
* Start MCP server and establish connection
*/
async connect() {
return new Promise((resolve, reject) => {
try {
// Start MCP server process
this.serverProcess = spawn(this.serverCommand[0], this.serverCommand.slice(1), {
cwd: this.serverWorkingDir,
stdio: ['pipe', 'pipe', 'pipe'],
env: { ...process.env }
});
// Handle server output
this.serverProcess.stdout.on('data', (data) => {
this.buffer += data.toString();
this.processMessages();
});
this.serverProcess.stderr.on('data', (data) => {
const error = data.toString();
// Only emit critical errors, ignore warnings and debug info
if (error.includes('FATAL') || error.includes('ExceptionGroup')) {
this.emit('error', new Error(`Server error: ${error.substring(0, 200)}...`));
}
});
this.serverProcess.on('error', (error) => {
if (!this.connected) {
reject(new Error(`Server process error: ${error.message}`));
}
});
this.serverProcess.on('exit', (code) => {
this.connected = false;
if (code !== 0 && !this.connected) {
reject(new Error(`Server failed to start (exit code ${code})`));
}
});
// Initialize MCP connection
this.sendInitialize()
.then(() => {
this.connected = true;
resolve();
})
.catch(reject);
// Connection timeout
setTimeout(() => {
if (!this.connected) {
reject(new Error('Connection timeout'));
}
}, this.connectionTimeout);
} catch (error) {
reject(error);
}
});
}
/**
* Process incoming messages from server
*/
processMessages() {
const lines = this.buffer.split('\n');
this.buffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
try {
const message = JSON.parse(line);
this.handleMessage(message);
} catch (error) {
// Ignore malformed messages
}
}
}
}
/**
* Handle incoming MCP messages
*/
handleMessage(message) {
if (message.id && this.pendingRequests.has(message.id)) {
const { resolve, reject } = this.pendingRequests.get(message.id);
this.pendingRequests.delete(message.id);
if (message.error) {
reject(new Error(message.error.message || 'MCP Error'));
} else {
resolve(message.result);
}
}
}
/**
* Send MCP initialize request
*/
async sendInitialize() {
const message = {
jsonrpc: '2.0',
id: ++this.messageId,
method: 'initialize',
params: {
protocolVersion: '2024-11-05',
capabilities: {
tools: {}
},
clientInfo: {
name: 'claude-hooks-mcp-client',
version: '1.0.0'
}
}
};
return this.sendMessage(message);
}
/**
* Send message to MCP server
*/
async sendMessage(message) {
return new Promise((resolve, reject) => {
if (!this.serverProcess || this.serverProcess.exitCode !== null) {
reject(new Error('Server not running'));
return;
}
if (message.id) {
this.pendingRequests.set(message.id, { resolve, reject });
// Timeout for this specific request
setTimeout(() => {
if (this.pendingRequests.has(message.id)) {
this.pendingRequests.delete(message.id);
reject(new Error('Request timeout'));
}
}, this.toolCallTimeout);
}
try {
const messageStr = JSON.stringify(message) + '\n';
this.serverProcess.stdin.write(messageStr);
if (!message.id) {
resolve(); // No response expected
}
} catch (error) {
if (message.id && this.pendingRequests.has(message.id)) {
this.pendingRequests.delete(message.id);
}
reject(error);
}
});
}
/**
* Call a tool via MCP
*/
async callTool(toolName, args = {}) {
const message = {
jsonrpc: '2.0',
id: ++this.messageId,
method: 'tools/call',
params: {
name: toolName,
arguments: args
}
};
return this.sendMessage(message);
}
/**
* Get memory service health status
*/
async getHealthStatus() {
try {
const result = await this.callTool('check_database_health');
return {
success: true,
data: result.content ? this.parseToolResponse(result.content) : result
};
} catch (error) {
return {
success: false,
error: error.message,
fallback: true
};
}
}
/**
* Query memories using semantic search
*/
async queryMemories(query, limit = 10) {
try {
const result = await this.callTool('retrieve_memory', {
query: query,
n_results: limit
});
return this.parseToolResponse(result.content);
} catch (error) {
console.warn('[MCP Client] Memory query error:', error.message);
return [];
}
}
/**
* Query memories by time range
*/
async queryMemoriesByTime(timeQuery, limit = 10) {
try {
const result = await this.callTool('recall_memory', {
query: timeQuery,
n_results: limit
});
return this.parseToolResponse(result.content);
} catch (error) {
console.warn('[MCP Client] Time-based memory query error:', error.message);
return [];
}
}
/**
* Parse tool response content
*/
parseToolResponse(content) {
if (!content) return [];
// Handle array of content objects
if (Array.isArray(content)) {
const textContent = content.find(c => c.type === 'text')?.text || '';
return this.parseMemoryResults(textContent);
}
// Handle direct text content
if (typeof content === 'string') {
return this.parseMemoryResults(content);
}
// Handle object with text property
if (content.text) {
return this.parseMemoryResults(content.text);
}
return [];
}
/**
* Parse memory results from text response
*/
parseMemoryResults(textData) {
try {
// Handle Python dict format conversion to JSON
let cleanText = textData
.replace(/'/g, '"')
.replace(/True/g, 'true')
.replace(/False/g, 'false')
.replace(/None/g, 'null');
const parsed = JSON.parse(cleanText);
return parsed.results || parsed.memories || parsed || [];
} catch (error) {
// Try to extract JSON from text
const jsonMatch = textData.match(/\{[\s\S]*\}/);
if (jsonMatch) {
try {
const extracted = JSON.parse(jsonMatch[0]);
return extracted.results || extracted.memories || [extracted];
} catch {}
}
console.warn('[MCP Client] Could not parse memory results:', error.message);
return [];
}
}
/**
* Disconnect and cleanup
*/
async disconnect() {
this.connected = false;
// Clear pending requests
for (const [id, { reject }] of this.pendingRequests) {
reject(new Error('Connection closed'));
}
this.pendingRequests.clear();
// Terminate server process
if (this.serverProcess && this.serverProcess.exitCode === null) {
this.serverProcess.kill('SIGTERM');
// Force kill if doesn't exit gracefully
setTimeout(() => {
if (this.serverProcess && this.serverProcess.exitCode === null) {
this.serverProcess.kill('SIGKILL');
}
}, 2000);
}
}
}
module.exports = { MCPClient };
```
--------------------------------------------------------------------------------
/scripts/validation/validate_timestamp_integrity.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Timestamp Integrity Validation Script
Detects timestamp anomalies that could indicate the regression bug where
created_at timestamps were being reset during metadata sync operations.
Checks for:
1. Suspicious clusters of recent created_at timestamps
2. Created_at timestamps that are newer than they should be
3. Memories with identical or very similar created_at timestamps (indicating bulk reset)
4. created_at > updated_at (logically impossible)
"""
import asyncio
import sys
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
from collections import Counter
from typing import List, Dict, Tuple, Optional
# Add project root to path
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root / "src"))
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
import mcp_memory_service.config as config_module
class TimestampIntegrityValidator:
"""Validator for detecting timestamp integrity issues."""
def __init__(self, storage):
self.storage = storage
self.warnings = []
self.errors = []
async def validate_all(self) -> Tuple[bool, List[str], List[str]]:
"""
Run all timestamp integrity checks.
Returns:
Tuple of (is_healthy, warnings, errors)
"""
print("🔍 Running timestamp integrity validation...\n")
await self.check_impossible_timestamps()
await self.check_suspicious_clusters()
await self.check_future_timestamps()
await self.check_timestamp_distribution()
# Print summary
print("\n" + "="*60)
print("📊 VALIDATION SUMMARY")
print("="*60)
if not self.errors and not self.warnings:
print("✅ No timestamp integrity issues detected!")
return True, [], []
if self.errors:
print(f"\n❌ ERRORS: {len(self.errors)}")
for error in self.errors:
print(f" - {error}")
if self.warnings:
print(f"\n⚠️ WARNINGS: {len(self.warnings)}")
for warning in self.warnings:
print(f" - {warning}")
return len(self.errors) == 0, self.warnings, self.errors
async def check_impossible_timestamps(self):
"""Check for logically impossible timestamps (created_at > updated_at)."""
print("1️⃣ Checking for impossible timestamps (created_at > updated_at)...")
if hasattr(self.storage, 'conn'): # SQLite-vec
cursor = self.storage.conn.execute('''
SELECT content_hash, created_at, updated_at,
created_at_iso, updated_at_iso
FROM memories
WHERE created_at > updated_at
''')
impossible = cursor.fetchall()
if impossible:
self.errors.append(
f"Found {len(impossible)} memories with created_at > updated_at (impossible!)"
)
for row in impossible[:5]: # Show first 5
content_hash, created_at, updated_at, created_iso, updated_iso = row
print(f" ❌ {content_hash[:8]}: created={created_iso}, updated={updated_iso}")
else:
print(" ✅ No impossible timestamps found")
else:
print(" ⚠️ Skipped (not SQLite backend)")
async def check_suspicious_clusters(self):
"""Check for suspicious clusters of recent created_at timestamps."""
print("\n2️⃣ Checking for suspicious timestamp clusters...")
if hasattr(self.storage, 'conn'): # SQLite-vec
# Get all created_at timestamps
cursor = self.storage.conn.execute('''
SELECT created_at, created_at_iso, COUNT(*) as count
FROM memories
GROUP BY created_at
HAVING COUNT(*) > 1
ORDER BY count DESC
LIMIT 10
''')
clusters = cursor.fetchall()
if clusters:
# Check if there are large clusters (> 5 memories with same timestamp)
large_clusters = [c for c in clusters if c[2] > 5]
if large_clusters:
self.warnings.append(
f"Found {len(large_clusters)} suspicious timestamp clusters "
f"(multiple memories with identical created_at)"
)
print(f" ⚠️ {len(large_clusters)} suspicious clusters found:")
for created_at, created_iso, count in large_clusters[:5]:
age_hours = (time.time() - created_at) / 3600
print(f" - {count} memories at {created_iso} ({age_hours:.1f}h ago)")
else:
print(f" ✅ No suspicious clusters (some duplicates normal)")
else:
print(" ✅ No timestamp clusters found")
else:
print(" ⚠️ Skipped (not SQLite backend)")
async def check_future_timestamps(self):
"""Check for timestamps in the future."""
print("\n3️⃣ Checking for future timestamps...")
now = time.time()
future_threshold = now + 300 # 5 minutes tolerance
if hasattr(self.storage, 'conn'): # SQLite-vec
cursor = self.storage.conn.execute('''
SELECT content_hash, created_at, updated_at,
created_at_iso, updated_at_iso
FROM memories
WHERE created_at > ? OR updated_at > ?
''', (future_threshold, future_threshold))
future_timestamps = cursor.fetchall()
if future_timestamps:
self.errors.append(
f"Found {len(future_timestamps)} memories with timestamps in the future!"
)
for row in future_timestamps[:5]:
content_hash, created_at, updated_at, created_iso, updated_iso = row
if created_at > future_threshold:
print(f" ❌ {content_hash[:8]}: created_at in future: {created_iso}")
if updated_at > future_threshold:
print(f" ❌ {content_hash[:8]}: updated_at in future: {updated_iso}")
else:
print(" ✅ No future timestamps found")
else:
print(" ⚠️ Skipped (not SQLite backend)")
async def check_timestamp_distribution(self):
"""Check timestamp distribution for anomalies (e.g., all recent)."""
print("\n4️⃣ Checking timestamp distribution...")
if hasattr(self.storage, 'conn'): # SQLite-vec
# Get timestamp statistics
cursor = self.storage.conn.execute('''
SELECT
COUNT(*) as total,
MIN(created_at) as oldest,
MAX(created_at) as newest,
AVG(created_at) as avg_timestamp
FROM memories
''')
row = cursor.fetchone()
if not row or row[0] == 0:
print(" ℹ️ No memories to analyze")
return
total, oldest, newest, avg_timestamp = row
# Calculate time ranges
now = time.time()
oldest_age_days = (now - oldest) / 86400
newest_age_hours = (now - newest) / 3600
print(f" 📈 Total memories: {total}")
print(f" 📅 Oldest: {datetime.utcfromtimestamp(oldest).isoformat()}Z ({oldest_age_days:.1f} days ago)")
print(f" 📅 Newest: {datetime.utcfromtimestamp(newest).isoformat()}Z ({newest_age_hours:.1f} hours ago)")
# Check for anomaly: if > 50% of memories created in last 24 hours
# but oldest is > 7 days old (indicates bulk timestamp reset)
cursor = self.storage.conn.execute('''
SELECT COUNT(*) FROM memories
WHERE created_at > ?
''', (now - 86400,))
recent_count = cursor.fetchone()[0]
recent_percentage = (recent_count / total) * 100
if recent_percentage > 50 and oldest_age_days > 7:
self.warnings.append(
f"Suspicious: {recent_percentage:.1f}% of memories created in last 24h, "
f"but oldest memory is {oldest_age_days:.1f} days old. "
f"This could indicate timestamp reset bug!"
)
print(f" ⚠️ {recent_percentage:.1f}% created in last 24h (suspicious if many old memories)")
# Check distribution by age buckets
buckets = [
("Last hour", 3600),
("Last 24 hours", 86400),
("Last week", 604800),
("Last month", 2592000),
("Older", float('inf'))
]
print(f"\n 📊 Distribution by age:")
for label, seconds in buckets:
if seconds == float('inf'):
cursor = self.storage.conn.execute('''
SELECT COUNT(*) FROM memories
WHERE created_at < ?
''', (now - 2592000,))
else:
cursor = self.storage.conn.execute('''
SELECT COUNT(*) FROM memories
WHERE created_at > ?
''', (now - seconds,))
count = cursor.fetchone()[0]
percentage = (count / total) * 100
print(f" {label:15}: {count:4} ({percentage:5.1f}%)")
else:
print(" ⚠️ Skipped (not SQLite backend)")
async def main():
"""Main validation function."""
print("="*60)
print("⏰ TIMESTAMP INTEGRITY VALIDATION")
print("="*60)
print()
try:
# Get database path from config
storage_backend = os.getenv('MCP_MEMORY_STORAGE_BACKEND', 'sqlite_vec')
db_path = config_module.MEMORY_SQLITE_DB_PATH
print(f"Backend: {storage_backend}")
print(f"Database: {db_path}")
print()
# Initialize storage
storage = SqliteVecMemoryStorage(
db_path=db_path,
embedding_model="all-MiniLM-L6-v2"
)
await storage.initialize()
# Run validation
validator = TimestampIntegrityValidator(storage)
is_healthy, warnings, errors = await validator.validate_all()
# Close storage
if hasattr(storage, 'close'):
storage.close()
# Exit with appropriate code
if errors:
print("\n❌ Validation FAILED with errors")
sys.exit(1)
elif warnings:
print("\n⚠️ Validation completed with warnings")
sys.exit(0)
else:
print("\n✅ Validation PASSED - Timestamps are healthy!")
sys.exit(0)
except Exception as e:
print(f"\n❌ Validation failed with exception: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/docs/troubleshooting/session-end-hooks.md:
--------------------------------------------------------------------------------
```markdown
# SessionEnd Hook Troubleshooting Guide
## Overview
SessionEnd hooks automatically consolidate conversation outcomes when you exit Claude Code. However, many users are confused about **when these hooks actually fire** and why memories might not be created as expected.
This guide clarifies the session lifecycle and common troubleshooting scenarios.
---
## Critical Concept: Session Lifecycle
Claude Code distinguishes between **session pause/suspend** and **session termination**:
| User Action | Session State | Hook Triggered | Memory Created? |
|-------------|---------------|----------------|-----------------|
| **Ctrl+C (once)** | Interrupt input | None | ❌ No |
| **Ctrl+C (twice)** | Suspend session | None | ❌ No |
| **Resume session** | Continue existing | `SessionStart:resume` | ❌ No (loads existing) |
| **`/exit` command** | Terminate | `SessionEnd` | ✅ **Yes** |
| **Close terminal** | Terminate | `SessionEnd` | ✅ **Yes** |
| **Kill process** | May terminate | `SessionEnd` (if graceful) | ⚠️ Maybe |
### Key Takeaway
**Ctrl+C does NOT trigger SessionEnd hooks.** It suspends the session, which you can later resume. Only actual session termination (e.g., `/exit`) triggers SessionEnd.
---
## Common Issue: "My Session Didn't Create a Memory"
### Symptom
You exited Claude Code with Ctrl+C (twice), resumed later, and noticed no `session-consolidation` memory was created for your previous session.
### Root Cause
**Ctrl+C suspends the session rather than ending it.** When you resume with `SessionStart:resume`, the session continues from where you left off - no SessionEnd hook fires.
### Evidence
When you resume a session, you'll see:
```
SessionStart:resume hook success
```
This confirms you **resumed** an existing session, not started a new one.
### Solution
**Always use `/exit` to properly terminate sessions** if you want SessionEnd memories created:
```bash
# In Claude Code prompt:
/exit
```
This triggers graceful shutdown and SessionEnd hook execution.
---
## Common Issue: Connection Failures (SessionEnd & SessionStart)
> **Note**: This issue affects both SessionEnd and SessionStart hooks, but with different symptoms:
> - **SessionEnd**: Hard failure - cannot store session memory
> - **SessionStart**: Soft failure - falls back to MCP tools, shows "No relevant memories found"
>
> See [hooks-quick-reference.md](hooks-quick-reference.md#sessionstart-hook-issues) for detailed SessionStart troubleshooting.
### Symptom (SessionEnd)
During SessionStart, you see:
```
⚠️ Memory Connection → Failed to connect using any available protocol
💾 Storage → 💾 Unknown Storage (http://127.0.0.1:8000)
```
### Symptom (SessionStart)
Multiple "MCP Fallback" messages and no memories loaded:
```
↩️ MCP Fallback → Using standard MCP tools
↩️ MCP Fallback → Using standard MCP tools
↩️ MCP Fallback → Using standard MCP tools
📭 Memory Search → No relevant memories found
```
### Root Cause
**HTTP/HTTPS protocol mismatch** between hook configuration and memory service.
**Example**:
- **Server running**: `https://localhost:8000` (HTTPS)
- **Hook configured**: `http://127.0.0.1:8000` (HTTP)
### Diagnosis
Check your server protocol:
```bash
# Check server status
systemctl --user status mcp-memory-http.service
# Look for: "Uvicorn running on https://0.0.0.0:8000" or "http://..."
# Or test connection
curl -sk "https://localhost:8000/api/health" # HTTPS
curl -s "http://127.0.0.1:8000/api/health" # HTTP
```
Check your hook configuration:
```bash
grep endpoint ~/.claude/hooks/config.json
# Should show: "endpoint": "https://localhost:8000"
```
### Solution
Update `~/.claude/hooks/config.json` to match server protocol:
```json
{
"memoryService": {
"http": {
"endpoint": "https://localhost:8000", // Match your server
"apiKey": "your-api-key-here"
}
}
}
```
**No restart required** - hooks reload config on next execution.
---
## SessionEnd Requirements
Even if SessionEnd fires correctly, memory creation requires:
### 1. Minimum Session Length
- Default: **100+ characters** total conversation
- Configurable: `sessionAnalysis.minSessionLength` in `config.json`
- Reason: Prevents noise from trivial sessions
### 2. Minimum Confidence Score
- Default: **> 0.1** (10% confidence)
- Based on conversation analysis quality
- Low confidence = session too generic to extract insights
### 3. Session Consolidation Enabled
```json
{
"memoryService": {
"enableSessionConsolidation": true // Must be true
}
}
```
### What Gets Extracted
SessionEnd analyzes your conversation to extract:
- **Topics**: Keywords like "implementation", "debugging", "architecture", "performance"
- **Decisions**: Phrases like "decided to", "will use", "chose to", "going with"
- **Insights**: Phrases like "learned that", "discovered", "realized"
- **Code Changes**: Phrases like "implemented", "created", "refactored"
- **Next Steps**: Phrases like "next we need", "TODO", "remaining"
If conversation lacks these patterns, confidence will be low and memory won't be created.
---
## Verification & Debugging
### 1. Check Recent Session Memories
```bash
# Search for recent session consolidation memories
curl -sk "https://localhost:8000/api/search/by-tag" \
-H "Content-Type: application/json" \
-d '{"tags": ["session-consolidation"], "limit": 5}' | \
python -m json.tool | grep created_at_iso
```
Look for recent timestamps (today/yesterday).
### 2. Test SessionEnd Hook Manually
```bash
# Run hook with test conversation
node ~/.claude/hooks/core/session-end.js
```
Check output for:
- `[Memory Hook] Session ending - consolidating outcomes...`
- `[Memory Hook] Session analysis: X topics, Y decisions, confidence: Z%`
- `[Memory Hook] Session consolidation stored successfully`
### 3. Verify Connection
```bash
# Test server health
curl -sk "https://localhost:8000/api/health"
# Check config matches
grep endpoint ~/.claude/hooks/config.json
```
### 4. Check SessionEnd Configuration
```bash
# Verify SessionEnd hook is configured
grep -A 10 "SessionEnd" ~/.claude/settings.json
# Should show:
# "SessionEnd": [
# {
# "hooks": [
# {
# "type": "command",
# "command": "node \"/home/user/.claude/hooks/core/session-end.js\"",
# "timeout": 15
# }
# ]
# }
# ]
```
---
## Quick Diagnosis Checklist
Use this checklist when SessionEnd memories aren't being created:
- [ ] **Did I use `/exit`** or just Ctrl+C?
- **Fix**: Use `/exit` command for proper termination
- [ ] **Does `config.json` endpoint match server protocol?**
- **Check**: HTTP vs HTTPS in both config and server
- **Fix**: Update endpoint in `~/.claude/hooks/config.json`
- [ ] **Is the memory service running?**
- **Check**: `curl https://localhost:8000/api/health`
- **Fix**: Start server with `systemctl --user start mcp-memory-http.service`
- [ ] **Was conversation meaningful?**
- **Check**: Total length > 100 characters
- **Fix**: Have longer conversations with decisions/insights
- [ ] **Is session consolidation enabled?**
- **Check**: `enableSessionConsolidation: true` in config
- **Fix**: Update `~/.claude/hooks/config.json`
- [ ] **Is SessionEnd hook installed?**
- **Check**: `grep SessionEnd ~/.claude/settings.json`
- **Fix**: Run `cd claude-hooks && python install_hooks.py --all`
---
## Best Practices
### For Reliable Memory Consolidation
1. **Always use `/exit`** when you want session memories created
2. **Avoid Ctrl+C for final exit** - Use it only for interrupts/corrections
3. **Have meaningful conversations** - Include decisions, insights, plans
4. **Verify endpoint configuration** - HTTP vs HTTPS must match
5. **Check session memories periodically** - Confirm system is working
### For Debugging
1. **Check recent memories** - Look for session-consolidation tag
2. **Test hook manually** - Run `session-end.js` directly
3. **Verify connection** - Test health endpoint
4. **Read hook logs** - Look for error messages in terminal
5. **Consult session requirements** - Length, confidence, enabled settings
---
## Technical Details
### SessionEnd Hook Implementation
**File**: `~/.claude/hooks/core/session-end.js`
**Key Code Sections**:
- **Lines 298-365**: Main `onSessionEnd()` function
- **Line 316**: Minimum session length check (100 chars)
- **Line 329**: Minimum confidence check (0.1)
- **Line 305**: Session consolidation enabled check
- **Lines 213-293**: `storeSessionMemory()` - HTTP API call
### Configuration Structure
**File**: `~/.claude/hooks/config.json`
```json
{
"memoryService": {
"protocol": "auto",
"preferredProtocol": "http",
"http": {
"endpoint": "https://localhost:8000", // Must match server
"apiKey": "your-api-key",
"healthCheckTimeout": 3000
},
"enableSessionConsolidation": true
},
"sessionAnalysis": {
"extractTopics": true,
"extractDecisions": true,
"extractInsights": true,
"extractCodeChanges": true,
"extractNextSteps": true,
"minSessionLength": 100,
"minConfidence": 0.1
}
}
```
### Hook Settings
**File**: `~/.claude/settings.json`
```json
{
"hooks": {
"SessionEnd": [
{
"hooks": [
{
"type": "command",
"command": "node \"/home/user/.claude/hooks/core/session-end.js\"",
"timeout": 15 // 15 seconds (vs 10s for SessionStart)
}
]
}
]
}
}
```
---
## Related Documentation
- **Hook Installation**: `claude-hooks/README.md`
- **Configuration Guide**: `claude-hooks/CONFIGURATION.md`
- **HTTP Server Management**: `docs/http-server-management.md`
- **General Troubleshooting**: `docs/troubleshooting/general.md`
- **SessionStart Windows Bug**: `claude-hooks/WINDOWS-SESSIONSTART-BUG.md`
---
## Common Questions
### Q: Why didn't my session create a memory even though I used `/exit`?
**A**: Check these conditions:
1. Conversation was too short (< 100 chars)
2. Conversation lacked decision/insight patterns (low confidence)
3. Connection to memory service failed (check endpoint)
4. Session consolidation disabled in config
### Q: Does Ctrl+C ever trigger SessionEnd?
**A**: No. Ctrl+C sends SIGINT which interrupts/suspends but doesn't terminate the session. Use `/exit` for proper termination.
### Q: Can I test if SessionEnd will work before exiting?
**A**: Yes:
```bash
node ~/.claude/hooks/core/session-end.js
```
This runs the hook with a test conversation and shows what would happen.
### Q: How do I see all my session consolidation memories?
**A**:
```bash
curl -sk "https://localhost:8000/api/search/by-tag" \
-H "Content-Type: application/json" \
-d '{"tags": ["session-consolidation"]}' | \
python -m json.tool
```
### Q: What's the difference between SessionStart and SessionEnd hooks?
**A**:
- **SessionStart**: Loads and injects memory context at session start
- **SessionEnd**: Analyzes and stores session outcomes at session end
- Both can have connection issues (check endpoint configuration)
- SessionStart has timeout issues on Windows (Ctrl+C hang bug)
---
**Last Updated**: 2025-11-01
**Applies to**: v8.15.1+
**Author**: Community Documentation
```
--------------------------------------------------------------------------------
/tests/integration/test_api_memories_chronological.py:
--------------------------------------------------------------------------------
```python
"""
Test chronological ordering and pagination for the /api/memories endpoint.
Tests verify that the GitHub issue #79 has been properly resolved by ensuring:
1. Memories are returned in chronological order (newest first)
2. Pagination works correctly with chronological ordering
3. All storage backends support the new ordering
"""
import pytest
import asyncio
import time
import tempfile
from datetime import datetime, timedelta
from typing import List, Dict, Any
import os
# Import project modules
# Note: This assumes the project is installed in editable mode with `pip install -e .`
# or PYTHONPATH is configured correctly for the test environment
from mcp_memory_service.models.memory import Memory
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
class TestChronologicalOrdering:
"""Test chronological ordering functionality across all storage backends."""
async def create_test_memories(self, storage) -> List[Memory]:
"""Create test memories with different timestamps."""
memories = []
base_time = time.time() - 3600 # Start 1 hour ago
# Create 5 test memories with 10-minute intervals
for i in range(5):
timestamp = base_time + (i * 600) # 10-minute intervals
memory = Memory(
content=f"Test memory {i + 1}",
content_hash=f"hash_{i + 1}",
tags=[f"tag{i + 1}", "test"],
memory_type="test",
metadata={"index": i + 1},
created_at=timestamp,
updated_at=timestamp
)
memories.append(memory)
success, message = await storage.store(memory)
assert success, f"Failed to store memory {i + 1}: {message}"
return memories
@pytest.mark.asyncio
async def test_get_all_memories_chronological_order_sqlite(self):
"""Test that get_all_memories returns memories in chronological order (SQLite)."""
with tempfile.TemporaryDirectory() as tmp_dir:
storage = SqliteVecMemoryStorage(os.path.join(tmp_dir, "test.db"))
await storage.initialize()
# Create test memories
original_memories = await self.create_test_memories(storage)
# Get all memories
retrieved_memories = await storage.get_all_memories()
# Verify we got all memories
assert len(retrieved_memories) == 5, f"Expected 5 memories, got {len(retrieved_memories)}"
# Verify chronological order (newest first)
for i in range(len(retrieved_memories) - 1):
current_time = retrieved_memories[i].created_at or 0
next_time = retrieved_memories[i + 1].created_at or 0
assert current_time >= next_time, f"Memory at index {i} is older than memory at index {i + 1}"
# Verify the actual order matches expectations (newest first)
expected_order = [5, 4, 3, 2, 1] # Newest to oldest
actual_order = [int(mem.content.split()[-1]) for mem in retrieved_memories]
assert actual_order == expected_order, f"Expected order {expected_order}, got {actual_order}"
@pytest.mark.asyncio
async def test_pagination_with_chronological_order_sqlite(self):
"""Test pagination maintains chronological order (SQLite)."""
with tempfile.TemporaryDirectory() as tmp_dir:
storage = SqliteVecMemoryStorage(os.path.join(tmp_dir, "test.db"))
await storage.initialize()
# Create test memories
await self.create_test_memories(storage)
# Test pagination: Get first 2 memories
first_page = await storage.get_all_memories(limit=2, offset=0)
assert len(first_page) == 2
# Test pagination: Get next 2 memories
second_page = await storage.get_all_memories(limit=2, offset=2)
assert len(second_page) == 2
# Test pagination: Get last memory
third_page = await storage.get_all_memories(limit=2, offset=4)
assert len(third_page) == 1
# Verify chronological order across pages
all_paginated = first_page + second_page + third_page
# Should be in chronological order (newest first)
for i in range(len(all_paginated) - 1):
current_time = all_paginated[i].created_at or 0
next_time = all_paginated[i + 1].created_at or 0
assert current_time >= next_time, f"Pagination broke chronological order at position {i}"
# Verify content order
expected_content_order = ["Test memory 5", "Test memory 4", "Test memory 3", "Test memory 2", "Test memory 1"]
actual_content_order = [mem.content for mem in all_paginated]
assert actual_content_order == expected_content_order
@pytest.mark.asyncio
async def test_count_all_memories_sqlite(self):
"""Test count_all_memories method (SQLite)."""
with tempfile.TemporaryDirectory() as tmp_dir:
storage = SqliteVecMemoryStorage(os.path.join(tmp_dir, "test.db"))
await storage.initialize()
# Initially should be empty
initial_count = await storage.count_all_memories()
assert initial_count == 0
# Create test memories
await self.create_test_memories(storage)
# Should now have 5 memories
final_count = await storage.count_all_memories()
assert final_count == 5
@pytest.mark.asyncio
async def test_empty_storage_handling_sqlite(self):
"""Test handling of empty storage (SQLite)."""
with tempfile.TemporaryDirectory() as tmp_dir:
storage = SqliteVecMemoryStorage(os.path.join(tmp_dir, "test.db"))
await storage.initialize()
# Test get_all_memories on empty storage
memories = await storage.get_all_memories()
assert memories == []
# Test with pagination on empty storage
paginated = await storage.get_all_memories(limit=10, offset=0)
assert paginated == []
# Test count on empty storage
count = await storage.count_all_memories()
assert count == 0
@pytest.mark.asyncio
async def test_offset_beyond_total_sqlite(self):
"""Test offset beyond total records (SQLite)."""
with tempfile.TemporaryDirectory() as tmp_dir:
storage = SqliteVecMemoryStorage(os.path.join(tmp_dir, "test.db"))
await storage.initialize()
# Create test memories
await self.create_test_memories(storage)
# Test offset beyond total records
memories = await storage.get_all_memories(limit=10, offset=100)
assert memories == []
@pytest.mark.asyncio
async def test_large_limit_sqlite(self):
"""Test large limit parameter (SQLite)."""
with tempfile.TemporaryDirectory() as tmp_dir:
storage = SqliteVecMemoryStorage(os.path.join(tmp_dir, "test.db"))
await storage.initialize()
# Create test memories
await self.create_test_memories(storage)
# Test limit larger than total records
memories = await storage.get_all_memories(limit=100, offset=0)
assert len(memories) == 5 # Should return all 5 memories
@pytest.mark.asyncio
async def test_mixed_timestamps_ordering_sqlite(self):
"""Test ordering with mixed/unsorted timestamps (SQLite)."""
with tempfile.TemporaryDirectory() as tmp_dir:
storage = SqliteVecMemoryStorage(os.path.join(tmp_dir, "test.db"))
await storage.initialize()
# Create memories with deliberately mixed timestamps
base_time = time.time()
timestamps = [base_time + 300, base_time + 100, base_time + 500, base_time + 200, base_time + 400]
for i, timestamp in enumerate(timestamps):
memory = Memory(
content=f"Mixed memory {i + 1}",
content_hash=f"mixed_hash_{i + 1}",
tags=["mixed", "test"],
memory_type="mixed",
metadata={"timestamp": timestamp},
created_at=timestamp,
updated_at=timestamp
)
success, message = await storage.store(memory)
assert success, f"Failed to store mixed memory {i + 1}: {message}"
# Retrieve all memories
memories = await storage.get_all_memories()
# Should be ordered by timestamp (newest first)
expected_order = [base_time + 500, base_time + 400, base_time + 300, base_time + 200, base_time + 100]
actual_timestamps = [mem.created_at for mem in memories]
assert actual_timestamps == expected_order, f"Expected {expected_order}, got {actual_timestamps}"
class TestAPIChronologicalIntegration:
"""Integration tests that would test the actual API endpoints.
These tests are structured to be easily adaptable for testing the actual
FastAPI endpoints when a test client is available.
"""
def test_api_endpoint_structure(self):
"""Test that the API endpoint imports and structure are correct."""
# Import the API router to ensure it loads correctly
from mcp_memory_service.web.api.memories import router
# Verify the router exists and has the expected endpoints
routes = [route.path for route in router.routes]
assert "/memories" in routes
assert "/memories/{content_hash}" in routes
def test_memory_response_model(self):
"""Test that the response models include necessary fields for chronological ordering."""
from mcp_memory_service.web.api.memories import MemoryResponse, MemoryListResponse
# Verify MemoryResponse has timestamp fields
response_fields = MemoryResponse.__fields__.keys()
assert "created_at" in response_fields
assert "created_at_iso" in response_fields
assert "updated_at" in response_fields
assert "updated_at_iso" in response_fields
# Verify MemoryListResponse has pagination fields
list_fields = MemoryListResponse.__fields__.keys()
assert "memories" in list_fields
assert "total" in list_fields
assert "page" in list_fields
assert "page_size" in list_fields
assert "has_more" in list_fields
def test_storage_backend_type_compatibility(self):
"""Test that the API endpoints use the correct base storage type."""
from mcp_memory_service.web.api.memories import list_memories
import inspect
# Get the signature of the list_memories function
sig = inspect.signature(list_memories)
storage_param = sig.parameters['storage']
# Check that it uses the base MemoryStorage type, not a specific implementation
assert 'MemoryStorage' in str(storage_param.annotation)
if __name__ == "__main__":
# Run tests directly
pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/discovery/client.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Discovery client for MCP Memory Service.
This module provides a high-level client for discovering and connecting to
MCP Memory Service instances on the local network.
"""
import asyncio
import logging
import aiohttp
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from .mdns_service import ServiceDiscovery, ServiceDetails
from ..config import MDNS_DISCOVERY_TIMEOUT
logger = logging.getLogger(__name__)
@dataclass
class HealthStatus:
"""Health status of a discovered service."""
healthy: bool
status: str
backend: str
statistics: Dict[str, Any]
response_time_ms: float
error: Optional[str] = None
class DiscoveryClient:
"""High-level client for discovering and validating MCP Memory Services."""
def __init__(self, discovery_timeout: int = MDNS_DISCOVERY_TIMEOUT):
self.discovery_timeout = discovery_timeout
self._discovery = ServiceDiscovery(discovery_timeout=discovery_timeout)
async def find_best_service(
self,
prefer_https: bool = True,
require_auth: Optional[bool] = None,
validate_health: bool = True
) -> Optional[ServiceDetails]:
"""
Find the best MCP Memory Service on the network.
Args:
prefer_https: Prefer HTTPS services over HTTP
require_auth: Require (True) or reject (False) services with auth, None for any
validate_health: Validate service health before returning
Returns:
Best service found, or None if no suitable service
"""
services = await self.discover_services()
if not services:
logger.info("No MCP Memory Services found on the network")
return None
# Filter services based on requirements
filtered_services = []
for service in services:
# Check auth requirement
if require_auth is not None and service.requires_auth != require_auth:
continue
filtered_services.append(service)
if not filtered_services:
logger.info("No services match the specified requirements")
return None
# Sort services by preference (HTTPS first if preferred)
def service_priority(service: ServiceDetails) -> tuple:
https_score = 1 if service.https else 0
if not prefer_https:
https_score = 1 - https_score # Invert preference
return (https_score, service.port) # Secondary sort by port for consistency
filtered_services.sort(key=service_priority, reverse=True)
# Validate health if requested
if validate_health:
for service in filtered_services:
health = await self.check_service_health(service)
if health and health.healthy:
logger.info(f"Selected healthy service: {service.name} at {service.url}")
return service
else:
logger.warning(f"Service {service.name} failed health check: {health.error if health else 'Unknown error'}")
logger.warning("No healthy services found")
return None
else:
# Return first service without health validation
best_service = filtered_services[0]
logger.info(f"Selected service: {best_service.name} at {best_service.url}")
return best_service
async def discover_services(self) -> List[ServiceDetails]:
"""Discover all MCP Memory Services on the network."""
logger.info("Discovering MCP Memory Services on the network...")
services = await self._discovery.discover_services()
if services:
logger.info(f"Found {len(services)} MCP Memory Services:")
for service in services:
logger.info(f" - {service.name} at {service.url} (Auth: {service.requires_auth})")
else:
logger.info("No MCP Memory Services found")
return services
async def check_service_health(
self,
service: ServiceDetails,
timeout: float = 5.0
) -> Optional[HealthStatus]:
"""
Check the health of a discovered service.
Args:
service: Service to check
timeout: Request timeout in seconds
Returns:
HealthStatus if check succeeded, None if failed
"""
health_url = f"{service.api_url}/health"
try:
import time
start_time = time.time()
timeout_config = aiohttp.ClientTimeout(total=timeout)
connector = aiohttp.TCPConnector(verify_ssl=False) # Allow self-signed certs
async with aiohttp.ClientSession(
timeout=timeout_config,
connector=connector
) as session:
async with session.get(health_url) as response:
response_time = (time.time() - start_time) * 1000 # Convert to ms
if response.status == 200:
data = await response.json()
return HealthStatus(
healthy=True,
status=data.get('status', 'unknown'),
backend=data.get('storage_type', 'unknown'),
statistics=data.get('statistics', {}),
response_time_ms=response_time
)
else:
return HealthStatus(
healthy=False,
status='error',
backend='unknown',
statistics={},
response_time_ms=response_time,
error=f"HTTP {response.status}"
)
except asyncio.TimeoutError:
return HealthStatus(
healthy=False,
status='timeout',
backend='unknown',
statistics={},
response_time_ms=timeout * 1000,
error="Request timeout"
)
except Exception as e:
return HealthStatus(
healthy=False,
status='error',
backend='unknown',
statistics={},
response_time_ms=0,
error=str(e)
)
async def get_service_capabilities(
self,
service: ServiceDetails,
api_key: Optional[str] = None,
timeout: float = 5.0
) -> Optional[Dict[str, Any]]:
"""
Get detailed capabilities of a service.
Args:
service: Service to query
api_key: API key if required
timeout: Request timeout
Returns:
Service capabilities or None if failed
"""
docs_url = f"{service.api_url}/docs"
try:
headers = {}
if api_key and service.requires_auth:
headers['Authorization'] = f'Bearer {api_key}'
timeout_config = aiohttp.ClientTimeout(total=timeout)
connector = aiohttp.TCPConnector(verify_ssl=False)
async with aiohttp.ClientSession(
timeout=timeout_config,
connector=connector
) as session:
# Try to get OpenAPI spec
openapi_url = f"{service.api_url}/openapi.json"
async with session.get(openapi_url, headers=headers) as response:
if response.status == 200:
return await response.json()
except Exception as e:
logger.error(f"Failed to get service capabilities: {e}")
return None
async def find_services_with_health(
self,
prefer_https: bool = True,
require_auth: Optional[bool] = None
) -> List[tuple[ServiceDetails, HealthStatus]]:
"""
Find all services and their health status.
Returns:
List of (service, health_status) tuples, sorted by preference
"""
services = await self.discover_services()
if not services:
return []
# Filter by auth requirement
if require_auth is not None:
services = [s for s in services if s.requires_auth == require_auth]
# Check health for all services concurrently
health_tasks = [self.check_service_health(service) for service in services]
health_results = await asyncio.gather(*health_tasks, return_exceptions=True)
# Combine services with health status
service_health_pairs = []
for service, health_result in zip(services, health_results):
if isinstance(health_result, Exception):
health = HealthStatus(
healthy=False,
status='error',
backend='unknown',
statistics={},
response_time_ms=0,
error=str(health_result)
)
else:
health = health_result or HealthStatus(
healthy=False,
status='unknown',
backend='unknown',
statistics={},
response_time_ms=0,
error="No response"
)
service_health_pairs.append((service, health))
# Sort by preference: healthy first, then HTTPS if preferred, then response time
def sort_key(pair: tuple[ServiceDetails, HealthStatus]) -> tuple:
service, health = pair
healthy_score = 1 if health.healthy else 0
https_score = 1 if service.https else 0
if not prefer_https:
https_score = 1 - https_score
response_time = health.response_time_ms if health.healthy else float('inf')
return (healthy_score, https_score, -response_time) # Negative for ascending order
service_health_pairs.sort(key=sort_key, reverse=True)
return service_health_pairs
async def stop(self) -> None:
"""Stop the discovery client."""
await self._discovery.stop_discovery()
```
--------------------------------------------------------------------------------
/tests/unit/test_csv_loader.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit tests for CSV document loader.
"""
import pytest
import asyncio
import csv
import io
from pathlib import Path
from mcp_memory_service.ingestion.csv_loader import CSVLoader
from mcp_memory_service.ingestion.base import DocumentChunk
from conftest import extract_chunks_from_temp_file
class TestCSVLoader:
"""Test suite for CSVLoader class."""
def test_initialization(self):
"""Test basic initialization of CSVLoader."""
loader = CSVLoader(chunk_size=500, chunk_overlap=50)
assert loader.chunk_size == 500
assert loader.chunk_overlap == 50
assert 'csv' in loader.supported_extensions
def test_can_handle_file(self):
"""Test file format detection."""
loader = CSVLoader()
# Create temporary test files
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
csv_file = Path(tmpdir) / "test.csv"
csv_file.touch()
txt_file = Path(tmpdir) / "test.txt"
txt_file.touch()
# Test supported formats
assert loader.can_handle(csv_file) is True
# Test unsupported formats
assert loader.can_handle(txt_file) is False
@pytest.mark.asyncio
async def test_extract_chunks_simple_csv(self):
"""Test extraction from simple CSV file."""
loader = CSVLoader(chunk_size=1000, chunk_overlap=200)
# Create test CSV file
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
csv_file = Path(tmpdir) / "test.csv"
csv_content = """name,age,city
John,25,New York
Jane,30,San Francisco"""
csv_file.write_text(csv_content)
chunks = []
async for chunk in loader.extract_chunks(csv_file):
chunks.append(chunk)
# Verify chunks were created
assert len(chunks) > 0
# Verify chunk structure
first_chunk = chunks[0]
assert isinstance(first_chunk, DocumentChunk)
assert isinstance(first_chunk.content, str)
assert first_chunk.source_file == csv_file
# Verify content contains formatted rows
content = first_chunk.content
assert "name: John" in content
assert "age: 25" in content
assert "city: New York" in content
assert "name: Jane" in content
assert "age: 30" in content
@pytest.mark.asyncio
async def test_extract_chunks_csv_with_headers(self):
"""Test extraction from CSV with header detection."""
loader = CSVLoader(chunk_size=1000, chunk_overlap=200)
# Create test CSV file with headers
csv_content = """product,price,category
Widget,19.99,Electronics
Gadget,29.99,Electronics
Book,12.99,Media"""
chunks = await extract_chunks_from_temp_file(loader, "test.csv", csv_content)
content = chunks[0].content
assert "product: Widget" in content
assert "price: 19.99" in content
assert "category: Electronics" in content
@pytest.mark.asyncio
async def test_extract_chunks_csv_no_headers(self):
"""Test extraction from CSV without headers."""
loader = CSVLoader(chunk_size=1000, chunk_overlap=200)
# Create test CSV file without headers
csv_content = """John,25,New York
Jane,30,San Francisco"""
chunks = await extract_chunks_from_temp_file(
loader,
"test.csv",
csv_content,
has_header=False
)
content = chunks[0].content
# Should use col_1, col_2, col_3 as headers
assert "col_1: John" in content
assert "col_2: 25" in content
assert "col_3: New York" in content
@pytest.mark.asyncio
async def test_extract_chunks_different_delimiters(self):
"""Test extraction with different CSV delimiters."""
loader = CSVLoader(chunk_size=1000, chunk_overlap=200)
# Test semicolon delimiter
csv_content = "name;age;city\nJohn;25;New York\nJane;30;San Francisco"
chunks = await extract_chunks_from_temp_file(
loader,
"test.csv",
csv_content,
delimiter=';'
)
content = chunks[0].content
assert "name: John" in content
assert "age: 25" in content
@pytest.mark.asyncio
async def test_extract_chunks_row_numbers(self):
"""Test extraction with row numbers."""
loader = CSVLoader(chunk_size=1000, chunk_overlap=200)
# Create test CSV file
csv_content = """name,age
John,25
Jane,30"""
chunks = await extract_chunks_from_temp_file(
loader,
"test.csv",
csv_content,
include_row_numbers=True
)
content = chunks[0].content
assert "Row 1:" in content
assert "Row 2:" in content
@pytest.mark.asyncio
async def test_extract_chunks_no_row_numbers(self):
"""Test extraction without row numbers."""
loader = CSVLoader(chunk_size=1000, chunk_overlap=200)
# Create test CSV file
csv_content = """name,age
John,25"""
chunks = await extract_chunks_from_temp_file(
loader,
"test.csv",
csv_content,
include_row_numbers=False
)
content = chunks[0].content
assert "Row:" in content
assert "Row 1:" not in content
@pytest.mark.asyncio
async def test_extract_chunks_large_file_chunking(self):
"""Test that large CSV files are processed correctly."""
loader = CSVLoader(chunk_size=1000, chunk_overlap=200)
# Create CSV with many rows
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
csv_file = Path(tmpdir) / "large.csv"
rows = ["name,value"] + [f"item{i},{i}" for i in range(10)]
csv_content = "\n".join(rows)
csv_file.write_text(csv_content)
# Process the file
chunks = []
async for chunk in loader.extract_chunks(csv_file, max_rows_per_chunk=50):
chunks.append(chunk)
# Should create at least one chunk
assert len(chunks) >= 1
# Verify all content is included
all_content = "".join(chunk.content for chunk in chunks)
assert "item0" in all_content
assert "item9" in all_content
assert "name: item0" in all_content
assert "value: 0" in all_content
@pytest.mark.asyncio
async def test_extract_chunks_empty_file(self):
"""Test handling of empty CSV files."""
loader = CSVLoader()
# Create empty CSV file
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
csv_file = Path(tmpdir) / "empty.csv"
csv_file.write_text("")
# Should not raise error but return no chunks
chunks = []
async for chunk in loader.extract_chunks(csv_file):
chunks.append(chunk)
assert len(chunks) == 0
@pytest.mark.asyncio
async def test_extract_chunks_malformed_csv(self):
"""Test handling of malformed CSV files."""
loader = CSVLoader()
# Create malformed CSV file
# CSV with inconsistent columns - should still work
csv_content = """name,age,city
John,25
Jane,30,San Francisco,Extra"""
chunks = await extract_chunks_from_temp_file(loader, "malformed.csv", csv_content)
# Should handle gracefully
assert len(chunks) > 0
content = chunks[0].content
assert "name: John" in content
assert "name: Jane" in content
@pytest.mark.asyncio
async def test_extract_chunks_encoding_detection(self):
"""Test automatic encoding detection."""
loader = CSVLoader()
# Create CSV file with UTF-8 content
csv_content = """name,city
José,São Paulo
François,Montréal"""
chunks = await extract_chunks_from_temp_file(
loader,
"utf8.csv",
csv_content,
encoding='utf-8'
)
content = chunks[0].content
assert "José" in content
assert "São Paulo" in content
@pytest.mark.asyncio
async def test_extract_chunks_metadata(self):
"""Test that metadata is properly included."""
loader = CSVLoader(chunk_size=1000, chunk_overlap=200)
# Create test CSV file
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
csv_file = Path(tmpdir) / "test.csv"
csv_content = """name,age
John,25
Jane,30"""
csv_file.write_text(csv_content)
chunks = []
async for chunk in loader.extract_chunks(csv_file):
chunks.append(chunk)
first_chunk = chunks[0]
assert first_chunk.metadata['content_type'] == 'csv'
assert first_chunk.metadata['has_header'] is True
assert first_chunk.metadata['column_count'] == 2
assert first_chunk.metadata['row_count'] == 2
assert first_chunk.metadata['headers'] == ['name', 'age']
assert 'file_size' in first_chunk.metadata
assert first_chunk.metadata['loader_type'] == 'CSVLoader'
class TestCSVLoaderRegistry:
"""Test CSV loader registration."""
def test_loader_registration(self):
"""Test that CSV loader is registered."""
from mcp_memory_service.ingestion.registry import get_loader_for_file
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
# Test CSV file
csv_file = Path(tmpdir) / "test.csv"
csv_file.write_text("name,value\nJohn,25")
loader = get_loader_for_file(csv_file)
# Should get CSVLoader
assert loader is not None
assert isinstance(loader, CSVLoader)
class TestCSVDelimiterDetection:
"""Test CSV delimiter detection."""
def test_detect_delimiter_comma(self):
"""Test comma delimiter detection."""
loader = CSVLoader()
content = "name,age,city\nJohn,25,New York\nJane,30,San Francisco"
delimiter = loader._detect_delimiter(content)
assert delimiter == ','
def test_detect_delimiter_semicolon(self):
"""Test semicolon delimiter detection."""
loader = CSVLoader()
content = "name;age;city\nJohn;25;New York\nJane;30;San Francisco"
delimiter = loader._detect_delimiter(content)
assert delimiter == ';'
def test_detect_delimiter_tab(self):
"""Test tab delimiter detection."""
loader = CSVLoader()
content = "name\tage\tcity\nJohn\t25\tNew York\nJane\t30\tSan Francisco"
delimiter = loader._detect_delimiter(content)
assert delimiter == '\t'
def test_detect_delimiter_pipe(self):
"""Test pipe delimiter detection."""
loader = CSVLoader()
content = "name|age|city\nJohn|25|New York\nJane|30|San Francisco"
delimiter = loader._detect_delimiter(content)
assert delimiter == '|'
if __name__ == '__main__':
pytest.main([__file__, '-v'])
```