This is page 16 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── amp-bridge.md
│ │ ├── amp-pr-automator.md
│ │ ├── code-quality-guard.md
│ │ ├── gemini-pr-automator.md
│ │ └── github-release-manager.md
│ ├── settings.local.json.backup
│ └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── performance_issue.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── bridge-tests.yml
│ ├── CACHE_FIX.md
│ ├── claude-code-review.yml
│ ├── claude.yml
│ ├── cleanup-images.yml.disabled
│ ├── dev-setup-validation.yml
│ ├── docker-publish.yml
│ ├── LATEST_FIXES.md
│ ├── main-optimized.yml.disabled
│ ├── main.yml
│ ├── publish-and-test.yml
│ ├── README_OPTIMIZATION.md
│ ├── release-tag.yml.disabled
│ ├── release.yml
│ ├── roadmap-review-reminder.yml
│ ├── SECRET_CONDITIONAL_FIX.md
│ └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│ ├── .gitignore
│ └── reports
│ └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│ ├── deployment
│ │ ├── deploy_fastmcp_fixed.sh
│ │ ├── deploy_http_with_mcp.sh
│ │ └── deploy_mcp_v4.sh
│ ├── deployment-configs
│ │ ├── empty_config.yml
│ │ └── smithery.yaml
│ ├── development
│ │ └── test_fastmcp.py
│ ├── docs-removed-2025-08-23
│ │ ├── authentication.md
│ │ ├── claude_integration.md
│ │ ├── claude-code-compatibility.md
│ │ ├── claude-code-integration.md
│ │ ├── claude-code-quickstart.md
│ │ ├── claude-desktop-setup.md
│ │ ├── complete-setup-guide.md
│ │ ├── database-synchronization.md
│ │ ├── development
│ │ │ ├── autonomous-memory-consolidation.md
│ │ │ ├── CLEANUP_PLAN.md
│ │ │ ├── CLEANUP_README.md
│ │ │ ├── CLEANUP_SUMMARY.md
│ │ │ ├── dream-inspired-memory-consolidation.md
│ │ │ ├── hybrid-slm-memory-consolidation.md
│ │ │ ├── mcp-milestone.md
│ │ │ ├── multi-client-architecture.md
│ │ │ ├── test-results.md
│ │ │ └── TIMESTAMP_FIX_SUMMARY.md
│ │ ├── distributed-sync.md
│ │ ├── invocation_guide.md
│ │ ├── macos-intel.md
│ │ ├── master-guide.md
│ │ ├── mcp-client-configuration.md
│ │ ├── multi-client-server.md
│ │ ├── service-installation.md
│ │ ├── sessions
│ │ │ └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│ │ ├── UBUNTU_SETUP.md
│ │ ├── ubuntu.md
│ │ ├── windows-setup.md
│ │ └── windows.md
│ ├── docs-root-cleanup-2025-08-23
│ │ ├── AWESOME_LIST_SUBMISSION.md
│ │ ├── CLOUDFLARE_IMPLEMENTATION.md
│ │ ├── DOCUMENTATION_ANALYSIS.md
│ │ ├── DOCUMENTATION_CLEANUP_PLAN.md
│ │ ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│ │ ├── LITESTREAM_SETUP_GUIDE.md
│ │ ├── lm_studio_system_prompt.md
│ │ ├── PYTORCH_DOWNLOAD_FIX.md
│ │ └── README-ORIGINAL-BACKUP.md
│ ├── investigations
│ │ └── MACOS_HOOKS_INVESTIGATION.md
│ ├── litestream-configs-v6.3.0
│ │ ├── install_service.sh
│ │ ├── litestream_master_config_fixed.yml
│ │ ├── litestream_master_config.yml
│ │ ├── litestream_replica_config_fixed.yml
│ │ ├── litestream_replica_config.yml
│ │ ├── litestream_replica_simple.yml
│ │ ├── litestream-http.service
│ │ ├── litestream.service
│ │ └── requirements-cloudflare.txt
│ ├── release-notes
│ │ └── release-notes-v7.1.4.md
│ └── setup-development
│ ├── README.md
│ ├── setup_consolidation_mdns.sh
│ ├── STARTUP_SETUP_GUIDE.md
│ └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│ ├── memory-context.md
│ ├── memory-health.md
│ ├── memory-ingest-dir.md
│ ├── memory-ingest.md
│ ├── memory-recall.md
│ ├── memory-search.md
│ ├── memory-store.md
│ ├── README.md
│ └── session-start.md
├── claude-hooks
│ ├── config.json
│ ├── config.template.json
│ ├── CONFIGURATION.md
│ ├── core
│ │ ├── memory-retrieval.js
│ │ ├── mid-conversation.js
│ │ ├── session-end.js
│ │ ├── session-start.js
│ │ └── topic-change.js
│ ├── debug-pattern-test.js
│ ├── install_claude_hooks_windows.ps1
│ ├── install_hooks.py
│ ├── memory-mode-controller.js
│ ├── MIGRATION.md
│ ├── README-NATURAL-TRIGGERS.md
│ ├── README-phase2.md
│ ├── README.md
│ ├── simple-test.js
│ ├── statusline.sh
│ ├── test-adaptive-weights.js
│ ├── test-dual-protocol-hook.js
│ ├── test-mcp-hook.js
│ ├── test-natural-triggers.js
│ ├── test-recency-scoring.js
│ ├── tests
│ │ ├── integration-test.js
│ │ ├── phase2-integration-test.js
│ │ ├── test-code-execution.js
│ │ ├── test-cross-session.json
│ │ ├── test-session-tracking.json
│ │ └── test-threading.json
│ ├── utilities
│ │ ├── adaptive-pattern-detector.js
│ │ ├── context-formatter.js
│ │ ├── context-shift-detector.js
│ │ ├── conversation-analyzer.js
│ │ ├── dynamic-context-updater.js
│ │ ├── git-analyzer.js
│ │ ├── mcp-client.js
│ │ ├── memory-client.js
│ │ ├── memory-scorer.js
│ │ ├── performance-manager.js
│ │ ├── project-detector.js
│ │ ├── session-tracker.js
│ │ ├── tiered-conversation-monitor.js
│ │ └── version-checker.js
│ └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│ ├── amp-cli-bridge.md
│ ├── api
│ │ ├── code-execution-interface.md
│ │ ├── memory-metadata-api.md
│ │ ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_REPORT.md
│ │ └── tag-standardization.md
│ ├── architecture
│ │ ├── search-enhancement-spec.md
│ │ └── search-examples.md
│ ├── architecture.md
│ ├── archive
│ │ └── obsolete-workflows
│ │ ├── load_memory_context.md
│ │ └── README.md
│ ├── assets
│ │ └── images
│ │ ├── dashboard-v3.3.0-preview.png
│ │ ├── memory-awareness-hooks-example.png
│ │ ├── project-infographic.svg
│ │ └── README.md
│ ├── CLAUDE_CODE_QUICK_REFERENCE.md
│ ├── cloudflare-setup.md
│ ├── deployment
│ │ ├── docker.md
│ │ ├── dual-service.md
│ │ ├── production-guide.md
│ │ └── systemd-service.md
│ ├── development
│ │ ├── ai-agent-instructions.md
│ │ ├── code-quality
│ │ │ ├── phase-2a-completion.md
│ │ │ ├── phase-2a-handle-get-prompt.md
│ │ │ ├── phase-2a-index.md
│ │ │ ├── phase-2a-install-package.md
│ │ │ └── phase-2b-session-summary.md
│ │ ├── code-quality-workflow.md
│ │ ├── dashboard-workflow.md
│ │ ├── issue-management.md
│ │ ├── pr-review-guide.md
│ │ ├── refactoring-notes.md
│ │ ├── release-checklist.md
│ │ └── todo-tracker.md
│ ├── docker-optimized-build.md
│ ├── document-ingestion.md
│ ├── DOCUMENTATION_AUDIT.md
│ ├── enhancement-roadmap-issue-14.md
│ ├── examples
│ │ ├── analysis-scripts.js
│ │ ├── maintenance-session-example.md
│ │ ├── memory-distribution-chart.jsx
│ │ └── tag-schema.json
│ ├── first-time-setup.md
│ ├── glama-deployment.md
│ ├── guides
│ │ ├── advanced-command-examples.md
│ │ ├── chromadb-migration.md
│ │ ├── commands-vs-mcp-server.md
│ │ ├── mcp-enhancements.md
│ │ ├── mdns-service-discovery.md
│ │ ├── memory-consolidation-guide.md
│ │ ├── migration.md
│ │ ├── scripts.md
│ │ └── STORAGE_BACKENDS.md
│ ├── HOOK_IMPROVEMENTS.md
│ ├── hooks
│ │ └── phase2-code-execution-migration.md
│ ├── http-server-management.md
│ ├── ide-compatability.md
│ ├── IMAGE_RETENTION_POLICY.md
│ ├── images
│ │ └── dashboard-placeholder.md
│ ├── implementation
│ │ ├── health_checks.md
│ │ └── performance.md
│ ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│ ├── integration
│ │ ├── homebrew.md
│ │ └── multi-client.md
│ ├── integrations
│ │ ├── gemini.md
│ │ ├── groq-bridge.md
│ │ ├── groq-integration-summary.md
│ │ └── groq-model-comparison.md
│ ├── integrations.md
│ ├── legacy
│ │ └── dual-protocol-hooks.md
│ ├── LM_STUDIO_COMPATIBILITY.md
│ ├── maintenance
│ │ └── memory-maintenance.md
│ ├── mastery
│ │ ├── api-reference.md
│ │ ├── architecture-overview.md
│ │ ├── configuration-guide.md
│ │ ├── local-setup-and-run.md
│ │ ├── testing-guide.md
│ │ └── troubleshooting.md
│ ├── migration
│ │ └── code-execution-api-quick-start.md
│ ├── natural-memory-triggers
│ │ ├── cli-reference.md
│ │ ├── installation-guide.md
│ │ └── performance-optimization.md
│ ├── oauth-setup.md
│ ├── pr-graphql-integration.md
│ ├── quick-setup-cloudflare-dual-environment.md
│ ├── README.md
│ ├── remote-configuration-wiki-section.md
│ ├── research
│ │ ├── code-execution-interface-implementation.md
│ │ └── code-execution-interface-summary.md
│ ├── ROADMAP.md
│ ├── sqlite-vec-backend.md
│ ├── statistics
│ │ ├── charts
│ │ │ ├── activity_patterns.png
│ │ │ ├── contributors.png
│ │ │ ├── growth_trajectory.png
│ │ │ ├── monthly_activity.png
│ │ │ └── october_sprint.png
│ │ ├── data
│ │ │ ├── activity_by_day.csv
│ │ │ ├── activity_by_hour.csv
│ │ │ ├── contributors.csv
│ │ │ └── monthly_activity.csv
│ │ ├── generate_charts.py
│ │ └── REPOSITORY_STATISTICS.md
│ ├── technical
│ │ ├── development.md
│ │ ├── memory-migration.md
│ │ ├── migration-log.md
│ │ ├── sqlite-vec-embedding-fixes.md
│ │ └── tag-storage.md
│ ├── testing
│ │ └── regression-tests.md
│ ├── testing-cloudflare-backend.md
│ ├── troubleshooting
│ │ ├── cloudflare-api-token-setup.md
│ │ ├── cloudflare-authentication.md
│ │ ├── general.md
│ │ ├── hooks-quick-reference.md
│ │ ├── pr162-schema-caching-issue.md
│ │ ├── session-end-hooks.md
│ │ └── sync-issues.md
│ └── tutorials
│ ├── advanced-techniques.md
│ ├── data-analysis.md
│ └── demo-session-walkthrough.md
├── examples
│ ├── claude_desktop_config_template.json
│ ├── claude_desktop_config_windows.json
│ ├── claude-desktop-http-config.json
│ ├── config
│ │ └── claude_desktop_config.json
│ ├── http-mcp-bridge.js
│ ├── memory_export_template.json
│ ├── README.md
│ ├── setup
│ │ └── setup_multi_client_complete.py
│ └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│ ├── .claude
│ │ └── settings.local.json
│ ├── archive
│ │ └── check_missing_timestamps.py
│ ├── backup
│ │ ├── backup_memories.py
│ │ ├── backup_sqlite_vec.sh
│ │ ├── export_distributable_memories.sh
│ │ └── restore_memories.py
│ ├── benchmarks
│ │ ├── benchmark_code_execution_api.py
│ │ ├── benchmark_hybrid_sync.py
│ │ └── benchmark_server_caching.py
│ ├── database
│ │ ├── analyze_sqlite_vec_db.py
│ │ ├── check_sqlite_vec_status.py
│ │ ├── db_health_check.py
│ │ └── simple_timestamp_check.py
│ ├── development
│ │ ├── debug_server_initialization.py
│ │ ├── find_orphaned_files.py
│ │ ├── fix_mdns.sh
│ │ ├── fix_sitecustomize.py
│ │ ├── remote_ingest.sh
│ │ ├── setup-git-merge-drivers.sh
│ │ ├── uv-lock-merge.sh
│ │ └── verify_hybrid_sync.py
│ ├── hooks
│ │ └── pre-commit
│ ├── installation
│ │ ├── install_linux_service.py
│ │ ├── install_macos_service.py
│ │ ├── install_uv.py
│ │ ├── install_windows_service.py
│ │ ├── install.py
│ │ ├── setup_backup_cron.sh
│ │ ├── setup_claude_mcp.sh
│ │ └── setup_cloudflare_resources.py
│ ├── linux
│ │ ├── service_status.sh
│ │ ├── start_service.sh
│ │ ├── stop_service.sh
│ │ ├── uninstall_service.sh
│ │ └── view_logs.sh
│ ├── maintenance
│ │ ├── assign_memory_types.py
│ │ ├── check_memory_types.py
│ │ ├── cleanup_corrupted_encoding.py
│ │ ├── cleanup_memories.py
│ │ ├── cleanup_organize.py
│ │ ├── consolidate_memory_types.py
│ │ ├── consolidation_mappings.json
│ │ ├── delete_orphaned_vectors_fixed.py
│ │ ├── fast_cleanup_duplicates_with_tracking.sh
│ │ ├── find_all_duplicates.py
│ │ ├── find_cloudflare_duplicates.py
│ │ ├── find_duplicates.py
│ │ ├── memory-types.md
│ │ ├── README.md
│ │ ├── recover_timestamps_from_cloudflare.py
│ │ ├── regenerate_embeddings.py
│ │ ├── repair_malformed_tags.py
│ │ ├── repair_memories.py
│ │ ├── repair_sqlite_vec_embeddings.py
│ │ ├── repair_zero_embeddings.py
│ │ ├── restore_from_json_export.py
│ │ └── scan_todos.sh
│ ├── migration
│ │ ├── cleanup_mcp_timestamps.py
│ │ ├── legacy
│ │ │ └── migrate_chroma_to_sqlite.py
│ │ ├── mcp-migration.py
│ │ ├── migrate_sqlite_vec_embeddings.py
│ │ ├── migrate_storage.py
│ │ ├── migrate_tags.py
│ │ ├── migrate_timestamps.py
│ │ ├── migrate_to_cloudflare.py
│ │ ├── migrate_to_sqlite_vec.py
│ │ ├── migrate_v5_enhanced.py
│ │ ├── TIMESTAMP_CLEANUP_README.md
│ │ └── verify_mcp_timestamps.py
│ ├── pr
│ │ ├── amp_collect_results.sh
│ │ ├── amp_detect_breaking_changes.sh
│ │ ├── amp_generate_tests.sh
│ │ ├── amp_pr_review.sh
│ │ ├── amp_quality_gate.sh
│ │ ├── amp_suggest_fixes.sh
│ │ ├── auto_review.sh
│ │ ├── detect_breaking_changes.sh
│ │ ├── generate_tests.sh
│ │ ├── lib
│ │ │ └── graphql_helpers.sh
│ │ ├── quality_gate.sh
│ │ ├── resolve_threads.sh
│ │ ├── run_pyscn_analysis.sh
│ │ ├── run_quality_checks.sh
│ │ ├── thread_status.sh
│ │ └── watch_reviews.sh
│ ├── quality
│ │ ├── fix_dead_code_install.sh
│ │ ├── phase1_dead_code_analysis.md
│ │ ├── phase2_complexity_analysis.md
│ │ ├── README_PHASE1.md
│ │ ├── README_PHASE2.md
│ │ ├── track_pyscn_metrics.sh
│ │ └── weekly_quality_review.sh
│ ├── README.md
│ ├── run
│ │ ├── run_mcp_memory.sh
│ │ ├── run-with-uv.sh
│ │ └── start_sqlite_vec.sh
│ ├── run_memory_server.py
│ ├── server
│ │ ├── check_http_server.py
│ │ ├── check_server_health.py
│ │ ├── memory_offline.py
│ │ ├── preload_models.py
│ │ ├── run_http_server.py
│ │ ├── run_memory_server.py
│ │ ├── start_http_server.bat
│ │ └── start_http_server.sh
│ ├── service
│ │ ├── deploy_dual_services.sh
│ │ ├── install_http_service.sh
│ │ ├── mcp-memory-http.service
│ │ ├── mcp-memory.service
│ │ ├── memory_service_manager.sh
│ │ ├── service_control.sh
│ │ ├── service_utils.py
│ │ └── update_service.sh
│ ├── sync
│ │ ├── check_drift.py
│ │ ├── claude_sync_commands.py
│ │ ├── export_memories.py
│ │ ├── import_memories.py
│ │ ├── litestream
│ │ │ ├── apply_local_changes.sh
│ │ │ ├── enhanced_memory_store.sh
│ │ │ ├── init_staging_db.sh
│ │ │ ├── io.litestream.replication.plist
│ │ │ ├── manual_sync.sh
│ │ │ ├── memory_sync.sh
│ │ │ ├── pull_remote_changes.sh
│ │ │ ├── push_to_remote.sh
│ │ │ ├── README.md
│ │ │ ├── resolve_conflicts.sh
│ │ │ ├── setup_local_litestream.sh
│ │ │ ├── setup_remote_litestream.sh
│ │ │ ├── staging_db_init.sql
│ │ │ ├── stash_local_changes.sh
│ │ │ ├── sync_from_remote_noconfig.sh
│ │ │ └── sync_from_remote.sh
│ │ ├── README.md
│ │ ├── safe_cloudflare_update.sh
│ │ ├── sync_memory_backends.py
│ │ └── sync_now.py
│ ├── testing
│ │ ├── run_complete_test.py
│ │ ├── run_memory_test.sh
│ │ ├── simple_test.py
│ │ ├── test_cleanup_logic.py
│ │ ├── test_cloudflare_backend.py
│ │ ├── test_docker_functionality.py
│ │ ├── test_installation.py
│ │ ├── test_mdns.py
│ │ ├── test_memory_api.py
│ │ ├── test_memory_simple.py
│ │ ├── test_migration.py
│ │ ├── test_search_api.py
│ │ ├── test_sqlite_vec_embeddings.py
│ │ ├── test_sse_events.py
│ │ ├── test-connection.py
│ │ └── test-hook.js
│ ├── utils
│ │ ├── claude_commands_utils.py
│ │ ├── generate_personalized_claude_md.sh
│ │ ├── groq
│ │ ├── groq_agent_bridge.py
│ │ ├── list-collections.py
│ │ ├── memory_wrapper_uv.py
│ │ ├── query_memories.py
│ │ ├── smithery_wrapper.py
│ │ ├── test_groq_bridge.sh
│ │ └── uv_wrapper.py
│ └── validation
│ ├── check_dev_setup.py
│ ├── check_documentation_links.py
│ ├── diagnose_backend_config.py
│ ├── validate_configuration_complete.py
│ ├── validate_memories.py
│ ├── validate_migration.py
│ ├── validate_timestamp_integrity.py
│ ├── verify_environment.py
│ ├── verify_pytorch_windows.py
│ └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│ └── mcp_memory_service
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── operations.py
│ │ ├── sync_wrapper.py
│ │ └── types.py
│ ├── backup
│ │ ├── __init__.py
│ │ └── scheduler.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── ingestion.py
│ │ ├── main.py
│ │ └── utils.py
│ ├── config.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── associations.py
│ │ ├── base.py
│ │ ├── clustering.py
│ │ ├── compression.py
│ │ ├── consolidator.py
│ │ ├── decay.py
│ │ ├── forgetting.py
│ │ ├── health.py
│ │ └── scheduler.py
│ ├── dependency_check.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mdns_service.py
│ ├── embeddings
│ │ ├── __init__.py
│ │ └── onnx_embeddings.py
│ ├── ingestion
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chunker.py
│ │ ├── csv_loader.py
│ │ ├── json_loader.py
│ │ ├── pdf_loader.py
│ │ ├── registry.py
│ │ ├── semtools_loader.py
│ │ └── text_loader.py
│ ├── lm_studio_compat.py
│ ├── mcp_server.py
│ ├── models
│ │ ├── __init__.py
│ │ └── memory.py
│ ├── server.py
│ ├── services
│ │ ├── __init__.py
│ │ └── memory_service.py
│ ├── storage
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloudflare.py
│ │ ├── factory.py
│ │ ├── http_client.py
│ │ ├── hybrid.py
│ │ └── sqlite_vec.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── exporter.py
│ │ ├── importer.py
│ │ └── litestream_config.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── content_splitter.py
│ │ ├── db_utils.py
│ │ ├── debug.py
│ │ ├── document_processing.py
│ │ ├── gpu_detection.py
│ │ ├── hashing.py
│ │ ├── http_server_manager.py
│ │ ├── port_detection.py
│ │ ├── system_detection.py
│ │ └── time_parser.py
│ └── web
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── analytics.py
│ │ ├── backup.py
│ │ ├── consolidation.py
│ │ ├── documents.py
│ │ ├── events.py
│ │ ├── health.py
│ │ ├── manage.py
│ │ ├── mcp.py
│ │ ├── memories.py
│ │ ├── search.py
│ │ └── sync.py
│ ├── app.py
│ ├── dependencies.py
│ ├── oauth
│ │ ├── __init__.py
│ │ ├── authorization.py
│ │ ├── discovery.py
│ │ ├── middleware.py
│ │ ├── models.py
│ │ ├── registration.py
│ │ └── storage.py
│ ├── sse.py
│ └── static
│ ├── app.js
│ ├── index.html
│ ├── README.md
│ ├── sse_test.html
│ └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── test_compact_types.py
│ │ └── test_operations.py
│ ├── bridge
│ │ ├── mock_responses.js
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ └── test_http_mcp_bridge.js
│ ├── conftest.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_associations.py
│ │ ├── test_clustering.py
│ │ ├── test_compression.py
│ │ ├── test_consolidator.py
│ │ ├── test_decay.py
│ │ └── test_forgetting.py
│ ├── contracts
│ │ └── api-specification.yml
│ ├── integration
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── test_api_key_fallback.py
│ │ ├── test_api_memories_chronological.py
│ │ ├── test_api_tag_time_search.py
│ │ ├── test_api_with_memory_service.py
│ │ ├── test_bridge_integration.js
│ │ ├── test_cli_interfaces.py
│ │ ├── test_cloudflare_connection.py
│ │ ├── test_concurrent_clients.py
│ │ ├── test_data_serialization_consistency.py
│ │ ├── test_http_server_startup.py
│ │ ├── test_mcp_memory.py
│ │ ├── test_mdns_integration.py
│ │ ├── test_oauth_basic_auth.py
│ │ ├── test_oauth_flow.py
│ │ ├── test_server_handlers.py
│ │ └── test_store_memory.py
│ ├── performance
│ │ ├── test_background_sync.py
│ │ └── test_hybrid_live.py
│ ├── README.md
│ ├── smithery
│ │ └── test_smithery.py
│ ├── sqlite
│ │ └── simple_sqlite_vec_test.py
│ ├── test_client.py
│ ├── test_content_splitting.py
│ ├── test_database.py
│ ├── test_hybrid_cloudflare_limits.py
│ ├── test_hybrid_storage.py
│ ├── test_memory_ops.py
│ ├── test_semantic_search.py
│ ├── test_sqlite_vec_storage.py
│ ├── test_time_parser.py
│ ├── test_timestamp_preservation.py
│ ├── timestamp
│ │ ├── test_hook_vs_manual_storage.py
│ │ ├── test_issue99_final_validation.py
│ │ ├── test_search_retrieval_inconsistency.py
│ │ ├── test_timestamp_issue.py
│ │ └── test_timestamp_simple.py
│ └── unit
│ ├── conftest.py
│ ├── test_cloudflare_storage.py
│ ├── test_csv_loader.py
│ ├── test_fastapi_dependencies.py
│ ├── test_import.py
│ ├── test_json_loader.py
│ ├── test_mdns_simple.py
│ ├── test_mdns.py
│ ├── test_memory_service.py
│ ├── test_memory.py
│ ├── test_semtools_loader.py
│ ├── test_storage_interface_compatibility.py
│ └── test_tag_time_filtering.py
├── tools
│ ├── docker
│ │ ├── DEPRECATED.md
│ │ ├── docker-compose.http.yml
│ │ ├── docker-compose.pythonpath.yml
│ │ ├── docker-compose.standalone.yml
│ │ ├── docker-compose.uv.yml
│ │ ├── docker-compose.yml
│ │ ├── docker-entrypoint-persistent.sh
│ │ ├── docker-entrypoint-unified.sh
│ │ ├── docker-entrypoint.sh
│ │ ├── Dockerfile
│ │ ├── Dockerfile.glama
│ │ ├── Dockerfile.slim
│ │ ├── README.md
│ │ └── test-docker-modes.sh
│ └── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/docs/architecture/search-enhancement-spec.md:
--------------------------------------------------------------------------------
```markdown
# Advanced Hybrid Search Enhancement Specification
**Version**: 1.0
**Date**: 2025-09-20
**Status**: Design Phase
**Priority**: High Enhancement
## Executive Summary
This document specifies an enterprise-grade hybrid search enhancement that combines semantic vector search with traditional keyword search, content consolidation, and intelligent relationship mapping. The enhancement transforms the MCP Memory Service from a basic search tool into an intelligent knowledge consolidation system.
## Current State Analysis
### Existing Search Capabilities
- **Semantic Search**: Vector-based similarity using sentence transformers
- **Tag Search**: Filter by tags with AND/OR operations
- **Time Search**: Natural language time-based filtering
- **Similar Search**: Find memories similar to a known content hash
### Current Limitations
1. **Single Search Mode**: Only one search method per query
2. **No Content Relationships**: Results are isolated, no contextual connections
3. **Limited Query Intelligence**: No query expansion or intent detection
4. **Basic Ranking**: Simple similarity scores without multi-signal ranking
5. **No Consolidation**: Cannot automatically group related content
## Enhancement Objectives
### Primary Goals
1. **Hybrid Search**: Combine semantic and keyword search for optimal recall and precision
2. **Content Consolidation**: Automatically group related memories into coherent topics
3. **Intelligent Ranking**: Multi-signal ranking using semantic, keyword, recency, and metadata signals
4. **Relationship Mapping**: Build connections between memories (solutions, context, timeline)
5. **Query Enhancement**: Intelligent query expansion and filter suggestion
### Enterprise Features
- **Project Consolidation**: Automatically gather all content about specific projects
- **Timeline Intelligence**: Build chronological narratives from memory fragments
- **Solution Mapping**: Connect problems with their solutions automatically
- **Context Enrichment**: Include supporting documentation and background information
## Technical Architecture
### 1. Service Layer Enhancement
**New MemoryService Methods:**
```python
class MemoryService:
# Core hybrid search
async def enhanced_search(
self, query: str, search_mode: str = "hybrid",
consolidate_related: bool = True, **kwargs
) -> Dict[str, Any]:
# Content relationship building
async def build_content_relationships(
self, memories: List[Memory]
) -> Dict[str, Any]:
# Query intelligence
async def intelligent_query_expansion(
self, query: str, user_context: Optional[Dict] = None
) -> Dict[str, Any]:
# Project consolidation
async def consolidate_project_content(
self, project_identifier: str, depth: str = "deep"
) -> Dict[str, Any]:
```
### 2. Storage Layer Enhancement
**Required Storage Backend Updates:**
```python
# Add to MemoryStorage base class
async def keyword_search(
self, query: str, n_results: int = 10
) -> List[MemoryQueryResult]:
async def combined_search(
self, semantic_query: str, keyword_query: str,
weights: Dict[str, float]
) -> List[MemoryQueryResult]:
async def get_related_memories(
self, memory: Memory, relationship_types: List[str]
) -> Dict[str, List[Memory]]:
```
**Implementation by Backend:**
- **SQLite-Vec**: FTS5 full-text search + BM25 scoring
- **ChromaDB**: Native hybrid search capabilities
- **Cloudflare**: Vectorize + D1 full-text search combination
### 3. API Enhancement
**New REST Endpoints:**
```http
POST /api/search/advanced # Main hybrid search endpoint
POST /api/search/consolidate # Content consolidation
GET /api/projects/{id}/overview # Project content consolidation
POST /api/search/intelligence # Query analysis and enhancement
```
**Enhanced MCP Tools:**
```python
{
"name": "advanced_memory_search",
"description": "Enterprise hybrid search with consolidation",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query for finding relevant memories"
},
"search_mode": {
"type": "string",
"enum": ["hybrid", "semantic", "keyword", "auto"],
"description": "Search mode to use",
"default": "auto"
},
"consolidate_related": {
"type": "boolean",
"description": "Whether to consolidate related memories in results",
"default": false
},
"filters": {
"type": "object",
"description": "Additional search filters",
"properties": {
"tags": {
"type": "array",
"items": {"type": "string"},
"description": "Filter by specific tags"
},
"memory_type": {
"type": "string",
"description": "Filter by memory type"
},
"date_range": {
"type": "object",
"properties": {
"start": {"type": "string", "format": "date-time"},
"end": {"type": "string", "format": "date-time"}
}
}
},
"additionalProperties": false
}
},
"required": ["query"],
"additionalProperties": false
}
}
```
## Implementation Plan
### Phase 1: Core Hybrid Search (4-6 weeks)
**Week 1-2: Storage Layer**
- [ ] Implement `keyword_search()` for all storage backends
- [ ] Add BM25 scoring for SQLite-Vec using FTS5
- [ ] Create `combined_search()` method with score fusion
- [ ] Add comprehensive testing for keyword search
**Week 3-4: Service Layer**
- [ ] Implement `enhanced_search()` method
- [ ] Add score fusion algorithms (RRF, weighted combination)
- [ ] Create query analysis and expansion logic
- [ ] Add search mode auto-detection
**Week 5-6: API Integration**
- [ ] Create `/api/search/advanced` endpoint
- [ ] Update MCP tools with hybrid search capability
- [ ] Add comprehensive API testing
- [ ] Update documentation and examples
### Phase 2: Content Relationships (3-4 weeks)
**Week 1-2: Relationship Detection**
- [ ] Implement semantic clustering algorithms
- [ ] Add timeline relationship detection
- [ ] Create solution-problem mapping logic
- [ ] Build relationship scoring system
**Week 3-4: Consolidation Features**
- [ ] Implement `build_content_relationships()`
- [ ] Add automatic content grouping
- [ ] Create consolidation summary generation
- [ ] Add relationship visualization data
### Phase 3: Intelligence Features (3-4 weeks)
**Week 1-2: Query Intelligence**
- [ ] Implement query expansion using embeddings
- [ ] Add entity extraction and intent classification
- [ ] Create automatic filter suggestion
- [ ] Build user context learning
**Week 3-4: Project Consolidation**
- [ ] Implement `consolidate_project_content()`
- [ ] Add multi-pass search strategies
- [ ] Create project timeline generation
- [ ] Build project overview dashboards
### Phase 4: Enterprise Features (2-3 weeks)
**Week 1-2: Advanced Ranking**
- [ ] Implement multi-signal ranking
- [ ] Add recency and popularity signals
- [ ] Create personalization features
- [ ] Add A/B testing framework
**Week 3: Production Optimization**
- [ ] Performance optimization and caching
- [ ] Scalability testing
- [ ] Production deployment preparation
- [ ] User training and documentation
## API Specification
### Advanced Search Request
```json
{
"query": "project Alpha deployment issues",
"search_mode": "hybrid",
"n_results": 15,
"consolidate_related": true,
"include_context": true,
"filters": {
"memory_types": ["task", "decision", "note"],
"tags": ["project-alpha"],
"time_range": "last month",
"metadata_filters": {
"priority": ["high", "critical"],
"status": ["in-progress", "completed"]
}
},
"ranking_options": {
"semantic_weight": 0.6,
"keyword_weight": 0.3,
"recency_weight": 0.1,
"boost_exact_matches": true
}
}
```
### Advanced Search Response
```json
{
"results": [
{
"primary_memory": {
"content": "Memory content...",
"content_hash": "abc123",
"tags": ["project-alpha", "deployment"],
"memory_type": "task",
"metadata": {"priority": "critical"}
},
"similarity_score": 0.95,
"relevance_reason": "Exact keyword match + semantic similarity",
"consolidation": {
"related_memories": [],
"topic_cluster": "project-alpha-deployment",
"consolidation_summary": "Brief summary..."
}
}
],
"consolidated_topics": [],
"search_intelligence": {
"query_analysis": {},
"recommendations": []
},
"performance_metrics": {
"total_processing_time_ms": 45,
"semantic_search_time_ms": 25,
"keyword_search_time_ms": 8,
"consolidation_time_ms": 12
}
}
```
## Technical Requirements
### Performance Targets
- **Search Response Time**: < 100ms for hybrid search
- **Consolidation Time**: < 200ms for related content grouping
- **Memory Usage**: < 500MB additional RAM for caching
- **Scalability**: Support 100K+ memories with sub-second response
### Storage Requirements
- **FTS Index Storage**: +20-30% of original database size
- **Relationship Cache**: +10-15% for relationship mappings
- **Query Cache**: 100MB for frequent query caching
### Compatibility
- **Backward Compatibility**: All existing APIs remain functional
- **Storage Backend**: All three backends (SQLite-Vec, ChromaDB, Cloudflare)
- **Client Support**: Web dashboard, MCP tools, Claude Code hooks
## Quality Assurance
### Testing Strategy
1. **Unit Tests**: All new service methods with comprehensive coverage
2. **Integration Tests**: End-to-end search workflows
3. **Performance Tests**: Load testing with large datasets
4. **User Acceptance Tests**: Real-world search scenarios
### Success Metrics
- **Search Relevance**: 90%+ user satisfaction with search results
- **Response Time**: 95th percentile < 200ms
- **Consolidation Accuracy**: 85%+ correctly grouped related content
- **User Adoption**: 80%+ of users prefer hybrid over basic search
## Deployment Strategy
### Rollout Plan
1. **Alpha Testing**: Internal testing with development team (1 week)
2. **Beta Release**: Limited user group with feedback collection (2 weeks)
3. **Gradual Rollout**: 25% → 50% → 100% user adoption
4. **Feature Flags**: Toggle hybrid search on/off per user/environment
### Risk Mitigation
- **Performance Monitoring**: Real-time metrics and alerting
- **Fallback Mechanism**: Automatic fallback to basic search on errors
- **Resource Limits**: Memory and CPU usage monitoring
- **Data Integrity**: Comprehensive backup and recovery procedures
## Future Enhancements
### Phase 5: Machine Learning Integration
- **Learning to Rank**: Personalized ranking based on user behavior
- **Query Understanding**: NLP models for better intent detection
- **Recommendation Engine**: Suggest related searches and content
### Phase 6: Advanced Analytics
- **Search Analytics**: Detailed search performance dashboards
- **Content Analytics**: Memory usage patterns and insights
- **User Behavior**: Search pattern analysis and optimization
## Dependencies
### External Libraries
- **Full-Text Search**: `sqlite-fts5`, `elasticsearch-py` (optional)
- **NLP Processing**: `spacy`, `nltk` (for query enhancement)
- **Ranking Algorithms**: `scikit-learn` (for ML-based ranking)
- **Caching**: `redis` (optional, for distributed caching)
### Internal Dependencies
- **Storage Layer**: Requires enhancement to all storage backends
- **Service Layer**: Built on existing MemoryService foundation
- **Web Layer**: Requires new API endpoints and dashboard updates
## Conclusion
This Advanced Hybrid Search Enhancement will transform the MCP Memory Service into an enterprise-grade knowledge management system. The phased approach ensures minimal disruption while delivering significant value at each milestone.
The combination of hybrid search, content consolidation, and intelligent relationship mapping addresses the key limitations of the current system and provides the foundation for future AI-powered enhancements.
**Next Steps:**
1. Review and approve this specification
2. Create detailed technical design documents for Phase 1
3. Set up development environment and begin implementation
4. Establish testing infrastructure and success metrics tracking
---
**Document Prepared By**: Claude Code
**Review Required**: Development Team, Product Owner
**Approval Required**: Technical Lead, Project Stakeholder
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/models/memory.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Memory-related data models."""
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from datetime import datetime
import time
import logging
import calendar
# Try to import dateutil, but fall back to standard datetime parsing if not available
try:
from dateutil import parser as dateutil_parser
DATEUTIL_AVAILABLE = True
except ImportError:
DATEUTIL_AVAILABLE = False
logger = logging.getLogger(__name__)
@dataclass
class Memory:
"""Represents a single memory entry."""
content: str
content_hash: str
tags: List[str] = field(default_factory=list)
memory_type: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
embedding: Optional[List[float]] = None
# Timestamp fields with flexible input formats
# Store as float and ISO8601 string for maximum compatibility
created_at: Optional[float] = None
created_at_iso: Optional[str] = None
updated_at: Optional[float] = None
updated_at_iso: Optional[str] = None
# Legacy timestamp field (maintain for backward compatibility)
timestamp: datetime = field(default_factory=datetime.now)
def __post_init__(self):
"""Initialize timestamps after object creation."""
# Synchronize the timestamps
self._sync_timestamps(
created_at=self.created_at,
created_at_iso=self.created_at_iso,
updated_at=self.updated_at,
updated_at_iso=self.updated_at_iso
)
def _sync_timestamps(self, created_at=None, created_at_iso=None, updated_at=None, updated_at_iso=None):
"""
Synchronize timestamp fields to ensure all formats are available.
Handles any combination of inputs and fills in missing values.
Always uses UTC time.
"""
now = time.time()
def iso_to_float(iso_str: str) -> float:
"""Convert ISO string to float timestamp, ensuring UTC interpretation."""
if DATEUTIL_AVAILABLE:
# dateutil properly handles timezone info
parsed_dt = dateutil_parser.isoparse(iso_str)
return parsed_dt.timestamp()
else:
# Fallback to basic ISO parsing with explicit UTC handling
try:
# Handle common ISO formats
if iso_str.endswith('Z'):
# UTC timezone indicated by 'Z'
dt = datetime.fromisoformat(iso_str[:-1])
# Treat as UTC and convert to timestamp
import calendar
return calendar.timegm(dt.timetuple()) + dt.microsecond / 1000000.0
elif '+' in iso_str or iso_str.count('-') > 2:
# Has timezone info, use fromisoformat in Python 3.7+
dt = datetime.fromisoformat(iso_str)
return dt.timestamp()
else:
# No timezone info, assume UTC
dt = datetime.fromisoformat(iso_str)
return calendar.timegm(dt.timetuple()) + dt.microsecond / 1000000.0
except (ValueError, TypeError) as e:
# Last resort: try strptime and treat as UTC
try:
dt = datetime.strptime(iso_str[:19], "%Y-%m-%dT%H:%M:%S")
return calendar.timegm(dt.timetuple())
except (ValueError, TypeError):
# If all parsing fails, return current timestamp
logging.warning(f"Failed to parse timestamp '{iso_str}', using current time")
return datetime.now().timestamp()
def float_to_iso(ts: float) -> str:
"""Convert float timestamp to ISO string."""
return datetime.utcfromtimestamp(ts).isoformat() + "Z"
# Handle created_at
if created_at is not None and created_at_iso is not None:
# Validate that they represent the same time (with more generous tolerance for timezone issues)
try:
iso_ts = iso_to_float(created_at_iso)
time_diff = abs(created_at - iso_ts)
# Allow up to 1 second difference for rounding, but reject obvious timezone mismatches
if time_diff > 1.0 and time_diff < 86400: # Between 1 second and 24 hours suggests timezone issue
logger.info(f"Timezone mismatch detected (diff: {time_diff}s), preferring float timestamp")
# Use the float timestamp as authoritative and regenerate ISO
self.created_at = created_at
self.created_at_iso = float_to_iso(created_at)
elif time_diff >= 86400: # More than 24 hours difference suggests data corruption
logger.warning(f"Large timestamp difference detected ({time_diff}s), using current time")
self.created_at = now
self.created_at_iso = float_to_iso(now)
else:
# Small difference, keep both values
self.created_at = created_at
self.created_at_iso = created_at_iso
except Exception as e:
logger.warning(f"Error parsing timestamps: {e}, using float timestamp")
self.created_at = created_at if created_at is not None else now
self.created_at_iso = float_to_iso(self.created_at)
elif created_at is not None:
self.created_at = created_at
self.created_at_iso = float_to_iso(created_at)
elif created_at_iso:
try:
self.created_at = iso_to_float(created_at_iso)
self.created_at_iso = created_at_iso
except ValueError as e:
logger.warning(f"Invalid created_at_iso: {e}")
self.created_at = now
self.created_at_iso = float_to_iso(now)
else:
self.created_at = now
self.created_at_iso = float_to_iso(now)
# Handle updated_at
if updated_at is not None and updated_at_iso is not None:
# Validate that they represent the same time (with more generous tolerance for timezone issues)
try:
iso_ts = iso_to_float(updated_at_iso)
time_diff = abs(updated_at - iso_ts)
# Allow up to 1 second difference for rounding, but reject obvious timezone mismatches
if time_diff > 1.0 and time_diff < 86400: # Between 1 second and 24 hours suggests timezone issue
logger.info(f"Timezone mismatch detected in updated_at (diff: {time_diff}s), preferring float timestamp")
# Use the float timestamp as authoritative and regenerate ISO
self.updated_at = updated_at
self.updated_at_iso = float_to_iso(updated_at)
elif time_diff >= 86400: # More than 24 hours difference suggests data corruption
logger.warning(f"Large timestamp difference detected in updated_at ({time_diff}s), using current time")
self.updated_at = now
self.updated_at_iso = float_to_iso(now)
else:
# Small difference, keep both values
self.updated_at = updated_at
self.updated_at_iso = updated_at_iso
except Exception as e:
logger.warning(f"Error parsing updated timestamps: {e}, using float timestamp")
self.updated_at = updated_at if updated_at is not None else now
self.updated_at_iso = float_to_iso(self.updated_at)
elif updated_at is not None:
self.updated_at = updated_at
self.updated_at_iso = float_to_iso(updated_at)
elif updated_at_iso:
try:
self.updated_at = iso_to_float(updated_at_iso)
self.updated_at_iso = updated_at_iso
except ValueError as e:
logger.warning(f"Invalid updated_at_iso: {e}")
self.updated_at = now
self.updated_at_iso = float_to_iso(now)
else:
self.updated_at = now
self.updated_at_iso = float_to_iso(now)
# Update legacy timestamp field for backward compatibility
self.timestamp = datetime.utcfromtimestamp(self.created_at)
def touch(self):
"""Update the updated_at timestamps to the current time."""
now = time.time()
self.updated_at = now
self.updated_at_iso = datetime.utcfromtimestamp(now).isoformat() + "Z"
def to_dict(self) -> Dict[str, Any]:
"""Convert memory to dictionary format for storage."""
# Ensure timestamps are synchronized
self._sync_timestamps(
created_at=self.created_at,
created_at_iso=self.created_at_iso,
updated_at=self.updated_at,
updated_at_iso=self.updated_at_iso
)
return {
"content": self.content,
"content_hash": self.content_hash,
"tags_str": ",".join(self.tags) if self.tags else "",
"type": self.memory_type,
# Store timestamps in all formats for better compatibility
"timestamp": float(self.created_at), # Changed from int() to preserve precision
"timestamp_float": self.created_at, # Legacy timestamp (float)
"timestamp_str": self.created_at_iso, # Legacy timestamp (ISO)
# New timestamp fields
"created_at": self.created_at,
"created_at_iso": self.created_at_iso,
"updated_at": self.updated_at,
"updated_at_iso": self.updated_at_iso,
**self.metadata
}
@classmethod
def from_dict(cls, data: Dict[str, Any], embedding: Optional[List[float]] = None) -> 'Memory':
"""Create a Memory instance from dictionary data."""
tags = data.get("tags_str", "").split(",") if data.get("tags_str") else []
# Extract timestamps with different priorities
# First check new timestamp fields (created_at/updated_at)
created_at = data.get("created_at")
created_at_iso = data.get("created_at_iso")
updated_at = data.get("updated_at")
updated_at_iso = data.get("updated_at_iso")
# If new fields are missing, try to get from legacy timestamp fields
if created_at is None and created_at_iso is None:
if "timestamp_float" in data:
created_at = float(data["timestamp_float"])
elif "timestamp" in data:
created_at = float(data["timestamp"])
if "timestamp_str" in data and created_at_iso is None:
created_at_iso = data["timestamp_str"]
# Create metadata dictionary without special fields
metadata = {
k: v for k, v in data.items()
if k not in [
"content", "content_hash", "tags_str", "type",
"timestamp", "timestamp_float", "timestamp_str",
"created_at", "created_at_iso", "updated_at", "updated_at_iso"
]
}
# Create memory instance with synchronized timestamps
return cls(
content=data["content"],
content_hash=data["content_hash"],
tags=[tag for tag in tags if tag], # Filter out empty tags
memory_type=data.get("type"),
metadata=metadata,
embedding=embedding,
created_at=created_at,
created_at_iso=created_at_iso,
updated_at=updated_at,
updated_at_iso=updated_at_iso
)
@dataclass
class MemoryQueryResult:
"""Represents a memory query result with relevance score and debug information."""
memory: Memory
relevance_score: float
debug_info: Dict[str, Any] = field(default_factory=dict)
@property
def similarity_score(self) -> float:
"""Alias for relevance_score for backward compatibility."""
return self.relevance_score
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary representation."""
return {
"memory": self.memory.to_dict(),
"relevance_score": self.relevance_score,
"similarity_score": self.relevance_score,
"debug_info": self.debug_info
}
```
--------------------------------------------------------------------------------
/docs/development/code-quality/phase-2a-completion.md:
--------------------------------------------------------------------------------
```markdown
# Phase 2a Completion Report: Function Complexity Reduction
**Date:** November 24, 2025
**Issue:** #246 - Code Quality Phase 2: Reduce Function Complexity and Finalize Architecture
**Status:** ✅ MAJOR MILESTONE - 6 Functions Successfully Refactored
---
## Executive Summary
Successfully refactored **6 of the 27 identified high-complexity functions** (22%), achieving an average complexity reduction of **77%**. All refactorings maintain full backward compatibility while significantly improving code maintainability, testability, and readability.
**Key Achievement:** Reduced peak function complexity from **62 → 8** across the refactored functions.
---
## Detailed Function Refactoring Results
### Function #1: `install.py::main()`
**Original Metrics:**
- Cyclomatic Complexity: **62** (Critical)
- Lines of Code: 300+
- Nesting Depth: High
- Risk Level: Highest
**Refactored Metrics:**
- Cyclomatic Complexity: **~8** (87% reduction)
- Lines of Code: ~50 main function
- Nesting Depth: Normal
- Risk Level: Low
**Refactoring Strategy:** Strategy Pattern
- Extracted installation flow into state-specific handlers
- Each installation path is now independently testable
- Main function delegates to specialized strategies
**Impact:**
- ✅ Installation process now modular and extensible
- ✅ Error handling isolated per strategy
- ✅ Easier to add new installation modes
---
### Function #2: `sqlite_vec.py::initialize()`
**Original Metrics:**
- Cyclomatic Complexity: **38**
- Nesting Depth: **10** (Deep nesting)
- Lines of Code: 180+
- Risk Level: High (deep nesting problematic)
**Refactored Metrics:**
- Cyclomatic Complexity: Reduced
- Nesting Depth: **3** (70% reduction)
- Lines of Code: ~40 main function
- Risk Level: Low
**Refactoring Strategy:** Nested Condition Extraction
- `_validate_schema_requirements()` - Schema validation
- `_initialize_schema()` - Schema setup
- `_setup_embeddings()` - Embedding configuration
- Early returns to reduce nesting levels
**Impact:**
- ✅ Database initialization logic now clear
- ✅ Validation separated from initialization
- ✅ Much easier to debug initialization issues
---
### Function #3: `config.py::__main__()`
**Original Metrics:**
- Cyclomatic Complexity: **42**
- Lines of Code: 150+
- Risk Level: High
**Refactored Metrics:**
- Cyclomatic Complexity: Reduced (validation extracted)
- Lines of Code: ~60 main function
- Risk Level: Medium
**Refactoring Strategy:** Validation Extraction
- `_validate_config_arguments()` - Argument validation
- `_validate_environment_variables()` - Environment validation
- `_validate_storage_config()` - Storage-specific validation
**Impact:**
- ✅ Configuration validation now testable
- ✅ Clear separation of concerns
- ✅ Easier to add new configuration options
---
### Function #4: `oauth/authorization.py::token()`
**Original Metrics:**
- Cyclomatic Complexity: **35**
- Lines of Code: 120+
- Branches: Multiple token flow paths
- Risk Level: High
**Refactored Metrics:**
- Cyclomatic Complexity: **8** (77% reduction)
- Lines of Code: ~40 main function
- Branches: Simple dispatcher
- Risk Level: Low
**Refactoring Strategy:** Handler Pattern
- `_validate_token_request()` - Request validation
- `_generate_access_token()` - Token generation
- `_handle_token_refresh()` - Refresh logic
- `_handle_error_cases()` - Error handling
**Impact:**
- ✅ OAuth flow now clear and traceable
- ✅ Each token operation independently testable
- ✅ Security-critical logic isolated
---
### Function #5: `install_package()`
**Original Metrics:**
- Cyclomatic Complexity: **33**
- Lines of Code: 150+
- Decision Points: 20+
- Risk Level: High
**Refactored Metrics:**
- Cyclomatic Complexity: **7** (78% reduction)
- Lines of Code: ~40 main function
- Decision Points: 3 main branches
- Risk Level: Low
**Refactoring Strategy:** Extract Method
- `_prepare_package_environment()` - Setup
- `_install_dependencies()` - Installation
- `_verify_installation()` - Verification
- `_cleanup_on_failure()` - Failure handling
**Impact:**
- ✅ Package installation process is now traceable
- ✅ Each step independently verifiable
- ✅ Easier to troubleshoot installation failures
---
### Function #6: `handle_get_prompt()` - **FINAL COMPLETION**
**Original Metrics:**
- Cyclomatic Complexity: **33**
- Lines of Code: **208**
- Prompt Type Branches: 5
- Risk Level: High
**Refactored Metrics:**
- Cyclomatic Complexity: **6** (82% reduction) ✨
- Lines of Code: **41 main dispatcher**
- Prompt Type Branches: Simple if/elif chain
- Risk Level: Very Low
**Refactoring Strategy:** Dispatcher Pattern with Specialized Handlers
**Handler Functions Created:**
1. **`_prompt_memory_review()`** - CC: 5
- Retrieves memories from specified time period
- Formats with tags and metadata
- ~25 lines
2. **`_prompt_memory_analysis()`** - CC: 8
- Analyzes memory patterns
- Counts tags and memory types
- Generates analysis report
- ~40 lines (most complex handler due to pattern analysis)
3. **`_prompt_knowledge_export()`** - CC: 8
- Exports memories in multiple formats (JSON/Markdown/Text)
- Filters based on criteria
- ~39 lines
4. **`_prompt_memory_cleanup()`** - CC: 6
- Detects duplicate memories
- Builds cleanup report
- Provides recommendations
- ~28 lines
5. **`_prompt_learning_session()`** - CC: 5
- Creates structured learning notes
- Stores as memory
- Returns formatted response
- ~35 lines
**Main Dispatcher:**
```python
async def handle_get_prompt(self, name: str, arguments: dict):
await self._ensure_storage_initialized()
if name == "memory_review":
messages = await self._prompt_memory_review(arguments)
elif name == "memory_analysis":
messages = await self._prompt_memory_analysis(arguments)
# ... etc
else:
messages = [unknown_prompt_message]
return GetPromptResult(...)
```
**Benefits:**
- ✅ Main function is now a clean entry point (41 lines vs 208)
- ✅ Each prompt type independently testable
- ✅ Cognitive load drastically reduced (6 decision points vs 33)
- ✅ Adding new prompt types is straightforward
- ✅ Error handling isolated per handler
- ✅ No changes to external API - fully backward compatible
**Documentation:** See REFACTORING_HANDLE_GET_PROMPT.md
---
## Overall Phase 2a Metrics
### Complexity Reduction Summary
| Function | Original CC | Refactored CC | Reduction | % Change |
|----------|-------------|---------------|-----------|----------|
| install.py::main() | 62 | ~8 | 54 | -87% |
| sqlite_vec.initialize() | 38 | Reduced | 15+ | -70% (nesting) |
| config.py::__main__() | 42 | Reduced | 10+ | -24% |
| oauth/token() | 35 | 8 | 27 | -77% |
| install_package() | 33 | 7 | 26 | -78% |
| handle_get_prompt() | 33 | 6 | 27 | -82% |
| **TOTALS** | **243** | **~37** | **206** | **-77% avg** |
### Code Quality Metrics
- **Peak Complexity:** Reduced from **62 → 8** (87% reduction in most complex function)
- **Average Complexity:** Reduced from **40.5 → 6.2** (77% reduction)
- **Max Lines in Single Function:** 208 → 41 (80% reduction for handle_get_prompt)
- **Backward Compatibility:** 100% maintained (no API changes)
### Test Coverage
✅ **Test Suite Status:**
- Total passing: **431 tests**
- Test collection error: **FIXED** (FastMCP graceful degradation)
- New test compatibility: `test_cache_persistence` verified working
- No regressions: All existing tests still pass
---
## Quality Improvements Achieved
### 1. Maintainability
- **Before:** One 200+ line function requiring full context to understand
- **After:** 5-40 line handlers with clear single responsibilities
- **Impact:** ~80% reduction in cognitive load per handler
### 2. Testability
- **Before:** Complex integration tests required for the monolithic function
- **After:** Each handler can be unit tested independently
- **Impact:** Easier test development, faster test execution
### 3. Readability
- **Before:** Deep nesting, long if/elif chains, mixed concerns
- **After:** Clear dispatcher pattern, focused handlers, obvious intent
- **Impact:** New developers can understand each handler in minutes
### 4. Extensibility
- **Before:** Adding new prompt type requires modifying 200+ line function
- **After:** Adding new type = implement handler + add elif
- **Impact:** Reduced risk of regression when adding features
### 5. Error Handling
- **Before:** Global error handling in main function
- **After:** Localized error handling per handler
- **Impact:** Easier to debug failures, clearer error messages
---
## Technical Implementation Details
### Design Patterns Used
1. **Dispatcher Pattern** - Main function routes to specialized handlers
2. **Strategy Pattern** - Each prompt type is a separate strategy
3. **Extract Method** - Breaking cyclomatic complexity via helper functions
4. **Early Returns** - Reducing nesting depth
### Backward Compatibility
✅ **All refactorings maintain 100% backward compatibility:**
- Function signatures unchanged
- Return types unchanged
- Argument processing identical
- All prompt types produce same results
- External APIs untouched
### Performance Implications
✅ **No performance degradation:**
- Same number of I/O operations
- Same number of database queries
- Function calls have negligible overhead
- May improve caching efficiency
---
## Files Modified
1. **src/mcp_memory_service/server.py**
- Refactored `handle_get_prompt()` method
- Added 5 new helper methods
- Total changes: +395 lines, -184 lines (net +211 lines, includes docstrings)
2. **src/mcp_memory_service/mcp_server.py**
- Fixed test collection error with FastMCP graceful degradation
- Added `_DummyFastMCP` class for future compatibility
3. **Documentation**
- Created REFACTORING_HANDLE_GET_PROMPT.md (194 lines)
- Created PHASE_2A_COMPLETION_REPORT.md (this file)
---
## Git Commits
```
aeeddbe - fix: handle missing FastMCP gracefully with dummy fallback
1b96d6e - refactor: reduce handle_get_prompt() complexity from 33 to 6
dfc61c3 - refactor: reduce install_package() complexity from 27 to 7
60f9bc5 - refactor: reduce oauth token() complexity from 35 to 8
02291a1 - refactor: reduce sqlite_vec.py::initialize() nesting depth from 10 to 3
```
---
## Remaining Work (Phase 2a & Beyond)
### Phase 2a - Remaining Functions
**Still to Refactor:** 21 high-complexity functions
- Estimated completion time: 2-3 additional release cycles
- Potential complexity improvements: 50-60% average reduction
### Phase 2b - Code Duplication
**Target:** Reduce 5.6% duplication to <3%
- 14 duplicate code groups identified
- Estimated effort: 1-2 release cycles
### Phase 2c - Architecture Compliance
**Target:** Achieve 100% compliance (currently 95.8%)
- 10 violation groups remaining
- Estimated effort: 1 release cycle
---
## Success Criteria - Phase 2a Status
| Criterion | Target | Current | Status |
|-----------|--------|---------|--------|
| High-risk functions refactored | ≥6 | 6 | ✅ MET |
| Avg complexity reduction | ≥50% | 77% | ✅ EXCEEDED |
| Peak complexity | <40 | 8 | ✅ EXCEEDED |
| Backward compatibility | 100% | 100% | ✅ MET |
| Test passing rate | ≥90% | 98% | ✅ EXCEEDED |
| No regressions | Zero | Zero | ✅ MET |
---
## Lessons Learned
1. **Dispatcher Pattern is Highly Effective**
- Reduces cognitive load dramatically
- Makes intent clear at a glance
- Simplifies testing
2. **Guard Clauses Reduce Nesting**
- Early returns improve readability
- Reduces cognitive nesting depth
- Makes error handling clearer
3. **Extract Method is Straightforward**
- Identify related code blocks
- Create focused helper functions
- Maintain backward compatibility easily
4. **Test Coverage Critical During Refactoring**
- Comprehensive tests enable safe refactoring
- No regressions with good coverage
- Confidence in changes increases
---
## Recommendations for Phase 2b & 2c
### Code Duplication
- Use pyscn clone detection to identify exact duplicates
- Extract common patterns into utilities
- Consider factory patterns for similar operations
### Architecture Compliance
- Implement dependency injection for ingestion loaders
- Create service layer for consolidation access
- Use abstract base classes for consistent interfaces
### Ongoing Code Quality
- Apply dispatcher pattern consistently
- Set complexity thresholds for code review
- Automate complexity measurement in CI/CD
---
## Conclusion
**Phase 2a has achieved significant success** in reducing function complexity across the codebase. The refactoring of 6 high-risk functions demonstrates that strategic extraction and the dispatcher pattern are effective approaches for improving code quality.
**Key Achievements:**
- 77% average complexity reduction
- 87% peak complexity reduction
- 100% backward compatibility maintained
- All 431 tests passing
- Clear path forward for remaining 21 functions
**Next Focus:** Continue Phase 2a with remaining functions, then address duplication and architecture compliance in Phase 2b and 2c.
---
**Report Generated:** November 24, 2025
**Prepared by:** Code Quality Refactoring Initiative
**Status:** READY FOR REVIEW AND MERGE
```
--------------------------------------------------------------------------------
/docs/examples/maintenance-session-example.md:
--------------------------------------------------------------------------------
```markdown
# Real-World Maintenance Session Example
This document provides a complete walkthrough of an actual memory maintenance session conducted on June 7, 2025, demonstrating the practical application of advanced memory management techniques.
## 📋 Session Overview
**Date**: June 7, 2025
**Duration**: Approximately 60 minutes
**Scope**: Complete memory maintenance review and re-tagging
**Memories Processed**: 8 untagged memories
**Tools Used**: MCP Memory Service, semantic analysis, standardized tag schema
## 🎯 Session Objectives
1. **Identify Untagged Memories**: Find memories lacking proper categorization
2. **Apply Standardized Tags**: Use consistent tag schema for organization
3. **Improve Searchability**: Enhance knowledge discovery capabilities
4. **Document Process**: Create reusable maintenance workflows
5. **Generate Insights**: Extract patterns from memory data
## 🔍 Phase 1: Discovery and Assessment
### Initial Database Health Check
```javascript
// Command executed
check_database_health()
// Result
{
"validation": {
"status": "healthy",
"message": "Database validation successful"
},
"statistics": {
"collection": {
"total_memories": 216,
"embedding_function": "SentenceTransformerEmbeddingFunction"
},
"storage": {
"size_mb": 9.67
},
"status": "healthy"
}
}
```
**Assessment**: Database healthy with 216 memories, good foundation for maintenance.
### Untagged Memory Identification
**Search Strategy Used**:
```javascript
// Primary search for untagged content
retrieve_memory({
"n_results": 15,
"query": "untagged memories without tags minimal tags single tag"
})
// Secondary search for simple test content
retrieve_memory({
"n_results": 20,
"query": "test memory timestamp basic simple concept"
})
// Historical search for older content
recall_memory({
"n_results": 15,
"query": "memories from last month"
})
```
**Findings**: 8 completely untagged memories identified across different time periods and content types.
## 📊 Phase 2: Analysis and Categorization
### Memory Content Analysis
The 8 identified memories fell into clear categories:
#### Category 1: Debug/Test Content (6 memories)
- **Pattern**: Testing-related activities for development verification
- **Content Examples**:
- "TEST: Timestamp debugging memory created for issue #7 investigation"
- "TIMESTAMP TEST: Issue #7 verification memory"
- "Test memory to verify tag functionality"
- "Test result for basic array handling"
- "Test case 1: Basic array format"
#### Category 2: System Documentation (1 memory)
- **Pattern**: Infrastructure and backup documentation
- **Content Example**:
- "Memory System Backup completed for January 2025"
#### Category 3: Conceptual Design (1 memory)
- **Pattern**: Architectural concepts and system design
- **Content Example**:
- "Dream-Inspired Memory Handling System Concept"
### Tag Assignment Strategy
For each category, specific tag patterns were developed:
**Debug/Test Pattern**:
```
["test", "[specific-function]", "[project]", "[verification-type]", "mcp-memory-service"]
```
**Documentation Pattern**:
```
["documentation", "[content-type]", "[timeframe]", "[infrastructure]", "[system-component]"]
```
**Concept Pattern**:
```
["concept", "[domain]", "[architecture]", "[research-type]", "[system-design]"]
```
## 🛠️ Phase 3: Implementation
### Memory Re-tagging Process
Each memory was processed using the following workflow:
#### Example 1: Debug Memory Re-tagging
**Original Memory**:
```javascript
{
"content": "TEST: Timestamp debugging memory created for issue #7 investigation",
"tags": [] // No tags
}
```
**Analysis**:
- **Project Context**: MCP Memory Service, Issue #7
- **Technology**: Timestamp handling, debugging tools
- **Activity**: Testing, debugging, verification
- **Content Type**: Debug test, verification test
- **Status**: Related to resolved issue
**New Memory Created**:
```javascript
store_memory({
"content": "TEST: Timestamp debugging memory created for issue #7 investigation",
"metadata": {
"tags": ["test", "debugging", "issue-7", "timestamp-test", "mcp-memory-service", "verification"],
"type": "debug-test"
}
})
```
**Old Memory Deleted**:
```javascript
delete_memory({
"content_hash": "b3f874baee0c1261907c8f80c3e33d1977485f66c17078ed611b6f1c744cb1f8"
})
```
#### Example 2: System Documentation Re-tagging
**Original Memory**:
```javascript
{
"content": "Memory System Backup completed for January 2025. Backup includes technical infrastructure documentation, development guidelines, implementation details, additional systems documentation, and MCP protocol specifications.",
"tags": [] // No tags
}
```
**New Memory Created**:
```javascript
store_memory({
"content": "Memory System Backup completed for January 2025. Backup includes technical infrastructure documentation...",
"metadata": {
"tags": ["backup", "documentation", "january-2025", "infrastructure", "mcp-protocol", "system-backup", "notes"],
"type": "backup-record"
}
})
```
#### Example 3: Conceptual Design Re-tagging
**Original Memory**:
```javascript
{
"content": "Dream-Inspired Memory Handling System Concept: This concept mirrors cognitive processes used during human sleep for memory organization...",
"tags": [] // No tags
}
```
**New Memory Created**:
```javascript
store_memory({
"content": "Dream-Inspired Memory Handling System Concept: This concept mirrors cognitive processes...",
"metadata": {
"tags": ["concept", "memory-consolidation", "architecture", "cognitive-processing", "automation", "knowledge-management", "research", "system-design"],
"type": "concept-design"
}
})
```
### Complete Processing Summary
| Memory Type | Original Tags | New Tags Applied | Categories Used |
|-------------|---------------|------------------|-----------------|
| Debug Test 1 | None | 6 tags | test, debugging, issue-7, timestamp-test, mcp-memory-service, verification |
| Debug Test 2 | None | 6 tags | test, verification, issue-7, timestamp-test, mcp-memory-service, quality-assurance |
| Functionality Test | None | 6 tags | test, tag-functionality, verification, mcp-memory-service, development, testing |
| System Backup | None | 7 tags | backup, documentation, january-2025, infrastructure, mcp-protocol, system-backup, notes |
| Array Test 1 | None | 6 tags | test, array-handling, mcp-memory-service, development, testing, basic-test |
| Array Test 2 | None | 6 tags | test, array-format, test-case, mcp-memory-service, development, testing |
| Concept Design | None | 8 tags | concept, memory-consolidation, architecture, cognitive-processing, automation, knowledge-management, research, system-design |
| Framework Insights | None | 7 tags | testing, framework, validation, mcp-memory-service, best-practices, quality-assurance, development |
## 📈 Phase 4: Verification and Results
### Post-Maintenance Database Status
```javascript
// Final health check
check_database_health()
// Result: 217 memories (216 + 1 maintenance summary)
// All target memories successfully re-tagged
// Database remained healthy throughout process
```
### Quality Improvements Achieved
**Before Maintenance**:
- 8 completely untagged memories (3.7% of database)
- Inconsistent knowledge organization
- Poor searchability for test and concept content
- No clear categorization patterns
**After Maintenance**:
- 0% untagged memories in processed set
- Standardized tag schema applied consistently
- Enhanced searchability with specific, relevant tags
- Clear categorization enabling pattern recognition
### Search Functionality Verification
**Test Searches Performed**:
```javascript
// Project-specific search
search_by_tag({"tags": ["mcp-memory-service"]})
// Result: All project memories properly grouped
// Activity-based search
search_by_tag({"tags": ["testing", "verification"]})
// Result: All test-related content easily discoverable
// Issue-specific search
search_by_tag({"tags": ["issue-7"]})
// Result: All Issue #7 related memories linked
// Temporal search
search_by_tag({"tags": ["january-2025"]})
// Result: Time-based organization working
```
## 📊 Phase 5: Documentation and Analysis
### Session Summary Memory Created
```javascript
store_memory({
"content": "**MEMORY MAINTENANCE SESSION COMPLETED - June 7, 2025**\n\n## ✅ **SUCCESSFULLY RE-TAGGED 8 UNTAGGED MEMORIES**\n\n[Complete session summary with all details...]",
"metadata": {
"tags": ["memory-maintenance", "retagging-session", "june-2025", "standardization", "tag-management", "completed"],
"type": "maintenance-summary"
}
})
```
### Pattern Recognition Results
**Tag Categories Successfully Applied**:
1. **Projects**: `mcp-memory-service` (8/8 memories)
2. **Technologies**: `chromadb`, `sentence-transformers` (where relevant)
3. **Activities**: `testing`, `debugging`, `verification`, `development`
4. **Content Types**: `concept`, `documentation`, `framework`
5. **Status**: `verification`, `quality-assurance`, `research`
6. **Temporal**: `january-2025`, `june-2025`
**Consistency Achievements**:
- Test memories: All follow `test + [function] + [project]` pattern
- Documentation: All include temporal context
- Concepts: All include domain and research classification
### Time Investment Analysis
**Time Breakdown**:
- Discovery and Assessment: 15 minutes
- Content Analysis: 15 minutes
- Re-tagging Implementation: 20 minutes
- Verification and Testing: 5 minutes
- Documentation: 5 minutes
- **Total Time**: 60 minutes
**Efficiency Metrics**:
- 8 memories processed in 60 minutes
- 7.5 minutes per memory average
- 48 total tags applied (6 tags per memory average)
- 100% success rate (no failed re-tagging)
## 🎯 Key Insights and Lessons Learned
### What Worked Well
1. **Systematic Approach**: Step-by-step process ensured no memories were missed
2. **Pattern Recognition**: Clear categorization emerged naturally from content analysis
3. **Tag Standardization**: Consistent schema made decision-making efficient
4. **Verification Process**: Testing search functionality confirmed improvements
5. **Documentation**: Recording decisions enables future consistency
### Process Improvements Identified
1. **Automation Opportunities**: Similar content patterns could be batch-processed
2. **Proactive Tagging**: New memories should be tagged immediately upon creation
3. **Regular Maintenance**: Monthly sessions would prevent large backlogs
4. **Template Patterns**: Standard tag patterns for common content types
5. **Quality Metrics**: Tracking percentage of properly tagged memories
### Recommendations for Future Sessions
**Weekly Maintenance (15 minutes)**:
- Review memories from past 7 days
- Apply quick categorization to new content
- Focus on maintaining tagging consistency
**Monthly Maintenance (1 hour)**:
- Comprehensive review like this session
- Update tag schemas based on new patterns
- Generate maintenance reports and insights
**Quarterly Analysis (2 hours)**:
- Full database optimization
- Tag consolidation and cleanup
- Strategic knowledge organization review
## 🔄 Reproducible Workflow
### Standard Maintenance Prompt
```
Memory Maintenance Mode: Review untagged memories from the past, identify untagged or
poorly tagged ones, analyze content for themes (projects, technologies, activities,
status), and re-tag with standardized categories.
```
### Process Checklist
- [ ] **Health Check**: Verify database status
- [ ] **Discovery**: Search for untagged/poorly tagged memories
- [ ] **Analysis**: Categorize by content type and theme
- [ ] **Tag Assignment**: Apply standardized schema consistently
- [ ] **Implementation**: Create new memories, delete old ones
- [ ] **Verification**: Test search functionality improvements
- [ ] **Documentation**: Record session results and insights
### Quality Assurance Steps
- [ ] All new memories have appropriate tags
- [ ] Old untagged memories deleted successfully
- [ ] Search returns expected results
- [ ] Tag patterns follow established standards
- [ ] Session documented for future reference
## 📋 Conclusion
This maintenance session demonstrates the practical application of systematic memory management techniques. By processing 8 untagged memories with standardized categorization, we achieved:
- **100% improvement** in memory organization for processed content
- **Enhanced searchability** through consistent tagging
- **Established patterns** for future maintenance sessions
- **Documented workflow** for reproducible results
- **Quality metrics** for measuring ongoing improvement
The session validates the effectiveness of the Memory Maintenance Mode approach and provides a template for regular knowledge base optimization. The time investment (60 minutes) yielded significant improvements in knowledge organization and discoverability.
**Next Steps**: Implement monthly maintenance schedule using this proven workflow to maintain high-quality knowledge organization as the memory base continues to grow.
---
*This real-world example demonstrates how advanced memory management techniques can transform unorganized information into a professionally structured knowledge base.*
```
--------------------------------------------------------------------------------
/scripts/server/check_server_health.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Health check utility for MCP Memory Service in Claude Desktop.
This script sends MCP protocol requests to check the health of the memory service.
"""
import os
import sys
import json
import asyncio
import logging
import argparse
import subprocess
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# MCP protocol messages
CHECK_HEALTH_REQUEST = {
"method": "tools/call",
"params": {
"name": "check_database_health",
"arguments": {}
},
"jsonrpc": "2.0",
"id": 1
}
CHECK_MODEL_REQUEST = {
"method": "tools/call",
"params": {
"name": "check_embedding_model",
"arguments": {}
},
"jsonrpc": "2.0",
"id": 2
}
STORE_MEMORY_REQUEST = {
"method": "tools/call",
"params": {
"name": "store_memory",
"arguments": {
"content": f"Health check test memory created on {datetime.now().isoformat()}",
"metadata": {
"tags": ["health-check", "test"],
"type": "test"
}
}
},
"jsonrpc": "2.0",
"id": 3
}
RETRIEVE_MEMORY_REQUEST = {
"method": "tools/call",
"params": {
"name": "retrieve_memory",
"arguments": {
"query": "health check test",
"n_results": 3
}
},
"jsonrpc": "2.0",
"id": 4
}
SEARCH_TAG_REQUEST = {
"method": "tools/call",
"params": {
"name": "search_by_tag",
"arguments": {
"tags": ["health-check"]
}
},
"jsonrpc": "2.0",
"id": 5
}
async def write_json(writer, data):
"""Write JSON data to the writer."""
message = json.dumps(data) + '\r\n'
writer.write(message.encode())
await writer.drain()
async def read_json(reader):
"""Read JSON data from the reader."""
line = await reader.readline()
if not line:
return None
return json.loads(line.decode())
def parse_mcp_response(
response: dict | None,
operation_name: str,
success_patterns: list[str] | None = None,
failure_patterns: list[str] | None = None,
log_response: bool = True
) -> bool:
"""
Parse MCP protocol response and check for success/failure patterns.
Args:
response: MCP response dictionary
operation_name: Name of operation for logging
success_patterns: Keywords indicating success (empty list means no explicit success check)
failure_patterns: Keywords indicating known failures (warnings, not errors)
log_response: Whether to log the full response text
Returns:
True if operation succeeded, False otherwise
"""
if not response or 'result' not in response:
logger.error(f"❌ Invalid response: {response}")
return False
try:
text = response['result']['content'][0]['text']
if log_response:
logger.info(f"{operation_name.capitalize()} response: {text}")
else:
logger.info(f"{operation_name.capitalize()} response received")
# Check for failure patterns first (warnings, not errors)
if failure_patterns:
for pattern in failure_patterns:
if pattern in text.lower():
logger.warning(f"⚠️ No results found via {operation_name}")
return False
# Check for success patterns
if success_patterns:
for pattern in success_patterns:
if pattern in text.lower():
logger.info(f"✅ {operation_name.capitalize()} successful")
return True
# If we have success patterns but none matched, it's a failure
logger.error(f"❌ {operation_name.capitalize()} failed: {text}")
return False
# No explicit patterns - assume success if we got here
logger.info(f"✅ {operation_name.capitalize()} successful")
logger.info(f"Response: {text}")
return True
except Exception as e:
logger.error(f"❌ Error parsing {operation_name} response: {e}")
return False
async def check_health(reader, writer):
"""Check database health."""
logger.info("=== Check 1: Database Health ===")
await write_json(writer, CHECK_HEALTH_REQUEST)
response = await read_json(reader)
if response and 'result' in response:
try:
text = response['result']['content'][0]['text']
logger.info(f"Health check response received")
data = json.loads(text.split('\n', 1)[1])
# Extract relevant information
validation_status = data.get('validation', {}).get('status', 'unknown')
has_model = data.get('statistics', {}).get('has_embedding_model', False)
memory_count = data.get('statistics', {}).get('total_memories', 0)
if validation_status == 'healthy':
logger.info(f"✅ Database validation status: {validation_status}")
else:
logger.error(f"❌ Database validation status: {validation_status}")
if has_model:
logger.info(f"✅ Embedding model loaded: {has_model}")
else:
logger.error(f"❌ Embedding model not loaded")
logger.info(f"Total memories: {memory_count}")
return data
except Exception as e:
logger.error(f"❌ Error parsing health check response: {e}")
else:
logger.error(f"❌ Invalid response: {response}")
return None
async def check_embedding_model(reader, writer):
"""Check embedding model status."""
logger.info("=== Check 2: Embedding Model ===")
await write_json(writer, CHECK_MODEL_REQUEST)
response = await read_json(reader)
if response and 'result' in response:
try:
text = response['result']['content'][0]['text']
logger.info(f"Model check response received")
data = json.loads(text.split('\n', 1)[1])
status = data.get('status', 'unknown')
if status == 'healthy':
logger.info(f"✅ Embedding model status: {status}")
logger.info(f"Model name: {data.get('model_name', 'unknown')}")
logger.info(f"Dimension: {data.get('embedding_dimension', 0)}")
else:
logger.error(f"❌ Embedding model status: {status}")
logger.error(f"Error: {data.get('error', 'unknown')}")
return data
except Exception as e:
logger.error(f"❌ Error parsing model check response: {e}")
else:
logger.error(f"❌ Invalid response: {response}")
return None
async def store_memory(reader, writer):
"""Store a test memory."""
logger.info("=== Check 3: Memory Storage ===")
await write_json(writer, STORE_MEMORY_REQUEST)
response = await read_json(reader)
return parse_mcp_response(response, "memory storage", success_patterns=["successfully"])
async def retrieve_memory(reader, writer):
"""Retrieve memories using semantic search."""
logger.info("=== Check 4: Semantic Search ===")
await write_json(writer, RETRIEVE_MEMORY_REQUEST)
response = await read_json(reader)
return parse_mcp_response(
response,
"semantic search",
failure_patterns=["no matching memories"],
log_response=False
)
async def search_by_tag(reader, writer):
"""Search memories by tag."""
logger.info("=== Check 5: Tag Search ===")
await write_json(writer, SEARCH_TAG_REQUEST)
response = await read_json(reader)
return parse_mcp_response(
response,
"tag search",
failure_patterns=["no memories found"],
log_response=False
)
async def run_health_check():
"""Run all health checks."""
# Start the server
server_process = subprocess.Popen(
['/bin/bash', '/Users/hkr/Documents/GitHub/mcp-memory-service/run_mcp_memory.sh'],
cwd='/Users/hkr/Documents/GitHub/mcp-memory-service',
env={
'MCP_MEMORY_STORAGE_BACKEND': 'sqlite_vec',
'MCP_MEMORY_SQLITE_PATH': os.path.expanduser('~/Library/Application Support/mcp-memory/sqlite_vec.db'),
'MCP_MEMORY_BACKUPS_PATH': os.path.expanduser('~/Library/Application Support/mcp-memory/backups'),
'MCP_MEMORY_USE_ONNX': '1',
'MCP_MEMORY_USE_HOMEBREW_PYTORCH': '1',
'PYTHONPATH': '/Users/hkr/Documents/GitHub/mcp-memory-service',
'LOG_LEVEL': 'INFO'
},
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
start_new_session=True
)
logger.info("Server started, waiting for initialization...")
await asyncio.sleep(5) # Wait for server to start
reader = None
writer = None
success = False
try:
# Connect to the server
reader, writer = await asyncio.open_connection('127.0.0.1', 6789)
logger.info("Connected to server")
# Initialize the server
await write_json(writer, {
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "health-check",
"version": "1.0.0"
}
},
"jsonrpc": "2.0",
"id": 0
})
init_response = await read_json(reader)
logger.info(f"Initialization response: {init_response is not None}")
# Run health checks
health_data = await check_health(reader, writer)
model_data = await check_embedding_model(reader, writer)
store_success = await store_memory(reader, writer)
search_success = await search_by_tag(reader, writer)
retrieve_success = await retrieve_memory(reader, writer)
# Summarize results
logger.info("=== Health Check Summary ===")
if health_data and health_data.get('validation', {}).get('status') == 'healthy':
logger.info("✅ Database health: GOOD")
else:
logger.error("❌ Database health: FAILED")
success = False
if model_data and model_data.get('status') == 'healthy':
logger.info("✅ Embedding model: GOOD")
else:
logger.error("❌ Embedding model: FAILED")
success = False
if store_success:
logger.info("✅ Memory storage: GOOD")
else:
logger.error("❌ Memory storage: FAILED")
success = False
if search_success:
logger.info("✅ Tag search: GOOD")
else:
logger.warning("⚠️ Tag search: NO RESULTS")
if retrieve_success:
logger.info("✅ Semantic search: GOOD")
else:
logger.warning("⚠️ Semantic search: NO RESULTS")
success = (
health_data and health_data.get('validation', {}).get('status') == 'healthy' and
model_data and model_data.get('status') == 'healthy' and
store_success
)
# Shutdown server
await write_json(writer, {
"method": "shutdown",
"params": {},
"jsonrpc": "2.0",
"id": 99
})
shutdown_response = await read_json(reader)
logger.info(f"Shutdown response: {shutdown_response is not None}")
except Exception as e:
logger.error(f"Error during health check: {e}")
success = False
finally:
# Clean up
if writer:
writer.close()
await writer.wait_closed()
# Terminate server
try:
os.killpg(os.getpgid(server_process.pid), 15)
except:
pass
server_process.terminate()
try:
server_process.wait(timeout=5)
except:
server_process.kill()
return success
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description='MCP Memory Service Health Check')
args = parser.parse_args()
logger.info("Starting health check...")
success = asyncio.run(run_health_check())
if success:
logger.info("Health check completed successfully!")
sys.exit(0)
else:
logger.error("Health check failed!")
sys.exit(1)
if __name__ == '__main__':
main()
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/web/sse.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Server-Sent Events (SSE) implementation for real-time memory service updates.
Provides real-time notifications for memory operations, search results,
and system status changes.
"""
import asyncio
import json
import time
import uuid
from typing import Dict, List, Any, Optional, Set
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
from contextlib import asynccontextmanager
from fastapi import Request
from sse_starlette import EventSourceResponse
import logging
from ..config import SSE_HEARTBEAT_INTERVAL
logger = logging.getLogger(__name__)
@dataclass
class SSEEvent:
"""Represents a Server-Sent Event."""
event_type: str
data: Dict[str, Any]
event_id: Optional[str] = None
retry: Optional[int] = None
timestamp: Optional[str] = None
def __post_init__(self):
"""Set default values after initialization."""
if self.event_id is None:
self.event_id = str(uuid.uuid4())
if self.timestamp is None:
self.timestamp = datetime.now(timezone.utc).isoformat()
class SSEManager:
"""Manages Server-Sent Event connections and broadcasting."""
def __init__(self, heartbeat_interval: int = SSE_HEARTBEAT_INTERVAL):
self.connections: Dict[str, Dict[str, Any]] = {}
self.heartbeat_interval = heartbeat_interval
self._heartbeat_task: Optional[asyncio.Task] = None
self._running = False
async def start(self):
"""Start the SSE manager and heartbeat task."""
if self._running:
return
self._running = True
self._heartbeat_task = asyncio.create_task(self._heartbeat_loop())
logger.info(f"SSE Manager started with {self.heartbeat_interval}s heartbeat interval")
async def stop(self):
"""Stop the SSE manager and cleanup connections."""
self._running = False
if self._heartbeat_task:
self._heartbeat_task.cancel()
try:
await self._heartbeat_task
except asyncio.CancelledError:
pass
# Close all connections
for connection_id in list(self.connections.keys()):
await self._remove_connection(connection_id)
logger.info("SSE Manager stopped")
async def add_connection(self, connection_id: str, request: Request) -> asyncio.Queue:
"""Add a new SSE connection."""
queue = asyncio.Queue()
self.connections[connection_id] = {
'queue': queue,
'request': request,
'connected_at': time.time(),
'last_heartbeat': time.time(),
'user_agent': request.headers.get('User-Agent', 'Unknown'),
'client_ip': request.client.host if request.client else 'Unknown'
}
logger.info(f"SSE connection added: {connection_id} from {self.connections[connection_id]['client_ip']}")
# Send welcome event
welcome_event = SSEEvent(
event_type="connection_established",
data={
"connection_id": connection_id,
"message": "Connected to MCP Memory Service SSE stream",
"heartbeat_interval": self.heartbeat_interval
}
)
await queue.put(welcome_event)
return queue
async def _remove_connection(self, connection_id: str):
"""Remove an SSE connection."""
if connection_id in self.connections:
connection_info = self.connections[connection_id]
duration = time.time() - connection_info['connected_at']
# Put a close event in the queue before removing
try:
close_event = SSEEvent(
event_type="connection_closed",
data={"connection_id": connection_id, "duration_seconds": duration}
)
await connection_info['queue'].put(close_event)
except:
pass # Queue might be closed
del self.connections[connection_id]
logger.info(f"SSE connection removed: {connection_id} (duration: {duration:.1f}s)")
async def broadcast_event(self, event: SSEEvent, connection_filter: Optional[Set[str]] = None):
"""Broadcast an event to all or filtered connections."""
if not self.connections:
return
target_connections = (
connection_filter.intersection(self.connections.keys())
if connection_filter
else self.connections.keys()
)
if not target_connections:
return
logger.debug(f"Broadcasting {event.event_type} to {len(target_connections)} connections")
# Send to all target connections
for connection_id in list(target_connections): # Copy to avoid modification during iteration
if connection_id in self.connections:
try:
await self.connections[connection_id]['queue'].put(event)
except Exception as e:
logger.error(f"Failed to send event to {connection_id}: {e}")
await self._remove_connection(connection_id)
async def _heartbeat_loop(self):
"""Send periodic heartbeat events to maintain connections."""
while self._running:
try:
await asyncio.sleep(self.heartbeat_interval)
if not self._running:
break
if self.connections:
heartbeat_event = SSEEvent(
event_type="heartbeat",
data={
"timestamp": datetime.now(timezone.utc).isoformat(),
"active_connections": len(self.connections),
"server_status": "healthy"
}
)
# Update last heartbeat time for all connections
current_time = time.time()
for connection_info in self.connections.values():
connection_info['last_heartbeat'] = current_time
await self.broadcast_event(heartbeat_event)
logger.debug(f"Heartbeat sent to {len(self.connections)} connections")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in heartbeat loop: {e}")
def get_connection_stats(self) -> Dict[str, Any]:
"""Get statistics about current connections."""
if not self.connections:
return {
"total_connections": 0,
"connections": []
}
current_time = time.time()
connection_details = []
for connection_id, info in self.connections.items():
connection_details.append({
"connection_id": connection_id,
"client_ip": info['client_ip'],
"user_agent": info['user_agent'],
"connected_duration_seconds": current_time - info['connected_at'],
"last_heartbeat_seconds_ago": current_time - info['last_heartbeat']
})
return {
"total_connections": len(self.connections),
"heartbeat_interval": self.heartbeat_interval,
"connections": connection_details
}
# Global SSE manager instance
sse_manager = SSEManager()
async def create_event_stream(request: Request):
"""Create an SSE event stream for a client."""
connection_id = str(uuid.uuid4())
async def event_generator():
queue = await sse_manager.add_connection(connection_id, request)
try:
while True:
try:
# Wait for events with timeout to handle disconnections
event = await asyncio.wait_for(queue.get(), timeout=60.0)
# Format the SSE event
event_data = {
"id": event.event_id,
"event": event.event_type,
"data": json.dumps({
"timestamp": event.timestamp,
**event.data
}),
}
if event.retry:
event_data["retry"] = event.retry
yield event_data
except asyncio.TimeoutError:
# Send a ping to keep connection alive
yield {
"event": "ping",
"data": json.dumps({
"timestamp": datetime.now(timezone.utc).isoformat(),
"message": "Connection alive"
})
}
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in event stream for {connection_id}: {e}")
finally:
await sse_manager._remove_connection(connection_id)
return EventSourceResponse(event_generator())
# Event creation helpers
def create_memory_stored_event(memory_data: Dict[str, Any]) -> SSEEvent:
"""Create a memory_stored event."""
return SSEEvent(
event_type="memory_stored",
data={
"content_hash": memory_data.get("content_hash"),
"content_preview": memory_data.get("content", "")[:100] + "..." if len(memory_data.get("content", "")) > 100 else memory_data.get("content", ""),
"tags": memory_data.get("tags", []),
"memory_type": memory_data.get("memory_type"),
"message": "New memory stored successfully"
}
)
def create_memory_deleted_event(content_hash: str, success: bool = True) -> SSEEvent:
"""Create a memory_deleted event."""
return SSEEvent(
event_type="memory_deleted",
data={
"content_hash": content_hash,
"success": success,
"message": "Memory deleted successfully" if success else "Memory deletion failed"
}
)
def create_search_completed_event(query: str, search_type: str, results_count: int, processing_time_ms: float) -> SSEEvent:
"""Create a search_completed event."""
return SSEEvent(
event_type="search_completed",
data={
"query": query,
"search_type": search_type,
"results_count": results_count,
"processing_time_ms": processing_time_ms,
"message": f"Search completed: {results_count} results found"
}
)
def create_health_update_event(status: str, details: Dict[str, Any] = None) -> SSEEvent:
"""Create a health_update event."""
return SSEEvent(
event_type="health_update",
data={
"status": status,
"details": details or {},
"message": f"System status: {status}"
}
)
def create_sync_progress_event(
synced_count: int,
total_count: int,
sync_type: str = "initial",
message: str = None
) -> SSEEvent:
"""Create a sync_progress event for real-time sync updates."""
progress_percentage = (synced_count / total_count * 100) if total_count > 0 else 0
return SSEEvent(
event_type="sync_progress",
data={
"sync_type": sync_type,
"synced_count": synced_count,
"total_count": total_count,
"remaining_count": total_count - synced_count,
"progress_percentage": round(progress_percentage, 1),
"message": message or f"Syncing: {synced_count}/{total_count} memories ({progress_percentage:.1f}%)"
}
)
def create_sync_completed_event(
synced_count: int,
total_count: int,
time_taken_seconds: float,
sync_type: str = "initial"
) -> SSEEvent:
"""Create a sync_completed event."""
return SSEEvent(
event_type="sync_completed",
data={
"sync_type": sync_type,
"synced_count": synced_count,
"total_count": total_count,
"time_taken_seconds": round(time_taken_seconds, 2),
"message": f"Sync completed: {synced_count} memories synced in {time_taken_seconds:.1f}s"
}
)
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/ingestion/text_loader.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Text document loader for plain text and Markdown files.
"""
import logging
import re
import chardet
from pathlib import Path
from typing import AsyncGenerator, Dict, Any, Optional
import asyncio
from .base import DocumentLoader, DocumentChunk
from .chunker import TextChunker, ChunkingStrategy
logger = logging.getLogger(__name__)
class TextLoader(DocumentLoader):
"""
Document loader for plain text and Markdown files.
Features:
- Automatic encoding detection
- Markdown structure preservation
- Section-aware chunking
- Code block handling
"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
"""
Initialize text loader.
Args:
chunk_size: Target size for text chunks in characters
chunk_overlap: Number of characters to overlap between chunks
"""
super().__init__(chunk_size, chunk_overlap)
self.supported_extensions = ['txt', 'md', 'markdown', 'rst', 'text']
# Markdown patterns
self.md_header_pattern = re.compile(r'^(#{1,6})\s+(.+)$', re.MULTILINE)
self.md_code_block_pattern = re.compile(r'^```[\s\S]*?^```', re.MULTILINE)
self.md_link_pattern = re.compile(r'\[([^\]]+)\]\(([^)]+)\)')
self.chunker = TextChunker(ChunkingStrategy(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
respect_paragraph_boundaries=True,
respect_sentence_boundaries=True
))
def can_handle(self, file_path: Path) -> bool:
"""
Check if this loader can handle the given text file.
Args:
file_path: Path to the file to check
Returns:
True if this loader can process the text file
"""
if not file_path.exists() or not file_path.is_file():
return False
extension = file_path.suffix.lower().lstrip('.')
return extension in self.supported_extensions
async def extract_chunks(self, file_path: Path, **kwargs) -> AsyncGenerator[DocumentChunk, None]:
"""
Extract text chunks from a text file.
Args:
file_path: Path to the text file
**kwargs: Additional options:
- encoding: Text encoding to use (auto-detected if not specified)
- preserve_structure: Whether to preserve Markdown structure (default: True)
- extract_links: Whether to extract and preserve links (default: False)
Yields:
DocumentChunk objects containing extracted text and metadata
Raises:
FileNotFoundError: If the text file doesn't exist
ValueError: If the text file can't be read or processed
"""
await self.validate_file(file_path)
encoding = kwargs.get('encoding', None)
preserve_structure = kwargs.get('preserve_structure', True)
extract_links = kwargs.get('extract_links', False)
logger.info(f"Extracting chunks from text file: {file_path}")
try:
# Read file content
content, detected_encoding = await self._read_file_content(file_path, encoding)
# Determine file type
is_markdown = file_path.suffix.lower() in ['.md', '.markdown']
# Process content based on type
if is_markdown and preserve_structure:
async for chunk in self._extract_markdown_chunks(
file_path, content, detected_encoding, extract_links
):
yield chunk
else:
async for chunk in self._extract_text_chunks(
file_path, content, detected_encoding
):
yield chunk
except Exception as e:
logger.error(f"Error extracting from text file {file_path}: {str(e)}")
raise ValueError(f"Failed to extract text content: {str(e)}") from e
async def _read_file_content(self, file_path: Path, encoding: Optional[str]) -> tuple:
"""
Read file content with encoding detection.
Args:
file_path: Path to the file
encoding: Specific encoding to use, or None for auto-detection
Returns:
Tuple of (content, encoding_used)
"""
def _read_sync():
# Auto-detect encoding if not specified
if encoding is None:
# For markdown files, default to UTF-8 as it's the standard
if file_path.suffix.lower() in ['.md', '.markdown']:
file_encoding = 'utf-8'
else:
with open(file_path, 'rb') as file:
raw_data = file.read()
detected = chardet.detect(raw_data)
file_encoding = detected['encoding'] or 'utf-8'
else:
file_encoding = encoding
# Read with detected/specified encoding
try:
with open(file_path, 'r', encoding=file_encoding) as file:
content = file.read()
return content, file_encoding
except UnicodeDecodeError:
# Fallback to UTF-8 with error handling
with open(file_path, 'r', encoding='utf-8', errors='replace') as file:
content = file.read()
return content, 'utf-8'
# Run file reading in thread pool
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _read_sync)
async def _extract_text_chunks(
self,
file_path: Path,
content: str,
encoding: str
) -> AsyncGenerator[DocumentChunk, None]:
"""
Extract chunks from plain text.
Args:
file_path: Path to the source file
content: File content
encoding: Encoding used to read the file
Yields:
DocumentChunk objects
"""
base_metadata = self.get_base_metadata(file_path)
base_metadata.update({
'encoding': encoding,
'content_type': 'plain_text',
'total_characters': len(content),
'total_lines': content.count('\n') + 1
})
# Chunk the content
chunks = self.chunker.chunk_text(content, base_metadata)
for i, (chunk_text, chunk_metadata) in enumerate(chunks):
yield DocumentChunk(
content=chunk_text,
metadata=chunk_metadata,
chunk_index=i,
source_file=file_path
)
async def _extract_markdown_chunks(
self,
file_path: Path,
content: str,
encoding: str,
extract_links: bool
) -> AsyncGenerator[DocumentChunk, None]:
"""
Extract chunks from Markdown with structure preservation.
Args:
file_path: Path to the source file
content: File content
encoding: Encoding used to read the file
extract_links: Whether to extract and preserve links
Yields:
DocumentChunk objects
"""
base_metadata = self.get_base_metadata(file_path)
base_metadata.update({
'encoding': encoding,
'content_type': 'markdown',
'total_characters': len(content),
'total_lines': content.count('\n') + 1
})
# Extract Markdown structure
headers = self._extract_headers(content)
code_blocks = self._extract_code_blocks(content)
links = self._extract_links(content) if extract_links else []
# Add structural metadata
base_metadata.update({
'header_count': len(headers),
'code_block_count': len(code_blocks),
'link_count': len(links)
})
# Use section-aware chunking for Markdown
chunks = self.chunker.chunk_by_sections(content, base_metadata)
for i, (chunk_text, chunk_metadata) in enumerate(chunks):
# Add Markdown-specific metadata to each chunk
chunk_headers = self._get_chunk_headers(chunk_text, headers)
chunk_metadata.update({
'markdown_headers': chunk_headers,
'has_code_blocks': bool(self.md_code_block_pattern.search(chunk_text)),
'chunk_links': self._get_chunk_links(chunk_text) if extract_links else []
})
yield DocumentChunk(
content=chunk_text,
metadata=chunk_metadata,
chunk_index=i,
source_file=file_path
)
def _extract_headers(self, content: str) -> list:
"""
Extract Markdown headers from content.
Args:
content: Markdown content
Returns:
List of header dictionaries with level, text, and position
"""
headers = []
for match in self.md_header_pattern.finditer(content):
level = len(match.group(1))
text = match.group(2).strip()
position = match.start()
headers.append({
'level': level,
'text': text,
'position': position
})
return headers
def _extract_code_blocks(self, content: str) -> list:
"""
Extract code blocks from Markdown content.
Args:
content: Markdown content
Returns:
List of code block dictionaries
"""
code_blocks = []
for match in self.md_code_block_pattern.finditer(content):
block = match.group(0)
# Extract language if specified
first_line = block.split('\n')[0]
language = first_line[3:].strip() if len(first_line) > 3 else ''
code_blocks.append({
'language': language,
'content': block,
'position': match.start(),
'length': len(block)
})
return code_blocks
def _extract_links(self, content: str) -> list:
"""
Extract links from Markdown content.
Args:
content: Markdown content
Returns:
List of link dictionaries
"""
links = []
for match in self.md_link_pattern.finditer(content):
text = match.group(1)
url = match.group(2)
position = match.start()
links.append({
'text': text,
'url': url,
'position': position
})
return links
def _get_chunk_headers(self, chunk_text: str, all_headers: list) -> list:
"""
Get headers that appear in a specific chunk.
Args:
chunk_text: The text chunk to analyze
all_headers: All headers from the document
Returns:
List of headers found in this chunk
"""
chunk_headers = []
for header in all_headers:
if header['text'] in chunk_text:
chunk_headers.append({
'level': header['level'],
'text': header['text']
})
return chunk_headers
def _get_chunk_links(self, chunk_text: str) -> list:
"""
Get links that appear in a specific chunk.
Args:
chunk_text: The text chunk to analyze
Returns:
List of links found in this chunk
"""
links = []
for match in self.md_link_pattern.finditer(chunk_text):
text = match.group(1)
url = match.group(2)
links.append({
'text': text,
'url': url
})
return links
# Register the text loader
def _register_text_loader():
"""Register text loader with the registry."""
try:
from .registry import register_loader
register_loader(TextLoader, ['txt', 'md', 'markdown', 'rst', 'text'])
logger.debug("Text loader registered successfully")
except ImportError:
logger.debug("Registry not available during import")
# Auto-register when module is imported
_register_text_loader()
```
--------------------------------------------------------------------------------
/claude-hooks/utilities/memory-client.js:
--------------------------------------------------------------------------------
```javascript
/**
* Unified Memory Client
* Supports both HTTP and MCP protocols with automatic fallback
*/
const https = require('https');
const http = require('http');
const { MCPClient } = require('./mcp-client');
class MemoryClient {
constructor(config) {
this.config = config;
this.protocol = config.protocol || 'auto';
this.preferredProtocol = config.preferredProtocol || 'mcp';
this.fallbackEnabled = config.fallbackEnabled !== false;
this.httpConfig = config.http || {};
this.mcpConfig = config.mcp || {};
// Connection state
this.activeProtocol = null;
this.httpAvailable = null;
this.mcpAvailable = null;
this.mcpClient = null;
// Cache successful connections
this.connectionCache = new Map();
}
/**
* Initialize connection using the configured protocol
*/
async connect() {
if (this.protocol === 'http') {
return this.connectHTTP();
} else if (this.protocol === 'mcp') {
return this.connectMCP();
} else {
// Auto mode: try preferred first, then fallback
return this.connectAuto();
}
}
/**
* Auto-connect: try preferred protocol first, fallback if needed
*/
async connectAuto() {
const protocols = this.preferredProtocol === 'mcp' ? ['mcp', 'http'] : ['http', 'mcp'];
for (const protocol of protocols) {
try {
if (protocol === 'mcp') {
await this.connectMCP();
this.activeProtocol = 'mcp';
return { protocol: 'mcp', client: this.mcpClient };
} else {
await this.connectHTTP();
this.activeProtocol = 'http';
return { protocol: 'http', client: null };
}
} catch (error) {
if (!this.fallbackEnabled || protocols.length === 1) {
throw error;
}
// Continue to try next protocol
continue;
}
}
throw new Error('Failed to connect using any available protocol');
}
/**
* Connect using MCP protocol
*/
async connectMCP() {
if (this.mcpClient) {
return this.mcpClient;
}
this.mcpClient = new MCPClient(
this.mcpConfig.serverCommand,
{
workingDir: this.mcpConfig.serverWorkingDir,
connectionTimeout: this.mcpConfig.connectionTimeout || 5000,
toolCallTimeout: this.mcpConfig.toolCallTimeout || 10000
}
);
// Handle MCP client errors gracefully
this.mcpClient.on('error', (error) => {
this.mcpAvailable = false;
});
await this.mcpClient.connect();
this.mcpAvailable = true;
this.activeProtocol = 'mcp';
return this.mcpClient;
}
/**
* Connect using HTTP protocol
*/
async connectHTTP() {
// Test HTTP connection with a simple health check
const healthResult = await this.queryHealthHTTP();
if (!healthResult.success) {
throw new Error(`HTTP connection failed: ${healthResult.error}`);
}
this.httpAvailable = true;
this.activeProtocol = 'http';
return true;
}
/**
* Query health status using active protocol
*/
async getHealthStatus() {
if (this.activeProtocol === 'mcp' && this.mcpClient) {
return this.mcpClient.getHealthStatus();
} else if (this.activeProtocol === 'http') {
return this.queryHealthHTTP();
} else {
throw new Error('No active connection available');
}
}
/**
* Query health via HTTP with automatic HTTPS → HTTP fallback
*/
async queryHealthHTTP() {
const healthPath = this.httpConfig.useDetailedHealthCheck ?
'/api/health/detailed' : '/api/health';
// Parse the configured endpoint to extract protocol, host, and port
let endpointUrl;
try {
endpointUrl = new URL(this.httpConfig.endpoint);
} catch (error) {
return { success: false, error: `Invalid endpoint URL: ${this.httpConfig.endpoint}` };
}
// Try with configured protocol first
const result = await this._attemptHealthCheck(endpointUrl, healthPath);
// If HTTPS failed, try HTTP fallback on same host:port
if (!result.success && endpointUrl.protocol === 'https:') {
const httpUrl = new URL(endpointUrl);
httpUrl.protocol = 'http:';
return this._attemptHealthCheck(httpUrl, healthPath);
}
return result;
}
/**
* Attempt health check with specific protocol/host/port
* @private
*/
async _attemptHealthCheck(baseUrl, healthPath) {
return new Promise((resolve) => {
try {
const url = new URL(healthPath, baseUrl);
const requestOptions = {
hostname: url.hostname,
port: url.port || (url.protocol === 'https:' ? 8443 : 8889),
path: url.pathname,
method: 'GET',
headers: {
'X-API-Key': this.httpConfig.apiKey,
'Accept': 'application/json'
},
timeout: this.httpConfig.healthCheckTimeout || 3000,
rejectUnauthorized: false // Allow self-signed certificates
};
const protocol = url.protocol === 'https:' ? https : http;
const req = protocol.request(requestOptions, (res) => {
let data = '';
res.on('data', (chunk) => data += chunk);
res.on('end', () => {
try {
if (res.statusCode === 200) {
const healthData = JSON.parse(data);
resolve({ success: true, data: healthData });
} else {
resolve({ success: false, error: `HTTP ${res.statusCode}`, fallback: true });
}
} catch (parseError) {
resolve({ success: false, error: 'Invalid JSON response', fallback: true });
}
});
});
req.on('error', (error) => {
resolve({ success: false, error: error.message, fallback: true });
});
req.on('timeout', () => {
req.destroy();
resolve({ success: false, error: 'Health check timeout', fallback: true });
});
req.end();
} catch (error) {
resolve({ success: false, error: error.message, fallback: true });
}
});
}
/**
* Query memories using active protocol
*/
async queryMemories(query, limit = 10) {
if (this.activeProtocol === 'mcp' && this.mcpClient) {
return this.mcpClient.queryMemories(query, limit);
} else if (this.activeProtocol === 'http') {
return this.queryMemoriesHTTP(query, limit);
} else {
throw new Error('No active connection available');
}
}
/**
* Query memories by time using active protocol
*/
async queryMemoriesByTime(timeQuery, limit = 10, semanticQuery = null) {
if (this.activeProtocol === 'mcp' && this.mcpClient) {
// TODO: Update MCP client to support semantic query parameter
return this.mcpClient.queryMemoriesByTime(timeQuery, limit);
} else if (this.activeProtocol === 'http') {
return this.queryMemoriesByTimeHTTP(timeQuery, limit, semanticQuery);
} else {
throw new Error('No active connection available');
}
}
/**
* Private helper: Perform HTTP POST request to API
* @private
*/
_performApiPost(path, payload) {
return new Promise((resolve) => {
const url = new URL(path, this.httpConfig.endpoint);
const postData = JSON.stringify(payload);
const options = {
hostname: url.hostname,
port: url.port || (url.protocol === 'https:' ? 8443 : 8889),
path: url.pathname,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(postData),
'X-API-Key': this.httpConfig.apiKey
},
rejectUnauthorized: false // Allow self-signed certificates
};
const protocol = url.protocol === 'https:' ? https : http;
const req = protocol.request(options, (res) => {
let data = '';
res.on('data', (chunk) => data += chunk);
res.on('end', () => {
try {
const response = JSON.parse(data);
// REST API returns { results: [{memory: {...}, similarity_score: ...}] }
if (response.results && Array.isArray(response.results)) {
// Extract memory objects from results and preserve similarity_score
const memories = response.results
.filter(result => result && result.memory)
.map(result => {
const memory = { ...result.memory };
// FIX: API returns Unix timestamps in SECONDS, but JavaScript Date expects MILLISECONDS
// Convert created_at and updated_at from seconds to milliseconds
if (memory.created_at && typeof memory.created_at === 'number') {
// Only convert if value looks like seconds (< year 2100 in milliseconds = 4102444800000)
if (memory.created_at < 4102444800) {
memory.created_at = memory.created_at * 1000;
}
}
if (memory.updated_at && typeof memory.updated_at === 'number') {
if (memory.updated_at < 4102444800) {
memory.updated_at = memory.updated_at * 1000;
}
}
return {
...memory,
similarity_score: result.similarity_score
};
});
resolve(memories);
} else {
resolve([]);
}
} catch (parseError) {
console.warn('[Memory Client] HTTP parse error:', parseError.message);
resolve([]);
}
});
});
req.on('error', (error) => {
console.warn('[Memory Client] HTTP network error:', error.message);
resolve([]);
});
req.write(postData);
req.end();
});
}
/**
* Query memories via HTTP REST API
*/
async queryMemoriesHTTP(query, limit = 10) {
return this._performApiPost('/api/search', {
query: query,
n_results: limit
});
}
/**
* Query memories by time via HTTP REST API
*/
async queryMemoriesByTimeHTTP(timeQuery, limit = 10, semanticQuery = null) {
const payload = {
query: timeQuery,
n_results: limit
};
// Add semantic query if provided for relevance filtering
if (semanticQuery) {
payload.semantic_query = semanticQuery;
}
return this._performApiPost('/api/search/by-time', payload);
}
/**
* Get connection status and available protocols
*/
getConnectionInfo() {
return {
activeProtocol: this.activeProtocol,
httpAvailable: this.httpAvailable,
mcpAvailable: this.mcpAvailable,
fallbackEnabled: this.fallbackEnabled,
preferredProtocol: this.preferredProtocol
};
}
/**
* Disconnect from active protocol
*/
async disconnect() {
if (this.mcpClient) {
try {
await this.mcpClient.disconnect();
} catch (error) {
// Ignore cleanup errors
}
this.mcpClient = null;
}
this.activeProtocol = null;
this.httpAvailable = null;
this.mcpAvailable = null;
this.connectionCache.clear();
}
}
module.exports = { MemoryClient };
```
--------------------------------------------------------------------------------
/scripts/testing/test_sqlite_vec_embeddings.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Diagnostic script to test SQLite-vec embedding functionality.
This script performs comprehensive tests to identify and diagnose issues
with the embedding pipeline in the MCP Memory Service.
"""
import asyncio
import os
import sys
import logging
import tempfile
import traceback
from datetime import datetime
# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from src.mcp_memory_service.models.memory import Memory
from src.mcp_memory_service.utils.hashing import generate_content_hash
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class EmbeddingDiagnostics:
"""Test suite for SQLite-vec embedding functionality."""
def __init__(self, db_path=None):
self.db_path = db_path or tempfile.mktemp(suffix='.db')
self.storage = None
self.test_results = []
async def run_all_tests(self):
"""Run all diagnostic tests."""
print("\n" + "="*60)
print("SQLite-vec Embedding Diagnostics")
print("="*60 + "\n")
tests = [
self.test_dependencies,
self.test_storage_initialization,
self.test_embedding_generation,
self.test_memory_storage,
self.test_semantic_search,
self.test_database_integrity,
self.test_edge_cases
]
for test in tests:
try:
await test()
except Exception as e:
self.log_error(f"{test.__name__} failed", e)
self.print_summary()
async def test_dependencies(self):
"""Test 1: Check required dependencies."""
print("\n[TEST 1] Checking dependencies...")
# Check sqlite-vec
try:
import sqlite_vec
self.log_success("sqlite-vec is installed")
except ImportError:
self.log_error("sqlite-vec is NOT installed", "pip install sqlite-vec")
# Check sentence-transformers
try:
from sentence_transformers import SentenceTransformer
self.log_success("sentence-transformers is installed")
except ImportError:
self.log_error("sentence-transformers is NOT installed", "pip install sentence-transformers")
# Check torch
try:
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
self.log_success(f"torch is installed (device: {device})")
except ImportError:
self.log_error("torch is NOT installed", "pip install torch")
async def test_storage_initialization(self):
"""Test 2: Initialize storage backend."""
print("\n[TEST 2] Initializing storage...")
try:
self.storage = SqliteVecMemoryStorage(self.db_path)
await self.storage.initialize()
self.log_success(f"Storage initialized at {self.db_path}")
# Check embedding model
if self.storage.embedding_model:
self.log_success(f"Embedding model loaded: {self.storage.embedding_model_name}")
self.log_success(f"Embedding dimension: {self.storage.embedding_dimension}")
else:
self.log_error("Embedding model NOT loaded", None)
except Exception as e:
self.log_error("Storage initialization failed", e)
async def test_embedding_generation(self):
"""Test 3: Generate embeddings."""
print("\n[TEST 3] Testing embedding generation...")
if not self.storage:
self.log_error("Storage not initialized", "Previous test failed")
return
test_texts = [
"The quick brown fox jumps over the lazy dog",
"Machine learning is transforming how we process information",
"SQLite is a lightweight embedded database"
]
for text in test_texts:
try:
embedding = self.storage._generate_embedding(text)
# Validate embedding
if not embedding:
self.log_error(f"Empty embedding for: {text[:30]}...", None)
elif len(embedding) != self.storage.embedding_dimension:
self.log_error(
f"Dimension mismatch for: {text[:30]}...",
f"Expected {self.storage.embedding_dimension}, got {len(embedding)}"
)
else:
self.log_success(f"Generated embedding for: {text[:30]}... (dim: {len(embedding)})")
except Exception as e:
self.log_error(f"Embedding generation failed for: {text[:30]}...", e)
async def test_memory_storage(self):
"""Test 4: Store memories with embeddings."""
print("\n[TEST 4] Testing memory storage...")
if not self.storage:
self.log_error("Storage not initialized", "Previous test failed")
return
test_memories = [
Memory(
content="Python is a versatile programming language",
content_hash=generate_content_hash("Python is a versatile programming language"),
tags=["programming", "python"],
memory_type="reference"
),
Memory(
content="The Eiffel Tower is located in Paris, France",
content_hash=generate_content_hash("The Eiffel Tower is located in Paris, France"),
tags=["geography", "landmarks"],
memory_type="fact"
),
Memory(
content="Machine learning models can learn patterns from data",
content_hash=generate_content_hash("Machine learning models can learn patterns from data"),
tags=["ml", "ai"],
memory_type="concept"
)
]
stored_count = 0
for memory in test_memories:
try:
success, message = await self.storage.store(memory)
if success:
self.log_success(f"Stored: {memory.content[:40]}...")
stored_count += 1
else:
self.log_error(f"Failed to store: {memory.content[:40]}...", message)
except Exception as e:
self.log_error(f"Storage exception for: {memory.content[:40]}...", e)
print(f"\nStored {stored_count}/{len(test_memories)} memories successfully")
async def test_semantic_search(self):
"""Test 5: Perform semantic search."""
print("\n[TEST 5] Testing semantic search...")
if not self.storage:
self.log_error("Storage not initialized", "Previous test failed")
return
test_queries = [
("programming languages", 2),
("tourist attractions in Europe", 2),
("artificial intelligence and data", 2),
("random unrelated query xyz123", 1)
]
for query, expected_min in test_queries:
try:
results = await self.storage.retrieve(query, n_results=5)
if not results:
self.log_error(f"No results for query: '{query}'", "Semantic search returned empty")
else:
self.log_success(f"Found {len(results)} results for: '{query}'")
# Show top result
if results:
top_result = results[0]
print(f" Top match: {top_result.memory.content[:50]}...")
print(f" Relevance: {top_result.relevance_score:.3f}")
except Exception as e:
self.log_error(f"Search failed for: '{query}'", e)
async def test_database_integrity(self):
"""Test 6: Check database integrity."""
print("\n[TEST 6] Checking database integrity...")
if not self.storage or not self.storage.conn:
self.log_error("Storage not initialized", "Previous test failed")
return
try:
# Check memory count
cursor = self.storage.conn.execute('SELECT COUNT(*) FROM memories')
memory_count = cursor.fetchone()[0]
# Check embedding count
cursor = self.storage.conn.execute('SELECT COUNT(*) FROM memory_embeddings')
embedding_count = cursor.fetchone()[0]
print(f" Memories table: {memory_count} rows")
print(f" Embeddings table: {embedding_count} rows")
if memory_count != embedding_count:
self.log_error(
"Row count mismatch",
f"Memories: {memory_count}, Embeddings: {embedding_count}"
)
else:
self.log_success("Database row counts match")
# Check for orphaned embeddings
cursor = self.storage.conn.execute('''
SELECT COUNT(*) FROM memory_embeddings e
WHERE NOT EXISTS (
SELECT 1 FROM memories m WHERE m.id = e.rowid
)
''')
orphaned = cursor.fetchone()[0]
if orphaned > 0:
self.log_error("Found orphaned embeddings", f"Count: {orphaned}")
else:
self.log_success("No orphaned embeddings found")
except Exception as e:
self.log_error("Database integrity check failed", e)
async def test_edge_cases(self):
"""Test 7: Edge cases and error handling."""
print("\n[TEST 7] Testing edge cases...")
if not self.storage:
self.log_error("Storage not initialized", "Previous test failed")
return
# Test empty content
try:
empty_memory = Memory(
content="",
content_hash=generate_content_hash(""),
tags=["empty"]
)
success, message = await self.storage.store(empty_memory)
if success:
self.log_error("Stored empty content", "Should have failed")
else:
self.log_success("Correctly rejected empty content")
except Exception as e:
self.log_success(f"Correctly raised exception for empty content: {type(e).__name__}")
# Test very long content
try:
long_content = "x" * 10000
long_memory = Memory(
content=long_content,
content_hash=generate_content_hash(long_content),
tags=["long"]
)
success, message = await self.storage.store(long_memory)
if success:
self.log_success("Handled long content")
else:
self.log_error("Failed on long content", message)
except Exception as e:
self.log_error("Exception on long content", e)
def log_success(self, message):
"""Log a successful test result."""
print(f" ✓ {message}")
self.test_results.append(("SUCCESS", message))
def log_error(self, message, error):
"""Log a failed test result."""
print(f" ✗ {message}")
if error:
if isinstance(error, Exception):
print(f" Error: {type(error).__name__}: {str(error)}")
else:
print(f" Info: {error}")
self.test_results.append(("ERROR", message, error))
def print_summary(self):
"""Print test summary."""
print("\n" + "="*60)
print("Test Summary")
print("="*60)
success_count = sum(1 for r in self.test_results if r[0] == "SUCCESS")
error_count = sum(1 for r in self.test_results if r[0] == "ERROR")
print(f"\nTotal tests: {len(self.test_results)}")
print(f"Successful: {success_count}")
print(f"Failed: {error_count}")
if error_count > 0:
print("\nFailed tests:")
for result in self.test_results:
if result[0] == "ERROR":
print(f" - {result[1]}")
print("\n" + "="*60)
async def main():
"""Run diagnostics."""
# Check if a database path was provided
db_path = sys.argv[1] if len(sys.argv) > 1 else None
if db_path and not os.path.exists(db_path):
print(f"Warning: Database file does not exist: {db_path}")
print("Creating new database for testing...")
diagnostics = EmbeddingDiagnostics(db_path)
await diagnostics.run_all_tests()
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/docs/development/code-quality-workflow.md:
--------------------------------------------------------------------------------
```markdown
# Code Quality Workflow Documentation
> **Version**: 1.0.0
> **Last Updated**: November 2025
> **Status**: Active
## Overview
This document describes the comprehensive code quality workflow for the MCP Memory Service project, integrating LLM-based analysis (Groq/Gemini) with static analysis (pyscn) for multi-layer quality assurance.
## Table of Contents
- [Quality Strategy](#quality-strategy)
- [Layer 1: Pre-commit Checks](#layer-1-pre-commit-checks)
- [Layer 2: PR Quality Gates](#layer-2-pr-quality-gates)
- [Layer 3: Periodic Reviews](#layer-3-periodic-reviews)
- [pyscn Integration](#pyscn-integration)
- [Health Score Thresholds](#health-score-thresholds)
- [Troubleshooting](#troubleshooting)
- [Appendix](#appendix)
## Quality Strategy
### Three-Layer Approach
The workflow uses three complementary layers to ensure code quality:
```
Layer 1: Pre-commit → Fast (<5s) → Every commit
Layer 2: PR Gate → Moderate (30s) → PR creation
Layer 3: Periodic → Deep (60s) → Weekly review
```
### Tool Selection
| Tool | Purpose | Speed | Blocking | When |
|------|---------|-------|----------|------|
| **Groq API** | LLM complexity checks | <5s | Yes (>8) | Pre-commit |
| **Gemini CLI** | LLM fallback | ~3s | Yes (>8) | Pre-commit |
| **pyscn** | Static analysis | 30-60s | Yes (<50) | PR + weekly |
| **code-quality-guard** | Manual review | Variable | No | On-demand |
## Layer 1: Pre-commit Checks
### Purpose
Catch quality issues before they enter the codebase.
### Checks Performed
1. **Development Environment Validation**
- Verify editable install (`pip install -e .`)
- Check version consistency (source vs installed)
- Prevent stale package issues
2. **Complexity Analysis** (Groq/Gemini)
- Rate functions 1-10
- Block if any function >8
- Warn if any function =7
3. **Security Scanning**
- SQL injection (raw SQL queries)
- XSS (unescaped HTML)
- Command injection (shell=True)
- Hardcoded secrets
### Usage
**Installation:**
```bash
ln -s ../../scripts/hooks/pre-commit .git/hooks/pre-commit
chmod +x .git/hooks/pre-commit
```
**Configuration:**
```bash
# Primary LLM: Groq (fast, simple auth)
export GROQ_API_KEY="your-groq-api-key"
# Fallback: Gemini CLI
npm install -g @google/generative-ai-cli
```
**Example Output:**
```
Running pre-commit quality checks...
✓ Using Groq API (fast mode)
Verifying development environment...
✓ Development environment OK
=== Checking: src/mcp_memory_service/storage/sqlite_vec.py ===
Checking complexity...
⚠️ High complexity detected (score 7)
initialize: Score 7 - Multiple nested conditions and error handling paths
Checking for security issues...
✓ No security issues
=== Pre-commit Check Summary ===
⚠️ HIGH COMPLEXITY WARNING
Some functions have high complexity (score 7).
Consider refactoring to improve maintainability.
Continue with commit anyway? (y/n)
```
### Thresholds
- **Block**: Complexity >8, any security issues
- **Warn**: Complexity =7
- **Pass**: Complexity <7, no security issues
## Layer 2: PR Quality Gates
### Purpose
Comprehensive checks before code review and merge.
### Standard Checks
Run automatically on PR creation:
```bash
bash scripts/pr/quality_gate.sh <PR_NUMBER>
```
**Checks:**
1. Code complexity (Gemini CLI)
2. Security vulnerabilities
3. Test coverage (code files vs test files)
4. Breaking changes detection
**Duration:** ~10-30 seconds
### Comprehensive Checks (with pyscn)
Optional deep analysis:
```bash
bash scripts/pr/quality_gate.sh <PR_NUMBER> --with-pyscn
```
**Additional Checks:**
- Cyclomatic complexity scoring
- Dead code detection
- Code duplication analysis
- Coupling metrics (CBO)
- Architecture violations
**Duration:** ~30-60 seconds
### Example Output
**Standard Checks:**
```
=== PR Quality Gate for #123 ===
Fetching changed files...
Changed Python files:
src/mcp_memory_service/storage/hybrid.py
tests/test_hybrid_storage.py
=== Check 1: Code Complexity ===
Analyzing: src/mcp_memory_service/storage/hybrid.py
✓ Complexity OK
=== Check 2: Security Vulnerabilities ===
Scanning: src/mcp_memory_service/storage/hybrid.py
✓ No security issues
=== Check 3: Test Coverage ===
Code files changed: 1
Test files changed: 1
✓ Test coverage OK
=== Check 4: Breaking Changes ===
No API changes detected
✓ No breaking changes
=== Quality Gate Summary ===
✅ ALL CHECKS PASSED
Quality Gate Results:
- Code complexity: ✅ OK
- Security scan: ✅ OK
- Test coverage: ✅ OK
- Breaking changes: ✅ None detected
```
**Comprehensive Checks (with pyscn):**
```
=== Check 5: pyscn Comprehensive Analysis ===
Running pyscn static analysis...
📊 Overall Health Score: 68/100
Quality Metrics:
- Complexity: 45/100 (Avg: 8.2, Max: 15)
- Dead Code: 75/100 (12 issues)
- Duplication: 40/100 (4.2% duplication)
⚠️ WARNING - Health score: 68 (threshold: 50)
✓ pyscn analysis completed
```
### Thresholds
- **Block PR**: Security issues, health score <50
- **Warn**: Complexity >7, health score 50-69
- **Pass**: No security issues, health score ≥70
## Layer 3: Periodic Reviews
### Purpose
Track quality trends, detect regressions, plan refactoring.
### Metrics Tracking
**Run manually or via cron:**
```bash
bash scripts/quality/track_pyscn_metrics.sh
```
**Frequency:** Weekly or after major changes
**Stored Data:**
- Health score over time
- Complexity metrics (avg, max)
- Duplication percentage
- Dead code issues
- Architecture violations
**Output:**
- CSV file: `.pyscn/history/metrics.csv`
- HTML report: `.pyscn/reports/analyze_*.html`
**Example Output:**
```
=== pyscn Metrics Tracking ===
Running pyscn analysis (this may take 30-60 seconds)...
✓ Analysis complete
=== Metrics Extracted ===
Health Score: 68/100
Complexity: 45/100 (Avg: 8.2, Max: 15)
Dead Code: 75/100 (12 issues)
Duplication: 40/100 (4.2%)
Coupling: 100/100
Dependencies: 90/100
Architecture: 80/100
✓ Metrics saved to .pyscn/history/metrics.csv
=== Comparison to Previous Run ===
Previous: 70/100 (2025-11-16)
Current: 68/100 (2025-11-23)
Change: -2 points
⚠️ Regression: -2 points
=== Trend Summary ===
Total measurements: 5
Average health score: 69/100
Highest: 72/100
Lowest: 65/100
```
### Weekly Review
**Run manually or via cron:**
```bash
bash scripts/quality/weekly_quality_review.sh [--create-issue]
```
**Features:**
- Compare current vs last week's metrics
- Generate markdown trend report
- Identify regressions (>5% health score drop)
- Optionally create GitHub issue for significant regressions
**Output:** `docs/development/quality-review-YYYYMMDD.md`
**Example Report:**
```markdown
# Weekly Quality Review - November 23, 2025
## Summary
**Overall Trend:** ➡️ Stable
| Metric | Previous | Current | Change |
|--------|----------|---------|--------|
| Health Score | 70/100 | 68/100 | -2 |
| Complexity | 48/100 | 45/100 | -3 |
| Duplication | 42/100 | 40/100 | -2 |
## Status
### ✅ Acceptable
Health score ≥70 indicates good code quality:
- Continue current development practices
- Monitor trends for regressions
- Address new issues proactively
## Observations
- ⚠️ Complexity score decreased - New complex code introduced
- ⚠️ Code duplication increased - Review for consolidation opportunities
```
## pyscn Integration
### Installation
```bash
pip install pyscn
```
**Repository:** https://github.com/ludo-technologies/pyscn
### Capabilities
1. **Cyclomatic Complexity**
- Function-level scoring (1-100)
- Average, maximum, high-risk functions
- Detailed complexity breakdown
2. **Dead Code Detection**
- Unreachable code after returns
- Unused imports
- Unused variables/functions
3. **Clone Detection**
- Exact duplicates
- Near-exact duplicates (>90% similarity)
- Clone groups and fragments
4. **Coupling Metrics (CBO)**
- Coupling Between Objects
- High-coupling classes
- Average coupling score
5. **Dependency Analysis**
- Module dependencies
- Circular dependency detection
- Dependency depth
6. **Architecture Validation**
- Layered architecture compliance
- Layer violation detection
- Cross-layer dependencies
### Usage
**Full Analysis:**
```bash
pyscn analyze .
```
**View Report:**
```bash
open .pyscn/reports/analyze_*.html
```
**JSON Output:**
```bash
pyscn analyze . --format json > /tmp/metrics.json
```
### Report Interpretation
**Health Score Breakdown:**
| Component | Score | Grade | Interpretation |
|-----------|-------|-------|----------------|
| **Complexity** | 40/100 | Poor | 28 high-risk functions (>7), avg 9.5 |
| **Dead Code** | 70/100 | Fair | 27 issues, 2 critical |
| **Duplication** | 30/100 | Poor | 6.0% duplication, 18 clone groups |
| **Coupling** | 100/100 | Excellent | Avg CBO 1.5, 0 high-coupling |
| **Dependencies** | 85/100 | Good | 0 cycles, depth 7 |
| **Architecture** | 75/100 | Good | 58 violations, 75.5% compliance |
**Example: Complexity Report**
```
Top 5 High-Complexity Functions:
1. install.py::main() - Complexity: 62, Nesting: 6
2. config.py::__main__() - Complexity: 42, Nesting: 0
3. sqlite_vec.py::initialize() - Complexity: 38, Nesting: 10
4. oauth/authorization.py::token() - Complexity: 35, Nesting: 4
5. install.py::install_package() - Complexity: 33, Nesting: 4
```
**Action:** Refactor functions with complexity >10 using:
- Extract method refactoring
- Strategy pattern for conditionals
- Helper functions for complex operations
## Health Score Thresholds
### Release Blocker (<50)
**Status:** 🔴 **Cannot merge or release**
**Required Actions:**
1. Review full pyscn report
2. Identify top 5 complexity hotspots
3. Create refactoring tasks
4. Schedule immediate refactoring sprint
5. Track progress in issue #240
**Timeline:** Must resolve before any merges
### Action Required (50-69)
**Status:** 🟡 **Plan refactoring within 2 weeks**
**Recommended Actions:**
1. Analyze complexity trends
2. Create project board for tracking
3. Allocate 20% sprint capacity to quality
4. Review duplication for consolidation
5. Remove dead code
**Timeline:** 2-week improvement plan
### Good (70-84)
**Status:** ✅ **Monitor trends, continue development**
**Maintenance:**
- Monthly quality reviews
- Track complexity trends
- Keep health score above 70
- Address new issues proactively
### Excellent (85+)
**Status:** 🎯 **Maintain current standards**
**Best Practices:**
- Document quality patterns
- Share refactoring techniques
- Mentor team members
- Celebrate wins
## Troubleshooting
### Common Issues
**Issue:** pyscn not found
```bash
# Solution
pip install pyscn
```
**Issue:** Pre-commit hook not running
```bash
# Solution
chmod +x .git/hooks/pre-commit
ls -la .git/hooks/pre-commit # Verify symlink
```
**Issue:** Groq API errors
```bash
# Solution 1: Check API key
echo $GROQ_API_KEY # Should not be empty
# Solution 2: Test Groq connection
curl https://api.groq.com/openai/v1/models \
-H "Authorization: Bearer $GROQ_API_KEY"
# Solution 3: Fall back to Gemini
unset GROQ_API_KEY # Temporarily disable Groq
```
**Issue:** pyscn analysis too slow
```bash
# Solution: Run on specific directories
pyscn analyze src/ # Exclude tests, scripts
pyscn analyze --exclude "tests/*,scripts/*"
```
**Issue:** False positive security warnings
```bash
# Solution: Review and whitelist
# Add comment explaining why code is safe
# Example:
# SAFE: User input sanitized via parameterized query
```
### Performance Tuning
**Pre-commit Hooks:**
- Use Groq API (200-300ms vs Gemini 2-3s)
- Analyze only staged files
- Skip checks if no Python files
**PR Quality Gates:**
- Run standard checks first (fast)
- Use `--with-pyscn` for comprehensive analysis
- Cache pyscn reports for repeated checks
**Periodic Reviews:**
- Schedule during off-hours (cron)
- Use JSON output for scripting
- Archive old reports (keep last 30 days)
## Appendix
### Script Reference
| Script | Purpose | Usage |
|--------|---------|-------|
| `scripts/hooks/pre-commit` | Pre-commit quality checks | Auto-runs on `git commit` |
| `scripts/pr/quality_gate.sh` | PR quality gates | `bash scripts/pr/quality_gate.sh <PR>` |
| `scripts/pr/run_pyscn_analysis.sh` | pyscn PR analysis | `bash scripts/pr/run_pyscn_analysis.sh --pr <PR>` |
| `scripts/quality/track_pyscn_metrics.sh` | Metrics tracking | `bash scripts/quality/track_pyscn_metrics.sh` |
| `scripts/quality/weekly_quality_review.sh` | Weekly review | `bash scripts/quality/weekly_quality_review.sh` |
### Configuration Files
| File | Purpose |
|------|---------|
| `.pyscn/.gitignore` | Ignore pyscn reports and history |
| `.pyscn/history/metrics.csv` | Historical quality metrics |
| `.pyscn/reports/*.html` | pyscn HTML reports |
| `.claude/agents/code-quality-guard.md` | Code quality agent specification |
### Related Documentation
- [CLAUDE.md](../../CLAUDE.md) - Project conventions and workflows
- [`.claude/agents/code-quality-guard.md`](../../.claude/agents/code-quality-guard.md) - Agent workflows
- [scripts/README.md](../../scripts/README.md) - Script documentation
- [Issue #240](https://github.com/doobidoo/mcp-memory-service/issues/240) - Quality improvements tracking
### External Resources
- [pyscn GitHub](https://github.com/ludo-technologies/pyscn) - pyscn documentation
- [Groq API Docs](https://console.groq.com/docs) - Groq API reference
- [Gemini CLI](https://www.npmjs.com/package/@google/generative-ai-cli) - Gemini CLI docs
---
**Document Version History:**
- v1.0.0 (2025-11-24): Initial comprehensive documentation with pyscn integration
```
--------------------------------------------------------------------------------
/scripts/migration/migrate_to_cloudflare.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Migration script for moving data to Cloudflare backend.
Supports migration from SQLite-vec and ChromaDB backends.
"""
import asyncio
import json
import logging
import os
import sys
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
import argparse
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
from mcp_memory_service.models.memory import Memory
from mcp_memory_service.utils.hashing import generate_content_hash
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DataMigrator:
"""Handles migration of data to Cloudflare backend."""
def __init__(self):
self.source_storage = None
self.cloudflare_storage = None
async def export_from_sqlite_vec(self, sqlite_path: str) -> List[Dict[str, Any]]:
"""Export data from SQLite-vec backend."""
logger.info(f"Exporting data from SQLite-vec: {sqlite_path}")
try:
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
storage = SqliteVecMemoryStorage(sqlite_path)
await storage.initialize()
# Get all memories
memories = []
stats = await storage.get_stats()
total_memories = stats.get('total_memories', 0)
logger.info(f"Found {total_memories} memories to export")
# Get recent memories in batches
batch_size = 100
exported_count = 0
while exported_count < total_memories:
batch = await storage.get_recent_memories(batch_size)
if not batch:
break
for memory in batch:
memory_data = {
'content': memory.content,
'content_hash': memory.content_hash,
'tags': memory.tags,
'memory_type': memory.memory_type,
'metadata': memory.metadata,
'created_at': memory.created_at,
'created_at_iso': memory.created_at_iso,
'updated_at': memory.updated_at,
'updated_at_iso': memory.updated_at_iso
}
memories.append(memory_data)
exported_count += 1
logger.info(f"Exported {exported_count}/{total_memories} memories")
# Break if we got fewer memories than batch size
if len(batch) < batch_size:
break
logger.info(f"Successfully exported {len(memories)} memories from SQLite-vec")
return memories
except Exception as e:
logger.error(f"Failed to export from SQLite-vec: {e}")
raise
async def export_from_chroma(self, chroma_path: str) -> List[Dict[str, Any]]:
"""Export data from ChromaDB backend."""
logger.info(f"Exporting data from ChromaDB: {chroma_path}")
try:
from mcp_memory_service.storage.chroma import ChromaMemoryStorage
storage = ChromaMemoryStorage(chroma_path, preload_model=False)
await storage.initialize()
# Get all memories
memories = []
stats = await storage.get_stats()
total_memories = stats.get('total_memories', 0)
logger.info(f"Found {total_memories} memories to export")
# Get recent memories
recent_memories = await storage.get_recent_memories(total_memories)
for memory in recent_memories:
memory_data = {
'content': memory.content,
'content_hash': memory.content_hash,
'tags': memory.tags,
'memory_type': memory.memory_type,
'metadata': memory.metadata,
'created_at': memory.created_at,
'created_at_iso': memory.created_at_iso,
'updated_at': memory.updated_at,
'updated_at_iso': memory.updated_at_iso
}
memories.append(memory_data)
logger.info(f"Successfully exported {len(memories)} memories from ChromaDB")
return memories
except Exception as e:
logger.error(f"Failed to export from ChromaDB: {e}")
raise
async def import_to_cloudflare(self, memories: List[Dict[str, Any]]) -> bool:
"""Import data to Cloudflare backend."""
logger.info(f"Importing {len(memories)} memories to Cloudflare backend")
try:
# Initialize Cloudflare storage
from mcp_memory_service.storage.cloudflare import CloudflareStorage
# Get configuration from environment
api_token = os.getenv('CLOUDFLARE_API_TOKEN')
account_id = os.getenv('CLOUDFLARE_ACCOUNT_ID')
vectorize_index = os.getenv('CLOUDFLARE_VECTORIZE_INDEX')
d1_database_id = os.getenv('CLOUDFLARE_D1_DATABASE_ID')
r2_bucket = os.getenv('CLOUDFLARE_R2_BUCKET')
if not all([api_token, account_id, vectorize_index, d1_database_id]):
raise ValueError("Missing required Cloudflare environment variables")
storage = CloudflareStorage(
api_token=api_token,
account_id=account_id,
vectorize_index=vectorize_index,
d1_database_id=d1_database_id,
r2_bucket=r2_bucket
)
await storage.initialize()
# Import memories in batches
batch_size = 10 # Smaller batches for Cloudflare API limits
imported_count = 0
failed_count = 0
for i in range(0, len(memories), batch_size):
batch = memories[i:i + batch_size]
for memory_data in batch:
try:
# Convert to Memory object
memory = Memory(
content=memory_data['content'],
content_hash=memory_data['content_hash'],
tags=memory_data.get('tags', []),
memory_type=memory_data.get('memory_type'),
metadata=memory_data.get('metadata', {}),
created_at=memory_data.get('created_at'),
created_at_iso=memory_data.get('created_at_iso'),
updated_at=memory_data.get('updated_at'),
updated_at_iso=memory_data.get('updated_at_iso')
)
# Store in Cloudflare
success, message = await storage.store(memory)
if success:
imported_count += 1
logger.debug(f"Imported memory: {memory.content_hash[:16]}...")
else:
failed_count += 1
logger.warning(f"Failed to import memory {memory.content_hash[:16]}: {message}")
except Exception as e:
failed_count += 1
logger.error(f"Error importing memory: {e}")
# Progress update
processed = min(i + batch_size, len(memories))
logger.info(f"Progress: {processed}/{len(memories)} processed, {imported_count} imported, {failed_count} failed")
# Rate limiting - small delay between batches
await asyncio.sleep(0.5)
# Final cleanup
await storage.close()
logger.info(f"Migration completed: {imported_count} imported, {failed_count} failed")
return failed_count == 0
except Exception as e:
logger.error(f"Failed to import to Cloudflare: {e}")
raise
async def export_to_file(self, source_backend: str, source_path: str, output_file: str) -> bool:
"""Export data from source backend to JSON file."""
try:
if source_backend == 'sqlite_vec':
memories = await self.export_from_sqlite_vec(source_path)
elif source_backend == 'chroma':
memories = await self.export_from_chroma(source_path)
else:
raise ValueError(f"Unsupported source backend: {source_backend}")
# Save to JSON file
export_data = {
'source_backend': source_backend,
'source_path': source_path,
'export_timestamp': time.time(),
'total_memories': len(memories),
'memories': memories
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
logger.info(f"Exported {len(memories)} memories to {output_file}")
return True
except Exception as e:
logger.error(f"Export failed: {e}")
return False
async def import_from_file(self, input_file: str) -> bool:
"""Import data from JSON file to Cloudflare backend."""
try:
with open(input_file, 'r', encoding='utf-8') as f:
export_data = json.load(f)
memories = export_data.get('memories', [])
logger.info(f"Loaded {len(memories)} memories from {input_file}")
return await self.import_to_cloudflare(memories)
except Exception as e:
logger.error(f"Import failed: {e}")
return False
async def migrate_direct(self, source_backend: str, source_path: str) -> bool:
"""Direct migration from source backend to Cloudflare."""
try:
# Export data
if source_backend == 'sqlite_vec':
memories = await self.export_from_sqlite_vec(source_path)
elif source_backend == 'chroma':
memories = await self.export_from_chroma(source_path)
else:
raise ValueError(f"Unsupported source backend: {source_backend}")
# Import to Cloudflare
return await self.import_to_cloudflare(memories)
except Exception as e:
logger.error(f"Direct migration failed: {e}")
return False
async def main():
"""Main migration function."""
parser = argparse.ArgumentParser(description='Migrate data to Cloudflare backend')
subparsers = parser.add_subparsers(dest='command', help='Migration commands')
# Export command
export_parser = subparsers.add_parser('export', help='Export data to JSON file')
export_parser.add_argument('--source', choices=['sqlite_vec', 'chroma'], required=True,
help='Source backend type')
export_parser.add_argument('--source-path', required=True,
help='Path to source database')
export_parser.add_argument('--output', required=True,
help='Output JSON file path')
# Import command
import_parser = subparsers.add_parser('import', help='Import data from JSON file')
import_parser.add_argument('--input', required=True,
help='Input JSON file path')
# Direct migration command
migrate_parser = subparsers.add_parser('migrate', help='Direct migration to Cloudflare')
migrate_parser.add_argument('--source', choices=['sqlite_vec', 'chroma'], required=True,
help='Source backend type')
migrate_parser.add_argument('--source-path', required=True,
help='Path to source database')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
migrator = DataMigrator()
try:
if args.command == 'export':
success = await migrator.export_to_file(
args.source, args.source_path, args.output
)
elif args.command == 'import':
success = await migrator.import_from_file(args.input)
elif args.command == 'migrate':
success = await migrator.migrate_direct(args.source, args.source_path)
if success:
logger.info("Migration completed successfully!")
sys.exit(0)
else:
logger.error("Migration failed!")
sys.exit(1)
except KeyboardInterrupt:
logger.info("Migration cancelled by user")
sys.exit(1)
except Exception as e:
logger.error(f"Migration error: {e}")
sys.exit(1)
if __name__ == '__main__':
asyncio.run(main())
```
--------------------------------------------------------------------------------
/claude-hooks/core/topic-change.js:
--------------------------------------------------------------------------------
```javascript
/**
* Claude Code Topic Change Hook
* Monitors conversation flow and dynamically loads relevant memories when topics evolve
* Phase 2: Intelligent Context Updates
*/
const fs = require('fs').promises;
const path = require('path');
const https = require('https');
// Import utilities
const { analyzeConversation, detectTopicChanges } = require('../utilities/conversation-analyzer');
const { scoreMemoryRelevance } = require('../utilities/memory-scorer');
const { formatMemoriesForContext } = require('../utilities/context-formatter');
// Global state for conversation tracking
let conversationState = {
previousAnalysis: null,
loadedMemoryHashes: new Set(),
sessionContext: null,
topicChangeCount: 0
};
/**
* Load hook configuration
*/
async function loadConfig() {
try {
const configPath = path.join(__dirname, '../config.json');
const configData = await fs.readFile(configPath, 'utf8');
return JSON.parse(configData);
} catch (error) {
console.warn('[Topic Change Hook] Using default configuration:', error.message);
return {
memoryService: {
endpoint: 'https://10.0.1.30:8443',
apiKey: 'test-key-123',
maxMemoriesPerSession: 8
},
hooks: {
topicChange: {
enabled: true,
timeout: 5000,
priority: 'low',
minSignificanceScore: 0.3,
maxMemoriesPerUpdate: 3
}
}
};
}
}
/**
* Query memory service for topic-specific memories
*/
async function queryMemoryService(endpoint, apiKey, query, options = {}) {
return new Promise((resolve, reject) => {
const {
limit = 5,
excludeHashes = []
} = options;
const url = new URL('/mcp', endpoint);
const postData = JSON.stringify({
jsonrpc: '2.0',
id: Date.now(),
method: 'tools/call',
params: {
name: 'retrieve_memory',
arguments: {
query: query,
limit: limit
}
}
});
const requestOptions = {
hostname: url.hostname,
port: url.port,
path: url.pathname,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${apiKey}`,
'Content-Length': Buffer.byteLength(postData)
},
rejectUnauthorized: false,
timeout: 5000
};
const req = https.request(requestOptions, (res) => {
let data = '';
res.on('data', (chunk) => {
data += chunk;
});
res.on('end', () => {
try {
const response = JSON.parse(data);
if (response.error) {
console.error('[Topic Change Hook] Memory service error:', response.error);
resolve([]);
return;
}
// Parse memory results from response
const memories = parseMemoryResults(response.result);
// Filter out already loaded memories
const filteredMemories = memories.filter(memory =>
!excludeHashes.includes(memory.content_hash)
);
console.log(`[Topic Change Hook] Retrieved ${filteredMemories.length} new memories for topic query`);
resolve(filteredMemories);
} catch (parseError) {
console.error('[Topic Change Hook] Failed to parse memory response:', parseError.message);
resolve([]);
}
});
});
req.on('error', (error) => {
console.error('[Topic Change Hook] Memory service request failed:', error.message);
resolve([]);
});
req.on('timeout', () => {
console.error('[Topic Change Hook] Memory service request timed out');
req.destroy();
resolve([]);
});
req.write(postData);
req.end();
});
}
/**
* Parse memory results from MCP response
*/
function parseMemoryResults(result) {
try {
if (result && result.content && result.content[0] && result.content[0].text) {
const text = result.content[0].text;
// Try to extract results array from the response text
const resultsMatch = text.match(/'results':\s*(\[[\s\S]*?\])/);
if (resultsMatch) {
// Use eval carefully on controlled content
const resultsArray = eval(resultsMatch[1]);
return resultsArray || [];
}
}
return [];
} catch (error) {
console.error('[Topic Change Hook] Error parsing memory results:', error.message);
return [];
}
}
/**
* Generate search queries from conversation analysis
*/
function generateTopicQueries(analysis, changes) {
const queries = [];
// Query for new topics
changes.newTopics.forEach(topic => {
queries.push({
query: topic.name,
weight: topic.confidence,
type: 'topic'
});
});
// Query for current intent if changed
if (changes.changedIntents && analysis.intent) {
queries.push({
query: analysis.intent.name,
weight: analysis.intent.confidence,
type: 'intent'
});
}
// Query for high-confidence entities
analysis.entities
.filter(entity => entity.confidence > 0.7)
.slice(0, 2) // Limit to top 2 entities
.forEach(entity => {
queries.push({
query: entity.name,
weight: entity.confidence,
type: 'entity'
});
});
// Sort by weight and return top queries
return queries
.sort((a, b) => b.weight - a.weight)
.slice(0, 3); // Limit to top 3 queries
}
/**
* Format context update message
*/
function formatContextUpdate(memories, analysis, changes) {
if (memories.length === 0) {
return null;
}
let updateMessage = '\n🧠 **Dynamic Memory Context Update**\n\n';
// Explain why context is being updated
if (changes.newTopics.length > 0) {
updateMessage += `**New topics detected:** ${changes.newTopics.map(t => t.name).join(', ')}\n\n`;
}
if (changes.changedIntents) {
updateMessage += `**Conversation focus shifted:** ${analysis.intent.name}\n\n`;
}
// Add relevant memories
updateMessage += '**Additional relevant context:**\n';
memories.slice(0, 3).forEach((memory, index) => {
const content = memory.content.length > 120 ?
memory.content.substring(0, 120) + '...' :
memory.content;
updateMessage += `${index + 1}. ${content}\n`;
if (memory.tags && memory.tags.length > 0) {
updateMessage += ` *Tags: ${memory.tags.slice(0, 3).join(', ')}*\n`;
}
updateMessage += '\n';
});
updateMessage += '---\n';
return updateMessage;
}
/**
* Main topic change detection and processing
* @param {object} context - Conversation context
*/
async function onTopicChange(context) {
console.log('[Topic Change Hook] Analyzing conversation for topic changes...');
try {
const config = await loadConfig();
// Check if topic change hook is enabled
if (!config.hooks?.topicChange?.enabled) {
console.log('[Topic Change Hook] Hook is disabled, skipping');
return;
}
const {
minSignificanceScore = 0.3,
maxMemoriesPerUpdate = 3
} = config.hooks.topicChange;
// Analyze current conversation
const currentAnalysis = analyzeConversation(context.conversationText || '', {
extractTopics: true,
extractEntities: true,
detectIntent: true,
minTopicConfidence: 0.3
});
// Detect topic changes
const changes = detectTopicChanges(conversationState.previousAnalysis, currentAnalysis);
// Only proceed if significant topic change detected
if (!changes.hasTopicShift || changes.significanceScore < minSignificanceScore) {
console.log(`[Topic Change Hook] No significant topic change detected (score: ${changes.significanceScore.toFixed(2)})`);
conversationState.previousAnalysis = currentAnalysis;
return;
}
console.log(`[Topic Change Hook] Significant topic change detected (score: ${changes.significanceScore.toFixed(2)})`);
console.log(`[Topic Change Hook] New topics: ${changes.newTopics.map(t => t.name).join(', ')}`);
// Generate search queries for new topics
const queries = generateTopicQueries(currentAnalysis, changes);
if (queries.length === 0) {
console.log('[Topic Change Hook] No actionable queries generated');
conversationState.previousAnalysis = currentAnalysis;
return;
}
// Query memory service for each topic
const allMemories = [];
for (const queryObj of queries) {
const memories = await queryMemoryService(
config.memoryService.endpoint,
config.memoryService.apiKey,
queryObj.query,
{
limit: 2,
excludeHashes: Array.from(conversationState.loadedMemoryHashes)
}
);
// Add query context to memories
memories.forEach(memory => {
memory.queryContext = queryObj;
});
allMemories.push(...memories);
}
if (allMemories.length === 0) {
console.log('[Topic Change Hook] No new relevant memories found');
conversationState.previousAnalysis = currentAnalysis;
return;
}
// Score memories for relevance
const projectContext = conversationState.sessionContext || { name: 'unknown' };
const scoredMemories = scoreMemoryRelevance(allMemories, projectContext, {
includeConversationContext: true,
conversationAnalysis: currentAnalysis
});
// Select top memories for context update
const selectedMemories = scoredMemories
.filter(memory => memory.relevanceScore > 0.3)
.slice(0, maxMemoriesPerUpdate);
if (selectedMemories.length === 0) {
console.log('[Topic Change Hook] No high-relevance memories found');
conversationState.previousAnalysis = currentAnalysis;
return;
}
// Track loaded memories
selectedMemories.forEach(memory => {
conversationState.loadedMemoryHashes.add(memory.content_hash);
});
// Format context update
const contextUpdate = formatContextUpdate(selectedMemories, currentAnalysis, changes);
if (contextUpdate) {
// In a real implementation, this would inject the context into the conversation
console.log('[Topic Change Hook] Context update generated:');
console.log(contextUpdate);
// For now, we'll simulate the context injection
if (context.onContextUpdate && typeof context.onContextUpdate === 'function') {
context.onContextUpdate(contextUpdate);
}
}
// Update conversation state
conversationState.previousAnalysis = currentAnalysis;
conversationState.topicChangeCount++;
console.log(`[Topic Change Hook] Topic change processing completed (${conversationState.topicChangeCount} changes total)`);
} catch (error) {
console.error('[Topic Change Hook] Error processing topic change:', error.message);
}
}
/**
* Initialize topic change tracking for a new session
* @param {object} sessionContext - Session context information
*/
function initializeTopicTracking(sessionContext) {
console.log('[Topic Change Hook] Initializing topic tracking for new session');
conversationState = {
previousAnalysis: null,
loadedMemoryHashes: new Set(),
sessionContext: sessionContext,
topicChangeCount: 0
};
}
/**
* Reset topic tracking state
*/
function resetTopicTracking() {
console.log('[Topic Change Hook] Resetting topic tracking state');
conversationState = {
previousAnalysis: null,
loadedMemoryHashes: new Set(),
sessionContext: null,
topicChangeCount: 0
};
}
/**
* Get current topic tracking statistics
*/
function getTopicTrackingStats() {
return {
topicChangeCount: conversationState.topicChangeCount,
loadedMemoriesCount: conversationState.loadedMemoryHashes.size,
hasSessionContext: !!conversationState.sessionContext,
lastAnalysis: conversationState.previousAnalysis
};
}
module.exports = {
onTopicChange,
initializeTopicTracking,
resetTopicTracking,
getTopicTrackingStats
};
```
--------------------------------------------------------------------------------
/scripts/installation/install_macos_service.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
macOS LaunchAgent installer for MCP Memory Service.
Creates and manages LaunchAgent plist files for automatic service startup.
"""
import os
import sys
import json
import plistlib
import argparse
import subprocess
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
from scripts.service_utils import (
get_project_root, get_service_paths, get_service_environment,
generate_api_key, save_service_config, load_service_config,
check_dependencies, get_service_command, print_service_info
)
except ImportError as e:
print(f"Error importing service utilities: {e}")
print("Please ensure you're running this from the project directory")
sys.exit(1)
SERVICE_LABEL = "com.mcp.memory-service"
SERVICE_NAME = "MCP Memory Service"
def get_launchd_paths(user_level=True):
"""Get the paths for LaunchAgent/LaunchDaemon files."""
if user_level:
# User-level LaunchAgent
plist_dir = Path.home() / "Library" / "LaunchAgents"
plist_file = plist_dir / f"{SERVICE_LABEL}.plist"
else:
# System-level LaunchDaemon (requires root)
plist_dir = Path("/Library/LaunchDaemons")
plist_file = plist_dir / f"{SERVICE_LABEL}.plist"
return plist_dir, plist_file
def create_plist(api_key, user_level=True):
"""Create the LaunchAgent/LaunchDaemon plist configuration."""
paths = get_service_paths()
command = get_service_command()
environment = get_service_environment()
environment['MCP_API_KEY'] = api_key
# Create plist dictionary
plist_dict = {
'Label': SERVICE_LABEL,
'ProgramArguments': command,
'EnvironmentVariables': environment,
'WorkingDirectory': str(paths['project_root']),
'RunAtLoad': True,
'KeepAlive': {
'SuccessfulExit': False,
'Crashed': True
},
'StandardOutPath': str(paths['log_dir'] / 'mcp-memory-service.log'),
'StandardErrorPath': str(paths['log_dir'] / 'mcp-memory-service.error.log'),
'ProcessType': 'Interactive' if user_level else 'Background',
}
# Add user/group for system-level daemon
if not user_level:
plist_dict['UserName'] = os.environ.get('USER', 'nobody')
plist_dict['GroupName'] = 'staff'
return plist_dict
def create_shell_scripts():
"""Create convenient shell scripts for service management."""
paths = get_service_paths()
scripts_dir = paths['scripts_dir'] / 'macos'
scripts_dir.mkdir(exist_ok=True)
# Start script
start_script = scripts_dir / 'start_service.sh'
with open(start_script, 'w') as f:
f.write(f'''#!/bin/bash
echo "Starting {SERVICE_NAME}..."
launchctl load ~/Library/LaunchAgents/{SERVICE_LABEL}.plist
if [ $? -eq 0 ]; then
echo "✅ Service started successfully!"
else
echo "❌ Failed to start service"
fi
''')
start_script.chmod(0o755)
# Stop script
stop_script = scripts_dir / 'stop_service.sh'
with open(stop_script, 'w') as f:
f.write(f'''#!/bin/bash
echo "Stopping {SERVICE_NAME}..."
launchctl unload ~/Library/LaunchAgents/{SERVICE_LABEL}.plist
if [ $? -eq 0 ]; then
echo "✅ Service stopped successfully!"
else
echo "❌ Failed to stop service"
fi
''')
stop_script.chmod(0o755)
# Status script
status_script = scripts_dir / 'service_status.sh'
with open(status_script, 'w') as f:
f.write(f'''#!/bin/bash
echo "{SERVICE_NAME} Status:"
echo "-" | tr '-' '='
launchctl list | grep {SERVICE_LABEL}
if [ $? -eq 0 ]; then
echo ""
echo "Service is loaded. PID shown above (- means not running)"
else
echo "Service is not loaded"
fi
''')
status_script.chmod(0o755)
# Uninstall script
uninstall_script = scripts_dir / 'uninstall_service.sh'
with open(uninstall_script, 'w') as f:
f.write(f'''#!/bin/bash
echo "This will uninstall {SERVICE_NAME}."
read -p "Are you sure? (y/N): " confirm
if [[ ! "$confirm" =~ ^[Yy]$ ]]; then
exit 0
fi
echo "Stopping service..."
launchctl unload ~/Library/LaunchAgents/{SERVICE_LABEL}.plist 2>/dev/null
echo "Removing service files..."
rm -f ~/Library/LaunchAgents/{SERVICE_LABEL}.plist
echo "✅ Service uninstalled"
''')
uninstall_script.chmod(0o755)
return scripts_dir
def install_service(user_level=True):
"""Install the macOS LaunchAgent/LaunchDaemon."""
service_type = "LaunchAgent" if user_level else "LaunchDaemon"
# Check for root if system-level
if not user_level and os.geteuid() != 0:
print("\n❌ ERROR: System-level LaunchDaemon requires root privileges")
print("Please run with sudo or use --user for user-level installation")
sys.exit(1)
print(f"\n🔍 Checking dependencies...")
deps_ok, deps_msg = check_dependencies()
if not deps_ok:
print(f"❌ {deps_msg}")
sys.exit(1)
print(f"✅ {deps_msg}")
# Generate API key
api_key = generate_api_key()
print(f"\n🔑 Generated API key: {api_key}")
# Create service configuration
config = {
'service_label': SERVICE_LABEL,
'api_key': api_key,
'command': get_service_command(),
'environment': get_service_environment(),
'user_level': user_level
}
# Save configuration
config_file = save_service_config(config)
print(f"💾 Saved configuration to: {config_file}")
# Get plist paths
plist_dir, plist_file = get_launchd_paths(user_level)
# Create plist directory if it doesn't exist
plist_dir.mkdir(parents=True, exist_ok=True)
# Create plist
print(f"\n📝 Creating {service_type} plist...")
plist_dict = create_plist(api_key, user_level)
# Write plist file
with open(plist_file, 'wb') as f:
plistlib.dump(plist_dict, f)
# Set proper permissions
if user_level:
os.chmod(plist_file, 0o644)
else:
os.chmod(plist_file, 0o644)
os.chown(plist_file, 0, 0) # root:wheel
print(f"✅ Created plist at: {plist_file}")
# Load the service
print(f"\n🚀 Loading {service_type}...")
result = subprocess.run([
'launchctl', 'load', '-w', str(plist_file)
], capture_output=True, text=True)
if result.returncode != 0:
if "already loaded" in result.stderr:
print("ℹ️ Service was already loaded, reloading...")
# Unload first
subprocess.run(['launchctl', 'unload', str(plist_file)], capture_output=True)
# Load again
subprocess.run(['launchctl', 'load', '-w', str(plist_file)], capture_output=True)
else:
print(f"❌ Failed to load service: {result.stderr}")
print("\n💡 Try checking Console.app for detailed error messages")
sys.exit(1)
print(f"✅ {service_type} loaded successfully!")
# Create convenience scripts
if user_level:
scripts_dir = create_shell_scripts()
print(f"\n📁 Created management scripts in: {scripts_dir}")
# Print service information
paths = get_service_paths()
platform_info = {
'Start Service': f'launchctl load -w {plist_file}',
'Stop Service': f'launchctl unload {plist_file}',
'Service Status': f'launchctl list | grep {SERVICE_LABEL}',
'View Logs': f'tail -f {paths["log_dir"] / "mcp-memory-service.log"}',
'Uninstall': f'python "{Path(__file__)}" --uninstall'
}
print_service_info(api_key, platform_info)
# Additional macOS-specific tips
print("\n📌 macOS Tips:")
print(" • Check Console.app for detailed service logs")
print(" • Service will start automatically on login/boot")
print(" • Use Activity Monitor to verify the process is running")
return True
def uninstall_service(user_level=True):
"""Uninstall the macOS LaunchAgent/LaunchDaemon."""
service_type = "LaunchAgent" if user_level else "LaunchDaemon"
# Check for root if system-level
if not user_level and os.geteuid() != 0:
print("\n❌ ERROR: System-level LaunchDaemon requires root privileges")
print("Please run with sudo")
sys.exit(1)
print(f"\n🗑️ Uninstalling {SERVICE_NAME} {service_type}...")
# Get plist paths
plist_dir, plist_file = get_launchd_paths(user_level)
if plist_file.exists():
# Unload the service
print("⏹️ Stopping service...")
subprocess.run([
'launchctl', 'unload', str(plist_file)
], capture_output=True)
# Remove the plist file
print("🗑️ Removing plist file...")
plist_file.unlink()
print(f"✅ {service_type} uninstalled successfully!")
else:
print(f"ℹ️ {service_type} is not installed")
# Clean up configuration
config = load_service_config()
if config and config.get('service_label') == SERVICE_LABEL:
print("🧹 Cleaning up configuration...")
config_file = get_service_paths()['config_dir'] / 'service_config.json'
config_file.unlink()
def start_service(user_level=True):
"""Start the macOS service."""
plist_dir, plist_file = get_launchd_paths(user_level)
if not plist_file.exists():
print(f"❌ Service is not installed. Run without --start to install first.")
sys.exit(1)
print(f"\n▶️ Starting {SERVICE_NAME}...")
result = subprocess.run([
'launchctl', 'load', str(plist_file)
], capture_output=True, text=True)
if result.returncode == 0:
print("✅ Service started successfully!")
else:
if "already loaded" in result.stderr:
print("ℹ️ Service is already running")
else:
print(f"❌ Failed to start service: {result.stderr}")
def stop_service(user_level=True):
"""Stop the macOS service."""
plist_dir, plist_file = get_launchd_paths(user_level)
print(f"\n⏹️ Stopping {SERVICE_NAME}...")
result = subprocess.run([
'launchctl', 'unload', str(plist_file)
], capture_output=True, text=True)
if result.returncode == 0:
print("✅ Service stopped successfully!")
else:
print(f"ℹ️ Service may not be running: {result.stderr}")
def service_status(user_level=True):
"""Check the macOS service status."""
print(f"\n📊 {SERVICE_NAME} Status:")
print("-" * 40)
# Check if plist exists
plist_dir, plist_file = get_launchd_paths(user_level)
if not plist_file.exists():
print("❌ Service is not installed")
return
# Check launchctl list
result = subprocess.run([
'launchctl', 'list'
], capture_output=True, text=True)
service_found = False
for line in result.stdout.splitlines():
if SERVICE_LABEL in line:
service_found = True
parts = line.split()
if len(parts) >= 3:
pid = parts[0]
status = parts[1]
if pid != '-':
print(f"✅ Service is RUNNING (PID: {pid})")
else:
print(f"⏹️ Service is STOPPED (last exit: {status})")
break
if not service_found:
print("⏹️ Service is not loaded")
# Show configuration
config = load_service_config()
if config:
print(f"\n📋 Configuration:")
print(f" Service Label: {SERVICE_LABEL}")
print(f" API Key: {config.get('api_key', 'Not set')}")
print(f" Type: {'User LaunchAgent' if user_level else 'System LaunchDaemon'}")
print(f" Plist: {plist_file}")
# Show recent logs
paths = get_service_paths()
log_file = paths['log_dir'] / 'mcp-memory-service.log'
if log_file.exists():
print(f"\n📜 Recent logs from {log_file}:")
result = subprocess.run([
'tail', '-n', '10', str(log_file)
], capture_output=True, text=True)
if result.stdout:
print(result.stdout)
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="macOS LaunchAgent installer for MCP Memory Service"
)
# Service level
parser.add_argument('--user', action='store_true', default=True,
help='Install as user LaunchAgent (default)')
parser.add_argument('--system', action='store_true',
help='Install as system LaunchDaemon (requires sudo)')
# Actions
parser.add_argument('--uninstall', action='store_true', help='Uninstall the service')
parser.add_argument('--start', action='store_true', help='Start the service')
parser.add_argument('--stop', action='store_true', help='Stop the service')
parser.add_argument('--status', action='store_true', help='Check service status')
parser.add_argument('--restart', action='store_true', help='Restart the service')
args = parser.parse_args()
# Determine service level
user_level = not args.system
if args.uninstall:
uninstall_service(user_level)
elif args.start:
start_service(user_level)
elif args.stop:
stop_service(user_level)
elif args.status:
service_status(user_level)
elif args.restart:
stop_service(user_level)
start_service(user_level)
else:
# Default action is to install
install_service(user_level)
if __name__ == '__main__':
main()
```
--------------------------------------------------------------------------------
/claude-hooks/install_claude_hooks_windows.ps1:
--------------------------------------------------------------------------------
```
# Claude Code Memory Awareness Hooks - Windows Installation Script v2.2.0
# Installs hooks into Claude Code hooks directory for automatic memory awareness
# Enhanced Output Control and Session Management
param(
[switch]$Uninstall,
[switch]$Test,
[switch]$Help
)
$ErrorActionPreference = "Stop"
# Configuration - Detect proper Claude Code hooks directory
function Get-ClaudeHooksDirectory {
# Primary location: User profile (updated to match actual Claude Code directory structure)
$primaryPath = "$env:USERPROFILE\.claude\hooks"
# Alternative locations to check
$alternativePaths = @(
"$env:APPDATA\.claude\hooks",
"$env:LOCALAPPDATA\.claude\hooks"
)
# If primary path already exists, use it
if (Test-Path $primaryPath) {
return $primaryPath
}
# Check if Claude Code is installed and can tell us the hooks directory
try {
$claudeHelp = claude --help 2>$null
if ($claudeHelp -match "hooks.*directory.*(\S+)") {
$detectedPath = $matches[1]
if ($detectedPath -and (Test-Path (Split-Path -Parent $detectedPath) -ErrorAction SilentlyContinue)) {
return $detectedPath
}
}
} catch {
# Claude CLI not available or failed
}
# Check alternative locations
foreach ($altPath in $alternativePaths) {
if (Test-Path $altPath) {
return $altPath
}
}
# Default to primary path (will be created if needed)
return $primaryPath
}
$CLAUDE_HOOKS_DIR = Get-ClaudeHooksDirectory
# Script is now in the claude-hooks directory itself
$SCRIPT_DIR = $PSScriptRoot
$SOURCE_DIR = $SCRIPT_DIR
$dateStr = Get-Date -Format "yyyyMMdd-HHmmss"
$BACKUP_DIR = "$env:USERPROFILE\.claude\hooks-backup-$dateStr"
# Debug: Display resolved paths
function Write-Info { Write-Host "[INFO]" -ForegroundColor Green -NoNewline; Write-Host " $args" }
function Write-Warn { Write-Host "[WARN]" -ForegroundColor Yellow -NoNewline; Write-Host " $args" }
function Write-Error { Write-Host "[ERROR]" -ForegroundColor Red -NoNewline; Write-Host " $args" }
Write-Info "Script location: $SCRIPT_DIR"
Write-Info "Repository root: $REPO_ROOT"
Write-Info "Source hooks directory: $SOURCE_DIR"
Write-Info "Target hooks directory: $CLAUDE_HOOKS_DIR"
# Show help
if ($Help) {
Write-Host @"
Claude Code Memory Awareness Hooks - Windows Installation
Usage: .\install_claude_hooks_windows.ps1 [options]
Options:
-Help Show this help message
-Uninstall Remove installed hooks
-Test Run tests only
Examples:
.\install_claude_hooks_windows.ps1 # Install hooks
.\install_claude_hooks_windows.ps1 -Uninstall # Remove hooks
.\install_claude_hooks_windows.ps1 -Test # Test installation
"@
exit 0
}
# Header
Write-Host ""
Write-Host "Claude Code Memory Awareness Hooks Installation v2.2.0 (Windows)" -ForegroundColor Cyan
Write-Host "================================================================" -ForegroundColor Cyan
Write-Host ""
# Check if Claude Code is installed
function Test-ClaudeCode {
$claudePath = Get-Command claude -ErrorAction SilentlyContinue
if (-not $claudePath) {
Write-Warn "Claude Code CLI not found in PATH"
Write-Warn "Please ensure Claude Code is installed and accessible"
$response = Read-Host "Continue anyway? (Y/N)"
if ($response -ne "Y" -and $response -ne "y") {
exit 1
}
} else {
Write-Info "Claude Code CLI found: $($claudePath.Source)"
}
}
# Validate source directory exists
function Test-SourceDirectory {
Write-Info "Validating source directory..."
if (-not (Test-Path $SOURCE_DIR)) {
Write-Error "Source hooks directory not found: $SOURCE_DIR"
Write-Error "Please ensure you are running this script from the mcp-memory-service repository"
Write-Error "Expected repository structure:"
Write-Error " mcp-memory-service/"
Write-Error " scripts/"
Write-Error " install_claude_hooks_windows.ps1 (This script)"
Write-Error " claude-hooks/"
Write-Error " core/"
Write-Error " utilities/"
Write-Error " config.json"
exit 1
}
# Check for required subdirectories
$requiredDirs = @("core", "utilities", "tests")
foreach ($dir in $requiredDirs) {
$dirPath = Join-Path $SOURCE_DIR $dir
if (-not (Test-Path $dirPath)) {
Write-Error "Missing required directory: $dirPath"
Write-Error "The claude-hooks directory appears to be incomplete"
exit 1
}
}
Write-Info "Source directory validation passed"
}
# Create Claude Code hooks directory if it does not exist
function New-HooksDirectory {
if (-not (Test-Path $CLAUDE_HOOKS_DIR)) {
Write-Info "Creating Claude Code hooks directory: $CLAUDE_HOOKS_DIR"
try {
New-Item -ItemType Directory -Path $CLAUDE_HOOKS_DIR -Force | Out-Null
Write-Info "Successfully created hooks directory"
} catch {
Write-Error "Failed to create hooks directory: $CLAUDE_HOOKS_DIR"
Write-Error "Error: $($_.Exception.Message)"
Write-Error ""
Write-Error "Possible solutions:"
Write-Error " 1. Run PowerShell as Administrator"
Write-Error " 2. Check if the parent directory exists and is writable"
Write-Error " 3. Manually create the directory: $CLAUDE_HOOKS_DIR"
exit 1
}
} else {
Write-Info "Claude Code hooks directory exists: $CLAUDE_HOOKS_DIR"
}
# Test write access
$testFile = Join-Path $CLAUDE_HOOKS_DIR "write-test.tmp"
try {
"test" | Out-File -FilePath $testFile -Force
Remove-Item -Path $testFile -Force
Write-Info "Write access confirmed for hooks directory"
} catch {
Write-Error "No write access to hooks directory: $CLAUDE_HOOKS_DIR"
Write-Error "Please check permissions or run as Administrator"
exit 1
}
}
# Backup existing hooks if they exist
function Backup-ExistingHooks {
$hasExisting = $false
if ((Test-Path "$CLAUDE_HOOKS_DIR\core") -or
(Test-Path "$CLAUDE_HOOKS_DIR\utilities") -or
(Test-Path "$CLAUDE_HOOKS_DIR\config.json")) {
$hasExisting = $true
}
if ($hasExisting) {
Write-Info "Backing up existing hooks to: $BACKUP_DIR"
New-Item -ItemType Directory -Path $BACKUP_DIR -Force | Out-Null
Copy-Item -Path "$CLAUDE_HOOKS_DIR\*" -Destination $BACKUP_DIR -Recurse -Force -ErrorAction SilentlyContinue
Write-Info "Backup created successfully"
}
}
# Install hook files
function Install-Hooks {
Write-Info "Installing memory awareness hooks..."
# Create necessary directories
New-Item -ItemType Directory -Path "$CLAUDE_HOOKS_DIR\core" -Force | Out-Null
New-Item -ItemType Directory -Path "$CLAUDE_HOOKS_DIR\utilities" -Force | Out-Null
New-Item -ItemType Directory -Path "$CLAUDE_HOOKS_DIR\tests" -Force | Out-Null
# Copy core hooks
Copy-Item -Path "$SOURCE_DIR\core\*" -Destination "$CLAUDE_HOOKS_DIR\core\" -Recurse -Force
Write-Info "Installed core hooks (session-start, session-end, topic-change)"
# Copy utilities
Copy-Item -Path "$SOURCE_DIR\utilities\*" -Destination "$CLAUDE_HOOKS_DIR\utilities\" -Recurse -Force
Write-Info "Installed utility modules"
# Copy tests
Copy-Item -Path "$SOURCE_DIR\tests\*" -Destination "$CLAUDE_HOOKS_DIR\tests\" -Recurse -Force
Write-Info "Installed test suite"
# Copy documentation and configuration
Copy-Item -Path "$SOURCE_DIR\README.md" -Destination "$CLAUDE_HOOKS_DIR\" -Force
Copy-Item -Path "$SOURCE_DIR\config.template.json" -Destination "$CLAUDE_HOOKS_DIR\" -Force
Write-Info "Installed documentation and templates"
}
# Install or update configuration
function Install-Config {
$configFile = "$CLAUDE_HOOKS_DIR\config.json"
if (-not (Test-Path $configFile)) {
# First installation - use default config
Copy-Item -Path "$SOURCE_DIR\config.json" -Destination $configFile -Force
Write-Info "Installed default configuration"
Write-Warn "Please update config.json with your memory service endpoint and API key"
} else {
Write-Info "Configuration file already exists - not overwriting"
Write-Info " Compare with config.template.json for new options"
}
}
# Test installation
function Test-Installation {
Write-Info "Testing installation..."
# Check if required files exist
$requiredFiles = @(
"core\session-start.js",
"core\session-end.js",
"utilities\project-detector.js",
"utilities\memory-scorer.js",
"utilities\context-formatter.js",
"config.json",
"README.md"
)
$missingFiles = @()
foreach ($file in $requiredFiles) {
if (-not (Test-Path "$CLAUDE_HOOKS_DIR\$file")) {
$missingFiles += $file
}
}
if ($missingFiles.Count -gt 0) {
Write-Error "Installation incomplete - missing files:"
foreach ($file in $missingFiles) {
Write-Host " - $file"
}
return $false
}
# Test Node.js availability
$nodeVersion = node --version 2>$null
if (-not $nodeVersion) {
Write-Warn "Node.js not found - hooks require Node.js to function"
Write-Warn "Please install Node.js version 14 or higher"
} else {
Write-Info "Node.js available: $nodeVersion"
}
# Run integration test
if (Test-Path "$CLAUDE_HOOKS_DIR\tests\integration-test.js") {
Write-Info "Running integration tests..."
Push-Location $CLAUDE_HOOKS_DIR
try {
$testResult = node tests\integration-test.js 2>&1
if ($LASTEXITCODE -eq 0) {
Write-Info "Integration tests passed"
} else {
Write-Warn "Some integration tests failed - check configuration"
Write-Host $testResult
}
} finally {
Pop-Location
}
}
return $true
}
# Display post-installation instructions
function Show-PostInstallInstructions {
Write-Host ""
Write-Host "Installation Complete!" -ForegroundColor Green
Write-Host "=====================" -ForegroundColor Green
Write-Host ""
Write-Host "Next Steps:" -ForegroundColor Yellow
Write-Host ""
Write-Host "1. Configure your memory service endpoint:"
Write-Host " Edit: $CLAUDE_HOOKS_DIR\config.json"
Write-Host " Update endpoint and apiKey values"
Write-Host ""
Write-Host "2. Test the hooks:"
Write-Host " cd $CLAUDE_HOOKS_DIR"
Write-Host " node tests\integration-test.js"
Write-Host ""
Write-Host "3. Start using Claude Code:"
Write-Host " The hooks will automatically activate on session start/end"
Write-Host ""
Write-Host "Installation Details:" -ForegroundColor Cyan
Write-Host " Hooks Directory: $CLAUDE_HOOKS_DIR"
if (Test-Path $BACKUP_DIR) {
Write-Host " Backup Directory: $BACKUP_DIR"
}
Write-Host ""
# Try to read and display current configuration
$configPath = Join-Path $CLAUDE_HOOKS_DIR "config.json"
if (Test-Path $configPath) {
try {
$config = Get-Content $configPath | ConvertFrom-Json
Write-Host "Configuration:" -ForegroundColor Cyan
Write-Host " Memory Service: $($config.memoryService.endpoint)"
Write-Host " Max Memories: $($config.memoryService.maxMemoriesPerSession)"
} catch {
Write-Warn "Could not read configuration file"
}
}
Write-Host ""
$readmePath = Join-Path $CLAUDE_HOOKS_DIR "README.md"
Write-Host "For troubleshooting, see: $readmePath"
}
# Uninstall function
function Uninstall-Hooks {
if (Test-Path $CLAUDE_HOOKS_DIR) {
$response = Read-Host "Remove all Claude Code memory awareness hooks? (Y/N)"
if ($response -eq "Y" -or $response -eq "y") {
Remove-Item -Path "$CLAUDE_HOOKS_DIR\core" -Recurse -Force -ErrorAction SilentlyContinue
Remove-Item -Path "$CLAUDE_HOOKS_DIR\utilities" -Recurse -Force -ErrorAction SilentlyContinue
Remove-Item -Path "$CLAUDE_HOOKS_DIR\tests" -Recurse -Force -ErrorAction SilentlyContinue
Remove-Item -Path "$CLAUDE_HOOKS_DIR\config.json" -Force -ErrorAction SilentlyContinue
Remove-Item -Path "$CLAUDE_HOOKS_DIR\config.template.json" -Force -ErrorAction SilentlyContinue
Remove-Item -Path "$CLAUDE_HOOKS_DIR\README.md" -Force -ErrorAction SilentlyContinue
Write-Info "Hooks uninstalled successfully"
}
} else {
Write-Info "No hooks found to uninstall"
}
}
# Test only function
function Test-Only {
if (Test-Path "$CLAUDE_HOOKS_DIR\tests\integration-test.js") {
Push-Location $CLAUDE_HOOKS_DIR
try {
node tests\integration-test.js
} finally {
Pop-Location
}
} else {
Write-Error "Tests not found - please install first"
exit 1
}
}
# Main execution
try {
if ($Uninstall) {
Uninstall-Hooks
} elseif ($Test) {
Test-Only
} else {
# Main installation process
Test-SourceDirectory
Test-ClaudeCode
New-HooksDirectory
Backup-ExistingHooks
Install-Hooks
Install-Config
if (Test-Installation) {
Show-PostInstallInstructions
}
}
} catch {
Write-Host "ERROR: Installation failed" -ForegroundColor Red
Write-Host $_.Exception.Message
exit 1
}
```