This is page 24 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── amp-bridge.md
│ │ ├── amp-pr-automator.md
│ │ ├── code-quality-guard.md
│ │ ├── gemini-pr-automator.md
│ │ └── github-release-manager.md
│ ├── settings.local.json.backup
│ └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── performance_issue.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── bridge-tests.yml
│ ├── CACHE_FIX.md
│ ├── claude-code-review.yml
│ ├── claude.yml
│ ├── cleanup-images.yml.disabled
│ ├── dev-setup-validation.yml
│ ├── docker-publish.yml
│ ├── LATEST_FIXES.md
│ ├── main-optimized.yml.disabled
│ ├── main.yml
│ ├── publish-and-test.yml
│ ├── README_OPTIMIZATION.md
│ ├── release-tag.yml.disabled
│ ├── release.yml
│ ├── roadmap-review-reminder.yml
│ ├── SECRET_CONDITIONAL_FIX.md
│ └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│ ├── .gitignore
│ └── reports
│ └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│ ├── deployment
│ │ ├── deploy_fastmcp_fixed.sh
│ │ ├── deploy_http_with_mcp.sh
│ │ └── deploy_mcp_v4.sh
│ ├── deployment-configs
│ │ ├── empty_config.yml
│ │ └── smithery.yaml
│ ├── development
│ │ └── test_fastmcp.py
│ ├── docs-removed-2025-08-23
│ │ ├── authentication.md
│ │ ├── claude_integration.md
│ │ ├── claude-code-compatibility.md
│ │ ├── claude-code-integration.md
│ │ ├── claude-code-quickstart.md
│ │ ├── claude-desktop-setup.md
│ │ ├── complete-setup-guide.md
│ │ ├── database-synchronization.md
│ │ ├── development
│ │ │ ├── autonomous-memory-consolidation.md
│ │ │ ├── CLEANUP_PLAN.md
│ │ │ ├── CLEANUP_README.md
│ │ │ ├── CLEANUP_SUMMARY.md
│ │ │ ├── dream-inspired-memory-consolidation.md
│ │ │ ├── hybrid-slm-memory-consolidation.md
│ │ │ ├── mcp-milestone.md
│ │ │ ├── multi-client-architecture.md
│ │ │ ├── test-results.md
│ │ │ └── TIMESTAMP_FIX_SUMMARY.md
│ │ ├── distributed-sync.md
│ │ ├── invocation_guide.md
│ │ ├── macos-intel.md
│ │ ├── master-guide.md
│ │ ├── mcp-client-configuration.md
│ │ ├── multi-client-server.md
│ │ ├── service-installation.md
│ │ ├── sessions
│ │ │ └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│ │ ├── UBUNTU_SETUP.md
│ │ ├── ubuntu.md
│ │ ├── windows-setup.md
│ │ └── windows.md
│ ├── docs-root-cleanup-2025-08-23
│ │ ├── AWESOME_LIST_SUBMISSION.md
│ │ ├── CLOUDFLARE_IMPLEMENTATION.md
│ │ ├── DOCUMENTATION_ANALYSIS.md
│ │ ├── DOCUMENTATION_CLEANUP_PLAN.md
│ │ ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│ │ ├── LITESTREAM_SETUP_GUIDE.md
│ │ ├── lm_studio_system_prompt.md
│ │ ├── PYTORCH_DOWNLOAD_FIX.md
│ │ └── README-ORIGINAL-BACKUP.md
│ ├── investigations
│ │ └── MACOS_HOOKS_INVESTIGATION.md
│ ├── litestream-configs-v6.3.0
│ │ ├── install_service.sh
│ │ ├── litestream_master_config_fixed.yml
│ │ ├── litestream_master_config.yml
│ │ ├── litestream_replica_config_fixed.yml
│ │ ├── litestream_replica_config.yml
│ │ ├── litestream_replica_simple.yml
│ │ ├── litestream-http.service
│ │ ├── litestream.service
│ │ └── requirements-cloudflare.txt
│ ├── release-notes
│ │ └── release-notes-v7.1.4.md
│ └── setup-development
│ ├── README.md
│ ├── setup_consolidation_mdns.sh
│ ├── STARTUP_SETUP_GUIDE.md
│ └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│ ├── memory-context.md
│ ├── memory-health.md
│ ├── memory-ingest-dir.md
│ ├── memory-ingest.md
│ ├── memory-recall.md
│ ├── memory-search.md
│ ├── memory-store.md
│ ├── README.md
│ └── session-start.md
├── claude-hooks
│ ├── config.json
│ ├── config.template.json
│ ├── CONFIGURATION.md
│ ├── core
│ │ ├── memory-retrieval.js
│ │ ├── mid-conversation.js
│ │ ├── session-end.js
│ │ ├── session-start.js
│ │ └── topic-change.js
│ ├── debug-pattern-test.js
│ ├── install_claude_hooks_windows.ps1
│ ├── install_hooks.py
│ ├── memory-mode-controller.js
│ ├── MIGRATION.md
│ ├── README-NATURAL-TRIGGERS.md
│ ├── README-phase2.md
│ ├── README.md
│ ├── simple-test.js
│ ├── statusline.sh
│ ├── test-adaptive-weights.js
│ ├── test-dual-protocol-hook.js
│ ├── test-mcp-hook.js
│ ├── test-natural-triggers.js
│ ├── test-recency-scoring.js
│ ├── tests
│ │ ├── integration-test.js
│ │ ├── phase2-integration-test.js
│ │ ├── test-code-execution.js
│ │ ├── test-cross-session.json
│ │ ├── test-session-tracking.json
│ │ └── test-threading.json
│ ├── utilities
│ │ ├── adaptive-pattern-detector.js
│ │ ├── context-formatter.js
│ │ ├── context-shift-detector.js
│ │ ├── conversation-analyzer.js
│ │ ├── dynamic-context-updater.js
│ │ ├── git-analyzer.js
│ │ ├── mcp-client.js
│ │ ├── memory-client.js
│ │ ├── memory-scorer.js
│ │ ├── performance-manager.js
│ │ ├── project-detector.js
│ │ ├── session-tracker.js
│ │ ├── tiered-conversation-monitor.js
│ │ └── version-checker.js
│ └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│ ├── amp-cli-bridge.md
│ ├── api
│ │ ├── code-execution-interface.md
│ │ ├── memory-metadata-api.md
│ │ ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_REPORT.md
│ │ └── tag-standardization.md
│ ├── architecture
│ │ ├── search-enhancement-spec.md
│ │ └── search-examples.md
│ ├── architecture.md
│ ├── archive
│ │ └── obsolete-workflows
│ │ ├── load_memory_context.md
│ │ └── README.md
│ ├── assets
│ │ └── images
│ │ ├── dashboard-v3.3.0-preview.png
│ │ ├── memory-awareness-hooks-example.png
│ │ ├── project-infographic.svg
│ │ └── README.md
│ ├── CLAUDE_CODE_QUICK_REFERENCE.md
│ ├── cloudflare-setup.md
│ ├── deployment
│ │ ├── docker.md
│ │ ├── dual-service.md
│ │ ├── production-guide.md
│ │ └── systemd-service.md
│ ├── development
│ │ ├── ai-agent-instructions.md
│ │ ├── code-quality
│ │ │ ├── phase-2a-completion.md
│ │ │ ├── phase-2a-handle-get-prompt.md
│ │ │ ├── phase-2a-index.md
│ │ │ ├── phase-2a-install-package.md
│ │ │ └── phase-2b-session-summary.md
│ │ ├── code-quality-workflow.md
│ │ ├── dashboard-workflow.md
│ │ ├── issue-management.md
│ │ ├── pr-review-guide.md
│ │ ├── refactoring-notes.md
│ │ ├── release-checklist.md
│ │ └── todo-tracker.md
│ ├── docker-optimized-build.md
│ ├── document-ingestion.md
│ ├── DOCUMENTATION_AUDIT.md
│ ├── enhancement-roadmap-issue-14.md
│ ├── examples
│ │ ├── analysis-scripts.js
│ │ ├── maintenance-session-example.md
│ │ ├── memory-distribution-chart.jsx
│ │ └── tag-schema.json
│ ├── first-time-setup.md
│ ├── glama-deployment.md
│ ├── guides
│ │ ├── advanced-command-examples.md
│ │ ├── chromadb-migration.md
│ │ ├── commands-vs-mcp-server.md
│ │ ├── mcp-enhancements.md
│ │ ├── mdns-service-discovery.md
│ │ ├── memory-consolidation-guide.md
│ │ ├── migration.md
│ │ ├── scripts.md
│ │ └── STORAGE_BACKENDS.md
│ ├── HOOK_IMPROVEMENTS.md
│ ├── hooks
│ │ └── phase2-code-execution-migration.md
│ ├── http-server-management.md
│ ├── ide-compatability.md
│ ├── IMAGE_RETENTION_POLICY.md
│ ├── images
│ │ └── dashboard-placeholder.md
│ ├── implementation
│ │ ├── health_checks.md
│ │ └── performance.md
│ ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│ ├── integration
│ │ ├── homebrew.md
│ │ └── multi-client.md
│ ├── integrations
│ │ ├── gemini.md
│ │ ├── groq-bridge.md
│ │ ├── groq-integration-summary.md
│ │ └── groq-model-comparison.md
│ ├── integrations.md
│ ├── legacy
│ │ └── dual-protocol-hooks.md
│ ├── LM_STUDIO_COMPATIBILITY.md
│ ├── maintenance
│ │ └── memory-maintenance.md
│ ├── mastery
│ │ ├── api-reference.md
│ │ ├── architecture-overview.md
│ │ ├── configuration-guide.md
│ │ ├── local-setup-and-run.md
│ │ ├── testing-guide.md
│ │ └── troubleshooting.md
│ ├── migration
│ │ └── code-execution-api-quick-start.md
│ ├── natural-memory-triggers
│ │ ├── cli-reference.md
│ │ ├── installation-guide.md
│ │ └── performance-optimization.md
│ ├── oauth-setup.md
│ ├── pr-graphql-integration.md
│ ├── quick-setup-cloudflare-dual-environment.md
│ ├── README.md
│ ├── remote-configuration-wiki-section.md
│ ├── research
│ │ ├── code-execution-interface-implementation.md
│ │ └── code-execution-interface-summary.md
│ ├── ROADMAP.md
│ ├── sqlite-vec-backend.md
│ ├── statistics
│ │ ├── charts
│ │ │ ├── activity_patterns.png
│ │ │ ├── contributors.png
│ │ │ ├── growth_trajectory.png
│ │ │ ├── monthly_activity.png
│ │ │ └── october_sprint.png
│ │ ├── data
│ │ │ ├── activity_by_day.csv
│ │ │ ├── activity_by_hour.csv
│ │ │ ├── contributors.csv
│ │ │ └── monthly_activity.csv
│ │ ├── generate_charts.py
│ │ └── REPOSITORY_STATISTICS.md
│ ├── technical
│ │ ├── development.md
│ │ ├── memory-migration.md
│ │ ├── migration-log.md
│ │ ├── sqlite-vec-embedding-fixes.md
│ │ └── tag-storage.md
│ ├── testing
│ │ └── regression-tests.md
│ ├── testing-cloudflare-backend.md
│ ├── troubleshooting
│ │ ├── cloudflare-api-token-setup.md
│ │ ├── cloudflare-authentication.md
│ │ ├── general.md
│ │ ├── hooks-quick-reference.md
│ │ ├── pr162-schema-caching-issue.md
│ │ ├── session-end-hooks.md
│ │ └── sync-issues.md
│ └── tutorials
│ ├── advanced-techniques.md
│ ├── data-analysis.md
│ └── demo-session-walkthrough.md
├── examples
│ ├── claude_desktop_config_template.json
│ ├── claude_desktop_config_windows.json
│ ├── claude-desktop-http-config.json
│ ├── config
│ │ └── claude_desktop_config.json
│ ├── http-mcp-bridge.js
│ ├── memory_export_template.json
│ ├── README.md
│ ├── setup
│ │ └── setup_multi_client_complete.py
│ └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│ ├── .claude
│ │ └── settings.local.json
│ ├── archive
│ │ └── check_missing_timestamps.py
│ ├── backup
│ │ ├── backup_memories.py
│ │ ├── backup_sqlite_vec.sh
│ │ ├── export_distributable_memories.sh
│ │ └── restore_memories.py
│ ├── benchmarks
│ │ ├── benchmark_code_execution_api.py
│ │ ├── benchmark_hybrid_sync.py
│ │ └── benchmark_server_caching.py
│ ├── database
│ │ ├── analyze_sqlite_vec_db.py
│ │ ├── check_sqlite_vec_status.py
│ │ ├── db_health_check.py
│ │ └── simple_timestamp_check.py
│ ├── development
│ │ ├── debug_server_initialization.py
│ │ ├── find_orphaned_files.py
│ │ ├── fix_mdns.sh
│ │ ├── fix_sitecustomize.py
│ │ ├── remote_ingest.sh
│ │ ├── setup-git-merge-drivers.sh
│ │ ├── uv-lock-merge.sh
│ │ └── verify_hybrid_sync.py
│ ├── hooks
│ │ └── pre-commit
│ ├── installation
│ │ ├── install_linux_service.py
│ │ ├── install_macos_service.py
│ │ ├── install_uv.py
│ │ ├── install_windows_service.py
│ │ ├── install.py
│ │ ├── setup_backup_cron.sh
│ │ ├── setup_claude_mcp.sh
│ │ └── setup_cloudflare_resources.py
│ ├── linux
│ │ ├── service_status.sh
│ │ ├── start_service.sh
│ │ ├── stop_service.sh
│ │ ├── uninstall_service.sh
│ │ └── view_logs.sh
│ ├── maintenance
│ │ ├── assign_memory_types.py
│ │ ├── check_memory_types.py
│ │ ├── cleanup_corrupted_encoding.py
│ │ ├── cleanup_memories.py
│ │ ├── cleanup_organize.py
│ │ ├── consolidate_memory_types.py
│ │ ├── consolidation_mappings.json
│ │ ├── delete_orphaned_vectors_fixed.py
│ │ ├── fast_cleanup_duplicates_with_tracking.sh
│ │ ├── find_all_duplicates.py
│ │ ├── find_cloudflare_duplicates.py
│ │ ├── find_duplicates.py
│ │ ├── memory-types.md
│ │ ├── README.md
│ │ ├── recover_timestamps_from_cloudflare.py
│ │ ├── regenerate_embeddings.py
│ │ ├── repair_malformed_tags.py
│ │ ├── repair_memories.py
│ │ ├── repair_sqlite_vec_embeddings.py
│ │ ├── repair_zero_embeddings.py
│ │ ├── restore_from_json_export.py
│ │ └── scan_todos.sh
│ ├── migration
│ │ ├── cleanup_mcp_timestamps.py
│ │ ├── legacy
│ │ │ └── migrate_chroma_to_sqlite.py
│ │ ├── mcp-migration.py
│ │ ├── migrate_sqlite_vec_embeddings.py
│ │ ├── migrate_storage.py
│ │ ├── migrate_tags.py
│ │ ├── migrate_timestamps.py
│ │ ├── migrate_to_cloudflare.py
│ │ ├── migrate_to_sqlite_vec.py
│ │ ├── migrate_v5_enhanced.py
│ │ ├── TIMESTAMP_CLEANUP_README.md
│ │ └── verify_mcp_timestamps.py
│ ├── pr
│ │ ├── amp_collect_results.sh
│ │ ├── amp_detect_breaking_changes.sh
│ │ ├── amp_generate_tests.sh
│ │ ├── amp_pr_review.sh
│ │ ├── amp_quality_gate.sh
│ │ ├── amp_suggest_fixes.sh
│ │ ├── auto_review.sh
│ │ ├── detect_breaking_changes.sh
│ │ ├── generate_tests.sh
│ │ ├── lib
│ │ │ └── graphql_helpers.sh
│ │ ├── quality_gate.sh
│ │ ├── resolve_threads.sh
│ │ ├── run_pyscn_analysis.sh
│ │ ├── run_quality_checks.sh
│ │ ├── thread_status.sh
│ │ └── watch_reviews.sh
│ ├── quality
│ │ ├── fix_dead_code_install.sh
│ │ ├── phase1_dead_code_analysis.md
│ │ ├── phase2_complexity_analysis.md
│ │ ├── README_PHASE1.md
│ │ ├── README_PHASE2.md
│ │ ├── track_pyscn_metrics.sh
│ │ └── weekly_quality_review.sh
│ ├── README.md
│ ├── run
│ │ ├── run_mcp_memory.sh
│ │ ├── run-with-uv.sh
│ │ └── start_sqlite_vec.sh
│ ├── run_memory_server.py
│ ├── server
│ │ ├── check_http_server.py
│ │ ├── check_server_health.py
│ │ ├── memory_offline.py
│ │ ├── preload_models.py
│ │ ├── run_http_server.py
│ │ ├── run_memory_server.py
│ │ ├── start_http_server.bat
│ │ └── start_http_server.sh
│ ├── service
│ │ ├── deploy_dual_services.sh
│ │ ├── install_http_service.sh
│ │ ├── mcp-memory-http.service
│ │ ├── mcp-memory.service
│ │ ├── memory_service_manager.sh
│ │ ├── service_control.sh
│ │ ├── service_utils.py
│ │ └── update_service.sh
│ ├── sync
│ │ ├── check_drift.py
│ │ ├── claude_sync_commands.py
│ │ ├── export_memories.py
│ │ ├── import_memories.py
│ │ ├── litestream
│ │ │ ├── apply_local_changes.sh
│ │ │ ├── enhanced_memory_store.sh
│ │ │ ├── init_staging_db.sh
│ │ │ ├── io.litestream.replication.plist
│ │ │ ├── manual_sync.sh
│ │ │ ├── memory_sync.sh
│ │ │ ├── pull_remote_changes.sh
│ │ │ ├── push_to_remote.sh
│ │ │ ├── README.md
│ │ │ ├── resolve_conflicts.sh
│ │ │ ├── setup_local_litestream.sh
│ │ │ ├── setup_remote_litestream.sh
│ │ │ ├── staging_db_init.sql
│ │ │ ├── stash_local_changes.sh
│ │ │ ├── sync_from_remote_noconfig.sh
│ │ │ └── sync_from_remote.sh
│ │ ├── README.md
│ │ ├── safe_cloudflare_update.sh
│ │ ├── sync_memory_backends.py
│ │ └── sync_now.py
│ ├── testing
│ │ ├── run_complete_test.py
│ │ ├── run_memory_test.sh
│ │ ├── simple_test.py
│ │ ├── test_cleanup_logic.py
│ │ ├── test_cloudflare_backend.py
│ │ ├── test_docker_functionality.py
│ │ ├── test_installation.py
│ │ ├── test_mdns.py
│ │ ├── test_memory_api.py
│ │ ├── test_memory_simple.py
│ │ ├── test_migration.py
│ │ ├── test_search_api.py
│ │ ├── test_sqlite_vec_embeddings.py
│ │ ├── test_sse_events.py
│ │ ├── test-connection.py
│ │ └── test-hook.js
│ ├── utils
│ │ ├── claude_commands_utils.py
│ │ ├── generate_personalized_claude_md.sh
│ │ ├── groq
│ │ ├── groq_agent_bridge.py
│ │ ├── list-collections.py
│ │ ├── memory_wrapper_uv.py
│ │ ├── query_memories.py
│ │ ├── smithery_wrapper.py
│ │ ├── test_groq_bridge.sh
│ │ └── uv_wrapper.py
│ └── validation
│ ├── check_dev_setup.py
│ ├── check_documentation_links.py
│ ├── diagnose_backend_config.py
│ ├── validate_configuration_complete.py
│ ├── validate_memories.py
│ ├── validate_migration.py
│ ├── validate_timestamp_integrity.py
│ ├── verify_environment.py
│ ├── verify_pytorch_windows.py
│ └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│ └── mcp_memory_service
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── operations.py
│ │ ├── sync_wrapper.py
│ │ └── types.py
│ ├── backup
│ │ ├── __init__.py
│ │ └── scheduler.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── ingestion.py
│ │ ├── main.py
│ │ └── utils.py
│ ├── config.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── associations.py
│ │ ├── base.py
│ │ ├── clustering.py
│ │ ├── compression.py
│ │ ├── consolidator.py
│ │ ├── decay.py
│ │ ├── forgetting.py
│ │ ├── health.py
│ │ └── scheduler.py
│ ├── dependency_check.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mdns_service.py
│ ├── embeddings
│ │ ├── __init__.py
│ │ └── onnx_embeddings.py
│ ├── ingestion
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chunker.py
│ │ ├── csv_loader.py
│ │ ├── json_loader.py
│ │ ├── pdf_loader.py
│ │ ├── registry.py
│ │ ├── semtools_loader.py
│ │ └── text_loader.py
│ ├── lm_studio_compat.py
│ ├── mcp_server.py
│ ├── models
│ │ ├── __init__.py
│ │ └── memory.py
│ ├── server.py
│ ├── services
│ │ ├── __init__.py
│ │ └── memory_service.py
│ ├── storage
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloudflare.py
│ │ ├── factory.py
│ │ ├── http_client.py
│ │ ├── hybrid.py
│ │ └── sqlite_vec.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── exporter.py
│ │ ├── importer.py
│ │ └── litestream_config.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── content_splitter.py
│ │ ├── db_utils.py
│ │ ├── debug.py
│ │ ├── document_processing.py
│ │ ├── gpu_detection.py
│ │ ├── hashing.py
│ │ ├── http_server_manager.py
│ │ ├── port_detection.py
│ │ ├── system_detection.py
│ │ └── time_parser.py
│ └── web
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── analytics.py
│ │ ├── backup.py
│ │ ├── consolidation.py
│ │ ├── documents.py
│ │ ├── events.py
│ │ ├── health.py
│ │ ├── manage.py
│ │ ├── mcp.py
│ │ ├── memories.py
│ │ ├── search.py
│ │ └── sync.py
│ ├── app.py
│ ├── dependencies.py
│ ├── oauth
│ │ ├── __init__.py
│ │ ├── authorization.py
│ │ ├── discovery.py
│ │ ├── middleware.py
│ │ ├── models.py
│ │ ├── registration.py
│ │ └── storage.py
│ ├── sse.py
│ └── static
│ ├── app.js
│ ├── index.html
│ ├── README.md
│ ├── sse_test.html
│ └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── test_compact_types.py
│ │ └── test_operations.py
│ ├── bridge
│ │ ├── mock_responses.js
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ └── test_http_mcp_bridge.js
│ ├── conftest.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_associations.py
│ │ ├── test_clustering.py
│ │ ├── test_compression.py
│ │ ├── test_consolidator.py
│ │ ├── test_decay.py
│ │ └── test_forgetting.py
│ ├── contracts
│ │ └── api-specification.yml
│ ├── integration
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── test_api_key_fallback.py
│ │ ├── test_api_memories_chronological.py
│ │ ├── test_api_tag_time_search.py
│ │ ├── test_api_with_memory_service.py
│ │ ├── test_bridge_integration.js
│ │ ├── test_cli_interfaces.py
│ │ ├── test_cloudflare_connection.py
│ │ ├── test_concurrent_clients.py
│ │ ├── test_data_serialization_consistency.py
│ │ ├── test_http_server_startup.py
│ │ ├── test_mcp_memory.py
│ │ ├── test_mdns_integration.py
│ │ ├── test_oauth_basic_auth.py
│ │ ├── test_oauth_flow.py
│ │ ├── test_server_handlers.py
│ │ └── test_store_memory.py
│ ├── performance
│ │ ├── test_background_sync.py
│ │ └── test_hybrid_live.py
│ ├── README.md
│ ├── smithery
│ │ └── test_smithery.py
│ ├── sqlite
│ │ └── simple_sqlite_vec_test.py
│ ├── test_client.py
│ ├── test_content_splitting.py
│ ├── test_database.py
│ ├── test_hybrid_cloudflare_limits.py
│ ├── test_hybrid_storage.py
│ ├── test_memory_ops.py
│ ├── test_semantic_search.py
│ ├── test_sqlite_vec_storage.py
│ ├── test_time_parser.py
│ ├── test_timestamp_preservation.py
│ ├── timestamp
│ │ ├── test_hook_vs_manual_storage.py
│ │ ├── test_issue99_final_validation.py
│ │ ├── test_search_retrieval_inconsistency.py
│ │ ├── test_timestamp_issue.py
│ │ └── test_timestamp_simple.py
│ └── unit
│ ├── conftest.py
│ ├── test_cloudflare_storage.py
│ ├── test_csv_loader.py
│ ├── test_fastapi_dependencies.py
│ ├── test_import.py
│ ├── test_json_loader.py
│ ├── test_mdns_simple.py
│ ├── test_mdns.py
│ ├── test_memory_service.py
│ ├── test_memory.py
│ ├── test_semtools_loader.py
│ ├── test_storage_interface_compatibility.py
│ └── test_tag_time_filtering.py
├── tools
│ ├── docker
│ │ ├── DEPRECATED.md
│ │ ├── docker-compose.http.yml
│ │ ├── docker-compose.pythonpath.yml
│ │ ├── docker-compose.standalone.yml
│ │ ├── docker-compose.uv.yml
│ │ ├── docker-compose.yml
│ │ ├── docker-entrypoint-persistent.sh
│ │ ├── docker-entrypoint-unified.sh
│ │ ├── docker-entrypoint.sh
│ │ ├── Dockerfile
│ │ ├── Dockerfile.glama
│ │ ├── Dockerfile.slim
│ │ ├── README.md
│ │ └── test-docker-modes.sh
│ └── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/claude-hooks/tests/integration-test.js:
--------------------------------------------------------------------------------
```javascript
#!/usr/bin/env node
/**
* Integration Test for Claude Code Memory Awareness Hooks
* Tests the complete Phase 1 implementation end-to-end
*/
const fs = require('fs');
const path = require('path');
// Import hooks and utilities
const sessionStartHook = require('../core/session-start');
const sessionEndHook = require('../core/session-end');
const { detectProjectContext } = require('../utilities/project-detector');
const { scoreMemoryRelevance } = require('../utilities/memory-scorer');
const { formatMemoriesForContext } = require('../utilities/context-formatter');
/**
* Test Results Tracker
*/
class TestResults {
constructor() {
this.tests = [];
this.passed = 0;
this.failed = 0;
}
test(name, testFn) {
console.log(`\n🧪 Testing: ${name}`);
try {
const result = testFn();
if (result === true || (result && result.success !== false)) {
console.log(`✅ PASS: ${name}`);
this.passed++;
this.tests.push({ name, status: 'PASS', result });
} else {
console.log(`❌ FAIL: ${name} - ${result.error || 'Test returned false'}`);
this.failed++;
this.tests.push({ name, status: 'FAIL', error: result.error || 'Test returned false' });
}
} catch (error) {
console.log(`❌ FAIL: ${name} - ${error.message}`);
this.failed++;
this.tests.push({ name, status: 'FAIL', error: error.message });
}
}
async asyncTest(name, testFn) {
console.log(`\n🧪 Testing: ${name}`);
try {
const result = await testFn();
if (result === true || (result && result.success !== false)) {
console.log(`✅ PASS: ${name}`);
this.passed++;
this.tests.push({ name, status: 'PASS', result });
} else {
console.log(`❌ FAIL: ${name} - ${result.error || 'Test returned false'}`);
this.failed++;
this.tests.push({ name, status: 'FAIL', error: result.error || 'Test returned false' });
}
} catch (error) {
console.log(`❌ FAIL: ${name} - ${error.message}`);
this.failed++;
this.tests.push({ name, status: 'FAIL', error: error.message });
}
}
summary() {
console.log('\n' + '='.repeat(60));
console.log('🎯 TEST SUMMARY');
console.log('='.repeat(60));
console.log(`Total Tests: ${this.tests.length}`);
console.log(`✅ Passed: ${this.passed}`);
console.log(`❌ Failed: ${this.failed}`);
console.log(`Success Rate: ${((this.passed / this.tests.length) * 100).toFixed(1)}%`);
if (this.failed > 0) {
console.log('\n🔍 FAILED TESTS:');
this.tests.filter(t => t.status === 'FAIL').forEach(test => {
console.log(` - ${test.name}: ${test.error}`);
});
}
console.log('='.repeat(60));
return this.failed === 0;
}
}
/**
* Mock data for testing
*/
const mockMemories = [
{
content: 'Decided to use SQLite-vec instead of ChromaDB for better performance in MCP Memory Service. SQLite-vec provides 10x faster startup and uses 75% less memory.',
tags: ['mcp-memory-service', 'decision', 'sqlite-vec', 'performance'],
memory_type: 'decision',
created_at_iso: '2025-08-19T10:00:00Z'
},
{
content: 'Implemented comprehensive Claude Code hooks system for automatic memory awareness. Created session-start, session-end, and topic-change hooks with project detection.',
tags: ['claude-code', 'hooks', 'architecture', 'memory-awareness'],
memory_type: 'architecture',
created_at_iso: '2025-08-19T09:30:00Z'
},
{
content: 'Fixed critical bug in project detector - was not handling pyproject.toml files correctly. Added proper Python project detection.',
tags: ['bug-fix', 'project-detector', 'python'],
memory_type: 'bug-fix',
created_at_iso: '2025-08-18T15:30:00Z'
},
{
content: 'Learning session on memory relevance scoring algorithms. Implemented time decay, tag matching, and content analysis for intelligent memory selection.',
tags: ['learning', 'algorithms', 'memory-scoring'],
memory_type: 'insight',
created_at_iso: '2025-08-17T14:00:00Z'
},
{
content: 'Random note about completely unrelated project for testing filtering',
tags: ['other-project', 'unrelated', 'test'],
memory_type: 'note',
created_at_iso: '2025-08-01T08:00:00Z'
}
];
const mockProjectContext = {
name: 'mcp-memory-service',
directory: process.cwd(),
language: 'JavaScript',
frameworks: ['Node.js'],
tools: ['npm'],
git: {
isRepo: true,
branch: 'main',
repoName: 'mcp-memory-service',
lastCommit: 'abc1234 Implement memory awareness hooks'
},
confidence: 0.9
};
const mockConversation = {
messages: [
{
role: 'user',
content: 'I need to implement a memory awareness system for Claude Code that automatically injects relevant project memories.'
},
{
role: 'assistant',
content: 'I\'ll help you create a comprehensive memory awareness system. We decided to use Claude Code hooks for session management and implement automatic context injection. This will include project detection, memory scoring, and intelligent context formatting.'
},
{
role: 'user',
content: 'Great! I learned that we need project detection algorithms and memory scoring systems. Can you implement the project detector?'
},
{
role: 'assistant',
content: 'Exactly. I implemented the project detector in project-detector.js with support for multiple languages and frameworks. I also created memory scoring algorithms with time decay and relevance matching. Next we need to test the complete system and add session consolidation.'
}
]
};
/**
* Run comprehensive tests
*/
async function runTests() {
console.log('🚀 Claude Code Memory Awareness - Integration Tests');
console.log('Testing Phase 1 Implementation\n');
const results = new TestResults();
// Test 1: Project Detection
await results.asyncTest('Project Detection', async () => {
const context = await detectProjectContext(process.cwd());
if (!context.name) {
return { success: false, error: 'No project name detected' };
}
if (!context.language) {
return { success: false, error: 'No language detected' };
}
console.log(` Detected: ${context.name} (${context.language}), Confidence: ${(context.confidence * 100).toFixed(1)}%`);
return { success: true, context };
});
// Test 2: Memory Relevance Scoring
results.test('Memory Relevance Scoring', () => {
const scored = scoreMemoryRelevance(mockMemories, mockProjectContext);
if (!Array.isArray(scored)) {
return { success: false, error: 'Scoring did not return array' };
}
if (scored.length !== mockMemories.length) {
return { success: false, error: 'Scoring lost memories' };
}
// Check that memories have scores
for (const memory of scored) {
if (typeof memory.relevanceScore !== 'number') {
return { success: false, error: 'Memory missing relevance score' };
}
}
// Check that memories are sorted by relevance (highest first)
for (let i = 1; i < scored.length; i++) {
if (scored[i].relevanceScore > scored[i-1].relevanceScore) {
return { success: false, error: 'Memories not sorted by relevance' };
}
}
console.log(` Scored ${scored.length} memories, top score: ${scored[0].relevanceScore.toFixed(3)}`);
return { success: true, scored };
});
// Test 3: Context Formatting
results.test('Context Formatting', () => {
const scored = scoreMemoryRelevance(mockMemories, mockProjectContext);
const formatted = formatMemoriesForContext(scored, mockProjectContext);
if (typeof formatted !== 'string') {
return { success: false, error: 'Formatting did not return string' };
}
if (formatted.length < 100) {
return { success: false, error: 'Formatted context too short' };
}
// Check for key formatting elements
if (!formatted.includes('Memory Context')) {
return { success: false, error: 'Missing memory context header' };
}
if (!formatted.includes(mockProjectContext.name)) {
return { success: false, error: 'Missing project name in context' };
}
console.log(` Generated ${formatted.length} characters of formatted context`);
return { success: true, formatted };
});
// Test 4: Session Start Hook Structure
results.test('Session Start Hook Structure', () => {
if (typeof sessionStartHook.handler !== 'function') {
return { success: false, error: 'Session start hook missing handler function' };
}
if (!sessionStartHook.name || !sessionStartHook.version) {
return { success: false, error: 'Session start hook missing metadata' };
}
if (sessionStartHook.trigger !== 'session-start') {
return { success: false, error: 'Session start hook wrong trigger' };
}
console.log(` Hook: ${sessionStartHook.name} v${sessionStartHook.version}`);
return { success: true };
});
// Test 5: Session End Hook Structure
results.test('Session End Hook Structure', () => {
if (typeof sessionEndHook.handler !== 'function') {
return { success: false, error: 'Session end hook missing handler function' };
}
if (!sessionEndHook.name || !sessionEndHook.version) {
return { success: false, error: 'Session end hook missing metadata' };
}
if (sessionEndHook.trigger !== 'session-end') {
return { success: false, error: 'Session end hook wrong trigger' };
}
console.log(` Hook: ${sessionEndHook.name} v${sessionEndHook.version}`);
return { success: true };
});
// Test 6: Configuration Loading
results.test('Configuration Loading', () => {
const configPath = path.join(__dirname, '../config.json');
if (!fs.existsSync(configPath)) {
return { success: false, error: 'Configuration file not found' };
}
try {
const config = JSON.parse(fs.readFileSync(configPath, 'utf8'));
if (!config.memoryService) {
return { success: false, error: 'Invalid configuration structure' };
}
// Support both old (direct endpoint) and new (dual-protocol) structures
const endpoint = config.memoryService.endpoint || config.memoryService.http?.endpoint;
if (!endpoint) {
return { success: false, error: 'No endpoint configured (checked both old and new format)' };
}
console.log(` Endpoint: ${endpoint}`);
return { success: true, config };
} catch (error) {
return { success: false, error: `Configuration parse error: ${error.message}` };
}
});
// Test 7: File Structure
results.test('File Structure Validation', () => {
const requiredFiles = [
'../core/session-start.js',
'../core/session-end.js',
'../utilities/project-detector.js',
'../utilities/memory-scorer.js',
'../utilities/context-formatter.js',
'../config.json',
'../config.template.json',
'../README.md'
];
for (const file of requiredFiles) {
const fullPath = path.join(__dirname, file);
if (!fs.existsSync(fullPath)) {
return { success: false, error: `Missing required file: ${file}` };
}
}
console.log(` All ${requiredFiles.length} required files present`);
return { success: true };
});
// Test 8: Mock Session Start (Limited Test)
await results.asyncTest('Mock Session Start Hook', async () => {
const mockContext = {
workingDirectory: process.cwd(),
sessionId: 'test-session',
injectSystemMessage: async (message) => {
if (typeof message !== 'string' || message.length < 50) {
throw new Error('Invalid message injection');
}
console.log(` Injected ${message.length} characters of context`);
return true;
}
};
try {
// Note: This will attempt to contact the memory service
// In a real test environment, we'd mock this
await sessionStartHook.handler(mockContext);
return { success: true };
} catch (error) {
// Expected to fail without real memory service connection or when dependencies are missing
if (error.message.includes('Network error') ||
error.message.includes('ENOTFOUND') ||
error.message.includes('memoryClient is not defined') ||
error.message.includes('No active connection')) {
console.log(' ⚠️ Expected error (no memory service or connection available)');
console.log(' This is expected if the service is not running during tests');
return { success: true }; // This is expected in test environment
}
throw error;
}
});
// Test 9: Package Dependencies
results.test('Package Dependencies Check', () => {
const requiredModules = ['fs', 'path', 'https', 'child_process'];
for (const module of requiredModules) {
try {
require(module);
} catch (error) {
return { success: false, error: `Missing required module: ${module}` };
}
}
console.log(` All ${requiredModules.length} required Node.js modules available`);
return { success: true };
});
// Test 10: Claude Code Settings Validation
results.test('Claude Code Settings Configuration', () => {
const settingsPath = path.join(process.env.HOME, '.claude', 'settings.json');
if (!fs.existsSync(settingsPath)) {
return { success: false, error: 'Claude Code settings.json not found' };
}
try {
const settings = JSON.parse(fs.readFileSync(settingsPath, 'utf8'));
// Check for hooks configuration
if (!settings.hooks) {
return { success: false, error: 'No hooks configuration found in settings' };
}
// Check for SessionStart hook
if (!settings.hooks.SessionStart || !Array.isArray(settings.hooks.SessionStart)) {
return { success: false, error: 'SessionStart hooks not configured' };
}
// Check for SessionEnd hook
if (!settings.hooks.SessionEnd || !Array.isArray(settings.hooks.SessionEnd)) {
return { success: false, error: 'SessionEnd hooks not configured' };
}
// Check hook command paths
const startHook = JSON.stringify(settings.hooks.SessionStart);
const endHook = JSON.stringify(settings.hooks.SessionEnd);
if (!startHook.includes('session-start.js')) {
return { success: false, error: 'SessionStart hook command not configured correctly' };
}
if (!endHook.includes('session-end.js')) {
return { success: false, error: 'SessionEnd hook command not configured correctly' };
}
console.log(' Claude Code settings configured correctly');
return { success: true, settings };
} catch (parseError) {
return { success: false, error: `Settings parse error: ${parseError.message}` };
}
});
// Test 11: Hook Files Location Validation
results.test('Hook Files in Correct Location', () => {
const hookDir = path.join(process.env.HOME, '.claude', 'hooks');
const requiredHooks = [
'core/session-start.js',
'core/session-end.js',
'utilities/project-detector.js',
'utilities/memory-scorer.js',
'utilities/context-formatter.js'
];
for (const hookFile of requiredHooks) {
const fullPath = path.join(hookDir, hookFile);
if (!fs.existsSync(fullPath)) {
return { success: false, error: `Hook file missing: ${hookFile}` };
}
}
console.log(` All hooks installed in ${hookDir}`);
return { success: true };
});
// Test 12: Claude Code CLI Availability
results.test('Claude Code CLI Availability', () => {
const { execSync } = require('child_process');
try {
execSync('which claude', { stdio: 'pipe' });
console.log(' Claude Code CLI available');
return { success: true };
} catch (error) {
return { success: false, error: 'Claude Code CLI not found in PATH' };
}
});
// Test 13: Memory Service Protocol
results.test('Memory Service Protocol Compatibility', () => {
// Test that we're generating the correct MCP JSON-RPC calls
const testCall = {
jsonrpc: '2.0',
id: 1,
method: 'tools/call',
params: {
name: 'retrieve_memory',
arguments: {
query: 'test query',
tags: ['test'],
limit: 5
}
}
};
const serialized = JSON.stringify(testCall);
const parsed = JSON.parse(serialized);
if (!parsed.jsonrpc || parsed.jsonrpc !== '2.0') {
return { success: false, error: 'Invalid JSON-RPC format' };
}
if (!parsed.params || !parsed.params.name || !parsed.params.arguments) {
return { success: false, error: 'Invalid MCP call structure' };
}
console.log(` MCP protocol structure valid`);
return { success: true };
});
// Test 14: Memory Service Connectivity
await results.asyncTest('Memory Service Connectivity', async () => {
const configPath = path.join(__dirname, '../config.json');
if (!fs.existsSync(configPath)) {
return { success: false, error: 'Configuration file not found for connectivity test' };
}
try {
const config = JSON.parse(fs.readFileSync(configPath, 'utf8'));
// Support both old (direct) and new (dual-protocol) structures
const endpoint = config.memoryService?.endpoint || config.memoryService?.http?.endpoint;
const apiKey = config.memoryService?.apiKey || config.memoryService?.http?.apiKey;
if (!endpoint) {
return { success: false, error: 'No memory service endpoint configured (checked both old and new format)' };
}
// Test basic connectivity (simplified test)
const https = require('https');
const url = new URL('/api/health', endpoint);
return new Promise((resolve) => {
const options = {
hostname: url.hostname,
port: url.port || 8443,
path: url.pathname,
method: 'GET',
timeout: 5000,
rejectUnauthorized: false
};
const req = https.request(options, (res) => {
console.log(` Memory service responded with status: ${res.statusCode}`);
if (res.statusCode === 200 || res.statusCode === 401) {
// 401 is expected without API key, but service is running
resolve({ success: true });
} else {
resolve({ success: false, error: `Service returned status: ${res.statusCode}` });
}
});
req.on('error', (error) => {
// Mark as success with warning if service isn't running (expected in test environments)
console.log(` ⚠️ Memory service not available: ${error.message}`);
console.log(' This is expected if the service is not running during tests');
resolve({ success: true });
});
req.on('timeout', () => {
console.log(' ⚠️ Connection timeout - service may not be running');
console.log(' This is expected if the service is not running during tests');
resolve({ success: true });
});
req.end();
});
} catch (parseError) {
return { success: false, error: `Configuration parse error: ${parseError.message}` };
}
});
// Display summary
const allTestsPassed = results.summary();
if (allTestsPassed) {
console.log('\n🎉 ALL TESTS PASSED! Phase 1 implementation is ready.');
console.log('\n📋 Next Steps:');
console.log(' 1. Install hooks in Claude Code hooks directory');
console.log(' 2. Configure memory service endpoint in config.json');
console.log(' 3. Test with real Claude Code session');
console.log(' 4. Begin Phase 2 implementation (dynamic memory loading)');
} else {
console.log('\n⚠️ Some tests failed. Please fix issues before proceeding.');
}
return allTestsPassed;
}
// Run tests if called directly
if (require.main === module) {
runTests()
.then(success => {
process.exit(success ? 0 : 1);
})
.catch(error => {
console.error('\n💥 Test suite crashed:', error.message);
console.error(error.stack);
process.exit(1);
});
}
module.exports = { runTests };
```
--------------------------------------------------------------------------------
/scripts/utils/claude_commands_utils.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utilities for installing and managing Claude Code commands for MCP Memory Service.
"""
import os
import sys
import shutil
import subprocess
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict, Tuple
def print_info(text: str) -> None:
"""Print formatted info text."""
print(f" -> {text}")
def print_error(text: str) -> None:
"""Print formatted error text."""
print(f" [ERROR] {text}")
def print_success(text: str) -> None:
"""Print formatted success text."""
print(f" [OK] {text}")
def print_warning(text: str) -> None:
"""Print formatted warning text."""
print(f" [WARNING] {text}")
def check_claude_code_cli() -> Tuple[bool, Optional[str]]:
"""
Check if Claude Code CLI is installed and available.
Returns:
Tuple of (is_available, version_or_error)
"""
try:
# Try to run claude --version
result = subprocess.run(
['claude', '--version'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
version = result.stdout.strip()
return True, version
else:
return False, f"claude command failed: {result.stderr.strip()}"
except subprocess.TimeoutExpired:
return False, "claude command timed out"
except FileNotFoundError:
return False, "claude command not found in PATH"
except Exception as e:
return False, f"Error checking claude CLI: {str(e)}"
def get_claude_commands_directory() -> Path:
"""
Get the Claude Code commands directory path.
Returns:
Path to ~/.claude/commands/
"""
return Path.home() / ".claude" / "commands"
def get_claude_hooks_directory() -> Path:
"""
Get the Claude Code hooks directory path.
Returns:
Path to ~/.claude/hooks/
"""
return Path.home() / ".claude" / "hooks"
def check_for_legacy_claude_paths() -> Tuple[bool, List[str]]:
"""
Check for legacy .claude-code directory structure and provide migration guidance.
Returns:
Tuple of (legacy_found, list_of_issues_and_recommendations)
"""
issues = []
legacy_found = False
# Check for legacy .claude-code directories
legacy_paths = [
Path.home() / ".claude-code",
Path.home() / ".claude-code" / "hooks",
Path.home() / ".claude-code" / "commands"
]
for legacy_path in legacy_paths:
if legacy_path.exists():
legacy_found = True
issues.append(f"⚠ Found legacy directory: {legacy_path}")
# Check what's in the legacy directory
if legacy_path.name == "hooks" and any(legacy_path.glob("*.js")):
issues.append(f" → Contains hook files that should be moved to ~/.claude/hooks/")
elif legacy_path.name == "commands" and any(legacy_path.glob("*.md")):
issues.append(f" → Contains command files that should be moved to ~/.claude/commands/")
if legacy_found:
issues.append("")
issues.append("Migration steps:")
issues.append("1. Create new directory: ~/.claude/")
issues.append("2. Move hooks: ~/.claude-code/hooks/* → ~/.claude/hooks/")
issues.append("3. Move commands: ~/.claude-code/commands/* → ~/.claude/commands/")
issues.append("4. Update settings.json to reference new paths")
issues.append("5. Remove old ~/.claude-code/ directory when satisfied")
return legacy_found, issues
def validate_claude_settings_paths() -> Tuple[bool, List[str]]:
"""
Validate paths in Claude settings files and detect common Windows path issues.
Returns:
Tuple of (all_valid, list_of_issues_and_recommendations)
"""
import json
import platform
issues = []
all_valid = True
# Common Claude settings locations
settings_paths = [
Path.home() / ".claude" / "settings.json",
Path.home() / ".claude" / "settings.local.json"
]
for settings_path in settings_paths:
if not settings_path.exists():
continue
try:
with open(settings_path, 'r', encoding='utf-8') as f:
settings = json.load(f)
# Check hooks configuration
if 'hooks' in settings:
for hook in settings.get('hooks', []):
if 'command' in hook:
command = hook['command']
# Check for Windows path issues
if platform.system() == "Windows":
if '\\' in command and not command.startswith('"'):
all_valid = False
issues.append(f"⚠ Windows path with backslashes in {settings_path.name}:")
issues.append(f" → {command}")
issues.append(f" → Consider using forward slashes: {command.replace(chr(92), '/')}")
# Check for legacy .claude-code references
if '.claude-code' in command:
all_valid = False
issues.append(f"⚠ Legacy path reference in {settings_path.name}:")
issues.append(f" → {command}")
issues.append(f" → Update to use .claude instead of .claude-code")
# Check for missing session-start-wrapper.bat
if 'session-start-wrapper.bat' in command:
all_valid = False
issues.append(f"⚠ Reference to non-existent wrapper file in {settings_path.name}:")
issues.append(f" → {command}")
issues.append(f" → Use Node.js script directly: node path/to/session-start.js")
# Check if referenced files exist
if command.startswith('node '):
script_path_str = command.replace('node ', '').strip()
# Handle quoted paths
if script_path_str.startswith('"') and script_path_str.endswith('"'):
script_path_str = script_path_str[1:-1]
script_path = Path(script_path_str)
if not script_path.exists() and not script_path.is_absolute():
# Try to resolve relative to home directory
script_path = Path.home() / script_path_str
if not script_path.exists():
all_valid = False
issues.append(f"⚠ Hook script not found: {script_path_str}")
issues.append(f" → Check if hooks are properly installed")
except json.JSONDecodeError as e:
all_valid = False
issues.append(f"⚠ JSON parsing error in {settings_path.name}: {str(e)}")
except Exception as e:
all_valid = False
issues.append(f"⚠ Error reading {settings_path.name}: {str(e)}")
return all_valid, issues
def normalize_windows_path_for_json(path_str: str) -> str:
"""
Normalize a Windows path for use in JSON configuration files.
Args:
path_str: Path string that may contain backslashes
Returns:
Path string with forward slashes suitable for JSON
"""
import platform
if platform.system() == "Windows":
# Convert backslashes to forward slashes
normalized = path_str.replace('\\', '/')
# Handle double backslashes from escaped strings
normalized = normalized.replace('//', '/')
return normalized
return path_str
def check_commands_directory_access() -> Tuple[bool, str]:
"""
Check if we can access and write to the Claude commands directory.
Returns:
Tuple of (can_access, status_message)
"""
commands_dir = get_claude_commands_directory()
try:
# Check if directory exists
if not commands_dir.exists():
# Try to create it
commands_dir.mkdir(parents=True, exist_ok=True)
return True, f"Created commands directory: {commands_dir}"
# Check if we can write to it
test_file = commands_dir / ".test_write_access"
try:
test_file.write_text("test")
test_file.unlink()
return True, f"Commands directory accessible: {commands_dir}"
except PermissionError:
return False, f"No write permission to commands directory: {commands_dir}"
except Exception as e:
return False, f"Cannot access commands directory: {str(e)}"
def get_source_commands_directory() -> Path:
"""
Get the source directory containing the command markdown files.
Returns:
Path to the claude_commands directory in the project
"""
# Get the directory containing this script
script_dir = Path(__file__).parent
# Go up one level to the project root and find claude_commands
project_root = script_dir.parent
return project_root / "claude_commands"
def list_available_commands() -> List[Dict[str, str]]:
"""
List all available command files in the source directory.
Returns:
List of command info dictionaries
"""
source_dir = get_source_commands_directory()
commands = []
if not source_dir.exists():
return commands
for md_file in source_dir.glob("*.md"):
# Extract command name from filename
command_name = md_file.stem
# Read the first line to get the description
try:
with open(md_file, 'r', encoding='utf-8') as f:
first_line = f.readline().strip()
# Remove markdown header formatting
description = first_line.lstrip('# ').strip()
except Exception:
description = "Command description unavailable"
commands.append({
'name': command_name,
'file': md_file.name,
'description': description,
'path': str(md_file)
})
return commands
def backup_existing_commands() -> Optional[str]:
"""
Create a backup of existing command files before installation.
Returns:
Path to backup directory if backup was created, None otherwise
"""
commands_dir = get_claude_commands_directory()
if not commands_dir.exists():
return None
# Check if there are any existing .md files
existing_commands = list(commands_dir.glob("*.md"))
if not existing_commands:
return None
# Create backup directory with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir = commands_dir / f"backup_{timestamp}"
try:
backup_dir.mkdir(exist_ok=True)
for cmd_file in existing_commands:
shutil.copy2(cmd_file, backup_dir / cmd_file.name)
print_info(f"Backed up {len(existing_commands)} existing commands to: {backup_dir}")
return str(backup_dir)
except Exception as e:
print_error(f"Failed to create backup: {str(e)}")
return None
def install_command_files() -> Tuple[bool, List[str]]:
"""
Install command markdown files to the Claude commands directory.
Returns:
Tuple of (success, list_of_installed_files)
"""
source_dir = get_source_commands_directory()
commands_dir = get_claude_commands_directory()
installed_files = []
if not source_dir.exists():
print_error(f"Source commands directory not found: {source_dir}")
return False, []
try:
# Ensure destination directory exists
commands_dir.mkdir(parents=True, exist_ok=True)
# Copy all .md files
for md_file in source_dir.glob("*.md"):
dest_file = commands_dir / md_file.name
shutil.copy2(md_file, dest_file)
installed_files.append(md_file.name)
print_info(f"Installed: {md_file.name}")
if installed_files:
print_success(f"Successfully installed {len(installed_files)} Claude Code commands")
return True, installed_files
else:
print_warning("No command files found to install")
return False, []
except Exception as e:
print_error(f"Failed to install command files: {str(e)}")
return False, []
def verify_mcp_service_connectivity() -> Tuple[bool, str]:
"""
Verify that the MCP Memory Service is accessible.
Returns:
Tuple of (is_accessible, status_message)
"""
try:
# Try to import the MCP service modules
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
# Test basic connectivity
from mcp_memory_service import config
# Check if we can detect a running service
# This is a basic check - in a real scenario, we'd try to connect
return True, "MCP Memory Service modules available"
except ImportError as e:
return False, f"MCP Memory Service not properly installed: {str(e)}"
except Exception as e:
return False, f"Error checking MCP service: {str(e)}"
def test_command_functionality() -> Tuple[bool, List[str]]:
"""
Test that installed commands are accessible via Claude Code CLI.
Returns:
Tuple of (all_tests_passed, list_of_test_results)
"""
commands_dir = get_claude_commands_directory()
test_results = []
all_passed = True
# Check if command files exist and are readable
for md_file in commands_dir.glob("memory-*.md"):
try:
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read()
if len(content) > 0:
test_results.append(f"✓ {md_file.name} - readable and non-empty")
else:
test_results.append(f"✗ {md_file.name} - file is empty")
all_passed = False
except Exception as e:
test_results.append(f"✗ {md_file.name} - error reading: {str(e)}")
all_passed = False
# Try to run claude commands (if Claude CLI is available)
claude_available, _ = check_claude_code_cli()
if claude_available:
try:
# Test that claude can see our commands
result = subprocess.run(
['claude', '--help'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
test_results.append("✓ Claude Code CLI is responsive")
else:
test_results.append("✗ Claude Code CLI returned error")
all_passed = False
except Exception as e:
test_results.append(f"✗ Error testing Claude CLI: {str(e)}")
all_passed = False
return all_passed, test_results
def uninstall_commands() -> Tuple[bool, List[str]]:
"""
Uninstall MCP Memory Service commands from Claude Code.
Returns:
Tuple of (success, list_of_removed_files)
"""
commands_dir = get_claude_commands_directory()
removed_files = []
if not commands_dir.exists():
return True, [] # Nothing to remove
try:
# Remove all memory-*.md files
for md_file in commands_dir.glob("memory-*.md"):
md_file.unlink()
removed_files.append(md_file.name)
print_info(f"Removed: {md_file.name}")
if removed_files:
print_success(f"Successfully removed {len(removed_files)} commands")
else:
print_info("No MCP Memory Service commands found to remove")
return True, removed_files
except Exception as e:
print_error(f"Failed to uninstall commands: {str(e)}")
return False, []
def install_claude_commands(verbose: bool = True) -> bool:
"""
Main function to install Claude Code commands for MCP Memory Service.
Args:
verbose: Whether to print detailed progress information
Returns:
True if installation was successful, False otherwise
"""
if verbose:
print_info("Installing Claude Code commands for MCP Memory Service...")
# Check for legacy paths and provide migration guidance
legacy_found, legacy_issues = check_for_legacy_claude_paths()
if legacy_found:
print_warning("Legacy Claude Code directory structure detected:")
for issue in legacy_issues:
print_info(issue)
print_info("")
# Validate existing settings paths
settings_valid, settings_issues = validate_claude_settings_paths()
if not settings_valid:
print_warning("Claude settings path issues detected:")
for issue in settings_issues:
print_info(issue)
print_info("")
# Check Claude Code CLI availability
claude_available, claude_status = check_claude_code_cli()
if not claude_available:
print_error(f"Claude Code CLI not available: {claude_status}")
print_info("Please install Claude Code CLI first: https://claude.ai/code")
return False
if verbose:
print_success(f"Claude Code CLI detected: {claude_status}")
# Check commands directory access
can_access, access_status = check_commands_directory_access()
if not can_access:
print_error(access_status)
return False
if verbose:
print_success(access_status)
# Create backup of existing commands
backup_path = backup_existing_commands()
# Install command files
install_success, installed_files = install_command_files()
if not install_success:
return False
# Verify MCP service connectivity (optional - warn but don't fail)
mcp_available, mcp_status = verify_mcp_service_connectivity()
if mcp_available:
if verbose:
print_success(mcp_status)
else:
if verbose:
print_warning(f"MCP service check: {mcp_status}")
print_info("Commands installed but MCP service may need to be started")
# Test command functionality
if verbose:
print_info("Testing installed commands...")
tests_passed, test_results = test_command_functionality()
for result in test_results:
print_info(result)
if tests_passed:
print_success("All command tests passed")
else:
print_warning("Some command tests failed - commands may still work")
# Show usage instructions
if verbose:
print_info("\nClaude Code commands installed successfully!")
print_info("Available commands:")
for cmd_file in installed_files:
cmd_name = cmd_file.replace('.md', '')
print_info(f" claude /{cmd_name}")
print_info("\nExample usage:")
print_info(' claude /memory-store "Important decision about architecture"')
print_info(' claude /memory-recall "what did we decide last week?"')
print_info(' claude /memory-search --tags "architecture,database"')
print_info(' claude /memory-health')
return True
if __name__ == "__main__":
# Allow running this script directly for testing
import argparse
parser = argparse.ArgumentParser(description="Install Claude Code commands for MCP Memory Service")
parser.add_argument('--test', action='store_true', help='Test installation without installing')
parser.add_argument('--uninstall', action='store_true', help='Uninstall commands')
parser.add_argument('--validate', action='store_true', help='Validate Claude configuration paths')
parser.add_argument('--quiet', action='store_true', help='Minimal output')
args = parser.parse_args()
if args.uninstall:
success, removed = uninstall_commands()
if success:
sys.exit(0)
else:
sys.exit(1)
elif args.validate:
# Path validation mode
print("Claude Code Configuration Validation")
print("=" * 40)
# Check for legacy paths
legacy_found, legacy_issues = check_for_legacy_claude_paths()
if legacy_found:
print("❌ Legacy paths detected:")
for issue in legacy_issues:
print(f" {issue}")
else:
print("✅ No legacy paths found")
print()
# Validate settings
settings_valid, settings_issues = validate_claude_settings_paths()
if settings_valid:
print("✅ Claude settings paths are valid")
else:
print("❌ Settings path issues detected:")
for issue in settings_issues:
print(f" {issue}")
sys.exit(0 if settings_valid and not legacy_found else 1)
elif args.test:
# Test mode - check prerequisites but don't install
claude_ok, claude_msg = check_claude_code_cli()
access_ok, access_msg = check_commands_directory_access()
mcp_ok, mcp_msg = verify_mcp_service_connectivity()
print("Claude Code commands installation test:")
print(f" Claude CLI: {'✓' if claude_ok else '✗'} {claude_msg}")
print(f" Directory access: {'✓' if access_ok else '✗'} {access_msg}")
print(f" MCP service: {'✓' if mcp_ok else '⚠'} {mcp_msg}")
if claude_ok and access_ok:
print("✓ Ready to install Claude Code commands")
sys.exit(0)
else:
print("✗ Prerequisites not met")
sys.exit(1)
else:
# Normal installation
success = install_claude_commands(verbose=not args.quiet)
sys.exit(0 if success else 1)
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/consolidation/forgetting.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Controlled forgetting system with archival for memory management."""
import os
import json
import shutil
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
from dataclasses import dataclass
from pathlib import Path
import hashlib
from .base import ConsolidationBase, ConsolidationConfig
from .decay import RelevanceScore
from ..models.memory import Memory
@dataclass
class ForgettingCandidate:
"""A memory candidate for forgetting."""
memory: Memory
relevance_score: RelevanceScore
forgetting_reasons: List[str]
archive_priority: int # 1=high, 2=medium, 3=low
can_be_deleted: bool
@dataclass
class ForgettingResult:
"""Result of forgetting operation."""
memory_hash: str
action_taken: str # 'archived', 'compressed', 'deleted', 'skipped'
archive_path: Optional[str]
compressed_version: Optional[Memory]
metadata: Dict[str, Any]
class ControlledForgettingEngine(ConsolidationBase):
"""
Implements intelligent forgetting to maintain memory system health.
Rather than deleting memories, this system compresses and archives
low-value memories while maintaining audit trails and recovery options.
"""
def __init__(self, config: ConsolidationConfig):
super().__init__(config)
self.relevance_threshold = config.relevance_threshold
self.access_threshold_days = config.access_threshold_days
self.archive_location = config.archive_location or "~/.mcp_memory_archive"
# Ensure archive directory exists
self.archive_path = Path(os.path.expanduser(self.archive_location))
self.archive_path.mkdir(parents=True, exist_ok=True)
# Create subdirectories for different archive types
self.daily_archive = self.archive_path / "daily"
self.compressed_archive = self.archive_path / "compressed"
self.metadata_archive = self.archive_path / "metadata"
for archive_dir in [self.daily_archive, self.compressed_archive, self.metadata_archive]:
archive_dir.mkdir(exist_ok=True)
async def process(self, memories: List[Memory], relevance_scores: List[RelevanceScore], **kwargs) -> List[ForgettingResult]:
"""Identify and process memories for controlled forgetting."""
if not self._validate_memories(memories):
return []
# Create score lookup
score_lookup = {score.memory_hash: score for score in relevance_scores}
# Get access patterns from kwargs
access_patterns = kwargs.get('access_patterns', {})
time_horizon = kwargs.get('time_horizon', 'monthly')
# Identify forgetting candidates
candidates = await self._identify_forgetting_candidates(
memories, score_lookup, access_patterns, time_horizon
)
if not candidates:
self.logger.info("No memories identified for forgetting")
return []
# Process candidates
results = []
for candidate in candidates:
result = await self._process_forgetting_candidate(candidate)
results.append(result)
# Log forgetting summary
actions_summary = {}
for result in results:
action = result.action_taken
actions_summary[action] = actions_summary.get(action, 0) + 1
self.logger.info(f"Forgetting results: {actions_summary}")
return results
async def _identify_forgetting_candidates(
self,
memories: List[Memory],
score_lookup: Dict[str, RelevanceScore],
access_patterns: Dict[str, datetime],
time_horizon: str
) -> List[ForgettingCandidate]:
"""Identify memories that are candidates for forgetting."""
candidates = []
current_time = datetime.now()
for memory in memories:
# Skip protected memories
if self._is_protected_memory(memory):
continue
# Get relevance score
relevance_score = score_lookup.get(memory.content_hash)
if not relevance_score:
continue
# Check if memory meets forgetting criteria
forgetting_reasons = []
can_be_deleted = False
archive_priority = 3 # Default to low priority
# Low relevance check
if relevance_score.total_score < self.relevance_threshold:
forgetting_reasons.append("low_relevance")
archive_priority = min(archive_priority, 2) # Medium priority
# Access pattern check
last_accessed = access_patterns.get(memory.content_hash)
if not last_accessed and memory.updated_at:
last_accessed = datetime.utcfromtimestamp(memory.updated_at)
if last_accessed:
days_since_access = (current_time - last_accessed).days
if days_since_access > self.access_threshold_days:
forgetting_reasons.append("old_access")
if days_since_access > self.access_threshold_days * 2:
archive_priority = min(archive_priority, 1) # High priority
can_be_deleted = True # Can be deleted if very old
# Memory type specific checks
memory_type = self._extract_memory_type(memory)
if memory_type == 'temporary':
age_days = self._get_memory_age_days(memory, current_time)
if age_days > 7: # Temporary memories older than a week
forgetting_reasons.append("expired_temporary")
can_be_deleted = True
archive_priority = 1
# Content quality checks
if self._is_low_quality_content(memory):
forgetting_reasons.append("low_quality")
archive_priority = min(archive_priority, 2)
# Duplicate content check
if self._appears_to_be_duplicate(memory, memories):
forgetting_reasons.append("potential_duplicate")
can_be_deleted = True
archive_priority = 1
# Create candidate if reasons exist
if forgetting_reasons:
# Override time horizon restriction for certain types of deletable content
can_delete_final = can_be_deleted
if not (time_horizon in ['quarterly', 'yearly']):
# Still allow deletion for expired temporary memories and duplicates
if not ('expired_temporary' in forgetting_reasons or 'potential_duplicate' in forgetting_reasons):
can_delete_final = False
candidate = ForgettingCandidate(
memory=memory,
relevance_score=relevance_score,
forgetting_reasons=forgetting_reasons,
archive_priority=archive_priority,
can_be_deleted=can_delete_final
)
candidates.append(candidate)
# Sort by priority (higher priority = lower number = first in list)
candidates.sort(key=lambda c: (c.archive_priority, -c.relevance_score.total_score))
self.logger.info(f"Identified {len(candidates)} forgetting candidates")
return candidates
def _is_low_quality_content(self, memory: Memory) -> bool:
"""Check if memory content appears to be low quality."""
content = memory.content.strip()
# Very short content
if len(content) < 10:
return True
# Mostly punctuation or special characters
alpha_chars = sum(1 for c in content if c.isalpha())
if alpha_chars / len(content) < 0.3: # Less than 30% alphabetic
return True
# Repetitive content patterns
if len(set(content.split())) < len(content.split()) * 0.5: # Less than 50% unique words
return True
# Common low-value patterns
low_value_patterns = [
'test', 'testing', 'hello world', 'lorem ipsum',
'asdf', 'qwerty', '1234', 'temp', 'temporary'
]
content_lower = content.lower()
for pattern in low_value_patterns:
if pattern in content_lower and len(content) < 100:
return True
return False
def _appears_to_be_duplicate(self, memory: Memory, all_memories: List[Memory]) -> bool:
"""Check if memory appears to be a duplicate of another memory."""
content = memory.content.strip().lower()
# Skip very short content for duplicate detection
if len(content) < 20:
return False
for other_memory in all_memories:
if other_memory.content_hash == memory.content_hash:
continue
other_content = other_memory.content.strip().lower()
# Exact match
if content == other_content:
return True
# Very similar content (simple check)
if len(content) > 50 and len(other_content) > 50:
# Check if one is a substring of the other with high overlap
if content in other_content or other_content in content:
return True
# Check word overlap
words1 = set(content.split())
words2 = set(other_content.split())
if len(words1) > 5 and len(words2) > 5:
overlap = len(words1.intersection(words2))
union = len(words1.union(words2))
if overlap / union > 0.8: # 80% word overlap
return True
return False
async def _process_forgetting_candidate(self, candidate: ForgettingCandidate) -> ForgettingResult:
"""Process a single forgetting candidate."""
memory = candidate.memory
try:
# Determine action based on candidate properties
if candidate.can_be_deleted and 'potential_duplicate' in candidate.forgetting_reasons:
# Delete obvious duplicates or expired temporary content
return await self._delete_memory(candidate)
elif candidate.archive_priority <= 2:
# Archive high and medium priority candidates
return await self._archive_memory(candidate)
else:
# Compress low priority candidates
return await self._compress_memory(candidate)
except Exception as e:
self.logger.error(f"Error processing forgetting candidate {memory.content_hash}: {e}")
return ForgettingResult(
memory_hash=memory.content_hash,
action_taken='skipped',
archive_path=None,
compressed_version=None,
metadata={'error': str(e)}
)
async def _archive_memory(self, candidate: ForgettingCandidate) -> ForgettingResult:
"""Archive a memory to the filesystem."""
memory = candidate.memory
# Create archive filename with timestamp and hash
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
short_hash = memory.content_hash[:12]
filename = f"{timestamp}_{short_hash}.json"
# Choose archive directory based on priority
if candidate.archive_priority == 1:
archive_dir = self.daily_archive
else:
archive_dir = self.compressed_archive
archive_file = archive_dir / filename
# Create archive data
archive_data = {
'memory': memory.to_dict(),
'relevance_score': {
'total_score': candidate.relevance_score.total_score,
'base_importance': candidate.relevance_score.base_importance,
'decay_factor': candidate.relevance_score.decay_factor,
'connection_boost': candidate.relevance_score.connection_boost,
'access_boost': candidate.relevance_score.access_boost,
'metadata': candidate.relevance_score.metadata
},
'forgetting_metadata': {
'reasons': candidate.forgetting_reasons,
'archive_priority': candidate.archive_priority,
'archive_date': datetime.now().isoformat(),
'original_hash': memory.content_hash
}
}
# Write to archive
with open(archive_file, 'w', encoding='utf-8') as f:
json.dump(archive_data, f, indent=2, ensure_ascii=False)
# Create metadata entry
await self._create_metadata_entry(memory, archive_file, 'archived')
return ForgettingResult(
memory_hash=memory.content_hash,
action_taken='archived',
archive_path=str(archive_file),
compressed_version=None,
metadata={
'archive_priority': candidate.archive_priority,
'reasons': candidate.forgetting_reasons,
'file_size': archive_file.stat().st_size
}
)
async def _compress_memory(self, candidate: ForgettingCandidate) -> ForgettingResult:
"""Create a compressed version of the memory."""
memory = candidate.memory
original_content = memory.content
# Simple compression: extract key information
compressed_content = self._create_compressed_content(original_content)
# Create compressed memory
compressed_hash = hashlib.sha256(compressed_content.encode()).hexdigest()
compressed_memory = Memory(
content=compressed_content,
content_hash=compressed_hash,
tags=memory.tags + ['compressed'],
memory_type='compressed',
metadata={
**memory.metadata,
'original_hash': memory.content_hash,
'original_length': len(original_content),
'compressed_length': len(compressed_content),
'compression_ratio': len(compressed_content) / len(original_content),
'compression_date': datetime.now().isoformat(),
'forgetting_reasons': candidate.forgetting_reasons
},
embedding=memory.embedding, # Preserve embedding
created_at=memory.created_at,
created_at_iso=memory.created_at_iso
)
# Archive original for recovery
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
short_hash = memory.content_hash[:12]
archive_file = self.compressed_archive / f"original_{timestamp}_{short_hash}.json"
with open(archive_file, 'w', encoding='utf-8') as f:
json.dump(memory.to_dict(), f, indent=2, ensure_ascii=False)
# Create metadata entry
await self._create_metadata_entry(memory, archive_file, 'compressed')
return ForgettingResult(
memory_hash=memory.content_hash,
action_taken='compressed',
archive_path=str(archive_file),
compressed_version=compressed_memory,
metadata={
'original_length': len(original_content),
'compressed_length': len(compressed_content),
'compression_ratio': len(compressed_content) / len(original_content),
'reasons': candidate.forgetting_reasons
}
)
async def _delete_memory(self, candidate: ForgettingCandidate) -> ForgettingResult:
"""Delete a memory (with metadata backup)."""
memory = candidate.memory
# Always create a metadata backup before deletion
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
short_hash = memory.content_hash[:12]
backup_file = self.metadata_archive / f"deleted_{timestamp}_{short_hash}.json"
backup_data = {
'memory': memory.to_dict(),
'deletion_metadata': {
'reasons': candidate.forgetting_reasons,
'deletion_date': datetime.now().isoformat(),
'relevance_score': candidate.relevance_score.total_score
}
}
with open(backup_file, 'w', encoding='utf-8') as f:
json.dump(backup_data, f, indent=2, ensure_ascii=False)
return ForgettingResult(
memory_hash=memory.content_hash,
action_taken='deleted',
archive_path=str(backup_file),
compressed_version=None,
metadata={
'reasons': candidate.forgetting_reasons,
'backup_location': str(backup_file)
}
)
def _create_compressed_content(self, original_content: str) -> str:
"""Create compressed version of content preserving key information."""
# Simple compression strategy: extract key sentences and important terms
sentences = original_content.split('.')
# Keep first and last sentences if content is long enough
if len(sentences) > 3:
key_sentences = [sentences[0].strip(), sentences[-1].strip()]
middle_content = ' '.join(sentences[1:-1])
# Extract important terms from middle content
important_terms = self._extract_important_terms(middle_content)
if important_terms:
key_sentences.append(f"Key terms: {', '.join(important_terms[:5])}")
compressed = '. '.join(key_sentences)
else:
# Content is already short, just clean it up
compressed = original_content.strip()
# Add compression indicator
if len(compressed) < len(original_content) * 0.8:
compressed += " [Compressed]"
return compressed
def _extract_important_terms(self, text: str) -> List[str]:
"""Extract important terms from text."""
import re
# Extract capitalized words, numbers, and technical terms
terms = set()
# Capitalized words (potential proper nouns)
terms.update(re.findall(r'\b[A-Z][a-z]+\b', text))
# Numbers and measurements
terms.update(re.findall(r'\b\d+(?:\.\d+)?(?:\s*[a-zA-Z]+)?\b', text))
# Words in quotes
terms.update(re.findall(r'"([^"]*)"', text))
# Technical-looking terms (CamelCase, with underscores, etc.)
terms.update(re.findall(r'\b[a-z]+[A-Z][a-zA-Z]*\b', text))
terms.update(re.findall(r'\b\w+_\w+\b', text))
return list(terms)[:10] # Limit to 10 terms
async def _create_metadata_entry(self, memory: Memory, archive_path: Path, action: str):
"""Create a metadata entry for tracking archived/compressed memories."""
metadata_file = self.metadata_archive / "forgetting_log.jsonl"
entry = {
'memory_hash': memory.content_hash,
'action': action,
'archive_path': str(archive_path),
'timestamp': datetime.now().isoformat(),
'memory_type': memory.memory_type,
'tags': memory.tags,
'content_length': len(memory.content)
}
# Append to log file
with open(metadata_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(entry) + '\n')
async def recover_memory(self, memory_hash: str) -> Optional[Memory]:
"""Recover a forgotten memory from archives."""
# Search through archive directories
for archive_dir in [self.daily_archive, self.compressed_archive]:
for archive_file in archive_dir.glob("*.json"):
try:
with open(archive_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Check if this is the memory we're looking for
if data.get('memory', {}).get('content_hash') == memory_hash:
memory_data = data['memory']
return Memory.from_dict(memory_data)
except Exception as e:
self.logger.warning(f"Error reading archive file {archive_file}: {e}")
return None
async def get_forgetting_statistics(self) -> Dict[str, Any]:
"""Get statistics about forgetting operations."""
stats = {
'total_archived': 0,
'total_compressed': 0,
'total_deleted': 0,
'archive_size_bytes': 0,
'oldest_archive': None,
'newest_archive': None
}
# Count files in archive directories
for archive_dir in [self.daily_archive, self.compressed_archive]:
if archive_dir.exists():
files = list(archive_dir.glob("*.json"))
stats['total_archived'] += len(files)
for file in files:
stats['archive_size_bytes'] += file.stat().st_size
# Read forgetting log for detailed stats
log_file = self.metadata_archive / "forgetting_log.jsonl"
if log_file.exists():
try:
with open(log_file, 'r', encoding='utf-8') as f:
for line in f:
entry = json.loads(line.strip())
action = entry.get('action', 'unknown')
if action == 'archived':
stats['total_archived'] += 1
elif action == 'compressed':
stats['total_compressed'] += 1
elif action == 'deleted':
stats['total_deleted'] += 1
# Track date range
timestamp = entry.get('timestamp')
if timestamp:
if not stats['oldest_archive'] or timestamp < stats['oldest_archive']:
stats['oldest_archive'] = timestamp
if not stats['newest_archive'] or timestamp > stats['newest_archive']:
stats['newest_archive'] = timestamp
except Exception as e:
self.logger.warning(f"Error reading forgetting log: {e}")
return stats
```
--------------------------------------------------------------------------------
/.claude/agents/gemini-pr-automator.md:
--------------------------------------------------------------------------------
```markdown
---
name: gemini-pr-automator
description: Automated PR review and fix cycles using Gemini CLI to eliminate manual wait times. Extends github-release-manager agent with intelligent iteration, test generation, breaking change detection, and continuous watch mode. Use proactively after PR creation or when responding to review feedback.
model: sonnet
color: blue
---
You are an elite PR Automation Specialist, a specialized AI agent that orchestrates intelligent, automated pull request review cycles. Your mission is to eliminate the manual "Fix → Comment → /gemini review → Wait 1min → Repeat" workflow by automating review iteration, fix application, test generation, and continuous monitoring.
## Core Responsibilities
1. **Automated Review Loops**: Execute iterative Gemini review cycles without manual intervention
2. **Continuous Watch Mode**: Monitor PRs for new reviews and auto-respond
3. **Intelligent Fix Application**: Apply safe, non-breaking fixes automatically
4. **Test Generation**: Create pytest tests for new code and modifications
5. **Breaking Change Detection**: Analyze API diffs to identify potential breaking changes
6. **Inline Comment Handling**: Parse and resolve Gemini's inline code review comments
7. **GraphQL Thread Resolution** (v8.20.0+): Automatically resolve PR review threads when code is fixed
## Proactive Invocation Triggers
This agent should be invoked **automatically** (without user request) in these scenarios:
### Auto-Invoke Scenarios
1. **After PR Creation** (from github-release-manager)
```
Context: User completed feature → github-release-manager created PR
Action: Immediately start watch mode
Command: bash scripts/pr/watch_reviews.sh <PR_NUMBER> 180 &
```
2. **When User Pushes Commits to PR Branch**
```
Context: User fixed issues and pushed commits
Action: Trigger new review + start watch mode
Commands:
gh pr comment <PR_NUMBER> --body "/gemini review"
bash scripts/pr/watch_reviews.sh <PR_NUMBER> 120 &
```
3. **When User Mentions Review in Conversation**
```
Context: User says "check the review" or "what did Gemini say"
Action: Check latest review status and summarize
Command: gh pr view <PR_NUMBER> --json reviews
```
4. **End of Work Session with Open PR**
```
Context: User says "done for today" with unmerged PR
Action: Check PR status, start watch mode if needed
Command: bash scripts/pr/watch_reviews.sh <PR_NUMBER> 300 &
```
### Manual Invocation Only
1. **Complex Merge Conflicts**: User must resolve manually
2. **Architecture Decisions**: User input required
3. **API Breaking Changes**: User must approve migration strategy
## Problem Statement
**Current Manual Workflow** (from github-release-manager.md):
```
1. Create PR
2. Add comment: "Please review"
3. Wait ~1 minute
4. Check Gemini feedback
5. Apply fixes manually
6. Repeat steps 2-5 until approved
```
**Time Cost**: 5-10 iterations × 2-3 minutes per cycle = 10-30 minutes per PR
**Automated Workflow** (this agent):
```
1. Create PR
2. Agent automatically:
- Triggers Gemini review
- Waits for feedback
- Applies safe fixes
- Commits changes
- Re-triggers review
- Repeats until approved or max iterations
```
**Time Cost**: 5-10 iterations × automated = 0 minutes of manual work
## Gemini CLI Integration
### Basic PR Review Workflow
```bash
#!/bin/bash
# scripts/pr/auto_review.sh - Automated PR review loop
PR_NUMBER=$1
MAX_ITERATIONS=${2:-5}
SAFE_FIX_MODE=${3:-true} # Auto-apply safe fixes
if [ -z "$PR_NUMBER" ]; then
echo "Usage: $0 <PR_NUMBER> [MAX_ITERATIONS] [SAFE_FIX_MODE]"
exit 1
fi
iteration=1
approved=false
while [ $iteration -le $MAX_ITERATIONS ] && [ "$approved" = false ]; do
echo "=== Iteration $iteration/$MAX_ITERATIONS ==="
# Trigger Gemini review (comment on PR)
gh pr comment $PR_NUMBER --body "Please review this PR for code quality, security, and best practices."
# Wait for Gemini to process
echo "Waiting for Gemini review..."
sleep 90 # Gemini typically responds in 60-90 seconds
# Fetch latest review comments
review_comments=$(gh pr view $PR_NUMBER --json comments --jq '.comments[-1].body')
echo "Review feedback:"
echo "$review_comments"
# Check if approved
if echo "$review_comments" | grep -qi "looks good\|approved\|lgtm"; then
echo "✅ PR approved by Gemini!"
approved=true
break
fi
# Extract issues and generate fixes
if [ "$SAFE_FIX_MODE" = true ]; then
echo "Generating fixes for review feedback..."
# Use Gemini to analyze feedback and suggest code changes
fixes=$(gemini "Based on this code review feedback, generate specific code fixes. Review feedback: $review_comments
Changed files: $(gh pr diff $PR_NUMBER)
Provide fixes in git diff format that can be applied with git apply. Focus only on safe, non-breaking changes.")
# Apply fixes (would need more sophisticated parsing in production)
echo "$fixes" > /tmp/pr_fixes_$PR_NUMBER.diff
# Apply and commit
if git apply --check /tmp/pr_fixes_$PR_NUMBER.diff 2>/dev/null; then
git apply /tmp/pr_fixes_$PR_NUMBER.diff
git add -A
git commit -m "fix: apply Gemini review feedback (iteration $iteration)"
git push
echo "✅ Fixes applied and pushed"
else
echo "⚠️ Fixes could not be auto-applied, manual intervention needed"
break
fi
else
echo "Manual fix mode - review feedback above and apply manually"
break
fi
iteration=$((iteration + 1))
echo ""
done
if [ "$approved" = true ]; then
echo "🎉 PR $PR_NUMBER is approved and ready to merge!"
exit 0
else
echo "⚠️ Max iterations reached or manual intervention needed"
exit 1
fi
```
### Test Generation Workflow
```bash
#!/bin/bash
# scripts/pr/generate_tests.sh - Auto-generate tests for new code
PR_NUMBER=$1
if [ -z "$PR_NUMBER" ]; then
echo "Usage: $0 <PR_NUMBER>"
exit 1
fi
echo "Analyzing PR $PR_NUMBER for test coverage..."
# Get changed Python files
changed_files=$(gh pr diff $PR_NUMBER --name-only | grep '\.py$' | grep -v '^tests/')
if [ -z "$changed_files" ]; then
echo "No Python files changed (excluding tests)"
exit 0
fi
for file in $changed_files; do
echo "Generating tests for: $file"
# Check if test file exists
test_file="tests/test_$(basename $file)"
if [ -f "$test_file" ]; then
echo "Test file exists, suggesting additional test cases..."
existing_tests=$(cat "$test_file")
prompt="Existing test file: $existing_tests
New/changed code: $(cat $file)
Suggest additional pytest test cases to cover the new/changed code. Output only the new test functions to append to the existing file."
else
echo "Creating new test file..."
prompt="Generate comprehensive pytest tests for this Python module: $(cat $file)
Include:
- Happy path tests
- Edge cases
- Error handling
- Async test cases if applicable
Output complete pytest test file."
fi
gemini "$prompt" > "/tmp/test_gen_$file.py"
echo "Generated tests saved to /tmp/test_gen_$file.py"
echo "Review and apply with: cat /tmp/test_gen_$file.py >> $test_file"
echo ""
done
```
### Breaking Change Detection
```bash
#!/bin/bash
# scripts/pr/detect_breaking_changes.sh - Analyze API changes for breaking changes
BASE_BRANCH=${1:-main}
HEAD_BRANCH=${2:-$(git branch --show-current)}
echo "Detecting breaking changes: $BASE_BRANCH...$HEAD_BRANCH"
# Get API-related file changes
api_changes=$(git diff $BASE_BRANCH...$HEAD_BRANCH -- src/mcp_memory_service/tools.py src/mcp_memory_service/web/api/)
if [ -z "$api_changes" ]; then
echo "✅ No API changes detected"
exit 0
fi
echo "Analyzing API changes for breaking changes..."
result=$(gemini "Analyze these API changes for breaking changes. A breaking change is:
- Removed function/method/endpoint
- Changed function signature (parameters removed/reordered)
- Changed return type
- Renamed public API
- Changed HTTP endpoint path/method
Report ONLY breaking changes with severity (CRITICAL/HIGH/MEDIUM).
Changes:
$api_changes")
if echo "$result" | grep -qi "breaking\|CRITICAL\|HIGH"; then
echo "🔴 BREAKING CHANGES DETECTED:"
echo "$result"
exit 1
else
echo "✅ No breaking changes detected"
exit 0
fi
```
## Decision-Making Framework
### When to Use Auto-Iteration
**Use automated iteration when**:
- PR contains straightforward code quality fixes
- Changes are non-critical (not release-blocking)
- Reviewer feedback is typically formatting/style
- Team has confidence in automated fix safety
**Use manual iteration when**:
- PR touches critical paths (authentication, storage backends)
- Architectural changes requiring human judgment
- Security-related modifications
- Complex refactoring with cross-file dependencies
### Safe Fix Classification
**Safe Fixes** (auto-apply):
- Formatting changes (whitespace, line length)
- Import organization
- Type hint additions
- Docstring improvements
- Variable renaming for clarity
- Simple refactoring (extract method with identical behavior)
**Unsafe Fixes** (manual review required):
- Logic changes
- Error handling modifications
- API signature changes
- Database queries
- Authentication/authorization code
- Performance optimizations with side effects
### Iteration Limits
- **Standard PRs**: Max 5 iterations
- **Urgent fixes**: Max 3 iterations (faster manual intervention if needed)
- **Experimental features**: Max 10 iterations (more tolerance for iteration)
- **Release PRs**: Max 2 iterations (strict human oversight)
## Operational Workflows
### 1. Full Automated PR Review Cycle
```bash
#!/bin/bash
# scripts/pr/full_auto_review.sh - Complete automated PR workflow
PR_NUMBER=$1
echo "Starting automated PR review for #$PR_NUMBER"
# Step 1: Run code quality checks
echo "Step 1: Code quality analysis..."
bash scripts/pr/quality_gate.sh $PR_NUMBER
if [ $? -ne 0 ]; then
echo "❌ Quality checks failed, fix issues first"
exit 1
fi
# Step 2: Generate tests for new code
echo "Step 2: Test generation..."
bash scripts/pr/generate_tests.sh $PR_NUMBER
# Step 3: Check for breaking changes
echo "Step 3: Breaking change detection..."
bash scripts/pr/detect_breaking_changes.sh main $(gh pr view $PR_NUMBER --json headRefName --jq '.headRefName')
if [ $? -ne 0 ]; then
echo "⚠️ Breaking changes detected, review carefully"
fi
# Step 4: Automated review loop
echo "Step 4: Automated Gemini review iteration..."
bash scripts/pr/auto_review.sh $PR_NUMBER 5 true
# Step 5: Final status
if [ $? -eq 0 ]; then
echo "🎉 PR #$PR_NUMBER is ready for merge!"
gh pr comment $PR_NUMBER --body "✅ Automated review completed successfully. All checks passed!"
else
echo "⚠️ Manual intervention needed for PR #$PR_NUMBER"
gh pr comment $PR_NUMBER --body "⚠️ Automated review requires manual attention. Please review feedback above."
fi
```
### 2. Intelligent Fix Application
```bash
#!/bin/bash
# scripts/pr/apply_review_fixes.sh - Parse and apply Gemini feedback
PR_NUMBER=$1
REVIEW_COMMENT_ID=$2
if [ -z "$PR_NUMBER" ] || [ -z "$REVIEW_COMMENT_ID" ]; then
echo "Usage: $0 <PR_NUMBER> <REVIEW_COMMENT_ID>"
exit 1
fi
# Fetch specific review comment
review_text=$(gh api "repos/:owner/:repo/pulls/$PR_NUMBER/comments/$REVIEW_COMMENT_ID" --jq '.body')
echo "Analyzing review feedback..."
# Use Gemini to categorize issues
categorized=$(gemini "Categorize these code review comments into: SAFE (can auto-fix), UNSAFE (needs manual review), NON-CODE (documentation/discussion).
Review comments:
$review_text
Output in JSON format:
{
\"safe\": [\"issue 1\", \"issue 2\"],
\"unsafe\": [\"issue 3\"],
\"non_code\": [\"comment 1\"]
}")
echo "$categorized" > /tmp/categorized_issues_$PR_NUMBER.json
# Extract safe issues
safe_issues=$(echo "$categorized" | jq -r '.safe[]')
if [ -z "$safe_issues" ]; then
echo "No safe auto-fixable issues found"
exit 0
fi
echo "Safe issues to auto-fix:"
echo "$safe_issues"
# Generate fixes for safe issues
fixes=$(gemini "Generate code fixes for these issues. Changed files: $(gh pr diff $PR_NUMBER)
Issues to fix:
$safe_issues
Provide fixes as git diff patches.")
echo "$fixes" > /tmp/fixes_$PR_NUMBER.patch
# Apply fixes
if git apply --check /tmp/fixes_$PR_NUMBER.patch 2>/dev/null; then
git apply /tmp/fixes_$PR_NUMBER.patch
git add -A
git commit -m "fix: apply Gemini review feedback
Addressed: $(echo \"$safe_issues\" | tr '\n' ', ')
Co-Authored-By: Gemini Code Assist <[email protected]>"
git push
echo "✅ Fixes applied successfully"
# Update PR with comment
gh pr comment $PR_NUMBER --body "✅ Auto-applied fixes for: $(echo \"$safe_issues\" | tr '\n' ', ')"
else
echo "❌ Could not apply fixes automatically"
exit 1
fi
```
### 3. PR Quality Gate Integration
```bash
#!/bin/bash
# scripts/pr/quality_gate.sh - Run all quality checks before review
PR_NUMBER=$1
echo "Running PR quality gate checks for #$PR_NUMBER..."
exit_code=0
# Check 1: Code complexity
echo "Check 1: Code complexity..."
changed_files=$(gh pr diff $PR_NUMBER --name-only | grep '\.py$')
for file in $changed_files; do
result=$(gemini "Check complexity. Report ONLY if any function scores >7: $(cat $file)")
if [ ! -z "$result" ]; then
echo "⚠️ High complexity in $file: $result"
exit_code=1
fi
done
# Check 2: Security scan
echo "Check 2: Security vulnerabilities..."
for file in $changed_files; do
result=$(gemini "Security scan. Report ONLY vulnerabilities: $(cat $file)")
if [ ! -z "$result" ]; then
echo "🔴 Security issue in $file: $result"
exit_code=2 # Critical failure
break
fi
done
# Check 3: Test coverage
echo "Check 3: Test coverage..."
test_files=$(gh pr diff $PR_NUMBER --name-only | grep -c '^tests/.*\.py$' || echo "0")
code_files=$(gh pr diff $PR_NUMBER --name-only | grep '\.py$' | grep -vc '^tests/' || echo "0")
if [ $code_files -gt 0 ] && [ $test_files -eq 0 ]; then
echo "⚠️ No test files added/modified despite code changes"
exit_code=1
fi
# Check 4: Breaking changes
echo "Check 4: Breaking changes..."
bash scripts/pr/detect_breaking_changes.sh main $(gh pr view $PR_NUMBER --json headRefName --jq '.headRefName')
if [ $? -ne 0 ]; then
echo "⚠️ Potential breaking changes detected"
exit_code=1
fi
# Report results
if [ $exit_code -eq 0 ]; then
echo "✅ All quality gate checks passed"
gh pr comment $PR_NUMBER --body "✅ **Quality Gate PASSED**
All automated checks completed successfully:
- Code complexity: OK
- Security scan: OK
- Test coverage: OK
- Breaking changes: None detected
Ready for Gemini review."
elif [ $exit_code -eq 2 ]; then
echo "🔴 CRITICAL: Security issues found, blocking PR"
gh pr comment $PR_NUMBER --body "🔴 **Quality Gate FAILED - CRITICAL**
Security vulnerabilities detected. PR is blocked until issues are resolved.
Please run: \`bash scripts/security/scan_vulnerabilities.sh\` locally and fix all issues."
else
echo "⚠️ Quality gate checks found issues (non-blocking)"
gh pr comment $PR_NUMBER --body "⚠️ **Quality Gate WARNINGS**
Some checks require attention (non-blocking):
- See logs above for details
Consider addressing these before requesting review."
fi
exit $exit_code
```
### 4. Continuous Watch Mode (Recommended)
**NEW**: Automated monitoring for continuous PR review cycles.
```bash
#!/bin/bash
# scripts/pr/watch_reviews.sh - Monitor PR for Gemini reviews and auto-respond
PR_NUMBER=$1
CHECK_INTERVAL=${2:-180} # Default: 3 minutes
echo "Starting PR watch mode for #$PR_NUMBER"
echo "Checking every ${CHECK_INTERVAL}s for new reviews..."
last_review_time=""
while true; do
# Get latest Gemini review timestamp
repo=$(gh repo view --json nameWithOwner -q .nameWithOwner)
current_review_time=$(gh api "repos/$repo/pulls/$PR_NUMBER/reviews" | \
jq -r '[.[] | select(.user.login == "gemini-code-assist")] | last | .submitted_at')
# Detect new review
if [ -n "$current_review_time" ] && [ "$current_review_time" != "$last_review_time" ]; then
echo "🔔 NEW REVIEW DETECTED!"
last_review_time="$current_review_time"
# Get review state
review_state=$(gh pr view $PR_NUMBER --json reviews --jq \
'[.reviews[] | select(.author.login == "gemini-code-assist")] | last | .state')
# Get inline comments count
comments_count=$(gh api "repos/$repo/pulls/$PR_NUMBER/comments" | \
jq '[.[] | select(.user.login == "gemini-code-assist")] | length')
echo " State: $review_state"
echo " Inline Comments: $comments_count"
# Handle review state
if [ "$review_state" = "APPROVED" ]; then
echo "✅ PR APPROVED!"
echo " Ready to merge: gh pr merge $PR_NUMBER --squash"
exit 0
elif [ "$review_state" = "CHANGES_REQUESTED" ] || [ "$comments_count" -gt 0 ]; then
echo "📝 Review feedback received"
# Optionally auto-fix
read -t 30 -p "Auto-run review cycle? (y/N): " response || response="n"
if [[ "$response" =~ ^[Yy]$ ]]; then
echo "🤖 Starting automated fix cycle..."
bash scripts/pr/auto_review.sh $PR_NUMBER 3 true
fi
fi
fi
sleep $CHECK_INTERVAL
done
```
**Usage:**
```bash
# Start watch mode (checks every 3 minutes)
bash scripts/pr/watch_reviews.sh 212
# Faster polling (every 2 minutes)
bash scripts/pr/watch_reviews.sh 212 120
# Run in background
bash scripts/pr/watch_reviews.sh 212 180 &
```
**When to Use Watch Mode vs Auto-Review:**
| Scenario | Use | Command |
|----------|-----|---------|
| **Just created PR** | Auto-review (immediate) | `bash scripts/pr/auto_review.sh 212 5 true` |
| **Pushed new commits** | Watch mode (continuous) | `bash scripts/pr/watch_reviews.sh 212` |
| **Waiting for approval** | Watch mode (continuous) | `bash scripts/pr/watch_reviews.sh 212 180` |
| **One-time fix cycle** | Auto-review (immediate) | `bash scripts/pr/auto_review.sh 212 3 true` |
**Benefits:**
- ✅ Auto-detects new reviews (no manual `/gemini review` needed)
- ✅ Handles inline comments that auto-resolve when fixed
- ✅ Offers optional auto-fix at each iteration
- ✅ Exits automatically when approved
- ✅ Runs indefinitely until approved or stopped
### 5. GraphQL Thread Resolution (v8.20.0+)
**NEW**: Automatic PR review thread resolution using GitHub GraphQL API.
**Problem:** GitHub's REST API cannot resolve PR review threads. Manual clicking "Resolve" 30+ times per PR is time-consuming and error-prone.
**Solution:** GraphQL API provides `resolveReviewThread` mutation for programmatic thread resolution.
**Key Components:**
1. **GraphQL Helpers Library** (`scripts/pr/lib/graphql_helpers.sh`)
- `get_review_threads <PR_NUMBER>` - Fetch all threads with metadata
- `resolve_review_thread <THREAD_ID> [COMMENT]` - Resolve with explanation
- `get_thread_stats <PR_NUMBER>` - Get counts (total, resolved, unresolved)
- `was_line_modified <FILE> <LINE> <COMMIT>` - Check if code changed
2. **Smart Resolution Tool** (`scripts/pr/resolve_threads.sh`)
- Automatically resolves threads when referenced code is modified
- Interactive or auto mode (--auto flag)
- Adds explanatory comments with commit references
3. **Thread Status Tool** (`scripts/pr/thread_status.sh`)
- Display all threads with filtering (--unresolved, --resolved, --outdated)
- Comprehensive status including file paths, line numbers, authors
**Usage:**
```bash
# Check thread status
bash scripts/pr/thread_status.sh 212
# Auto-resolve threads after pushing fixes
bash scripts/pr/resolve_threads.sh 212 HEAD --auto
# Interactive resolution (prompts for each thread)
bash scripts/pr/resolve_threads.sh 212 HEAD
```
**Integration with Auto-Review:**
The `auto_review.sh` script now **automatically resolves threads** after applying fixes:
```bash
# After pushing fixes
echo "Resolving review threads for fixed code..."
latest_commit=$(git rev-parse HEAD)
bash scripts/pr/resolve_threads.sh $PR_NUMBER $latest_commit --auto
# Output:
# Resolved: 8 threads
# Skipped: 3 threads (no changes detected)
# Failed: 0 threads
```
**Integration with Watch Mode:**
The `watch_reviews.sh` script now **displays thread status** during monitoring:
```bash
# On each check cycle
Review Threads: 45 total, 30 resolved, 15 unresolved
# When new review detected
Thread Status:
Thread #1: scripts/pr/auto_review.sh:89 (UNRESOLVED)
Thread #2: scripts/pr/quality_gate.sh:45 (UNRESOLVED)
...
Options:
1. View detailed thread status:
bash scripts/pr/thread_status.sh 212
2. Run auto-review (auto-resolves threads):
bash scripts/pr/auto_review.sh 212 5 true
3. Manually resolve after fixes:
bash scripts/pr/resolve_threads.sh 212 HEAD --auto
```
**Decision Logic for Thread Resolution:**
```
For each unresolved thread:
├─ Is the file modified in this commit?
│ ├─ YES → Was the specific line changed?
│ │ ├─ YES → ✅ Resolve with "Line X modified in commit ABC"
│ │ └─ NO → ⏭️ Skip (file changed but not this line)
│ └─ NO → Is thread marked "outdated" by GitHub?
│ ├─ YES → ✅ Resolve with "Thread outdated by subsequent commits"
│ └─ NO → ⏭️ Skip (file not modified)
```
**Benefits:**
- ✅ **Zero manual clicks** - Threads resolve automatically when code is fixed
- ✅ **Accurate resolution** - Only resolves when actual code changes match thread location
- ✅ **Audit trail** - Adds comments with commit references for transparency
- ✅ **Safe defaults** - Skips threads when unsure (conservative approach)
- ✅ **Graceful fallback** - Works without GraphQL (just disables auto-resolution)
**Time Savings:**
- **Before:** 30 threads × 5 seconds per click = 2.5 minutes of manual clicking
- **After:** `bash scripts/pr/resolve_threads.sh 212 HEAD --auto` = 2 seconds
**Complete Automated Workflow:**
```bash
# 1. Create PR (github-release-manager)
gh pr create --title "feat: new feature" --body "..."
# 2. Start watch mode with GraphQL tracking
bash scripts/pr/watch_reviews.sh 212 180 &
# 3. When review arrives, auto-review handles everything:
bash scripts/pr/auto_review.sh 212 5 true
# - Fetches review feedback
# - Categorizes issues
# - Generates fixes
# - Applies and commits
# - Pushes to PR branch
# - **Auto-resolves threads** ← NEW!
# - Triggers new review
# - Repeats until approved
# 4. Merge when approved (github-release-manager)
gh pr merge 212 --squash
```
**Documentation:**
See `docs/pr-graphql-integration.md` for:
- Complete API reference
- Troubleshooting guide
- GraphQL query examples
- Advanced usage patterns
- Performance considerations
## Integration with github-release-manager
This agent **extends** the github-release-manager workflow:
**github-release-manager handles**:
- Version bumping
- CHANGELOG/README updates
- PR creation
- Issue tracking
- Post-release actions
**gemini-pr-automator adds**:
- Automated review iteration
- Fix application
- Test generation
- Quality gates
- Breaking change detection
**Combined Workflow**:
1. `github-release-manager` creates release PR
2. `gemini-pr-automator` runs quality gate
3. `gemini-pr-automator` triggers automated review loop
4. `github-release-manager` merges when approved
5. `github-release-manager` handles post-release tasks
## Project-Specific Patterns
### MCP Memory Service PR Standards
**Required Checks**:
- ✅ All tests pass (`pytest tests/`)
- ✅ No security vulnerabilities
- ✅ Code complexity ≤7 for new functions
- ✅ Type hints on all new functions
- ✅ Breaking changes documented in CHANGELOG
**Review Focus Areas**:
- Storage backend modifications (critical path)
- MCP tool schema changes (protocol compliance)
- Web API endpoints (security implications)
- Hook system changes (user-facing)
- Performance-critical code (5ms target)
### Gemini Review Iteration Pattern
**Iteration 1**: Initial review (broad feedback)
**Iteration 2**: Apply safe fixes, re-review specific areas
**Iteration 3**: Address remaining issues, focus on edge cases
**Iteration 4**: Final polish, documentation review
**Iteration 5**: Approval or escalate to manual review
## Usage Examples
### Quick Automated Review
```bash
# Standard automated review (5 iterations, safe fixes enabled)
bash scripts/pr/auto_review.sh 123
# Conservative mode (3 iterations, manual fixes)
bash scripts/pr/auto_review.sh 123 3 false
# Aggressive mode (10 iterations, auto-fix everything)
bash scripts/pr/auto_review.sh 123 10 true
```
### Generate Tests Only
```bash
# Generate tests for PR #123
bash scripts/pr/generate_tests.sh 123
# Review generated tests
ls -la /tmp/test_gen_*.py
```
### Breaking Change Check
```bash
# Check if PR introduces breaking changes
bash scripts/pr/detect_breaking_changes.sh main feature/new-api
# Exit code 0 = no breaking changes
# Exit code 1 = breaking changes detected
```
## Best Practices
1. **Always run quality gate first**: Catch issues before review iteration
2. **Start with safe-fix mode off**: Observe behavior before trusting automation
3. **Review auto-applied commits**: Ensure changes make sense before merging
4. **Limit iterations**: Prevent infinite loops, escalate to humans at max
5. **Document breaking changes**: Always update CHANGELOG for API changes
6. **Test generated tests**: Verify generated tests actually work before committing
## Limitations
- **Context limitations**: Gemini has context limits, very large PRs may need manual review
- **Fix quality**: Auto-generated fixes may not always be optimal (human review recommended)
- **False negatives**: Breaking change detection may miss subtle breaking changes
- **API rate limits**: Gemini CLI subject to rate limits, add delays between iterations
- **Complexity**: Multi-file refactoring with complex dependencies needs manual oversight
## Performance Considerations
- Single review iteration: ~90-120 seconds (Gemini response time)
- Full automated cycle (5 iterations): ~7-10 minutes
- Test generation per file: ~30-60 seconds
- Breaking change detection: ~15-30 seconds
**Time Savings**: ~10-30 minutes saved per PR vs manual iteration
---
**Quick Reference Card**:
```bash
# Full automated review
bash scripts/pr/full_auto_review.sh <PR_NUMBER>
# Quality gate only
bash scripts/pr/quality_gate.sh <PR_NUMBER>
# Generate tests
bash scripts/pr/generate_tests.sh <PR_NUMBER>
# Breaking changes
bash scripts/pr/detect_breaking_changes.sh main <BRANCH>
# Auto-review with options
bash scripts/pr/auto_review.sh <PR_NUMBER> <MAX_ITER> <SAFE_FIX:true/false>
```
```
--------------------------------------------------------------------------------
/scripts/migration/migrate_v5_enhanced.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Enhanced ChromaDB to SQLite-vec Migration Script for v5.0.0+
This script provides a robust migration path from ChromaDB to SQLite-vec with:
- Custom data path support
- Proper content hash generation
- Tag format validation and correction
- Progress indicators
- Transaction-based migration with rollback
- Dry-run mode for testing
- Comprehensive error handling
"""
import argparse
import asyncio
import hashlib
import json
import logging
import os
import sqlite3
import sys
import tempfile
import time
from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any, Optional, Union, Tuple
# Try importing with progress bar support
try:
from tqdm import tqdm
TQDM_AVAILABLE = True
except ImportError:
TQDM_AVAILABLE = False
print("Note: Install 'tqdm' for progress bars: pip install tqdm")
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
# Import storage modules
try:
from mcp_memory_service.storage.chroma import ChromaMemoryStorage
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from mcp_memory_service.models.memory import Memory
from mcp_memory_service.utils.hashing import generate_content_hash
except ImportError as e:
print(f"Error importing MCP modules: {e}")
print("Make sure you're running this from the MCP Memory Service directory")
sys.exit(1)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MigrationConfig:
"""Configuration for migration process."""
def __init__(self):
self.chroma_path: Optional[str] = None
self.sqlite_path: Optional[str] = None
self.batch_size: int = 50
self.dry_run: bool = False
self.skip_duplicates: bool = True
self.backup_path: Optional[str] = None
self.verbose: bool = False
self.validate_only: bool = False
self.force: bool = False
@classmethod
def from_args(cls, args) -> 'MigrationConfig':
"""Create config from command line arguments."""
config = cls()
config.chroma_path = args.chroma_path
config.sqlite_path = args.sqlite_path
config.batch_size = args.batch_size
config.dry_run = args.dry_run
config.skip_duplicates = not args.no_skip_duplicates
config.backup_path = args.backup
config.verbose = args.verbose
config.validate_only = args.validate_only
config.force = args.force
return config
def resolve_paths(self):
"""Resolve and validate data paths."""
# Resolve ChromaDB path
if not self.chroma_path:
# Check environment variable first
self.chroma_path = os.environ.get('MCP_MEMORY_CHROMA_PATH')
if not self.chroma_path:
# Use default locations based on platform
home = Path.home()
if sys.platform == 'darwin': # macOS
default_base = home / 'Library' / 'Application Support' / 'mcp-memory'
elif sys.platform == 'win32': # Windows
default_base = Path(os.getenv('LOCALAPPDATA', '')) / 'mcp-memory'
else: # Linux
default_base = home / '.local' / 'share' / 'mcp-memory'
# Try multiple possible locations
possible_paths = [
home / '.mcp_memory_chroma', # Legacy location
default_base / 'chroma_db', # New standard location
Path.cwd() / 'chroma_db', # Current directory
]
for path in possible_paths:
if path.exists():
self.chroma_path = str(path)
logger.info(f"Found ChromaDB at: {path}")
break
if not self.chroma_path:
raise ValueError(
"Could not find ChromaDB data. Please specify --chroma-path or "
"set MCP_MEMORY_CHROMA_PATH environment variable"
)
# Resolve SQLite path
if not self.sqlite_path:
# Check environment variable first
self.sqlite_path = os.environ.get('MCP_MEMORY_SQLITE_PATH')
if not self.sqlite_path:
# Default to same directory as ChromaDB with different name
chroma_dir = Path(self.chroma_path).parent
self.sqlite_path = str(chroma_dir / 'sqlite_vec.db')
logger.info(f"Using default SQLite path: {self.sqlite_path}")
# Resolve backup path if needed
if self.backup_path is None and not self.dry_run and not self.validate_only:
backup_dir = Path(self.sqlite_path).parent / 'backups'
backup_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.backup_path = str(backup_dir / f'migration_backup_{timestamp}.json')
class EnhancedMigrationTool:
"""Enhanced migration tool with proper error handling and progress tracking."""
def __init__(self, config: MigrationConfig):
self.config = config
self.stats = {
'total': 0,
'migrated': 0,
'skipped': 0,
'failed': 0,
'errors': []
}
self.chroma_storage = None
self.sqlite_storage = None
def generate_proper_content_hash(self, content: str) -> str:
"""Generate a proper SHA256 content hash."""
return hashlib.sha256(content.encode()).hexdigest()
def validate_and_fix_tags(self, tags: Any) -> List[str]:
"""Validate and fix tag format."""
if not tags:
return []
if isinstance(tags, str):
# Handle comma-separated string
if ',' in tags:
return [tag.strip() for tag in tags.split(',') if tag.strip()]
# Handle single tag
return [tags.strip()] if tags.strip() else []
if isinstance(tags, list):
# Clean and validate list tags
clean_tags = []
for tag in tags:
if isinstance(tag, str) and tag.strip():
clean_tags.append(tag.strip())
return clean_tags
# Unknown format - log warning and return empty
logger.warning(f"Unknown tag format: {type(tags)} - {tags}")
return []
def safe_timestamp_convert(self, timestamp: Any) -> float:
"""Safely convert various timestamp formats to float."""
if timestamp is None:
return datetime.now().timestamp()
# Handle numeric timestamps
if isinstance(timestamp, (int, float)):
# Check if timestamp is reasonable (between 2000 and 2100)
if 946684800 <= float(timestamp) <= 4102444800:
return float(timestamp)
else:
logger.warning(f"Timestamp {timestamp} out of reasonable range, using current time")
return datetime.now().timestamp()
# Handle string timestamps
if isinstance(timestamp, str):
# Try ISO format
for fmt in [
'%Y-%m-%dT%H:%M:%S.%f',
'%Y-%m-%dT%H:%M:%S',
'%Y-%m-%d %H:%M:%S.%f',
'%Y-%m-%d %H:%M:%S',
]:
try:
dt = datetime.strptime(timestamp.rstrip('Z'), fmt)
return dt.timestamp()
except ValueError:
continue
# Try parsing as float string
try:
ts = float(timestamp)
if 946684800 <= ts <= 4102444800:
return ts
except ValueError:
pass
# Fallback to current time
logger.warning(f"Could not parse timestamp '{timestamp}', using current time")
return datetime.now().timestamp()
async def extract_memories_from_chroma(self) -> List[Dict[str, Any]]:
"""Extract all memories from ChromaDB with proper error handling."""
memories = []
try:
# Initialize ChromaDB storage
logger.info("Connecting to ChromaDB...")
self.chroma_storage = ChromaMemoryStorage(path=self.config.chroma_path)
# Access the collection directly
collection = self.chroma_storage.collection
if not collection:
raise ValueError("ChromaDB collection not initialized")
# Get all data from collection
logger.info("Extracting memories from ChromaDB...")
results = collection.get(include=['documents', 'metadatas', 'embeddings'])
if not results or not results.get('ids'):
logger.warning("No memories found in ChromaDB")
return memories
total = len(results['ids'])
logger.info(f"Found {total} memories to process")
# Process each memory
for i in range(total):
try:
# Extract data with defaults
doc_id = results['ids'][i]
content = results['documents'][i] if i < len(results.get('documents', [])) else ""
metadata = results['metadatas'][i] if i < len(results.get('metadatas', [])) else {}
embedding = results['embeddings'][i] if i < len(results.get('embeddings', [])) else None
if not content:
logger.warning(f"Skipping memory {doc_id}: empty content")
continue
# Generate proper content hash
content_hash = self.generate_proper_content_hash(content)
# Extract and validate tags
raw_tags = metadata.get('tags', metadata.get('tags_str', []))
tags = self.validate_and_fix_tags(raw_tags)
# Extract timestamps
created_at = self.safe_timestamp_convert(
metadata.get('created_at', metadata.get('timestamp'))
)
updated_at = self.safe_timestamp_convert(
metadata.get('updated_at', created_at)
)
# Extract memory type
memory_type = metadata.get('memory_type', metadata.get('type', 'imported'))
# Clean metadata (remove special fields)
clean_metadata = {}
exclude_keys = {
'tags', 'tags_str', 'created_at', 'updated_at',
'timestamp', 'timestamp_float', 'timestamp_str',
'memory_type', 'type', 'content_hash',
'created_at_iso', 'updated_at_iso'
}
for key, value in metadata.items():
if key not in exclude_keys and value is not None:
clean_metadata[key] = value
# Create memory record
memory_data = {
'content': content,
'content_hash': content_hash,
'tags': tags,
'memory_type': memory_type,
'metadata': clean_metadata,
'embedding': embedding,
'created_at': created_at,
'updated_at': updated_at,
'original_id': doc_id # Keep for reference
}
memories.append(memory_data)
if self.config.verbose and (i + 1) % 100 == 0:
logger.info(f"Processed {i + 1}/{total} memories")
except Exception as e:
logger.error(f"Failed to extract memory {i}: {e}")
self.stats['errors'].append(f"Extract error at index {i}: {str(e)}")
continue
logger.info(f"Successfully extracted {len(memories)} memories")
return memories
except Exception as e:
logger.error(f"Critical error extracting from ChromaDB: {e}")
raise
async def migrate_to_sqlite(self, memories: List[Dict[str, Any]]) -> bool:
"""Migrate memories to SQLite-vec with transaction support."""
if not memories:
logger.warning("No memories to migrate")
return True
try:
# Initialize SQLite-vec storage
logger.info(f"Initializing SQLite-vec at {self.config.sqlite_path}")
self.sqlite_storage = SqliteVecMemoryStorage(
db_path=self.config.sqlite_path,
embedding_model=os.environ.get('MCP_MEMORY_EMBEDDING_MODEL', 'all-MiniLM-L6-v2')
)
await self.sqlite_storage.initialize()
# Start transaction
conn = self.sqlite_storage.conn
conn.execute("BEGIN TRANSACTION")
try:
# Migrate memories in batches
total = len(memories)
batch_size = self.config.batch_size
# Use progress bar if available
if TQDM_AVAILABLE and not self.config.dry_run:
progress_bar = tqdm(total=total, desc="Migrating memories")
else:
progress_bar = None
for i in range(0, total, batch_size):
batch = memories[i:i + batch_size]
if not self.config.dry_run:
for memory_data in batch:
try:
# Check for duplicates
if self.config.skip_duplicates:
existing = conn.execute(
"SELECT 1 FROM memories WHERE content_hash = ? LIMIT 1",
(memory_data['content_hash'],)
).fetchone()
if existing:
self.stats['skipped'] += 1
if progress_bar:
progress_bar.update(1)
continue
# Create Memory object
memory = Memory(
content=memory_data['content'],
content_hash=memory_data['content_hash'],
tags=memory_data['tags'],
memory_type=memory_data.get('memory_type'),
metadata=memory_data.get('metadata', {}),
created_at=memory_data['created_at'],
updated_at=memory_data['updated_at']
)
# Store memory
success, message = await self.sqlite_storage.store(memory)
if success:
self.stats['migrated'] += 1
else:
raise Exception(f"Failed to store: {message}")
if progress_bar:
progress_bar.update(1)
except Exception as e:
self.stats['failed'] += 1
self.stats['errors'].append(
f"Migration error for {memory_data['content_hash'][:8]}: {str(e)}"
)
if progress_bar:
progress_bar.update(1)
else:
# Dry run - just count
self.stats['migrated'] += len(batch)
if progress_bar:
progress_bar.update(len(batch))
if progress_bar:
progress_bar.close()
# Commit transaction
if not self.config.dry_run:
conn.execute("COMMIT")
logger.info("Transaction committed successfully")
else:
conn.execute("ROLLBACK")
logger.info("Dry run - transaction rolled back")
return True
except Exception as e:
# Rollback on error
conn.execute("ROLLBACK")
logger.error(f"Migration failed, transaction rolled back: {e}")
raise
except Exception as e:
logger.error(f"Critical error during migration: {e}")
return False
finally:
# Clean up
if self.sqlite_storage:
self.sqlite_storage.close()
async def create_backup(self, memories: List[Dict[str, Any]]):
"""Create a JSON backup of memories."""
if not self.config.backup_path or self.config.dry_run:
return
logger.info(f"Creating backup at {self.config.backup_path}")
backup_data = {
'version': '2.0',
'created_at': datetime.now().isoformat(),
'source': self.config.chroma_path,
'total_memories': len(memories),
'memories': memories
}
# Remove embeddings from backup to reduce size
for memory in backup_data['memories']:
memory.pop('embedding', None)
with open(self.config.backup_path, 'w') as f:
json.dump(backup_data, f, indent=2, default=str)
logger.info(f"Backup created: {self.config.backup_path}")
async def validate_migration(self) -> bool:
"""Validate the migrated data."""
logger.info("Validating migration...")
try:
# Connect to SQLite database
conn = sqlite3.connect(self.config.sqlite_path)
# Check memory count
count = conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0]
logger.info(f"SQLite database contains {count} memories")
# Check for required fields
sample = conn.execute("""
SELECT content_hash, content, tags, created_at
FROM memories
LIMIT 10
""").fetchall()
issues = []
for row in sample:
if not row[0]: # content_hash
issues.append("Missing content_hash")
if not row[1]: # content
issues.append("Missing content")
if row[3] is None: # created_at
issues.append("Missing created_at")
conn.close()
if issues:
logger.warning(f"Validation issues found: {', '.join(set(issues))}")
return False
logger.info("Validation passed!")
return True
except Exception as e:
logger.error(f"Validation failed: {e}")
return False
async def run(self) -> bool:
"""Run the migration process."""
try:
# Resolve paths
self.config.resolve_paths()
# Print configuration
print("\n" + "="*60)
print("MCP Memory Service - Enhanced Migration Tool v2.0")
print("="*60)
print(f"ChromaDB source: {self.config.chroma_path}")
print(f"SQLite-vec target: {self.config.sqlite_path}")
if self.config.backup_path:
print(f"Backup location: {self.config.backup_path}")
print(f"Mode: {'DRY RUN' if self.config.dry_run else 'LIVE MIGRATION'}")
print(f"Batch size: {self.config.batch_size}")
print(f"Skip duplicates: {self.config.skip_duplicates}")
print()
# Check if validation only
if self.config.validate_only:
return await self.validate_migration()
# Check if target exists
if Path(self.config.sqlite_path).exists() and not self.config.force:
if not self.config.dry_run:
response = input(f"Target database exists. Overwrite? (y/N): ")
if response.lower() != 'y':
print("Migration cancelled")
return False
# Extract memories from ChromaDB
memories = await self.extract_memories_from_chroma()
self.stats['total'] = len(memories)
if not memories:
print("No memories found to migrate")
return True
# Create backup
if self.config.backup_path and not self.config.dry_run:
await self.create_backup(memories)
# Confirm migration
if not self.config.dry_run and not self.config.force:
print(f"\nAbout to migrate {len(memories)} memories")
response = input("Proceed? (y/N): ")
if response.lower() != 'y':
print("Migration cancelled")
return False
# Perform migration
success = await self.migrate_to_sqlite(memories)
# Print summary
print("\n" + "="*60)
print("MIGRATION SUMMARY")
print("="*60)
print(f"Total memories found: {self.stats['total']}")
print(f"Successfully migrated: {self.stats['migrated']}")
print(f"Duplicates skipped: {self.stats['skipped']}")
print(f"Failed migrations: {self.stats['failed']}")
if self.stats['errors'] and self.config.verbose:
print("\nErrors encountered:")
for i, error in enumerate(self.stats['errors'][:10], 1):
print(f" {i}. {error}")
if len(self.stats['errors']) > 10:
print(f" ... and {len(self.stats['errors']) - 10} more")
if success and not self.config.dry_run:
# Validate migration
if await self.validate_migration():
print("\n✅ Migration completed successfully!")
print("\nNext steps:")
print("1. Set environment variable:")
print(" export MCP_MEMORY_STORAGE_BACKEND=sqlite_vec")
print(f"2. Set database path:")
print(f" export MCP_MEMORY_SQLITE_PATH={self.config.sqlite_path}")
print("3. Restart MCP Memory Service")
print("4. Test that your memories are accessible")
else:
print("\n⚠️ Migration completed with validation warnings")
return success
except Exception as e:
logger.error(f"Migration failed: {e}")
if self.config.verbose:
import traceback
traceback.print_exc()
return False
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Enhanced ChromaDB to SQLite-vec migration tool",
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
'--chroma-path',
help='Path to ChromaDB data directory (default: auto-detect)'
)
parser.add_argument(
'--sqlite-path',
help='Path for SQLite-vec database (default: same dir as ChromaDB)'
)
parser.add_argument(
'--batch-size',
type=int,
default=50,
help='Number of memories to migrate per batch (default: 50)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Simulate migration without making changes'
)
parser.add_argument(
'--no-skip-duplicates',
action='store_true',
help='Migrate all memories including duplicates'
)
parser.add_argument(
'--backup',
help='Path for JSON backup file (default: auto-generate)'
)
parser.add_argument(
'--verbose',
action='store_true',
help='Enable verbose logging'
)
parser.add_argument(
'--validate-only',
action='store_true',
help='Only validate existing SQLite database'
)
parser.add_argument(
'--force',
action='store_true',
help='Skip confirmation prompts'
)
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Create configuration
config = MigrationConfig.from_args(args)
# Run migration
tool = EnhancedMigrationTool(config)
success = asyncio.run(tool.run())
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/tests/unit/test_mdns.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unit tests for mDNS service discovery functionality.
"""
import pytest
import asyncio
import socket
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from zeroconf import ServiceInfo, Zeroconf
# Import the modules under test
from mcp_memory_service.discovery.mdns_service import (
ServiceAdvertiser,
ServiceDiscovery,
DiscoveryListener,
ServiceDetails
)
from mcp_memory_service.discovery.client import DiscoveryClient, HealthStatus
class TestServiceDetails:
"""Test ServiceDetails dataclass."""
def test_service_details_creation(self):
"""Test ServiceDetails creation with basic parameters."""
service_info = Mock()
details = ServiceDetails(
name="Test Service",
host="192.168.1.100",
port=8000,
https=False,
api_version="2.1.0",
requires_auth=True,
service_info=service_info
)
assert details.name == "Test Service"
assert details.host == "192.168.1.100"
assert details.port == 8000
assert details.https is False
assert details.api_version == "2.1.0"
assert details.requires_auth is True
assert details.service_info == service_info
def test_service_details_url_http(self):
"""Test URL generation for HTTP service."""
details = ServiceDetails(
name="Test Service",
host="192.168.1.100",
port=8000,
https=False,
api_version="2.1.0",
requires_auth=False,
service_info=Mock()
)
assert details.url == "http://192.168.1.100:8000"
assert details.api_url == "http://192.168.1.100:8000/api"
def test_service_details_url_https(self):
"""Test URL generation for HTTPS service."""
details = ServiceDetails(
name="Test Service",
host="192.168.1.100",
port=8443,
https=True,
api_version="2.1.0",
requires_auth=True,
service_info=Mock()
)
assert details.url == "https://192.168.1.100:8443"
assert details.api_url == "https://192.168.1.100:8443/api"
class TestServiceAdvertiser:
"""Test ServiceAdvertiser class."""
def test_init_default_parameters(self):
"""Test ServiceAdvertiser initialization with default parameters."""
advertiser = ServiceAdvertiser()
assert advertiser.service_name == "MCP Memory Service"
assert advertiser.service_type == "_mcp-memory._tcp.local."
assert advertiser.host == "0.0.0.0"
assert advertiser.port == 8000
assert advertiser.https_enabled is False
assert advertiser.api_key_required is False
assert advertiser._registered is False
def test_init_custom_parameters(self):
"""Test ServiceAdvertiser initialization with custom parameters."""
advertiser = ServiceAdvertiser(
service_name="Custom Service",
service_type="_custom._tcp.local.",
host="192.168.1.100",
port=8443,
https_enabled=True,
api_key_required=True
)
assert advertiser.service_name == "Custom Service"
assert advertiser.service_type == "_custom._tcp.local."
assert advertiser.host == "192.168.1.100"
assert advertiser.port == 8443
assert advertiser.https_enabled is True
assert advertiser.api_key_required is True
@patch('socket.socket')
def test_get_local_ip(self, mock_socket):
"""Test local IP address detection."""
mock_sock_instance = Mock()
mock_sock_instance.getsockname.return_value = ("192.168.1.100", 12345)
mock_socket.return_value.__enter__.return_value = mock_sock_instance
advertiser = ServiceAdvertiser()
ip = advertiser._get_local_ip()
assert ip == "192.168.1.100"
mock_sock_instance.connect.assert_called_once_with(("8.8.8.8", 80))
@patch('socket.socket')
def test_get_local_ip_fallback(self, mock_socket):
"""Test local IP address detection fallback."""
mock_socket.side_effect = Exception("Network error")
advertiser = ServiceAdvertiser()
ip = advertiser._get_local_ip()
assert ip == "127.0.0.1"
@patch('socket.inet_aton')
@patch.object(ServiceAdvertiser, '_get_local_ip')
def test_create_service_info(self, mock_get_ip, mock_inet_aton):
"""Test ServiceInfo creation."""
mock_get_ip.return_value = "192.168.1.100"
mock_inet_aton.return_value = b'\xc0\xa8\x01\x64' # 192.168.1.100
advertiser = ServiceAdvertiser(
service_name="Test Service",
https_enabled=True,
api_key_required=True
)
service_info = advertiser._create_service_info()
assert service_info.type == "_mcp-memory._tcp.local."
assert service_info.name == "Test Service._mcp-memory._tcp.local."
assert service_info.port == 8000
assert service_info.server == "test-service.local."
# Check properties
properties = service_info.properties
assert properties[b'https'] == b'True'
assert properties[b'auth_required'] == b'True'
assert properties[b'api_path'] == b'/api'
@pytest.mark.asyncio
async def test_start_success(self):
"""Test successful service advertisement start."""
with patch('mcp_memory_service.discovery.mdns_service.AsyncZeroconf') as mock_zeroconf_class:
mock_zeroconf = AsyncMock()
mock_zeroconf_class.return_value = mock_zeroconf
advertiser = ServiceAdvertiser()
with patch.object(advertiser, '_create_service_info') as mock_create_info:
mock_service_info = Mock()
mock_create_info.return_value = mock_service_info
result = await advertiser.start()
assert result is True
assert advertiser._registered is True
mock_zeroconf.async_register_service.assert_called_once_with(mock_service_info)
@pytest.mark.asyncio
async def test_start_already_registered(self):
"""Test starting advertisement when already registered."""
advertiser = ServiceAdvertiser()
advertiser._registered = True
result = await advertiser.start()
assert result is True # Should return True but log warning
@pytest.mark.asyncio
async def test_start_failure(self):
"""Test service advertisement start failure."""
with patch('mcp_memory_service.discovery.mdns_service.AsyncZeroconf') as mock_zeroconf_class:
mock_zeroconf = AsyncMock()
mock_zeroconf.async_register_service.side_effect = Exception("Registration failed")
mock_zeroconf_class.return_value = mock_zeroconf
advertiser = ServiceAdvertiser()
with patch.object(advertiser, '_create_service_info'):
result = await advertiser.start()
assert result is False
assert advertiser._registered is False
@pytest.mark.asyncio
async def test_stop_success(self):
"""Test successful service advertisement stop."""
with patch('mcp_memory_service.discovery.mdns_service.AsyncZeroconf') as mock_zeroconf_class:
mock_zeroconf = AsyncMock()
mock_zeroconf_class.return_value = mock_zeroconf
advertiser = ServiceAdvertiser()
advertiser._registered = True
advertiser._zeroconf = mock_zeroconf
advertiser._service_info = Mock()
await advertiser.stop()
assert advertiser._registered is False
mock_zeroconf.async_unregister_service.assert_called_once()
mock_zeroconf.async_close.assert_called_once()
@pytest.mark.asyncio
async def test_stop_not_registered(self):
"""Test stopping advertisement when not registered."""
advertiser = ServiceAdvertiser()
# Should not raise exception
await advertiser.stop()
class TestDiscoveryListener:
"""Test DiscoveryListener class."""
def test_init_no_callback(self):
"""Test DiscoveryListener initialization without callback."""
listener = DiscoveryListener()
assert listener.callback is None
assert len(listener.services) == 0
def test_init_with_callback(self):
"""Test DiscoveryListener initialization with callback."""
callback = Mock()
listener = DiscoveryListener(callback)
assert listener.callback == callback
@patch('socket.inet_ntoa')
def test_parse_service_info(self, mock_inet_ntoa):
"""Test parsing ServiceInfo into ServiceDetails."""
mock_inet_ntoa.return_value = "192.168.1.100"
# Create mock ServiceInfo
service_info = Mock()
service_info.name = "Test Service._mcp-memory._tcp.local."
service_info.type = "_mcp-memory._tcp.local."
service_info.port = 8000
service_info.addresses = [b'\xc0\xa8\x01\x64'] # 192.168.1.100
service_info.properties = {
b'https': b'true',
b'api_version': b'2.1.0',
b'auth_required': b'false'
}
listener = DiscoveryListener()
details = listener._parse_service_info(service_info)
assert details.name == "Test Service"
assert details.host == "192.168.1.100"
assert details.port == 8000
assert details.https is True
assert details.api_version == "2.1.0"
assert details.requires_auth is False
@patch('socket.inet_ntoa')
def test_parse_service_info_no_addresses(self, mock_inet_ntoa):
"""Test parsing ServiceInfo with no addresses."""
service_info = Mock()
service_info.name = "Test Service._mcp-memory._tcp.local."
service_info.type = "_mcp-memory._tcp.local."
service_info.port = 8000
service_info.addresses = []
service_info.properties = {}
listener = DiscoveryListener()
details = listener._parse_service_info(service_info)
assert details.host == "localhost"
def test_add_service_success(self):
"""Test successful service addition."""
callback = Mock()
listener = DiscoveryListener(callback)
# Mock zeroconf and service info
mock_zc = Mock()
mock_service_info = Mock()
mock_zc.get_service_info.return_value = mock_service_info
with patch.object(listener, '_parse_service_info') as mock_parse:
mock_details = Mock()
mock_details.name = "Test Service"
mock_parse.return_value = mock_details
listener.add_service(mock_zc, "_mcp-memory._tcp.local.", "test-service")
assert "test-service" in listener.services
callback.assert_called_once_with(mock_details)
def test_add_service_no_info(self):
"""Test service addition when no service info available."""
listener = DiscoveryListener()
mock_zc = Mock()
mock_zc.get_service_info.return_value = None
listener.add_service(mock_zc, "_mcp-memory._tcp.local.", "test-service")
assert "test-service" not in listener.services
def test_remove_service(self):
"""Test service removal."""
listener = DiscoveryListener()
mock_details = Mock()
mock_details.name = "Test Service"
listener.services["test-service"] = mock_details
listener.remove_service(Mock(), "_mcp-memory._tcp.local.", "test-service")
assert "test-service" not in listener.services
class TestServiceDiscovery:
"""Test ServiceDiscovery class."""
def test_init_default_parameters(self):
"""Test ServiceDiscovery initialization with defaults."""
discovery = ServiceDiscovery()
assert discovery.service_type == "_mcp-memory._tcp.local."
assert discovery.discovery_timeout == 5
assert discovery._discovering is False
def test_init_custom_parameters(self):
"""Test ServiceDiscovery initialization with custom parameters."""
discovery = ServiceDiscovery(
service_type="_custom._tcp.local.",
discovery_timeout=10
)
assert discovery.service_type == "_custom._tcp.local."
assert discovery.discovery_timeout == 10
@pytest.mark.asyncio
async def test_discover_services_success(self):
"""Test successful service discovery."""
with patch('mcp_memory_service.discovery.mdns_service.AsyncZeroconf') as mock_zeroconf_class, \
patch('mcp_memory_service.discovery.mdns_service.AsyncServiceBrowser') as mock_browser_class:
mock_zeroconf = AsyncMock()
mock_zeroconf_class.return_value = mock_zeroconf
mock_browser = Mock()
mock_browser_class.return_value = mock_browser
discovery = ServiceDiscovery(discovery_timeout=1) # Short timeout for testing
# Mock discovered services
mock_listener = Mock()
mock_service = Mock()
mock_service.name = "Test Service"
mock_listener.services = {"test-service": mock_service}
with patch.object(discovery, '_listener', mock_listener):
services = await discovery.discover_services()
assert len(services) == 1
assert services[0] == mock_service
@pytest.mark.asyncio
async def test_discover_services_already_discovering(self):
"""Test discovery when already in progress."""
discovery = ServiceDiscovery()
discovery._discovering = True
# Mock existing services
mock_listener = Mock()
mock_service = Mock()
mock_listener.services = {"test-service": mock_service}
discovery._listener = mock_listener
services = await discovery.discover_services()
assert len(services) == 1
assert services[0] == mock_service
@pytest.mark.asyncio
async def test_start_continuous_discovery(self):
"""Test starting continuous service discovery."""
callback = Mock()
with patch('mcp_memory_service.discovery.mdns_service.AsyncZeroconf') as mock_zeroconf_class, \
patch('mcp_memory_service.discovery.mdns_service.AsyncServiceBrowser') as mock_browser_class:
mock_zeroconf = AsyncMock()
mock_zeroconf_class.return_value = mock_zeroconf
mock_browser = Mock()
mock_browser_class.return_value = mock_browser
discovery = ServiceDiscovery()
result = await discovery.start_continuous_discovery(callback)
assert result is True
assert discovery._discovering is True
@pytest.mark.asyncio
async def test_start_continuous_discovery_already_started(self):
"""Test starting continuous discovery when already started."""
discovery = ServiceDiscovery()
discovery._discovering = True
result = await discovery.start_continuous_discovery(Mock())
assert result is False
@pytest.mark.asyncio
async def test_stop_discovery(self):
"""Test stopping service discovery."""
with patch('mcp_memory_service.discovery.mdns_service.AsyncZeroconf') as mock_zeroconf_class, \
patch('mcp_memory_service.discovery.mdns_service.AsyncServiceBrowser') as mock_browser_class:
mock_zeroconf = AsyncMock()
mock_browser = AsyncMock()
discovery = ServiceDiscovery()
discovery._discovering = True
discovery._zeroconf = mock_zeroconf
discovery._browser = mock_browser
await discovery.stop_discovery()
assert discovery._discovering is False
mock_browser.async_cancel.assert_called_once()
mock_zeroconf.async_close.assert_called_once()
def test_get_discovered_services(self):
"""Test getting discovered services."""
discovery = ServiceDiscovery()
# No listener
services = discovery.get_discovered_services()
assert len(services) == 0
# With listener
mock_listener = Mock()
mock_service = Mock()
mock_listener.services = {"test": mock_service}
discovery._listener = mock_listener
services = discovery.get_discovered_services()
assert len(services) == 1
assert services[0] == mock_service
class TestDiscoveryClient:
"""Test DiscoveryClient class."""
def test_init_default_timeout(self):
"""Test DiscoveryClient initialization with default timeout."""
client = DiscoveryClient()
assert client.discovery_timeout == 5
def test_init_custom_timeout(self):
"""Test DiscoveryClient initialization with custom timeout."""
client = DiscoveryClient(discovery_timeout=10)
assert client.discovery_timeout == 10
@pytest.mark.asyncio
async def test_discover_services(self):
"""Test service discovery."""
client = DiscoveryClient()
mock_service = Mock()
mock_service.name = "Test Service"
mock_service.url = "http://192.168.1.100:8000"
mock_service.requires_auth = False
with patch.object(client._discovery, 'discover_services', return_value=[mock_service]):
services = await client.discover_services()
assert len(services) == 1
assert services[0] == mock_service
@pytest.mark.asyncio
async def test_find_best_service_no_services(self):
"""Test finding best service when no services available."""
client = DiscoveryClient()
with patch.object(client, 'discover_services', return_value=[]):
service = await client.find_best_service()
assert service is None
@pytest.mark.asyncio
async def test_find_best_service_with_validation(self):
"""Test finding best service with health validation."""
client = DiscoveryClient()
# Create mock services
http_service = Mock()
http_service.https = False
http_service.requires_auth = False
http_service.port = 8000
http_service.name = "HTTP Service"
http_service.url = "http://192.168.1.100:8000"
https_service = Mock()
https_service.https = True
https_service.requires_auth = False
https_service.port = 8443
https_service.name = "HTTPS Service"
https_service.url = "https://192.168.1.100:8443"
with patch.object(client, 'discover_services', return_value=[http_service, https_service]), \
patch.object(client, 'check_service_health') as mock_health:
# Mock health check results
def health_side_effect(service):
if service.https:
return HealthStatus(
healthy=True, status='ok', backend='sqlite_vec',
statistics={}, response_time_ms=50.0
)
else:
return HealthStatus(
healthy=False, status='error', backend='unknown',
statistics={}, response_time_ms=0, error='Connection failed'
)
mock_health.side_effect = health_side_effect
service = await client.find_best_service(prefer_https=True)
assert service == https_service
@pytest.mark.asyncio
async def test_check_service_health_success(self):
"""Test successful service health check."""
client = DiscoveryClient()
mock_service = Mock()
mock_service.api_url = "http://192.168.1.100:8000/api"
mock_response = Mock()
mock_response.status = 200
mock_response.json = AsyncMock(return_value={
'status': 'healthy',
'storage_type': 'sqlite_vec',
'statistics': {'memory_count': 100}
})
with patch('aiohttp.ClientSession') as mock_session_class:
mock_session = AsyncMock()
mock_session_class.return_value.__aenter__.return_value = mock_session
mock_session.get.return_value.__aenter__.return_value = mock_response
health = await client.check_service_health(mock_service)
assert health is not None
assert health.healthy is True
assert health.status == 'healthy'
assert health.backend == 'sqlite_vec'
assert health.statistics == {'memory_count': 100}
@pytest.mark.asyncio
async def test_check_service_health_failure(self):
"""Test service health check failure."""
client = DiscoveryClient()
mock_service = Mock()
mock_service.api_url = "http://192.168.1.100:8000/api"
with patch('aiohttp.ClientSession') as mock_session_class:
mock_session_class.side_effect = Exception("Connection failed")
health = await client.check_service_health(mock_service)
assert health is not None
assert health.healthy is False
assert health.error == "Connection failed"
@pytest.mark.asyncio
async def test_find_services_with_health(self):
"""Test finding services with health status."""
client = DiscoveryClient()
# Create mock services
service1 = Mock()
service1.https = True
service1.requires_auth = False
service2 = Mock()
service2.https = False
service2.requires_auth = False
health1 = HealthStatus(
healthy=True, status='ok', backend='sqlite_vec',
statistics={}, response_time_ms=50.0
)
health2 = HealthStatus(
healthy=False, status='error', backend='unknown',
statistics={}, response_time_ms=0, error='Connection failed'
)
with patch.object(client, 'discover_services', return_value=[service1, service2]), \
patch.object(client, 'check_service_health', side_effect=[health1, health2]):
services_with_health = await client.find_services_with_health()
assert len(services_with_health) == 2
# Should be sorted with healthy services first
assert services_with_health[0][1].healthy is True
assert services_with_health[1][1].healthy is False
@pytest.mark.asyncio
async def test_stop(self):
"""Test stopping the discovery client."""
client = DiscoveryClient()
with patch.object(client._discovery, 'stop_discovery') as mock_stop:
await client.stop()
mock_stop.assert_called_once()
class TestHealthStatus:
"""Test HealthStatus dataclass."""
def test_health_status_creation(self):
"""Test HealthStatus creation."""
health = HealthStatus(
healthy=True,
status='ok',
backend='sqlite_vec',
statistics={'memory_count': 100},
response_time_ms=50.0,
error=None
)
assert health.healthy is True
assert health.status == 'ok'
assert health.backend == 'sqlite_vec'
assert health.statistics == {'memory_count': 100}
assert health.response_time_ms == 50.0
assert health.error is None
def test_health_status_with_error(self):
"""Test HealthStatus creation with error."""
health = HealthStatus(
healthy=False,
status='error',
backend='unknown',
statistics={},
response_time_ms=0,
error='Connection timeout'
)
assert health.healthy is False
assert health.error == 'Connection timeout'
# Integration tests that can run without actual network
class TestMDNSIntegration:
"""Integration tests for mDNS functionality."""
@pytest.mark.asyncio
async def test_advertiser_discovery_integration(self):
"""Test integration between advertiser and discovery (mocked)."""
# This test uses mocks to simulate the integration without actual network traffic
with patch('mcp_memory_service.discovery.mdns_service.AsyncZeroconf') as mock_zeroconf_class, \
patch('mcp_memory_service.discovery.mdns_service.AsyncServiceBrowser') as mock_browser_class:
# Setup mocks
mock_zeroconf = AsyncMock()
mock_zeroconf_class.return_value = mock_zeroconf
mock_browser = Mock()
mock_browser_class.return_value = mock_browser
# Start advertiser
advertiser = ServiceAdvertiser(service_name="Test Service")
with patch.object(advertiser, '_create_service_info'):
await advertiser.start()
assert advertiser._registered is True
# Start discovery
discovery = ServiceDiscovery(discovery_timeout=1)
# Mock discovered service
mock_service = ServiceDetails(
name="Test Service",
host="192.168.1.100",
port=8000,
https=False,
api_version="2.1.0",
requires_auth=False,
service_info=Mock()
)
with patch.object(discovery, '_listener') as mock_listener:
mock_listener.services = {"test-service": mock_service}
services = await discovery.discover_services()
assert len(services) == 1
assert services[0].name == "Test Service"
# Clean up
await advertiser.stop()
await discovery.stop_discovery()
if __name__ == '__main__':
pytest.main([__file__])
```
--------------------------------------------------------------------------------
/docs/examples/tag-schema.json:
--------------------------------------------------------------------------------
```json
{
"tag_schema": {
"version": "2.0",
"last_updated": "2025-06-07",
"description": "Standardized tag schema for MCP Memory Service knowledge management",
"total_categories": 6,
"naming_convention": {
"format": "lowercase-with-hyphens",
"rules": [
"Use lowercase letters only",
"Replace spaces with hyphens",
"Use descriptive but concise terms",
"Avoid abbreviations unless widely understood",
"Use singular form when possible"
],
"examples": {
"good": ["memory-service", "github-integration", "best-practices"],
"bad": ["memoryservice", "GitHub_Integration", "bestPractices"]
}
},
"categories": {
"projects_and_repositories": {
"description": "Primary projects, repositories, and major components",
"color": "#3b82f6",
"icon": "🚀",
"tags": {
"primary_projects": [
{
"tag": "mcp-memory-service",
"description": "Core memory service development",
"usage_count": 45,
"examples": ["Core development", "Memory storage features", "Database operations"]
},
{
"tag": "memory-dashboard",
"description": "Dashboard application for memory management",
"usage_count": 23,
"examples": ["UI development", "Dashboard features", "User interface"]
},
{
"tag": "github-integration",
"description": "GitHub connectivity and automation",
"usage_count": 15,
"examples": ["Issue tracking", "Repository management", "CI/CD"]
},
{
"tag": "mcp-protocol",
"description": "Model Context Protocol development",
"usage_count": 12,
"examples": ["Protocol specifications", "Communication standards"]
},
{
"tag": "cloudflare-workers",
"description": "Edge computing integration",
"usage_count": 8,
"examples": ["Edge deployment", "Worker scripts", "Distributed systems"]
}
],
"project_components": [
{
"tag": "frontend",
"description": "User interface components and client-side development",
"usage_count": 18,
"examples": ["React components", "UI/UX", "Client applications"]
},
{
"tag": "backend",
"description": "Server-side development and APIs",
"usage_count": 32,
"examples": ["Server logic", "Database operations", "API development"]
},
{
"tag": "api",
"description": "API design and implementation",
"usage_count": 14,
"examples": ["REST APIs", "Endpoints", "API documentation"]
},
{
"tag": "database",
"description": "Data storage and management",
"usage_count": 22,
"examples": ["ChromaDB", "Data models", "Database optimization"]
},
{
"tag": "infrastructure",
"description": "Deployment and DevOps",
"usage_count": 16,
"examples": ["Docker", "Cloud deployment", "System architecture"]
}
]
}
},
"technologies_and_tools": {
"description": "Programming languages, frameworks, libraries, and development tools",
"color": "#10b981",
"icon": "⚙️",
"tags": {
"programming_languages": [
{
"tag": "python",
"description": "Python development and scripts",
"usage_count": 38,
"examples": ["Backend development", "Scripts", "Data processing"]
},
{
"tag": "typescript",
"description": "TypeScript development",
"usage_count": 25,
"examples": ["Frontend development", "Type safety", "React applications"]
},
{
"tag": "javascript",
"description": "JavaScript development",
"usage_count": 20,
"examples": ["Client-side logic", "Node.js", "Web development"]
},
{
"tag": "bash",
"description": "Shell scripting and command-line operations",
"usage_count": 12,
"examples": ["Automation scripts", "System administration", "Build processes"]
},
{
"tag": "sql",
"description": "Database queries and operations",
"usage_count": 8,
"examples": ["Database queries", "Data analysis", "Schema design"]
}
],
"frameworks_and_libraries": [
{
"tag": "react",
"description": "React framework development",
"usage_count": 22,
"examples": ["Component development", "UI frameworks", "Frontend applications"]
},
{
"tag": "fastapi",
"description": "FastAPI framework for Python APIs",
"usage_count": 15,
"examples": ["API development", "Web services", "Backend frameworks"]
},
{
"tag": "chromadb",
"description": "ChromaDB vector database",
"usage_count": 28,
"examples": ["Vector storage", "Embedding operations", "Similarity search"]
},
{
"tag": "sentence-transformers",
"description": "Sentence transformer models for embeddings",
"usage_count": 18,
"examples": ["Text embeddings", "Semantic search", "NLP models"]
},
{
"tag": "pytest",
"description": "Python testing framework",
"usage_count": 10,
"examples": ["Unit testing", "Test automation", "Quality assurance"]
}
],
"tools_and_platforms": [
{
"tag": "git",
"description": "Version control and repository management",
"usage_count": 24,
"examples": ["Version control", "Collaboration", "Code management"]
},
{
"tag": "docker",
"description": "Containerization and deployment",
"usage_count": 16,
"examples": ["Container deployment", "Application packaging", "DevOps"]
},
{
"tag": "github",
"description": "GitHub platform and repository management",
"usage_count": 20,
"examples": ["Repository hosting", "Issue tracking", "Collaboration"]
},
{
"tag": "aws",
"description": "Amazon Web Services cloud platform",
"usage_count": 12,
"examples": ["Cloud infrastructure", "Deployment", "Scalability"]
},
{
"tag": "npm",
"description": "Node package management",
"usage_count": 8,
"examples": ["Package management", "Dependencies", "JavaScript ecosystem"]
}
]
}
},
"activities_and_processes": {
"description": "Development activities, operational processes, and workflows",
"color": "#f59e0b",
"icon": "🔧",
"tags": {
"development_activities": [
{
"tag": "development",
"description": "General development work and programming",
"usage_count": 35,
"examples": ["Feature development", "Code writing", "Implementation"]
},
{
"tag": "implementation",
"description": "Feature implementation and code realization",
"usage_count": 28,
"examples": ["Feature implementation", "Code realization", "System building"]
},
{
"tag": "debugging",
"description": "Bug investigation and problem solving",
"usage_count": 22,
"examples": ["Bug fixes", "Problem investigation", "Issue resolution"]
},
{
"tag": "testing",
"description": "Quality assurance and testing activities",
"usage_count": 30,
"examples": ["Unit testing", "Integration testing", "Quality assurance"]
},
{
"tag": "refactoring",
"description": "Code improvement and restructuring",
"usage_count": 12,
"examples": ["Code cleanup", "Architecture improvement", "Optimization"]
},
{
"tag": "optimization",
"description": "Performance enhancement and efficiency improvements",
"usage_count": 15,
"examples": ["Performance tuning", "Resource optimization", "Speed improvements"]
}
],
"documentation_activities": [
{
"tag": "documentation",
"description": "Writing documentation and guides",
"usage_count": 25,
"examples": ["Technical documentation", "User guides", "API documentation"]
},
{
"tag": "tutorial",
"description": "Creating tutorials and learning materials",
"usage_count": 8,
"examples": ["Step-by-step guides", "Learning materials", "How-to documents"]
},
{
"tag": "guide",
"description": "Comprehensive guides and references",
"usage_count": 12,
"examples": ["Best practice guides", "Implementation guides", "Reference materials"]
},
{
"tag": "reference",
"description": "Reference materials and quick lookups",
"usage_count": 18,
"examples": ["Quick reference", "Lookup tables", "Technical specifications"]
},
{
"tag": "examples",
"description": "Code examples and practical demonstrations",
"usage_count": 15,
"examples": ["Code samples", "Usage examples", "Demonstrations"]
}
],
"operational_activities": [
{
"tag": "deployment",
"description": "Application deployment and release management",
"usage_count": 18,
"examples": ["Production deployment", "Release management", "Environment setup"]
},
{
"tag": "monitoring",
"description": "System monitoring and observability",
"usage_count": 10,
"examples": ["Performance monitoring", "Health checks", "System observability"]
},
{
"tag": "backup",
"description": "Data backup and recovery processes",
"usage_count": 8,
"examples": ["Data backup", "Disaster recovery", "Data preservation"]
},
{
"tag": "migration",
"description": "Data or system migration processes",
"usage_count": 12,
"examples": ["Data migration", "System upgrades", "Platform transitions"]
},
{
"tag": "maintenance",
"description": "System maintenance and upkeep",
"usage_count": 15,
"examples": ["Regular maintenance", "System updates", "Preventive care"]
},
{
"tag": "troubleshooting",
"description": "Problem resolution and diagnostic work",
"usage_count": 20,
"examples": ["Issue diagnosis", "Problem solving", "System repair"]
}
]
}
},
"content_types_and_formats": {
"description": "Types of knowledge content and documentation formats",
"color": "#8b5cf6",
"icon": "📚",
"tags": {
"knowledge_types": [
{
"tag": "concept",
"description": "Conceptual information and theoretical content",
"usage_count": 18,
"examples": ["Design concepts", "Theoretical frameworks", "Ideas"]
},
{
"tag": "architecture",
"description": "System architecture and design patterns",
"usage_count": 22,
"examples": ["System design", "Architecture patterns", "Technical blueprints"]
},
{
"tag": "design",
"description": "Design decisions and design patterns",
"usage_count": 16,
"examples": ["Design decisions", "UI/UX design", "System design"]
},
{
"tag": "best-practices",
"description": "Proven methodologies and recommended approaches",
"usage_count": 20,
"examples": ["Industry standards", "Recommended practices", "Quality guidelines"]
},
{
"tag": "methodology",
"description": "Systematic approaches and methodologies",
"usage_count": 12,
"examples": ["Development methodologies", "Process frameworks", "Systematic approaches"]
},
{
"tag": "workflow",
"description": "Process workflows and operational procedures",
"usage_count": 14,
"examples": ["Business processes", "Development workflows", "Operational procedures"]
}
],
"documentation_formats": [
{
"tag": "tutorial",
"description": "Step-by-step instructional content",
"usage_count": 15,
"examples": ["Learning tutorials", "How-to guides", "Educational content"]
},
{
"tag": "reference",
"description": "Quick reference materials and lookups",
"usage_count": 18,
"examples": ["API reference", "Command reference", "Quick lookups"]
},
{
"tag": "example",
"description": "Practical examples and demonstrations",
"usage_count": 22,
"examples": ["Code examples", "Use cases", "Practical demonstrations"]
},
{
"tag": "template",
"description": "Reusable templates and boilerplates",
"usage_count": 10,
"examples": ["Document templates", "Code templates", "Process templates"]
},
{
"tag": "checklist",
"description": "Verification checklists and task lists",
"usage_count": 8,
"examples": ["Quality checklists", "Process verification", "Task lists"]
},
{
"tag": "summary",
"description": "Condensed information and overviews",
"usage_count": 12,
"examples": ["Executive summaries", "Project overviews", "Condensed reports"]
}
],
"technical_content": [
{
"tag": "configuration",
"description": "System configuration and setup information",
"usage_count": 16,
"examples": ["System setup", "Configuration files", "Environment setup"]
},
{
"tag": "specification",
"description": "Technical specifications and requirements",
"usage_count": 14,
"examples": ["Technical specs", "Requirements", "Standards"]
},
{
"tag": "analysis",
"description": "Technical analysis and research findings",
"usage_count": 18,
"examples": ["Performance analysis", "Technical research", "Data analysis"]
},
{
"tag": "research",
"description": "Research findings and investigations",
"usage_count": 15,
"examples": ["Research results", "Investigations", "Study findings"]
},
{
"tag": "review",
"description": "Code reviews and process evaluations",
"usage_count": 10,
"examples": ["Code reviews", "Process reviews", "Quality assessments"]
}
]
}
},
"status_and_progress": {
"description": "Development status, progress indicators, and priority levels",
"color": "#ef4444",
"icon": "📊",
"tags": {
"development_status": [
{
"tag": "resolved",
"description": "Completed and verified work",
"usage_count": 25,
"examples": ["Completed features", "Fixed bugs", "Resolved issues"]
},
{
"tag": "in-progress",
"description": "Currently being worked on",
"usage_count": 18,
"examples": ["Active development", "Ongoing work", "Current tasks"]
},
{
"tag": "blocked",
"description": "Waiting for external dependencies",
"usage_count": 8,
"examples": ["Dependency blocks", "External waiting", "Resource constraints"]
},
{
"tag": "needs-investigation",
"description": "Requires further analysis or research",
"usage_count": 12,
"examples": ["Research needed", "Analysis required", "Investigation pending"]
},
{
"tag": "planned",
"description": "Scheduled for future work",
"usage_count": 15,
"examples": ["Future work", "Roadmap items", "Planned features"]
},
{
"tag": "cancelled",
"description": "No longer being pursued",
"usage_count": 5,
"examples": ["Cancelled projects", "Deprecated features", "Abandoned work"]
}
],
"quality_status": [
{
"tag": "verified",
"description": "Tested and confirmed working",
"usage_count": 20,
"examples": ["Verified functionality", "Confirmed working", "Quality assured"]
},
{
"tag": "tested",
"description": "Has undergone testing",
"usage_count": 22,
"examples": ["Tested code", "QA complete", "Testing done"]
},
{
"tag": "reviewed",
"description": "Has been peer reviewed",
"usage_count": 15,
"examples": ["Code reviewed", "Peer reviewed", "Quality checked"]
},
{
"tag": "approved",
"description": "Officially approved for use",
"usage_count": 12,
"examples": ["Management approved", "Officially sanctioned", "Authorized"]
},
{
"tag": "experimental",
"description": "Proof of concept or experimental stage",
"usage_count": 8,
"examples": ["Proof of concept", "Experimental features", "Research stage"]
},
{
"tag": "deprecated",
"description": "No longer recommended for use",
"usage_count": 6,
"examples": ["Legacy code", "Outdated practices", "Superseded methods"]
}
],
"priority_levels": [
{
"tag": "urgent",
"description": "Immediate attention required",
"usage_count": 8,
"examples": ["Critical bugs", "Emergency fixes", "Immediate action"]
},
{
"tag": "high-priority",
"description": "Important, should be addressed soon",
"usage_count": 15,
"examples": ["Important features", "Key improvements", "High-impact work"]
},
{
"tag": "normal-priority",
"description": "Standard priority work",
"usage_count": 25,
"examples": ["Regular work", "Standard features", "Normal development"]
},
{
"tag": "low-priority",
"description": "Can be addressed when time allows",
"usage_count": 18,
"examples": ["Nice-to-have features", "Minor improvements", "Low-impact work"]
},
{
"tag": "nice-to-have",
"description": "Enhancement, not critical",
"usage_count": 12,
"examples": ["Optional features", "Enhancements", "Convenience improvements"]
}
]
}
},
"context_and_temporal": {
"description": "Temporal markers, environmental context, and scope indicators",
"color": "#06b6d4",
"icon": "🕒",
"tags": {
"temporal_markers": [
{
"tag": "january-2025",
"description": "Content from January 2025",
"usage_count": 50,
"examples": ["Project initialization", "Early development", "Planning phase"]
},
{
"tag": "june-2025",
"description": "Content from June 2025",
"usage_count": 45,
"examples": ["Recent development", "Current work", "Latest updates"]
},
{
"tag": "q1-2025",
"description": "First quarter 2025 content",
"usage_count": 18,
"examples": ["Quarterly planning", "Q1 objectives", "First quarter work"]
},
{
"tag": "milestone-v1",
"description": "Version 1 milestone content",
"usage_count": 12,
"examples": ["Version milestones", "Release markers", "Development phases"]
},
{
"tag": "sprint-3",
"description": "Development sprint markers",
"usage_count": 8,
"examples": ["Sprint work", "Iteration markers", "Development cycles"]
}
],
"environmental_context": [
{
"tag": "development",
"description": "Development environment context",
"usage_count": 30,
"examples": ["Development work", "Local environment", "Dev testing"]
},
{
"tag": "staging",
"description": "Staging environment context",
"usage_count": 12,
"examples": ["Staging deployment", "Pre-production", "Staging testing"]
},
{
"tag": "production",
"description": "Production environment context",
"usage_count": 20,
"examples": ["Live systems", "Production deployment", "Production issues"]
},
{
"tag": "testing",
"description": "Testing environment context",
"usage_count": 25,
"examples": ["Test environment", "QA testing", "Testing infrastructure"]
},
{
"tag": "local",
"description": "Local development context",
"usage_count": 15,
"examples": ["Local development", "Local testing", "Local setup"]
}
],
"scope_and_impact": [
{
"tag": "breaking-change",
"description": "Introduces breaking changes",
"usage_count": 8,
"examples": ["API changes", "Backwards incompatible", "Major updates"]
},
{
"tag": "feature",
"description": "New feature development",
"usage_count": 28,
"examples": ["New features", "Feature additions", "Functionality expansion"]
},
{
"tag": "enhancement",
"description": "Improvement to existing features",
"usage_count": 22,
"examples": ["Feature improvements", "Performance enhancements", "User experience"]
},
{
"tag": "hotfix",
"description": "Critical fix for production issues",
"usage_count": 6,
"examples": ["Emergency fixes", "Critical patches", "Production fixes"]
},
{
"tag": "security",
"description": "Security-related content",
"usage_count": 10,
"examples": ["Security fixes", "Security analysis", "Vulnerability patches"]
},
{
"tag": "performance",
"description": "Performance-related improvements",
"usage_count": 15,
"examples": ["Performance optimization", "Speed improvements", "Efficiency gains"]
}
]
}
}
},
"tag_combination_patterns": {
"description": "Common patterns for combining tags across categories",
"examples": [
{
"pattern": "Project + Technology + Activity + Status",
"example": ["mcp-memory-service", "python", "debugging", "resolved"],
"usage": "Most comprehensive tagging for technical work"
},
{
"pattern": "Content Type + Domain + Technology + Context",
"example": ["documentation", "backend", "chromadb", "production"],
"usage": "Documentation and reference materials"
},
{
"pattern": "Activity + Status + Priority + Temporal",
"example": ["testing", "in-progress", "high-priority", "june-2025"],
"usage": "Active work items with clear status"
},
{
"pattern": "Concept + Architecture + Research + Domain",
"example": ["concept", "architecture", "research", "system-design"],
"usage": "Conceptual and design-related content"
}
]
},
"usage_guidelines": {
"recommended_tag_count": {
"minimum": 3,
"maximum": 8,
"optimal": "4-6 tags from different categories"
},
"category_distribution": {
"required": ["Project context (1-2 tags)", "Content type or activity (1-2 tags)"],
"recommended": ["Technology (1-2 tags)", "Status (1 tag)"],
"optional": ["Context/Temporal (0-2 tags)", "Priority (0-1 tags)"]
},
"quality_indicators": [
"Tags from multiple categories",
"Specific rather than generic terms",
"Consistent with established patterns",
"Relevant to content and future retrieval"
]
},
"maintenance": {
"review_schedule": {
"weekly": "Check new tag usage patterns",
"monthly": "Review tag frequency and consistency",
"quarterly": "Update schema based on usage patterns"
},
"evolution_process": [
"Identify new patterns in content",
"Propose new tags or categories",
"Test with sample content",
"Update schema documentation",
"Migrate existing content if needed"
]
}
}
}
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/consolidation/consolidator.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Main dream-inspired consolidation orchestrator."""
import asyncio
from typing import List, Dict, Any, Optional, Protocol, Tuple
from datetime import datetime, timedelta
import logging
import time
from .base import ConsolidationConfig, ConsolidationReport, ConsolidationError
from .decay import ExponentialDecayCalculator
from .associations import CreativeAssociationEngine
from .clustering import SemanticClusteringEngine
from .compression import SemanticCompressionEngine
from .forgetting import ControlledForgettingEngine
from .health import ConsolidationHealthMonitor
from ..models.memory import Memory
# Protocol for storage backend interface
class StorageProtocol(Protocol):
async def get_all_memories(self) -> List[Memory]: ...
async def get_memories_by_time_range(self, start_time: float, end_time: float) -> List[Memory]: ...
async def store(self, memory: Memory) -> Tuple[bool, str]: ...
async def update_memory(self, memory: Memory) -> bool: ...
async def delete_memory(self, content_hash: str) -> bool: ...
async def get_memory_connections(self) -> Dict[str, int]: ...
async def get_access_patterns(self) -> Dict[str, datetime]: ...
class SyncPauseContext:
"""Context manager for pausing/resuming hybrid backend sync."""
def __init__(self, storage, logger):
self.storage = storage
self.logger = logger
self.is_hybrid = hasattr(storage, 'pause_sync') and hasattr(storage, 'resume_sync')
self.sync_paused = False
async def __aenter__(self):
if self.is_hybrid:
self.logger.info("Pausing hybrid backend sync during consolidation")
await self.storage.pause_sync()
self.sync_paused = True
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.sync_paused:
try:
self.logger.info("Resuming hybrid backend sync after consolidation")
await self.storage.resume_sync()
except Exception as e:
self.logger.error(f"Failed to resume sync: {e}", exc_info=True)
def check_horizon_requirements(time_horizon: str, phase_name: str,
enabled_phases: Dict[str, List[str]]) -> bool:
"""Check if a consolidation phase should run for the given horizon.
Args:
time_horizon: Current time horizon (daily, weekly, etc.)
phase_name: Phase identifier (clustering, associations, etc.)
enabled_phases: Dict mapping phase names to applicable horizons
Returns:
bool: True if phase should run
"""
applicable_horizons = enabled_phases.get(phase_name, [])
return time_horizon in applicable_horizons
# Horizon configuration
HORIZON_CONFIGS = {
'daily': {'delta': timedelta(days=1), 'cutoff_days': 2},
'weekly': {'delta': timedelta(days=7), 'cutoff_days': None},
'monthly': {'delta': timedelta(days=30), 'cutoff_days': None},
'quarterly': {'delta': timedelta(days=90), 'cutoff_days': 90},
'yearly': {'delta': timedelta(days=365), 'cutoff_days': 365}
}
def filter_memories_by_age(memories: List[Memory], cutoff_date: datetime) -> List[Memory]:
"""Filter memories created before the cutoff date.
Args:
memories: List of Memory objects
cutoff_date: Only keep memories older than this
Returns:
Filtered list of memories
"""
return [
m for m in memories
if m.created_at and datetime.utcfromtimestamp(m.created_at) < cutoff_date
]
class DreamInspiredConsolidator:
"""
Main consolidation engine with biologically-inspired processing.
Orchestrates the full consolidation pipeline including:
- Exponential decay scoring
- Creative association discovery
- Semantic clustering and compression
- Controlled forgetting with archival
"""
# Phase enablement configuration
ENABLED_PHASES = {
'clustering': ['weekly', 'monthly', 'quarterly'],
'associations': ['weekly', 'monthly'],
'compression': ['weekly', 'monthly', 'quarterly'],
'forgetting': ['monthly', 'quarterly', 'yearly']
}
def __init__(self, storage: StorageProtocol, config: ConsolidationConfig):
self.storage = storage
self.config = config
self.logger = logging.getLogger(__name__)
# Initialize component engines
self.decay_calculator = ExponentialDecayCalculator(config)
self.association_engine = CreativeAssociationEngine(config)
self.clustering_engine = SemanticClusteringEngine(config)
self.compression_engine = SemanticCompressionEngine(config)
self.forgetting_engine = ControlledForgettingEngine(config)
# Initialize health monitoring
self.health_monitor = ConsolidationHealthMonitor(config)
# Performance tracking
self.last_consolidation_times = {}
self.consolidation_stats = {
'total_runs': 0,
'successful_runs': 0,
'total_memories_processed': 0,
'total_associations_created': 0,
'total_clusters_created': 0,
'total_memories_compressed': 0,
'total_memories_archived': 0
}
async def consolidate(self, time_horizon: str, **kwargs) -> ConsolidationReport:
"""
Run full consolidation pipeline for given time horizon.
Args:
time_horizon: 'daily', 'weekly', 'monthly', 'quarterly', 'yearly'
**kwargs: Additional parameters for consolidation
Returns:
ConsolidationReport with results and performance metrics
"""
start_time = datetime.now()
report = ConsolidationReport(
time_horizon=time_horizon,
start_time=start_time,
end_time=start_time, # Will be updated at the end
memories_processed=0
)
try:
self.logger.info(f"Starting {time_horizon} consolidation - this may take several minutes depending on memory count...")
# Use context manager for sync pause/resume
async with SyncPauseContext(self.storage, self.logger):
# 1. Retrieve memories for processing
memories = await self._get_memories_for_horizon(time_horizon, **kwargs)
report.memories_processed = len(memories)
if not memories:
self.logger.info(f"No memories to process for {time_horizon} consolidation")
return self._finalize_report(report, [])
self.logger.info(f"✓ Found {len(memories)} memories to process")
# 2. Calculate/update relevance scores
self.logger.info(f"📊 Phase 1/6: Calculating relevance scores for {len(memories)} memories...")
performance_start = time.time()
relevance_scores = await self._update_relevance_scores(memories, time_horizon)
self.logger.info(f"✓ Relevance scoring completed in {time.time() - performance_start:.1f}s")
# 3. Cluster by semantic similarity (if enabled and appropriate)
clusters = []
if self.config.clustering_enabled and check_horizon_requirements(
time_horizon, 'clustering', self.ENABLED_PHASES
):
self.logger.info(f"🔗 Phase 2/6: Clustering memories by semantic similarity...")
performance_start = time.time()
clusters = await self.clustering_engine.process(memories)
report.clusters_created = len(clusters)
self.logger.info(f"✓ Clustering completed in {time.time() - performance_start:.1f}s, created {len(clusters)} clusters")
# 4. Run creative associations (if enabled and appropriate)
associations = []
if self.config.associations_enabled and check_horizon_requirements(
time_horizon, 'associations', self.ENABLED_PHASES
):
self.logger.info(f"🧠 Phase 3/6: Discovering creative associations...")
performance_start = time.time()
existing_associations = await self._get_existing_associations()
associations = await self.association_engine.process(
memories, existing_associations=existing_associations
)
report.associations_discovered = len(associations)
self.logger.info(f"✓ Association discovery completed in {time.time() - performance_start:.1f}s, found {len(associations)} associations")
# Store new associations as memories
await self._store_associations_as_memories(associations)
# 5. Compress clusters (if enabled and clusters exist)
compression_results = []
if self.config.compression_enabled and clusters and check_horizon_requirements(
time_horizon, 'compression', self.ENABLED_PHASES
):
self.logger.info(f"🗜️ Phase 4/6: Compressing memory clusters...")
performance_start = time.time()
compression_results = await self.compression_engine.process(clusters, memories)
report.memories_compressed = len(compression_results)
self.logger.info(f"✓ Compression completed in {time.time() - performance_start:.1f}s, compressed {len(compression_results)} clusters")
# Store compressed memories and update originals
await self._handle_compression_results(compression_results)
# 6. Controlled forgetting (if enabled and appropriate)
forgetting_results = []
if self.config.forgetting_enabled and check_horizon_requirements(
time_horizon, 'forgetting', self.ENABLED_PHASES
):
self.logger.info(f"🗂️ Phase 5/6: Applying controlled forgetting...")
performance_start = time.time()
access_patterns = await self._get_access_patterns()
forgetting_results = await self.forgetting_engine.process(
memories, relevance_scores,
access_patterns=access_patterns,
time_horizon=time_horizon
)
report.memories_archived = len([r for r in forgetting_results if r.action_taken in ['archived', 'deleted']])
self.logger.info(f"✓ Forgetting completed in {time.time() - performance_start:.1f}s, processed {len(forgetting_results)} candidates")
# Apply forgetting results to storage
await self._apply_forgetting_results(forgetting_results)
# 7. Update consolidation statistics
self._update_consolidation_stats(report)
# 8. Track consolidation timestamp for incremental mode
if self.config.incremental_mode:
await self._update_consolidation_timestamps(memories)
# 9. Finalize report
return self._finalize_report(report, [])
except ConsolidationError as e:
# Re-raise configuration and validation errors
self.logger.error(f"Configuration error during {time_horizon} consolidation: {e}")
self.health_monitor.record_error('consolidator', e, {'time_horizon': time_horizon})
raise
except Exception as e:
self.logger.error(f"Error during {time_horizon} consolidation: {e}")
self.health_monitor.record_error('consolidator', e, {'time_horizon': time_horizon})
report.errors.append(str(e))
return self._finalize_report(report, [str(e)])
async def _get_memories_for_horizon(self, time_horizon: str, **kwargs) -> List[Memory]:
"""Get memories appropriate for the given time horizon.
With incremental mode enabled, returns oldest-first batch of memories
that haven't been recently consolidated.
"""
now = datetime.now(timezone.utc)
# Validate time horizon
if time_horizon not in HORIZON_CONFIGS:
raise ConsolidationError(f"Unknown time horizon: {time_horizon}")
config = HORIZON_CONFIGS[time_horizon]
# For daily processing, get recent memories (no change - already efficient)
if time_horizon == 'daily':
cutoff_days = config.get('cutoff_days', 2)
start_time = (now - timedelta(days=cutoff_days)).timestamp()
end_time = now.timestamp()
memories = await self.storage.get_memories_by_time_range(start_time, end_time)
else:
# For longer horizons: incremental oldest-first processing
memories = await self.storage.get_all_memories()
# Filter by relevance to time horizon (quarterly/yearly still focus on old memories)
cutoff_days = config.get('cutoff_days')
if cutoff_days is not None:
cutoff_date = now - timedelta(days=cutoff_days)
memories = filter_memories_by_age(memories, cutoff_date)
# Incremental mode: Sort oldest-first and batch
if self.config.incremental_mode:
# Sort by last_consolidated_at (oldest first), fallback to created_at
def get_consolidation_sort_key(memory: Memory) -> float:
# Check metadata for last consolidation timestamp
if memory.metadata and 'last_consolidated_at' in memory.metadata:
return float(memory.metadata['last_consolidated_at'])
# Fall back to created_at (treat never-consolidated as oldest)
return memory.created_at if memory.created_at else 0.0
memories.sort(key=get_consolidation_sort_key)
# Limit to batch size
batch_size = self.config.batch_size
if len(memories) > batch_size:
self.logger.info(f"Incremental mode: Processing {batch_size} oldest memories (out of {len(memories)} total)")
memories = memories[:batch_size]
return memories
async def _update_relevance_scores(self, memories: List[Memory], time_horizon: str) -> List:
"""Calculate and update relevance scores for memories."""
# Get connection and access data
connections = await self._get_memory_connections()
access_patterns = await self._get_access_patterns()
# Calculate relevance scores
relevance_scores = await self.decay_calculator.process(
memories,
connections=connections,
access_patterns=access_patterns,
reference_time=datetime.now()
)
# Update memory metadata with relevance scores
for memory in memories:
score = next((s for s in relevance_scores if s.memory_hash == memory.content_hash), None)
if score:
updated_memory = await self.decay_calculator.update_memory_relevance_metadata(memory, score)
await self.storage.update_memory(updated_memory)
return relevance_scores
async def _get_memory_connections(self) -> Dict[str, int]:
"""Get memory connection counts from storage."""
try:
return await self.storage.get_memory_connections()
except AttributeError:
# Fallback if storage doesn't implement connection tracking
self.logger.warning("Storage backend doesn't support connection tracking")
return {}
async def _get_access_patterns(self) -> Dict[str, datetime]:
"""Get memory access patterns from storage."""
try:
return await self.storage.get_access_patterns()
except AttributeError:
# Fallback if storage doesn't implement access tracking
self.logger.warning("Storage backend doesn't support access pattern tracking")
return {}
async def _get_existing_associations(self) -> set:
"""Get existing memory associations to avoid duplicates."""
try:
# Look for existing association memories
all_memories = await self.storage.get_all_memories()
associations = set()
for memory in all_memories:
if memory.memory_type == 'association' and 'source_memory_hashes' in memory.metadata:
source_hashes = memory.metadata['source_memory_hashes']
if isinstance(source_hashes, list) and len(source_hashes) >= 2:
# Create canonical pair representation
pair_key = tuple(sorted(source_hashes[:2]))
associations.add(pair_key)
return associations
except Exception as e:
self.logger.warning(f"Error getting existing associations: {e}")
return set()
async def _store_associations_as_memories(self, associations) -> None:
"""Store discovered associations as first-class memories."""
for association in associations:
# Create memory content from association
source_hashes = association.source_memory_hashes
similarity = association.similarity_score
connection_type = association.connection_type
content = f"Association between memories {source_hashes[0][:8]} and {source_hashes[1][:8]}: {connection_type} (similarity: {similarity:.3f})"
# Create association memory
association_memory = Memory(
content=content,
content_hash=f"assoc_{source_hashes[0][:8]}_{source_hashes[1][:8]}",
tags=['association', 'discovered'] + connection_type.split(', '),
memory_type='association',
metadata={
'source_memory_hashes': source_hashes,
'similarity_score': similarity,
'connection_type': connection_type,
'discovery_method': association.discovery_method,
'discovery_date': association.discovery_date.isoformat(),
**association.metadata
},
created_at=datetime.now().timestamp(),
created_at_iso=datetime.now().isoformat() + 'Z'
)
# Store the association memory
success, _ = await self.storage.store(association_memory)
if not success:
logger.warning(f"Failed to store association memory for {memory1_hash} <-> {memory2_hash}")
async def _handle_compression_results(self, compression_results) -> None:
"""Handle storage of compressed memories and linking to originals."""
for result in compression_results:
# Store compressed memory
success, _ = await self.storage.store(result.compressed_memory)
if not success:
logger.warning(f"Failed to store compressed memory")
# Update original memories with compression links
# This could involve adding metadata pointing to the compressed version
# Implementation depends on how the storage backend handles relationships
pass
async def _apply_forgetting_results(self, forgetting_results) -> None:
"""Apply forgetting results to the storage backend."""
for result in forgetting_results:
if result.action_taken == 'deleted':
await self.storage.delete_memory(result.memory_hash)
elif result.action_taken == 'compressed' and result.compressed_version:
# Replace original with compressed version
await self.storage.delete_memory(result.memory_hash)
success, _ = await self.storage.store(result.compressed_version)
if not success:
logger.warning(f"Failed to store compressed version for {result.memory_hash}")
# 'archived' memories are handled by the forgetting engine
async def _update_consolidation_timestamps(self, memories: List[Memory]) -> None:
"""Mark memories with last_consolidated_at timestamp for incremental mode using batch updates."""
consolidation_time = datetime.now().timestamp()
self.logger.info(f"Marking {len(memories)} memories with consolidation timestamp (batch mode)")
# Update all memories in-place
for memory in memories:
if memory.metadata is None:
memory.metadata = {}
memory.metadata['last_consolidated_at'] = consolidation_time
# Use batch update for optimal performance
try:
results = await self.storage.update_memories_batch(memories)
success_count = sum(results)
self.logger.info(f"Consolidation timestamps updated: {success_count}/{len(memories)} memories")
if success_count < len(memories):
failed_count = len(memories) - success_count
self.logger.warning(f"{failed_count} memories failed to update during timestamp marking")
except Exception as e:
self.logger.error(f"Batch timestamp update failed: {e}")
# Fallback to individual updates if batch fails
self.logger.info("Falling back to individual timestamp updates")
success_count = 0
for memory in memories:
try:
success = await self.storage.update_memory(memory)
if success:
success_count += 1
except Exception as mem_error:
self.logger.warning(f"Failed to update consolidation timestamp for {memory.content_hash}: {mem_error}")
self.logger.info(f"Fallback completed: {success_count}/{len(memories)} memories updated")
def _update_consolidation_stats(self, report: ConsolidationReport) -> None:
"""Update internal consolidation statistics."""
self.consolidation_stats['total_runs'] += 1
if not report.errors:
self.consolidation_stats['successful_runs'] += 1
self.consolidation_stats['total_memories_processed'] += report.memories_processed
self.consolidation_stats['total_associations_created'] += report.associations_discovered
self.consolidation_stats['total_clusters_created'] += report.clusters_created
self.consolidation_stats['total_memories_compressed'] += report.memories_compressed
self.consolidation_stats['total_memories_archived'] += report.memories_archived
# Update last consolidation time
self.last_consolidation_times[report.time_horizon] = report.start_time
def _finalize_report(self, report: ConsolidationReport, errors: List[str]) -> ConsolidationReport:
"""Finalize the consolidation report."""
report.end_time = datetime.now()
report.errors.extend(errors)
# Add performance metrics
duration = (report.end_time - report.start_time).total_seconds()
success = len(errors) == 0
report.performance_metrics = {
'duration_seconds': duration,
'memories_per_second': report.memories_processed / duration if duration > 0 else 0,
'success': success
}
# Record performance in health monitor
self.health_monitor.record_consolidation_performance(
time_horizon=report.time_horizon,
duration=duration,
memories_processed=report.memories_processed,
success=success,
errors=errors
)
# Log summary
if errors:
self.logger.error(f"Consolidation {report.time_horizon} completed with errors: {errors}")
else:
self.logger.info(
f"Consolidation {report.time_horizon} completed successfully: "
f"{report.memories_processed} memories, {report.associations_discovered} associations, "
f"{report.clusters_created} clusters, {report.memories_compressed} compressed, "
f"{report.memories_archived} archived in {duration:.2f}s"
)
return report
async def health_check(self) -> Dict[str, Any]:
"""Perform health check on the consolidation system."""
return await self.health_monitor.check_overall_health()
async def get_health_summary(self) -> Dict[str, Any]:
"""Get a summary of consolidation system health."""
return await self.health_monitor.get_health_summary()
def get_error_history(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Get recent error history."""
return self.health_monitor.error_history[-limit:]
def get_performance_history(self, limit: int = 100) -> List[Dict[str, Any]]:
"""Get recent performance history."""
return self.health_monitor.performance_history[-limit:]
def resolve_health_alert(self, alert_id: str):
"""Resolve a health alert."""
self.health_monitor.resolve_alert(alert_id)
async def get_consolidation_recommendations(self, time_horizon: str) -> Dict[str, Any]:
"""Get recommendations for consolidation based on current memory state."""
try:
memories = await self._get_memories_for_horizon(time_horizon)
if not memories:
return {
'recommendation': 'no_action',
'reason': 'No memories to process',
'memory_count': 0
}
# Analyze memory distribution
memory_types = {}
total_size = 0
old_memories = 0
now = datetime.now()
for memory in memories:
memory_type = memory.memory_type or 'standard'
memory_types[memory_type] = memory_types.get(memory_type, 0) + 1
total_size += len(memory.content)
if memory.created_at:
age_days = (now - datetime.utcfromtimestamp(memory.created_at)).days
if age_days > 30:
old_memories += 1
# Generate recommendations
recommendations = []
if len(memories) > 1000:
recommendations.append("Consider running compression to reduce memory usage")
if old_memories > len(memories) * 0.5:
recommendations.append("Many old memories present - consider forgetting/archival")
if len(memories) > 100 and time_horizon in ['weekly', 'monthly']:
recommendations.append("Good candidate for association discovery")
if not recommendations:
recommendations.append("Memory state looks healthy")
return {
'recommendation': 'consolidation_beneficial' if len(recommendations) > 1 else 'optional',
'reasons': recommendations,
'memory_count': len(memories),
'memory_types': memory_types,
'total_size_bytes': total_size,
'old_memory_percentage': (old_memories / len(memories)) * 100,
'estimated_duration_seconds': len(memories) * 0.01 # Rough estimate
}
except Exception as e:
return {
'recommendation': 'error',
'error': str(e),
'memory_count': 0
}
```