This is page 27 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── amp-bridge.md
│ │ ├── amp-pr-automator.md
│ │ ├── code-quality-guard.md
│ │ ├── gemini-pr-automator.md
│ │ └── github-release-manager.md
│ ├── settings.local.json.backup
│ └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── performance_issue.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── bridge-tests.yml
│ ├── CACHE_FIX.md
│ ├── claude-code-review.yml
│ ├── claude.yml
│ ├── cleanup-images.yml.disabled
│ ├── dev-setup-validation.yml
│ ├── docker-publish.yml
│ ├── LATEST_FIXES.md
│ ├── main-optimized.yml.disabled
│ ├── main.yml
│ ├── publish-and-test.yml
│ ├── README_OPTIMIZATION.md
│ ├── release-tag.yml.disabled
│ ├── release.yml
│ ├── roadmap-review-reminder.yml
│ ├── SECRET_CONDITIONAL_FIX.md
│ └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│ ├── .gitignore
│ └── reports
│ └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│ ├── deployment
│ │ ├── deploy_fastmcp_fixed.sh
│ │ ├── deploy_http_with_mcp.sh
│ │ └── deploy_mcp_v4.sh
│ ├── deployment-configs
│ │ ├── empty_config.yml
│ │ └── smithery.yaml
│ ├── development
│ │ └── test_fastmcp.py
│ ├── docs-removed-2025-08-23
│ │ ├── authentication.md
│ │ ├── claude_integration.md
│ │ ├── claude-code-compatibility.md
│ │ ├── claude-code-integration.md
│ │ ├── claude-code-quickstart.md
│ │ ├── claude-desktop-setup.md
│ │ ├── complete-setup-guide.md
│ │ ├── database-synchronization.md
│ │ ├── development
│ │ │ ├── autonomous-memory-consolidation.md
│ │ │ ├── CLEANUP_PLAN.md
│ │ │ ├── CLEANUP_README.md
│ │ │ ├── CLEANUP_SUMMARY.md
│ │ │ ├── dream-inspired-memory-consolidation.md
│ │ │ ├── hybrid-slm-memory-consolidation.md
│ │ │ ├── mcp-milestone.md
│ │ │ ├── multi-client-architecture.md
│ │ │ ├── test-results.md
│ │ │ └── TIMESTAMP_FIX_SUMMARY.md
│ │ ├── distributed-sync.md
│ │ ├── invocation_guide.md
│ │ ├── macos-intel.md
│ │ ├── master-guide.md
│ │ ├── mcp-client-configuration.md
│ │ ├── multi-client-server.md
│ │ ├── service-installation.md
│ │ ├── sessions
│ │ │ └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│ │ ├── UBUNTU_SETUP.md
│ │ ├── ubuntu.md
│ │ ├── windows-setup.md
│ │ └── windows.md
│ ├── docs-root-cleanup-2025-08-23
│ │ ├── AWESOME_LIST_SUBMISSION.md
│ │ ├── CLOUDFLARE_IMPLEMENTATION.md
│ │ ├── DOCUMENTATION_ANALYSIS.md
│ │ ├── DOCUMENTATION_CLEANUP_PLAN.md
│ │ ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│ │ ├── LITESTREAM_SETUP_GUIDE.md
│ │ ├── lm_studio_system_prompt.md
│ │ ├── PYTORCH_DOWNLOAD_FIX.md
│ │ └── README-ORIGINAL-BACKUP.md
│ ├── investigations
│ │ └── MACOS_HOOKS_INVESTIGATION.md
│ ├── litestream-configs-v6.3.0
│ │ ├── install_service.sh
│ │ ├── litestream_master_config_fixed.yml
│ │ ├── litestream_master_config.yml
│ │ ├── litestream_replica_config_fixed.yml
│ │ ├── litestream_replica_config.yml
│ │ ├── litestream_replica_simple.yml
│ │ ├── litestream-http.service
│ │ ├── litestream.service
│ │ └── requirements-cloudflare.txt
│ ├── release-notes
│ │ └── release-notes-v7.1.4.md
│ └── setup-development
│ ├── README.md
│ ├── setup_consolidation_mdns.sh
│ ├── STARTUP_SETUP_GUIDE.md
│ └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│ ├── memory-context.md
│ ├── memory-health.md
│ ├── memory-ingest-dir.md
│ ├── memory-ingest.md
│ ├── memory-recall.md
│ ├── memory-search.md
│ ├── memory-store.md
│ ├── README.md
│ └── session-start.md
├── claude-hooks
│ ├── config.json
│ ├── config.template.json
│ ├── CONFIGURATION.md
│ ├── core
│ │ ├── memory-retrieval.js
│ │ ├── mid-conversation.js
│ │ ├── session-end.js
│ │ ├── session-start.js
│ │ └── topic-change.js
│ ├── debug-pattern-test.js
│ ├── install_claude_hooks_windows.ps1
│ ├── install_hooks.py
│ ├── memory-mode-controller.js
│ ├── MIGRATION.md
│ ├── README-NATURAL-TRIGGERS.md
│ ├── README-phase2.md
│ ├── README.md
│ ├── simple-test.js
│ ├── statusline.sh
│ ├── test-adaptive-weights.js
│ ├── test-dual-protocol-hook.js
│ ├── test-mcp-hook.js
│ ├── test-natural-triggers.js
│ ├── test-recency-scoring.js
│ ├── tests
│ │ ├── integration-test.js
│ │ ├── phase2-integration-test.js
│ │ ├── test-code-execution.js
│ │ ├── test-cross-session.json
│ │ ├── test-session-tracking.json
│ │ └── test-threading.json
│ ├── utilities
│ │ ├── adaptive-pattern-detector.js
│ │ ├── context-formatter.js
│ │ ├── context-shift-detector.js
│ │ ├── conversation-analyzer.js
│ │ ├── dynamic-context-updater.js
│ │ ├── git-analyzer.js
│ │ ├── mcp-client.js
│ │ ├── memory-client.js
│ │ ├── memory-scorer.js
│ │ ├── performance-manager.js
│ │ ├── project-detector.js
│ │ ├── session-tracker.js
│ │ ├── tiered-conversation-monitor.js
│ │ └── version-checker.js
│ └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│ ├── amp-cli-bridge.md
│ ├── api
│ │ ├── code-execution-interface.md
│ │ ├── memory-metadata-api.md
│ │ ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_REPORT.md
│ │ └── tag-standardization.md
│ ├── architecture
│ │ ├── search-enhancement-spec.md
│ │ └── search-examples.md
│ ├── architecture.md
│ ├── archive
│ │ └── obsolete-workflows
│ │ ├── load_memory_context.md
│ │ └── README.md
│ ├── assets
│ │ └── images
│ │ ├── dashboard-v3.3.0-preview.png
│ │ ├── memory-awareness-hooks-example.png
│ │ ├── project-infographic.svg
│ │ └── README.md
│ ├── CLAUDE_CODE_QUICK_REFERENCE.md
│ ├── cloudflare-setup.md
│ ├── deployment
│ │ ├── docker.md
│ │ ├── dual-service.md
│ │ ├── production-guide.md
│ │ └── systemd-service.md
│ ├── development
│ │ ├── ai-agent-instructions.md
│ │ ├── code-quality
│ │ │ ├── phase-2a-completion.md
│ │ │ ├── phase-2a-handle-get-prompt.md
│ │ │ ├── phase-2a-index.md
│ │ │ ├── phase-2a-install-package.md
│ │ │ └── phase-2b-session-summary.md
│ │ ├── code-quality-workflow.md
│ │ ├── dashboard-workflow.md
│ │ ├── issue-management.md
│ │ ├── pr-review-guide.md
│ │ ├── refactoring-notes.md
│ │ ├── release-checklist.md
│ │ └── todo-tracker.md
│ ├── docker-optimized-build.md
│ ├── document-ingestion.md
│ ├── DOCUMENTATION_AUDIT.md
│ ├── enhancement-roadmap-issue-14.md
│ ├── examples
│ │ ├── analysis-scripts.js
│ │ ├── maintenance-session-example.md
│ │ ├── memory-distribution-chart.jsx
│ │ └── tag-schema.json
│ ├── first-time-setup.md
│ ├── glama-deployment.md
│ ├── guides
│ │ ├── advanced-command-examples.md
│ │ ├── chromadb-migration.md
│ │ ├── commands-vs-mcp-server.md
│ │ ├── mcp-enhancements.md
│ │ ├── mdns-service-discovery.md
│ │ ├── memory-consolidation-guide.md
│ │ ├── migration.md
│ │ ├── scripts.md
│ │ └── STORAGE_BACKENDS.md
│ ├── HOOK_IMPROVEMENTS.md
│ ├── hooks
│ │ └── phase2-code-execution-migration.md
│ ├── http-server-management.md
│ ├── ide-compatability.md
│ ├── IMAGE_RETENTION_POLICY.md
│ ├── images
│ │ └── dashboard-placeholder.md
│ ├── implementation
│ │ ├── health_checks.md
│ │ └── performance.md
│ ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│ ├── integration
│ │ ├── homebrew.md
│ │ └── multi-client.md
│ ├── integrations
│ │ ├── gemini.md
│ │ ├── groq-bridge.md
│ │ ├── groq-integration-summary.md
│ │ └── groq-model-comparison.md
│ ├── integrations.md
│ ├── legacy
│ │ └── dual-protocol-hooks.md
│ ├── LM_STUDIO_COMPATIBILITY.md
│ ├── maintenance
│ │ └── memory-maintenance.md
│ ├── mastery
│ │ ├── api-reference.md
│ │ ├── architecture-overview.md
│ │ ├── configuration-guide.md
│ │ ├── local-setup-and-run.md
│ │ ├── testing-guide.md
│ │ └── troubleshooting.md
│ ├── migration
│ │ └── code-execution-api-quick-start.md
│ ├── natural-memory-triggers
│ │ ├── cli-reference.md
│ │ ├── installation-guide.md
│ │ └── performance-optimization.md
│ ├── oauth-setup.md
│ ├── pr-graphql-integration.md
│ ├── quick-setup-cloudflare-dual-environment.md
│ ├── README.md
│ ├── remote-configuration-wiki-section.md
│ ├── research
│ │ ├── code-execution-interface-implementation.md
│ │ └── code-execution-interface-summary.md
│ ├── ROADMAP.md
│ ├── sqlite-vec-backend.md
│ ├── statistics
│ │ ├── charts
│ │ │ ├── activity_patterns.png
│ │ │ ├── contributors.png
│ │ │ ├── growth_trajectory.png
│ │ │ ├── monthly_activity.png
│ │ │ └── october_sprint.png
│ │ ├── data
│ │ │ ├── activity_by_day.csv
│ │ │ ├── activity_by_hour.csv
│ │ │ ├── contributors.csv
│ │ │ └── monthly_activity.csv
│ │ ├── generate_charts.py
│ │ └── REPOSITORY_STATISTICS.md
│ ├── technical
│ │ ├── development.md
│ │ ├── memory-migration.md
│ │ ├── migration-log.md
│ │ ├── sqlite-vec-embedding-fixes.md
│ │ └── tag-storage.md
│ ├── testing
│ │ └── regression-tests.md
│ ├── testing-cloudflare-backend.md
│ ├── troubleshooting
│ │ ├── cloudflare-api-token-setup.md
│ │ ├── cloudflare-authentication.md
│ │ ├── general.md
│ │ ├── hooks-quick-reference.md
│ │ ├── pr162-schema-caching-issue.md
│ │ ├── session-end-hooks.md
│ │ └── sync-issues.md
│ └── tutorials
│ ├── advanced-techniques.md
│ ├── data-analysis.md
│ └── demo-session-walkthrough.md
├── examples
│ ├── claude_desktop_config_template.json
│ ├── claude_desktop_config_windows.json
│ ├── claude-desktop-http-config.json
│ ├── config
│ │ └── claude_desktop_config.json
│ ├── http-mcp-bridge.js
│ ├── memory_export_template.json
│ ├── README.md
│ ├── setup
│ │ └── setup_multi_client_complete.py
│ └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│ ├── .claude
│ │ └── settings.local.json
│ ├── archive
│ │ └── check_missing_timestamps.py
│ ├── backup
│ │ ├── backup_memories.py
│ │ ├── backup_sqlite_vec.sh
│ │ ├── export_distributable_memories.sh
│ │ └── restore_memories.py
│ ├── benchmarks
│ │ ├── benchmark_code_execution_api.py
│ │ ├── benchmark_hybrid_sync.py
│ │ └── benchmark_server_caching.py
│ ├── database
│ │ ├── analyze_sqlite_vec_db.py
│ │ ├── check_sqlite_vec_status.py
│ │ ├── db_health_check.py
│ │ └── simple_timestamp_check.py
│ ├── development
│ │ ├── debug_server_initialization.py
│ │ ├── find_orphaned_files.py
│ │ ├── fix_mdns.sh
│ │ ├── fix_sitecustomize.py
│ │ ├── remote_ingest.sh
│ │ ├── setup-git-merge-drivers.sh
│ │ ├── uv-lock-merge.sh
│ │ └── verify_hybrid_sync.py
│ ├── hooks
│ │ └── pre-commit
│ ├── installation
│ │ ├── install_linux_service.py
│ │ ├── install_macos_service.py
│ │ ├── install_uv.py
│ │ ├── install_windows_service.py
│ │ ├── install.py
│ │ ├── setup_backup_cron.sh
│ │ ├── setup_claude_mcp.sh
│ │ └── setup_cloudflare_resources.py
│ ├── linux
│ │ ├── service_status.sh
│ │ ├── start_service.sh
│ │ ├── stop_service.sh
│ │ ├── uninstall_service.sh
│ │ └── view_logs.sh
│ ├── maintenance
│ │ ├── assign_memory_types.py
│ │ ├── check_memory_types.py
│ │ ├── cleanup_corrupted_encoding.py
│ │ ├── cleanup_memories.py
│ │ ├── cleanup_organize.py
│ │ ├── consolidate_memory_types.py
│ │ ├── consolidation_mappings.json
│ │ ├── delete_orphaned_vectors_fixed.py
│ │ ├── fast_cleanup_duplicates_with_tracking.sh
│ │ ├── find_all_duplicates.py
│ │ ├── find_cloudflare_duplicates.py
│ │ ├── find_duplicates.py
│ │ ├── memory-types.md
│ │ ├── README.md
│ │ ├── recover_timestamps_from_cloudflare.py
│ │ ├── regenerate_embeddings.py
│ │ ├── repair_malformed_tags.py
│ │ ├── repair_memories.py
│ │ ├── repair_sqlite_vec_embeddings.py
│ │ ├── repair_zero_embeddings.py
│ │ ├── restore_from_json_export.py
│ │ └── scan_todos.sh
│ ├── migration
│ │ ├── cleanup_mcp_timestamps.py
│ │ ├── legacy
│ │ │ └── migrate_chroma_to_sqlite.py
│ │ ├── mcp-migration.py
│ │ ├── migrate_sqlite_vec_embeddings.py
│ │ ├── migrate_storage.py
│ │ ├── migrate_tags.py
│ │ ├── migrate_timestamps.py
│ │ ├── migrate_to_cloudflare.py
│ │ ├── migrate_to_sqlite_vec.py
│ │ ├── migrate_v5_enhanced.py
│ │ ├── TIMESTAMP_CLEANUP_README.md
│ │ └── verify_mcp_timestamps.py
│ ├── pr
│ │ ├── amp_collect_results.sh
│ │ ├── amp_detect_breaking_changes.sh
│ │ ├── amp_generate_tests.sh
│ │ ├── amp_pr_review.sh
│ │ ├── amp_quality_gate.sh
│ │ ├── amp_suggest_fixes.sh
│ │ ├── auto_review.sh
│ │ ├── detect_breaking_changes.sh
│ │ ├── generate_tests.sh
│ │ ├── lib
│ │ │ └── graphql_helpers.sh
│ │ ├── quality_gate.sh
│ │ ├── resolve_threads.sh
│ │ ├── run_pyscn_analysis.sh
│ │ ├── run_quality_checks.sh
│ │ ├── thread_status.sh
│ │ └── watch_reviews.sh
│ ├── quality
│ │ ├── fix_dead_code_install.sh
│ │ ├── phase1_dead_code_analysis.md
│ │ ├── phase2_complexity_analysis.md
│ │ ├── README_PHASE1.md
│ │ ├── README_PHASE2.md
│ │ ├── track_pyscn_metrics.sh
│ │ └── weekly_quality_review.sh
│ ├── README.md
│ ├── run
│ │ ├── run_mcp_memory.sh
│ │ ├── run-with-uv.sh
│ │ └── start_sqlite_vec.sh
│ ├── run_memory_server.py
│ ├── server
│ │ ├── check_http_server.py
│ │ ├── check_server_health.py
│ │ ├── memory_offline.py
│ │ ├── preload_models.py
│ │ ├── run_http_server.py
│ │ ├── run_memory_server.py
│ │ ├── start_http_server.bat
│ │ └── start_http_server.sh
│ ├── service
│ │ ├── deploy_dual_services.sh
│ │ ├── install_http_service.sh
│ │ ├── mcp-memory-http.service
│ │ ├── mcp-memory.service
│ │ ├── memory_service_manager.sh
│ │ ├── service_control.sh
│ │ ├── service_utils.py
│ │ └── update_service.sh
│ ├── sync
│ │ ├── check_drift.py
│ │ ├── claude_sync_commands.py
│ │ ├── export_memories.py
│ │ ├── import_memories.py
│ │ ├── litestream
│ │ │ ├── apply_local_changes.sh
│ │ │ ├── enhanced_memory_store.sh
│ │ │ ├── init_staging_db.sh
│ │ │ ├── io.litestream.replication.plist
│ │ │ ├── manual_sync.sh
│ │ │ ├── memory_sync.sh
│ │ │ ├── pull_remote_changes.sh
│ │ │ ├── push_to_remote.sh
│ │ │ ├── README.md
│ │ │ ├── resolve_conflicts.sh
│ │ │ ├── setup_local_litestream.sh
│ │ │ ├── setup_remote_litestream.sh
│ │ │ ├── staging_db_init.sql
│ │ │ ├── stash_local_changes.sh
│ │ │ ├── sync_from_remote_noconfig.sh
│ │ │ └── sync_from_remote.sh
│ │ ├── README.md
│ │ ├── safe_cloudflare_update.sh
│ │ ├── sync_memory_backends.py
│ │ └── sync_now.py
│ ├── testing
│ │ ├── run_complete_test.py
│ │ ├── run_memory_test.sh
│ │ ├── simple_test.py
│ │ ├── test_cleanup_logic.py
│ │ ├── test_cloudflare_backend.py
│ │ ├── test_docker_functionality.py
│ │ ├── test_installation.py
│ │ ├── test_mdns.py
│ │ ├── test_memory_api.py
│ │ ├── test_memory_simple.py
│ │ ├── test_migration.py
│ │ ├── test_search_api.py
│ │ ├── test_sqlite_vec_embeddings.py
│ │ ├── test_sse_events.py
│ │ ├── test-connection.py
│ │ └── test-hook.js
│ ├── utils
│ │ ├── claude_commands_utils.py
│ │ ├── generate_personalized_claude_md.sh
│ │ ├── groq
│ │ ├── groq_agent_bridge.py
│ │ ├── list-collections.py
│ │ ├── memory_wrapper_uv.py
│ │ ├── query_memories.py
│ │ ├── smithery_wrapper.py
│ │ ├── test_groq_bridge.sh
│ │ └── uv_wrapper.py
│ └── validation
│ ├── check_dev_setup.py
│ ├── check_documentation_links.py
│ ├── diagnose_backend_config.py
│ ├── validate_configuration_complete.py
│ ├── validate_memories.py
│ ├── validate_migration.py
│ ├── validate_timestamp_integrity.py
│ ├── verify_environment.py
│ ├── verify_pytorch_windows.py
│ └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│ └── mcp_memory_service
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── operations.py
│ │ ├── sync_wrapper.py
│ │ └── types.py
│ ├── backup
│ │ ├── __init__.py
│ │ └── scheduler.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── ingestion.py
│ │ ├── main.py
│ │ └── utils.py
│ ├── config.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── associations.py
│ │ ├── base.py
│ │ ├── clustering.py
│ │ ├── compression.py
│ │ ├── consolidator.py
│ │ ├── decay.py
│ │ ├── forgetting.py
│ │ ├── health.py
│ │ └── scheduler.py
│ ├── dependency_check.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mdns_service.py
│ ├── embeddings
│ │ ├── __init__.py
│ │ └── onnx_embeddings.py
│ ├── ingestion
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chunker.py
│ │ ├── csv_loader.py
│ │ ├── json_loader.py
│ │ ├── pdf_loader.py
│ │ ├── registry.py
│ │ ├── semtools_loader.py
│ │ └── text_loader.py
│ ├── lm_studio_compat.py
│ ├── mcp_server.py
│ ├── models
│ │ ├── __init__.py
│ │ └── memory.py
│ ├── server.py
│ ├── services
│ │ ├── __init__.py
│ │ └── memory_service.py
│ ├── storage
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloudflare.py
│ │ ├── factory.py
│ │ ├── http_client.py
│ │ ├── hybrid.py
│ │ └── sqlite_vec.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── exporter.py
│ │ ├── importer.py
│ │ └── litestream_config.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── content_splitter.py
│ │ ├── db_utils.py
│ │ ├── debug.py
│ │ ├── document_processing.py
│ │ ├── gpu_detection.py
│ │ ├── hashing.py
│ │ ├── http_server_manager.py
│ │ ├── port_detection.py
│ │ ├── system_detection.py
│ │ └── time_parser.py
│ └── web
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── analytics.py
│ │ ├── backup.py
│ │ ├── consolidation.py
│ │ ├── documents.py
│ │ ├── events.py
│ │ ├── health.py
│ │ ├── manage.py
│ │ ├── mcp.py
│ │ ├── memories.py
│ │ ├── search.py
│ │ └── sync.py
│ ├── app.py
│ ├── dependencies.py
│ ├── oauth
│ │ ├── __init__.py
│ │ ├── authorization.py
│ │ ├── discovery.py
│ │ ├── middleware.py
│ │ ├── models.py
│ │ ├── registration.py
│ │ └── storage.py
│ ├── sse.py
│ └── static
│ ├── app.js
│ ├── index.html
│ ├── README.md
│ ├── sse_test.html
│ └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── test_compact_types.py
│ │ └── test_operations.py
│ ├── bridge
│ │ ├── mock_responses.js
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ └── test_http_mcp_bridge.js
│ ├── conftest.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_associations.py
│ │ ├── test_clustering.py
│ │ ├── test_compression.py
│ │ ├── test_consolidator.py
│ │ ├── test_decay.py
│ │ └── test_forgetting.py
│ ├── contracts
│ │ └── api-specification.yml
│ ├── integration
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── test_api_key_fallback.py
│ │ ├── test_api_memories_chronological.py
│ │ ├── test_api_tag_time_search.py
│ │ ├── test_api_with_memory_service.py
│ │ ├── test_bridge_integration.js
│ │ ├── test_cli_interfaces.py
│ │ ├── test_cloudflare_connection.py
│ │ ├── test_concurrent_clients.py
│ │ ├── test_data_serialization_consistency.py
│ │ ├── test_http_server_startup.py
│ │ ├── test_mcp_memory.py
│ │ ├── test_mdns_integration.py
│ │ ├── test_oauth_basic_auth.py
│ │ ├── test_oauth_flow.py
│ │ ├── test_server_handlers.py
│ │ └── test_store_memory.py
│ ├── performance
│ │ ├── test_background_sync.py
│ │ └── test_hybrid_live.py
│ ├── README.md
│ ├── smithery
│ │ └── test_smithery.py
│ ├── sqlite
│ │ └── simple_sqlite_vec_test.py
│ ├── test_client.py
│ ├── test_content_splitting.py
│ ├── test_database.py
│ ├── test_hybrid_cloudflare_limits.py
│ ├── test_hybrid_storage.py
│ ├── test_memory_ops.py
│ ├── test_semantic_search.py
│ ├── test_sqlite_vec_storage.py
│ ├── test_time_parser.py
│ ├── test_timestamp_preservation.py
│ ├── timestamp
│ │ ├── test_hook_vs_manual_storage.py
│ │ ├── test_issue99_final_validation.py
│ │ ├── test_search_retrieval_inconsistency.py
│ │ ├── test_timestamp_issue.py
│ │ └── test_timestamp_simple.py
│ └── unit
│ ├── conftest.py
│ ├── test_cloudflare_storage.py
│ ├── test_csv_loader.py
│ ├── test_fastapi_dependencies.py
│ ├── test_import.py
│ ├── test_json_loader.py
│ ├── test_mdns_simple.py
│ ├── test_mdns.py
│ ├── test_memory_service.py
│ ├── test_memory.py
│ ├── test_semtools_loader.py
│ ├── test_storage_interface_compatibility.py
│ └── test_tag_time_filtering.py
├── tools
│ ├── docker
│ │ ├── DEPRECATED.md
│ │ ├── docker-compose.http.yml
│ │ ├── docker-compose.pythonpath.yml
│ │ ├── docker-compose.standalone.yml
│ │ ├── docker-compose.uv.yml
│ │ ├── docker-compose.yml
│ │ ├── docker-entrypoint-persistent.sh
│ │ ├── docker-entrypoint-unified.sh
│ │ ├── docker-entrypoint.sh
│ │ ├── Dockerfile
│ │ ├── Dockerfile.glama
│ │ ├── Dockerfile.slim
│ │ ├── README.md
│ │ └── test-docker-modes.sh
│ └── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/claude-hooks/utilities/context-formatter.js:
--------------------------------------------------------------------------------
```javascript
/**
* Context Formatting Utility
* Formats memories for injection into Claude Code sessions
*/
/**
* Detect if running in Claude Code CLI environment
*/
function isCLIEnvironment() {
// Check for Claude Code specific environment indicators
return process.env.CLAUDE_CODE_CLI === 'true' ||
process.env.TERM_PROGRAM === 'claude-code' ||
process.argv.some(arg => arg.includes('claude')) ||
(process.stdout.isTTY === false); // Explicitly check for non-TTY contexts
}
/**
* ANSI Color codes for CLI formatting
*/
const COLORS = {
RESET: '\x1b[0m',
BRIGHT: '\x1b[1m',
DIM: '\x1b[2m',
CYAN: '\x1b[36m',
GREEN: '\x1b[32m',
BLUE: '\x1b[34m',
YELLOW: '\x1b[33m',
MAGENTA: '\x1b[35m',
GRAY: '\x1b[90m'
};
/**
* Convert markdown formatting to ANSI color codes for terminal display
* Provides clean, formatted output without raw markdown syntax
*/
function convertMarkdownToANSI(text, options = {}) {
const {
stripOnly = false, // If true, only strip markdown without adding ANSI
preserveStructure = true // If true, maintain line breaks and spacing
} = options;
if (!text || typeof text !== 'string') {
return text;
}
// Check if markdown conversion is disabled via environment
if (process.env.CLAUDE_MARKDOWN_TO_ANSI === 'false') {
return text;
}
let processed = text;
// Process headers (must be done before other replacements)
// H1: # Header -> Bold Cyan
processed = processed.replace(/^#\s+(.+)$/gm, (match, content) => {
return stripOnly ? content : `${COLORS.BRIGHT}${COLORS.CYAN}${content}${COLORS.RESET}`;
});
// H2: ## Header -> Bold Cyan (slightly different from H1 in real terminal apps)
processed = processed.replace(/^##\s+(.+)$/gm, (match, content) => {
return stripOnly ? content : `${COLORS.BRIGHT}${COLORS.CYAN}${content}${COLORS.RESET}`;
});
// H3: ### Header -> Bold
processed = processed.replace(/^###\s+(.+)$/gm, (match, content) => {
return stripOnly ? content : `${COLORS.BRIGHT}${content}${COLORS.RESET}`;
});
// H4-H6: #### Header -> Bold (but could be differentiated if needed)
processed = processed.replace(/^#{4,6}\s+(.+)$/gm, (match, content) => {
return stripOnly ? content : `${COLORS.BRIGHT}${content}${COLORS.RESET}`;
});
// Bold text: **text** or __text__
processed = processed.replace(/\*\*([^*]+)\*\*/g, (match, content) => {
return stripOnly ? content : `${COLORS.BRIGHT}${content}${COLORS.RESET}`;
});
processed = processed.replace(/__([^_]+)__/g, (match, content) => {
return stripOnly ? content : `${COLORS.BRIGHT}${content}${COLORS.RESET}`;
});
// Code blocks MUST be processed before inline code to avoid conflicts
// Code blocks: ```language\ncode\n```
processed = processed.replace(/```(\w*)\n?([\s\S]*?)```/g, (match, lang, content) => {
if (stripOnly) {
return content.trim();
}
const lines = content.trim().split('\n').map(line =>
`${COLORS.GRAY}${line}${COLORS.RESET}`
);
return lines.join('\n');
});
// Italic text: *text* or _text_ (avoiding URLs and bold syntax)
// More conservative pattern to avoid matching within URLs
processed = processed.replace(/(?<!\*)\*(?!\*)([^*\n]+)(?<!\*)\*(?!\*)/g, (match, content) => {
return stripOnly ? content : `${COLORS.DIM}${content}${COLORS.RESET}`;
});
processed = processed.replace(/(?<!_)_(?!_)([^_\n]+)(?<!_)_(?!_)/g, (match, content) => {
return stripOnly ? content : `${COLORS.DIM}${content}${COLORS.RESET}`;
});
// Inline code: `code` (after code blocks to avoid matching backticks in blocks)
processed = processed.replace(/`([^`]+)`/g, (match, content) => {
return stripOnly ? content : `${COLORS.GRAY}${content}${COLORS.RESET}`;
});
// Lists: Convert markdown bullets to better symbols
// Unordered lists: - item or * item
processed = processed.replace(/^[\s]*[-*]\s+(.+)$/gm, (match, content) => {
return stripOnly ? content : ` ${COLORS.CYAN}•${COLORS.RESET} ${content}`;
});
// Ordered lists: 1. item
processed = processed.replace(/^[\s]*\d+\.\s+(.+)$/gm, (match, content) => {
return stripOnly ? content : ` ${COLORS.CYAN}›${COLORS.RESET} ${content}`;
});
// Links: [text](url) - process before blockquotes so links in quotes work
processed = processed.replace(/\[([^\]]+)\]\(([^)]+)\)/g, (match, text, url) => {
return stripOnly ? text : `${COLORS.CYAN}${text}${COLORS.RESET}`;
});
// Blockquotes: > quote
processed = processed.replace(/^>\s+(.+)$/gm, (match, content) => {
return stripOnly ? content : `${COLORS.DIM}│ ${content}${COLORS.RESET}`;
});
// Horizontal rules: --- or *** or ___
processed = processed.replace(/^[-*_]{3,}$/gm, () => {
return stripOnly ? '' : `${COLORS.DIM}${'─'.repeat(40)}${COLORS.RESET}`;
});
// Clean up any double resets or color artifacts
processed = processed.replace(/(\x1b\[0m)+/g, COLORS.RESET);
return processed;
}
/**
* Wrap text to specified width while preserving words and indentation
*/
function wrapText(text, maxWidth = 80, indent = 0, treePrefix = '') {
const indentStr = ' '.repeat(indent);
const effectiveWidth = maxWidth - indent;
// Strip ANSI codes for accurate width calculation
const stripAnsi = (str) => str.replace(/\x1b\[[0-9;]*m/g, '');
// Remove pre-existing newlines to consolidate text into single line
// This prevents embedded newlines from breaking tree structure
const normalizedText = text.replace(/\n/g, ' ').replace(/\s{2,}/g, ' ').trim();
const textStripped = stripAnsi(normalizedText);
if (textStripped.length <= effectiveWidth) {
return [normalizedText];
}
const words = normalizedText.split(/\s+/); // Split on whitespace
const lines = [];
let currentLine = '';
for (const word of words) {
const testLine = currentLine ? currentLine + ' ' + word : word;
const testLineStripped = stripAnsi(testLine);
if (testLineStripped.length <= effectiveWidth) {
currentLine = testLine;
} else if (currentLine) {
lines.push(currentLine);
currentLine = word;
} else {
// Single word longer than line width, force break
const effectiveWordWidth = stripAnsi(word).length;
if (effectiveWordWidth > effectiveWidth) {
lines.push(word.substring(0, effectiveWidth));
currentLine = word.substring(effectiveWidth);
} else {
currentLine = word;
}
}
}
if (currentLine) {
lines.push(currentLine);
}
// Apply tree prefix to continuation lines (not just spaces)
return lines.map((line, idx) => (idx === 0 ? line : treePrefix + indentStr + line));
}
/**
* Format memories for CLI environment with enhanced visual formatting
*/
function formatMemoriesForCLI(memories, projectContext, options = {}) {
const {
includeProjectSummary = true,
maxMemories = 8,
includeTimestamp = true,
maxContentLengthCLI = 400,
maxContentLengthCategorized = 350,
storageInfo = null,
adaptiveTruncation = true,
contentLengthConfig = null
} = options;
if (!memories || memories.length === 0) {
return `\n${COLORS.CYAN}╭────────────────────────────────────────────────────────────────────────────────╮${COLORS.RESET}\n${COLORS.CYAN}│${COLORS.RESET} 🧠 ${COLORS.BRIGHT}Memory Context${COLORS.RESET} ${COLORS.CYAN}│${COLORS.RESET}\n${COLORS.CYAN}╰────────────────────────────────────────────────────────────────────────────────╯${COLORS.RESET}\n${COLORS.CYAN}┌─${COLORS.RESET} ${COLORS.GRAY}No relevant memories found for this session.${COLORS.RESET}\n`;
}
// Determine adaptive content length based on memory count
const estimatedMemoryCount = Math.min(memories.length, maxMemories);
let adaptiveContentLength = maxContentLengthCLI;
if (adaptiveTruncation && contentLengthConfig) {
if (estimatedMemoryCount >= 5) {
adaptiveContentLength = contentLengthConfig.manyMemories || 300;
} else if (estimatedMemoryCount >= 3) {
adaptiveContentLength = contentLengthConfig.fewMemories || 500;
} else {
adaptiveContentLength = contentLengthConfig.veryFewMemories || 800;
}
}
// Filter out null/generic memories and limit number
const validMemories = [];
let memoryIndex = 0;
for (const memory of memories) {
if (validMemories.length >= maxMemories) break;
const formatted = formatMemoryForCLI(memory, memoryIndex, {
maxContentLength: adaptiveContentLength,
includeDate: includeTimestamp
});
if (formatted) {
validMemories.push({ memory, formatted });
memoryIndex++;
}
}
// Build unified tree structure (no separate decorative box)
let contextMessage = '';
// Add project summary in enhanced CLI format
if (includeProjectSummary && projectContext) {
const { name, frameworks, tools, branch, lastCommit } = projectContext;
const projectInfo = [];
if (name) projectInfo.push(name);
if (frameworks?.length) projectInfo.push(frameworks.slice(0, 2).join(', '));
if (tools?.length) projectInfo.push(tools.slice(0, 2).join(', '));
contextMessage += `\n${COLORS.CYAN}┌─${COLORS.RESET} 🧠 ${COLORS.BRIGHT}Injected Memory Context${COLORS.RESET} ${COLORS.DIM}→${COLORS.RESET} ${COLORS.BLUE}${projectInfo.join(', ')}${COLORS.RESET}\n`;
// Add storage information if available
if (storageInfo) {
const locationText = storageInfo.location.length > 40 ?
storageInfo.location.substring(0, 37) + '...' :
storageInfo.location;
// Show rich storage info if health data is available
if (storageInfo.health && storageInfo.health.totalMemories > 0) {
const memoryInfo = `${storageInfo.health.totalMemories} memories`;
contextMessage += `${COLORS.CYAN}│${COLORS.RESET}\n`;
contextMessage += `${COLORS.CYAN}├─${COLORS.RESET} ${storageInfo.icon} ${COLORS.BRIGHT}${storageInfo.description}${COLORS.RESET} ${COLORS.DIM}•${COLORS.RESET} ${COLORS.GRAY}${memoryInfo}${COLORS.RESET}\n`;
} else {
contextMessage += `${COLORS.CYAN}│${COLORS.RESET}\n`;
contextMessage += `${COLORS.CYAN}├─${COLORS.RESET} ${storageInfo.icon} ${COLORS.BRIGHT}${storageInfo.description}${COLORS.RESET}\n`;
}
contextMessage += `${COLORS.CYAN}├─${COLORS.RESET} 📍 ${COLORS.GRAY}${locationText}${COLORS.RESET}\n`;
}
contextMessage += `${COLORS.CYAN}├─${COLORS.RESET} 📚 ${COLORS.BRIGHT}${validMemories.length} memories loaded${COLORS.RESET}\n`;
if (branch || lastCommit) {
const gitInfo = [];
if (branch) gitInfo.push(`${COLORS.GREEN}${branch}${COLORS.RESET}`);
if (lastCommit) gitInfo.push(`${COLORS.GRAY}${lastCommit.substring(0, 7)}${COLORS.RESET}`);
contextMessage += `${COLORS.CYAN}│${COLORS.RESET}\n`;
}
} else {
contextMessage += `\n${COLORS.CYAN}┌─${COLORS.RESET} 🧠 ${COLORS.BRIGHT}Injected Memory Context${COLORS.RESET}\n`;
contextMessage += `${COLORS.CYAN}├─${COLORS.RESET} 📚 ${COLORS.BRIGHT}${validMemories.length} memories loaded${COLORS.RESET}\n`;
}
contextMessage += `${COLORS.CYAN}│${COLORS.RESET}\n`;
if (validMemories.length > 3) {
// Group by category with enhanced formatting
const categories = groupMemoriesByCategory(validMemories.map(v => v.memory));
const categoryInfo = {
'recent-work': { title: 'Recent Work', icon: '🔥', color: COLORS.GREEN },
'current-problems': { title: 'Current Problems', icon: '⚠️', color: COLORS.YELLOW },
'key-decisions': { title: 'Key Decisions', icon: '🎯', color: COLORS.CYAN },
'additional-context': { title: 'Additional Context', icon: '📋', color: COLORS.GRAY }
};
let hasContent = false;
let categoryCount = 0;
const totalCategories = Object.values(categories).filter(cat => cat.length > 0).length;
Object.entries(categories).forEach(([category, categoryMemories]) => {
if (categoryMemories.length > 0) {
categoryCount++;
const isLast = categoryCount === totalCategories;
const categoryIcon = categoryInfo[category]?.icon || '📝';
const categoryTitle = categoryInfo[category]?.title || 'Context';
const categoryColor = categoryInfo[category]?.color || COLORS.GRAY;
contextMessage += `${COLORS.CYAN}${isLast ? '└─' : '├─'}${COLORS.RESET} ${categoryIcon} ${categoryColor}${COLORS.BRIGHT}${categoryTitle}${COLORS.RESET}:\n`;
hasContent = true;
categoryMemories.forEach((memory, idx) => {
const formatted = formatMemoryForCLI(memory, 0, {
maxContentLength: maxContentLengthCategorized,
includeDate: includeTimestamp,
indent: true
});
if (formatted) {
const isLastMemory = idx === categoryMemories.length - 1;
const connector = isLast ? ' ' : `${COLORS.CYAN}│${COLORS.RESET} `;
const prefix = isLastMemory
? `${connector}${COLORS.CYAN}└─${COLORS.RESET} `
: `${connector}${COLORS.CYAN}├─${COLORS.RESET} `;
// Calculate tree prefix for continuation lines
let treePrefix;
if (isLastMemory) {
// Last memory in category - no vertical line after └─
treePrefix = isLast ? ' ' : connector;
} else {
// Not last memory - maintain vertical tree structure
treePrefix = isLast
? ` ${COLORS.CYAN}│${COLORS.RESET} `
: `${COLORS.CYAN}│${COLORS.RESET} ${COLORS.CYAN}│${COLORS.RESET} `;
}
// Wrap long content lines with tree prefix for continuation
const lines = wrapText(formatted, 70, 6, treePrefix);
// Output all lines (first line with prefix, continuation lines already have tree chars)
lines.forEach((line, lineIdx) => {
if (lineIdx === 0) {
contextMessage += `${prefix}${line}\n`;
} else {
contextMessage += `${line}\n`;
}
});
}
});
if (!isLast) contextMessage += `${COLORS.CYAN}│${COLORS.RESET}\n`;
}
});
if (!hasContent) {
// Fallback to linear format
validMemories.forEach(({ formatted }, idx) => {
const isLast = idx === validMemories.length - 1;
const connector = isLast ? ' ' : `${COLORS.CYAN}│${COLORS.RESET} `;
const lines = wrapText(formatted, 76, 3, connector);
// Output all lines (first with tree char, continuation with connector prefix)
lines.forEach((line, lineIdx) => {
if (lineIdx === 0) {
contextMessage += `${COLORS.CYAN}${isLast ? '└─' : '├─'}${COLORS.RESET} ${line}\n`;
} else {
contextMessage += `${line}\n`;
}
});
});
}
} else {
// Simple linear formatting with enhanced visual elements
validMemories.forEach(({ formatted }, idx) => {
const isLast = idx === validMemories.length - 1;
const connector = isLast ? ' ' : `${COLORS.CYAN}│${COLORS.RESET} `;
const lines = wrapText(formatted, 76, 3, connector);
// Output all lines (first with tree char, continuation with connector prefix)
lines.forEach((line, lineIdx) => {
if (lineIdx === 0) {
contextMessage += `${COLORS.CYAN}${isLast ? '└─' : '├─'}${COLORS.RESET} ${line}\n`;
} else {
contextMessage += `${line}\n`;
}
});
});
}
// Tree structure ends naturally with └─, no need for separate closing frame
return contextMessage;
}
/**
* Wrap text to fit within specified width while maintaining tree structure
*/
function wrapTextForTree(text, maxWidth = 80, indentPrefix = ' ') {
if (!text) return [];
// Remove ANSI codes for width calculation
const stripAnsi = (str) => str.replace(/\x1b\[[0-9;]*m/g, '');
const lines = [];
const words = text.split(/\s+/);
let currentLine = '';
for (const word of words) {
const testLine = currentLine ? `${currentLine} ${word}` : word;
const testLineStripped = stripAnsi(testLine);
if (testLineStripped.length <= maxWidth) {
currentLine = testLine;
} else {
if (currentLine) {
lines.push(currentLine);
}
currentLine = word;
}
}
if (currentLine) {
lines.push(currentLine);
}
return lines.length > 0 ? lines : [text];
}
/**
* Format individual memory for CLI with color coding and proper line wrapping
*/
function formatMemoryForCLI(memory, index, options = {}) {
try {
const {
maxContentLength = 400,
includeDate = true,
indent = false,
maxLineWidth = 70
} = options;
// Extract meaningful content with markdown conversion enabled for CLI
const content = extractMeaningfulContent(
memory.content || 'No content available',
maxContentLength,
{ convertMarkdown: true, stripMarkdown: false }
);
// Skip generic summaries
if (isGenericSessionSummary(memory.content)) {
return null;
}
// Format date with standardized recency indicators
let dateStr = '';
if (includeDate && memory.created_at_iso) {
const date = new Date(memory.created_at_iso);
const now = new Date();
const daysDiff = (now - date) / (1000 * 60 * 60 * 24);
if (daysDiff < 1) {
dateStr = ` ${COLORS.GREEN}🕒 today${COLORS.RESET}`;
} else if (daysDiff < 2) {
dateStr = ` ${COLORS.CYAN}📅 yesterday${COLORS.RESET}`;
} else if (daysDiff <= 7) {
const daysAgo = Math.floor(daysDiff);
dateStr = ` ${COLORS.CYAN}📅 ${daysAgo}d ago${COLORS.RESET}`;
} else if (daysDiff <= 30) {
const formattedDate = date.toLocaleDateString('en-US', { month: 'short', day: 'numeric' });
dateStr = ` ${COLORS.CYAN}📅 ${formattedDate}${COLORS.RESET}`;
} else {
const formattedDate = date.toLocaleDateString('en-US', { month: 'short', day: 'numeric' });
dateStr = ` ${COLORS.GRAY}📅 ${formattedDate}${COLORS.RESET}`;
}
}
// Determine content color based on memory type and recency
let contentColor = '';
let contentReset = COLORS.RESET;
// Prioritize recent memories with green tint
if (memory.created_at_iso) {
const daysDiff = (new Date() - new Date(memory.created_at_iso)) / (1000 * 60 * 60 * 24);
if (daysDiff < 7) {
// Recent memory - no special coloring, keep it prominent
contentColor = '';
}
}
// Apply type-based coloring only for non-recent memories
if (!contentColor) {
if (memory.memory_type === 'decision' || (memory.tags && memory.tags.some(tag => tag.includes('decision')))) {
contentColor = COLORS.DIM; // Subtle for decisions
} else if (memory.memory_type === 'insight') {
contentColor = COLORS.DIM;
} else if (memory.memory_type === 'bug-fix') {
contentColor = COLORS.DIM;
} else if (memory.memory_type === 'feature') {
contentColor = COLORS.DIM;
}
}
return `${contentColor}${content}${contentReset}${dateStr}`;
} catch (error) {
return `${COLORS.GRAY}[Error formatting memory: ${error.message}]${COLORS.RESET}`;
}
}
/**
* Extract meaningful content from session summaries and structured memories
*/
function extractMeaningfulContent(content, maxLength = 500, options = {}) {
if (!content || typeof content !== 'string') {
return 'No content available';
}
const {
convertMarkdown = isCLIEnvironment(), // Auto-convert in CLI mode
stripMarkdown = false // Just strip without ANSI colors
} = options;
// Sanitize content - remove embedded formatting characters that conflict with tree structure
let sanitizedContent = content
// Remove checkmarks and bullets
.replace(/[✅✓✔]/g, '')
.replace(/^[\s]*[•▪▫]\s*/gm, '')
// Remove list markers at start of lines
.replace(/^[\s]*[-*]\s*/gm, '')
// Remove embedded Date: lines from old session summaries
.replace(/\*\*Date\*\*:.*?\n/gi, '')
.replace(/^Date:\s*\n\s*\d{1,2}\.\d{1,2}\.(\d{2,4})?\s*/gim, '') // Multi-line: "Date:\n 9.11.2025"
.replace(/^Date:.*?\n/gim, '') // Single-line: "Date: 9.11.2025"
.replace(/^\d{1,2}\.\d{1,2}\.(\d{2,4})?\s*$/gim, '') // Standalone date lines
// Clean up multiple spaces
.replace(/\s{2,}/g, ' ')
// Remove markdown bold/italic
.replace(/\*\*([^*]+)\*\*/g, '$1')
.replace(/\*([^*]+)\*/g, '$1')
.replace(/__([^_]+)__/g, '$1')
.replace(/_([^_]+)_/g, '$1')
.trim();
// Check if this is a session summary with structured sections
if (sanitizedContent.includes('# Session Summary') || sanitizedContent.includes('## 🎯') || sanitizedContent.includes('## 🏛️') || sanitizedContent.includes('## 💡')) {
const sections = {
decisions: [],
insights: [],
codeChanges: [],
nextSteps: [],
topics: []
};
// Extract structured sections
const lines = sanitizedContent.split('\n');
let currentSection = null;
for (const line of lines) {
const trimmed = line.trim();
if (trimmed.includes('🏛️') && trimmed.includes('Decision')) {
currentSection = 'decisions';
continue;
} else if (trimmed.includes('💡') && (trimmed.includes('Insight') || trimmed.includes('Key'))) {
currentSection = 'insights';
continue;
} else if (trimmed.includes('💻') && trimmed.includes('Code')) {
currentSection = 'codeChanges';
continue;
} else if (trimmed.includes('📋') && trimmed.includes('Next')) {
currentSection = 'nextSteps';
continue;
} else if (trimmed.includes('🎯') && trimmed.includes('Topic')) {
currentSection = 'topics';
continue;
} else if (trimmed.startsWith('##') || trimmed.startsWith('#')) {
currentSection = null; // Reset on new major section
continue;
}
// Collect bullet points under current section
if (currentSection && trimmed.startsWith('- ') && trimmed.length > 2) {
const item = trimmed.substring(2).trim();
if (item.length > 5 && item !== 'implementation' && item !== '...') {
sections[currentSection].push(item);
}
}
}
// Build meaningful summary from extracted sections
const meaningfulParts = [];
if (sections.decisions.length > 0) {
meaningfulParts.push(`Decisions: ${sections.decisions.slice(0, 2).join('; ')}`);
}
if (sections.insights.length > 0) {
meaningfulParts.push(`Insights: ${sections.insights.slice(0, 2).join('; ')}`);
}
if (sections.codeChanges.length > 0) {
meaningfulParts.push(`Changes: ${sections.codeChanges.slice(0, 2).join('; ')}`);
}
if (sections.nextSteps.length > 0) {
meaningfulParts.push(`Next: ${sections.nextSteps.slice(0, 2).join('; ')}`);
}
if (meaningfulParts.length > 0) {
let extracted = meaningfulParts.join(' | ');
// Re-sanitize to remove any Date: patterns that survived section extraction
extracted = extracted
.replace(/Date:\s*\d{1,2}\.\d{1,2}\.(\d{2,4})?/gi, '') // Remove "Date: 9.11.2025"
.replace(/\d{1,2}\.\d{1,2}\.(\d{2,4})?/g, '') // Remove standalone dates
.replace(/\s{2,}/g, ' ') // Clean up multiple spaces
.trim();
const truncated = extracted.length > maxLength ? extracted.substring(0, maxLength - 3) + '...' : extracted;
// Apply markdown conversion if requested
if (convertMarkdown) {
return convertMarkdownToANSI(truncated, { stripOnly: stripMarkdown });
}
return truncated;
}
}
// For non-structured content, use sanitized version
let processedContent = sanitizedContent;
if (convertMarkdown) {
processedContent = convertMarkdownToANSI(sanitizedContent, { stripOnly: stripMarkdown });
}
// Smart first-sentence extraction for very short limits
if (maxLength < 400) {
// Try to get just the first 1-2 sentences
const sentenceMatch = processedContent.match(/^[^.!?]+[.!?]\s*[^.!?]+[.!?]?/);
if (sentenceMatch && sentenceMatch[0].length <= maxLength) {
return sentenceMatch[0].trim();
}
// Try just first sentence
const firstSentence = processedContent.match(/^[^.!?]+[.!?]/);
if (firstSentence && firstSentence[0].length <= maxLength) {
return firstSentence[0].trim();
}
}
// Then use smart truncation
if (processedContent.length <= maxLength) {
return processedContent;
}
// Try to find a good breaking point (sentence, paragraph, or code block)
const breakPoints = ['. ', '\n\n', '\n', '; '];
for (const breakPoint of breakPoints) {
const lastBreak = processedContent.lastIndexOf(breakPoint, maxLength - 3);
if (lastBreak > maxLength * 0.7) { // Only use if we keep at least 70% of desired length
return processedContent.substring(0, lastBreak + (breakPoint === '. ' ? 1 : 0)).trim();
}
}
// Fallback to hard truncation
return processedContent.substring(0, maxLength - 3).trim() + '...';
}
/**
* Check if memory content appears to be a generic/empty session summary
*/
function isGenericSessionSummary(content) {
if (!content || typeof content !== 'string') {
return true;
}
// Check for generic patterns
const genericPatterns = [
/## 🎯 Topics Discussed\s*-\s*implementation\s*-\s*\.\.\.?$/m,
/Topics Discussed.*implementation.*\.\.\..*$/s,
/Session Summary.*implementation.*\.\.\..*$/s
];
return genericPatterns.some(pattern => pattern.test(content));
}
/**
* Format a single memory for context display
*/
function formatMemory(memory, index = 0, options = {}) {
try {
const {
includeScore = false,
includeMetadata = false,
maxContentLength = 500,
includeDate = true,
showOnlyRelevantTags = true
} = options;
// Extract meaningful content using smart parsing
// For non-CLI, strip markdown without adding ANSI colors
const content = extractMeaningfulContent(
memory.content || 'No content available',
maxContentLength,
{ convertMarkdown: true, stripMarkdown: true }
);
// Skip generic/empty session summaries
if (isGenericSessionSummary(memory.content) && !includeScore) {
return null; // Signal to skip this memory
}
// Format date more concisely
let dateStr = '';
if (includeDate && memory.created_at_iso) {
const date = new Date(memory.created_at_iso);
dateStr = ` (${date.toLocaleDateString('en-US', { month: 'short', day: 'numeric' })})`;
}
// Build formatted memory
let formatted = `${index + 1}. ${content}${dateStr}`;
// Add only the most relevant tags
if (showOnlyRelevantTags && memory.tags && memory.tags.length > 0) {
const relevantTags = memory.tags.filter(tag => {
const tagLower = tag.toLowerCase();
return !tagLower.startsWith('source:') &&
!tagLower.startsWith('claude-code-session') &&
!tagLower.startsWith('session-consolidation') &&
tagLower !== 'claude-code' &&
tagLower !== 'auto-generated' &&
tagLower !== 'implementation' &&
tagLower.length > 2;
});
// Only show tags if they add meaningful context (max 3)
if (relevantTags.length > 0 && relevantTags.length <= 5) {
formatted += `\n Tags: ${relevantTags.slice(0, 3).join(', ')}`;
}
}
return formatted;
} catch (error) {
// Silently fail with error message to avoid noise
return `${index + 1}. [Error formatting memory: ${error.message}]`;
}
}
/**
* Deduplicate memories based on content similarity
*/
function deduplicateMemories(memories, options = {}) {
if (!Array.isArray(memories) || memories.length <= 1) {
return memories;
}
const deduplicated = [];
const seenContent = new Set();
// Sort by relevance score (highest first) and recency
const sorted = memories.sort((a, b) => {
const scoreA = a.relevanceScore || 0;
const scoreB = b.relevanceScore || 0;
if (scoreA !== scoreB) return scoreB - scoreA;
// If scores are equal, prefer more recent
const dateA = new Date(a.created_at_iso || 0);
const dateB = new Date(b.created_at_iso || 0);
return dateB - dateA;
});
for (const memory of sorted) {
const content = memory.content || '';
// Create a normalized version for comparison
let normalized = content.toLowerCase()
.replace(/# session summary.*?\n/gi, '') // Remove session headers
.replace(/\*\*date\*\*:.*?\n/gi, '') // Remove date lines
.replace(/\*\*project\*\*:.*?\n/gi, '') // Remove project lines
.replace(/\s+/g, ' ') // Normalize whitespace
.trim();
// Skip if content is too generic or already seen
if (normalized.length < 20 || isGenericSessionSummary(content)) {
continue;
}
// Check for substantial similarity
let isDuplicate = false;
for (const seenNormalized of seenContent) {
const similarity = calculateContentSimilarity(normalized, seenNormalized);
if (similarity > 0.8) { // 80% similarity threshold
isDuplicate = true;
break;
}
}
if (!isDuplicate) {
seenContent.add(normalized);
deduplicated.push(memory);
}
}
// Only log if in verbose mode (can be passed via options)
if (options?.verbose !== false && memories.length !== deduplicated.length) {
console.log(`[Context Formatter] Deduplicated ${memories.length} → ${deduplicated.length} memories`);
}
return deduplicated;
}
/**
* Calculate content similarity between two normalized strings
*/
function calculateContentSimilarity(str1, str2) {
if (!str1 || !str2) return 0;
if (str1 === str2) return 1;
// Use simple word overlap similarity
const words1 = new Set(str1.split(/\s+/).filter(w => w.length > 3));
const words2 = new Set(str2.split(/\s+/).filter(w => w.length > 3));
if (words1.size === 0 && words2.size === 0) return 1;
if (words1.size === 0 || words2.size === 0) return 0;
const intersection = new Set([...words1].filter(w => words2.has(w)));
const union = new Set([...words1, ...words2]);
return intersection.size / union.size;
}
/**
* Group memories by category for better organization
*/
function groupMemoriesByCategory(memories, options = {}) {
try {
// First deduplicate to remove redundant content
const deduplicated = deduplicateMemories(memories, options);
const categories = {
'recent-work': [],
'current-problems': [],
'key-decisions': [],
'additional-context': []
};
const now = new Date();
deduplicated.forEach(memory => {
const type = memory.memory_type?.toLowerCase() || 'other';
const tags = memory.tags || [];
const content = memory.content?.toLowerCase() || '';
// Check if memory is recent (within last week)
let isRecent = false;
if (memory.created_at_iso) {
const memDate = new Date(memory.created_at_iso);
const daysDiff = (now - memDate) / (1000 * 60 * 60 * 24);
isRecent = daysDiff <= 7;
}
// Detect current problems (issues, bugs, blockers, TODOs)
// Exclude session summaries which may mention fixes but aren't problems themselves
const isSessionType = type === 'session' || type === 'session-summary' ||
tags.some(tag => tag.toLowerCase() === 'session-summary');
const isProblem = !isSessionType && (
type === 'issue' || type === 'bug' || type === 'bug-fix' ||
tags.some(tag => ['issue', 'bug', 'blocked', 'todo', 'problem', 'blocker'].includes(tag.toLowerCase())) ||
content.includes('issue #') || content.includes('bug:') || content.includes('blocked')
);
// Detect key decisions (architecture, design, technical choices)
const isKeyDecision =
type === 'decision' || type === 'architecture' ||
tags.some(tag => ['decision', 'architecture', 'design', 'key-decisions', 'why'].includes(tag.toLowerCase())) ||
content.includes('decided to') || content.includes('architecture:');
// Categorize with priority: recent-work > current-problems > key-decisions > additional-context
if (isRecent && memory._gitContextType) {
// Git context memories from recent development
categories['recent-work'].push(memory);
} else if (isProblem) {
categories['current-problems'].push(memory);
} else if (isRecent) {
categories['recent-work'].push(memory);
} else if (isKeyDecision) {
categories['key-decisions'].push(memory);
} else {
categories['additional-context'].push(memory);
}
});
// Sort each category by creation date (newest first)
Object.keys(categories).forEach(category => {
categories[category].sort((a, b) => {
const dateA = a.created_at_iso ? new Date(a.created_at_iso) : new Date(0);
const dateB = b.created_at_iso ? new Date(b.created_at_iso) : new Date(0);
return dateB - dateA; // Newest first
});
});
return categories;
} catch (error) {
if (options?.verbose !== false) {
console.warn('[Context Formatter] Error grouping memories:', error.message);
}
return { 'additional-context': memories };
}
}
/**
* Create a context summary from project information
*/
function createProjectSummary(projectContext) {
try {
let summary = `**Project**: ${projectContext.name}`;
if (projectContext.language && projectContext.language !== 'Unknown') {
summary += ` (${projectContext.language})`;
}
if (projectContext.frameworks && projectContext.frameworks.length > 0) {
summary += `\n**Frameworks**: ${projectContext.frameworks.join(', ')}`;
}
if (projectContext.tools && projectContext.tools.length > 0) {
summary += `\n**Tools**: ${projectContext.tools.join(', ')}`;
}
if (projectContext.git && projectContext.git.isRepo) {
summary += `\n**Branch**: ${projectContext.git.branch || 'unknown'}`;
if (projectContext.git.lastCommit) {
summary += `\n**Last Commit**: ${projectContext.git.lastCommit}`;
}
}
return summary;
} catch (error) {
// Silently fail with fallback summary
return `**Project**: ${projectContext.name || 'Unknown Project'}`;
}
}
/**
* Format memories for Claude Code context injection
*/
function formatMemoriesForContext(memories, projectContext, options = {}) {
try {
// Use CLI formatting if in CLI environment
if (isCLIEnvironment()) {
return formatMemoriesForCLI(memories, projectContext, options);
}
const {
includeProjectSummary = true,
includeScore = false,
groupByCategory = true,
maxMemories = 8,
includeTimestamp = true,
maxContentLength = 500,
storageInfo = null
} = options;
if (!memories || memories.length === 0) {
return `## 📋 Memory Context\n\nNo relevant memories found for this session.\n`;
}
// Filter out null/generic memories and limit number
const validMemories = [];
let memoryIndex = 0;
for (const memory of memories) {
if (validMemories.length >= maxMemories) break;
const formatted = formatMemory(memory, memoryIndex, {
includeScore,
maxContentLength: maxContentLength,
includeDate: includeTimestamp,
showOnlyRelevantTags: true
});
if (formatted) { // formatMemory returns null for generic summaries
validMemories.push({ memory, formatted });
memoryIndex++;
}
}
if (validMemories.length === 0) {
return `## 📋 Memory Context\n\nNo meaningful memories found for this session (filtered out generic content).\n`;
}
// Start building context message
let contextMessage = '## 🧠 Memory Context Loaded\n\n';
// Add project summary
if (includeProjectSummary && projectContext) {
contextMessage += createProjectSummary(projectContext) + '\n\n';
}
// Add storage information
if (storageInfo) {
contextMessage += `**Storage**: ${storageInfo.description}`;
// Add health information if available
if (storageInfo.health && storageInfo.health.totalMemories > 0) {
const memoryCount = storageInfo.health.totalMemories;
const dbSize = storageInfo.health.databaseSizeMB;
const uniqueTags = storageInfo.health.uniqueTags;
contextMessage += ` - ${memoryCount} memories`;
if (dbSize > 0) contextMessage += `, ${dbSize}MB`;
if (uniqueTags > 0) contextMessage += `, ${uniqueTags} unique tags`;
}
contextMessage += '\n';
if (storageInfo.location && !storageInfo.location.includes('Configuration Error') && !storageInfo.location.includes('Health parse error')) {
contextMessage += `**Location**: \`${storageInfo.location}\`\n`;
}
if (storageInfo.health && storageInfo.health.embeddingModel && storageInfo.health.embeddingModel !== 'Unknown') {
contextMessage += `**Embedding Model**: ${storageInfo.health.embeddingModel}\n`;
}
contextMessage += '\n';
}
contextMessage += `**Loaded ${validMemories.length} relevant memories from your project history:**\n\n`;
if (groupByCategory && validMemories.length > 3) {
// Group and format by category only if we have enough content
const categories = groupMemoriesByCategory(validMemories.map(v => v.memory));
const categoryTitles = {
gitContext: '### ⚡ Current Development (Git Context)',
recent: '### 🕒 Recent Work (Last Week)',
decisions: '### 🎯 Key Decisions',
architecture: '### 🏗️ Architecture & Design',
insights: '### 💡 Insights & Learnings',
bugs: '### 🐛 Bug Fixes & Issues',
features: '### ✨ Features & Implementation',
other: '### 📝 Additional Context'
};
let hasContent = false;
Object.entries(categories).forEach(([category, categoryMemories]) => {
if (categoryMemories.length > 0) {
contextMessage += `${categoryTitles[category]}\n`;
hasContent = true;
categoryMemories.forEach((memory, index) => {
const formatted = formatMemory(memory, index, {
includeScore,
maxContentLength: maxContentLength,
includeDate: includeTimestamp,
showOnlyRelevantTags: true
});
if (formatted) {
contextMessage += `${formatted}\n\n`;
}
});
}
});
if (!hasContent) {
// Fallback to linear format
validMemories.forEach(({ formatted }) => {
contextMessage += `${formatted}\n\n`;
});
}
} else {
// Simple linear formatting for small lists
validMemories.forEach(({ formatted }) => {
contextMessage += `${formatted}\n\n`;
});
}
// Add concise footer
contextMessage += '---\n';
contextMessage += '*This context was automatically loaded based on your project and recent activities. ';
contextMessage += 'Use this information to maintain continuity with your previous work and decisions.*';
return contextMessage;
} catch (error) {
// Return error context without logging to avoid noise
return `## 📋 Memory Context\n\n*Error loading context: ${error.message}*\n`;
}
}
/**
* Format memory for session-end consolidation
*/
function formatSessionConsolidation(sessionData, projectContext) {
try {
const timestamp = new Date().toISOString();
let consolidation = `# Session Summary - ${projectContext.name}\n`;
consolidation += `**Project**: ${projectContext.name} (${projectContext.language})\n\n`;
if (sessionData.topics && sessionData.topics.length > 0) {
consolidation += `## 🎯 Topics Discussed\n`;
sessionData.topics.forEach(topic => {
consolidation += `- ${topic}\n`;
});
consolidation += '\n';
}
if (sessionData.decisions && sessionData.decisions.length > 0) {
consolidation += `## 🏛️ Decisions Made\n`;
sessionData.decisions.forEach(decision => {
consolidation += `- ${decision}\n`;
});
consolidation += '\n';
}
if (sessionData.insights && sessionData.insights.length > 0) {
consolidation += `## 💡 Key Insights\n`;
sessionData.insights.forEach(insight => {
consolidation += `- ${insight}\n`;
});
consolidation += '\n';
}
if (sessionData.codeChanges && sessionData.codeChanges.length > 0) {
consolidation += `## 💻 Code Changes\n`;
sessionData.codeChanges.forEach(change => {
consolidation += `- ${change}\n`;
});
consolidation += '\n';
}
if (sessionData.nextSteps && sessionData.nextSteps.length > 0) {
consolidation += `## 📋 Next Steps\n`;
sessionData.nextSteps.forEach(step => {
consolidation += `- ${step}\n`;
});
consolidation += '\n';
}
consolidation += `---\n*Session captured by Claude Code Memory Awareness at ${timestamp}*`;
return consolidation;
} catch (error) {
// Return error without logging to avoid noise
return `Session Summary Error: ${error.message}`;
}
}
module.exports = {
formatMemoriesForContext,
formatMemoriesForCLI,
formatMemory,
formatMemoryForCLI,
groupMemoriesByCategory,
createProjectSummary,
formatSessionConsolidation,
isCLIEnvironment,
convertMarkdownToANSI
};
// Direct execution support for testing
if (require.main === module) {
// Test with mock data
const mockMemories = [
{
content: 'Decided to use SQLite-vec for better performance, 10x faster than ChromaDB',
tags: ['mcp-memory-service', 'decision', 'sqlite-vec', 'performance'],
memory_type: 'decision',
created_at_iso: '2025-08-19T10:00:00Z',
relevanceScore: 0.95
},
{
content: 'Implemented Claude Code hooks system for automatic memory awareness. Created session-start, session-end, and topic-change hooks.',
tags: ['claude-code', 'hooks', 'architecture', 'memory-awareness'],
memory_type: 'architecture',
created_at_iso: '2025-08-19T09:30:00Z',
relevanceScore: 0.87
},
{
content: 'Fixed critical bug in project detector - was not handling pyproject.toml files correctly',
tags: ['bug-fix', 'project-detector', 'python'],
memory_type: 'bug-fix',
created_at_iso: '2025-08-18T15:30:00Z',
relevanceScore: 0.72
},
{
content: 'Added new feature: Claude Code hooks with session lifecycle management',
tags: ['feature', 'claude-code', 'hooks'],
memory_type: 'feature',
created_at_iso: '2025-08-17T12:00:00Z',
relevanceScore: 0.85
},
{
content: 'Key insight: Memory deduplication prevents information overload in context',
tags: ['insight', 'memory-management', 'optimization'],
memory_type: 'insight',
created_at_iso: '2025-08-16T14:00:00Z',
relevanceScore: 0.78
}
];
const mockProjectContext = {
name: 'mcp-memory-service',
language: 'JavaScript',
frameworks: ['Node.js'],
tools: ['npm'],
branch: 'main',
lastCommit: 'cdabc9a feat: enhance deduplication script'
};
console.log('\n=== CONTEXT FORMATTING TEST ===');
const formatted = formatMemoriesForContext(mockMemories, mockProjectContext, {
includeScore: true,
groupByCategory: true
});
console.log(formatted);
console.log('\n=== END TEST ===');
}
```
--------------------------------------------------------------------------------
/scripts/quality/phase2_complexity_analysis.md:
--------------------------------------------------------------------------------
```markdown
# Issue #240 Phase 2: Low-Hanging Complexity Reductions
## Executive Summary
**Current State:**
- Overall Health: 63/100 (Grade C)
- Cyclomatic Complexity Score: 40/100
- Average complexity: 9.5
- High-risk functions (>7): 28 functions
- Maximum complexity: 62 (install.py::main)
**Phase 2 Goals:**
- Target functions: 5 main targets (complexity 10-15) + 5 quick wins
- Target complexity improvement: +10-15 points (40 → 50-55)
- Expected overall health improvement: +3 points (63 → 66-68)
- Strategy: Extract methods, guard clauses, dict lookups (no architectural changes)
**Total Estimated Effort:** 12-15 hours
**Functions Analyzed:** 5 target functions + 5 quick wins
---
## Target Function 1: install.py::configure_paths() (Complexity: 15)
### Current Implementation
**Purpose:** Configure storage paths for memory service based on platform and backend type.
**Location:** Lines 1287-1445 (158 lines)
### Complexity Breakdown
```
Lines 1287-1306: +3 complexity (platform-specific path detection)
Lines 1306-1347: +5 complexity (storage backend conditional setup)
Lines 1349-1358: +2 complexity (backup directory test with error handling)
Lines 1359-1443: +5 complexity (Claude Desktop config update nested logic)
Total Base: 15
```
**Primary Contributors:**
1. Platform detection branching (macOS/Windows/Linux) - 3 branches
2. Storage backend type branching (sqlite_vec/hybrid/cloudflare/chromadb) - 4 branches
3. Nested Claude config file discovery and JSON manipulation
4. Error handling for directory creation and file operations
### Refactoring Proposal #1: Extract Platform Path Detection
**Risk:** Low | **Impact:** -3 complexity | **Time:** 1 hour
**Before:**
```python
def configure_paths(args):
print_step("4", "Configuring paths")
system_info = detect_system()
home_dir = Path.home()
# Determine base directory based on platform
if platform.system() == 'Darwin': # macOS
base_dir = home_dir / 'Library' / 'Application Support' / 'mcp-memory'
elif platform.system() == 'Windows': # Windows
base_dir = Path(os.environ.get('LOCALAPPDATA', '')) / 'mcp-memory'
else: # Linux and others
base_dir = home_dir / '.local' / 'share' / 'mcp-memory'
storage_backend = os.environ.get('MCP_MEMORY_STORAGE_BACKEND', 'chromadb')
...
```
**After:**
```python
def get_platform_base_dir() -> Path:
"""Get platform-specific base directory for MCP Memory storage.
Returns:
Path: Platform-appropriate base directory
"""
home_dir = Path.home()
PLATFORM_PATHS = {
'Darwin': home_dir / 'Library' / 'Application Support' / 'mcp-memory',
'Windows': Path(os.environ.get('LOCALAPPDATA', '')) / 'mcp-memory',
}
system = platform.system()
return PLATFORM_PATHS.get(system, home_dir / '.local' / 'share' / 'mcp-memory')
def configure_paths(args):
print_step("4", "Configuring paths")
system_info = detect_system()
base_dir = get_platform_base_dir()
storage_backend = os.environ.get('MCP_MEMORY_STORAGE_BACKEND', 'chromadb')
...
```
**Complexity Impact:** 15 → 12 (-3)
- Removes platform branching from main function
- Uses dict lookup instead of if/elif/else chain
### Refactoring Proposal #2: Extract Storage Path Setup
**Risk:** Low | **Impact:** -4 complexity | **Time:** 1.5 hours
**Before:**
```python
def configure_paths(args):
...
if storage_backend in ['sqlite_vec', 'hybrid', 'cloudflare']:
storage_path = args.chroma_path or (base_dir / 'sqlite_vec.db')
storage_dir = storage_path.parent if storage_path.name.endswith('.db') else storage_path
backups_path = args.backups_path or (base_dir / 'backups')
try:
os.makedirs(storage_dir, exist_ok=True)
os.makedirs(backups_path, exist_ok=True)
print_info(f"SQLite-vec database: {storage_path}")
print_info(f"Backups path: {backups_path}")
# Test if directory is writable
test_file = os.path.join(storage_dir, '.write_test')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
except Exception as e:
print_error(f"Failed to configure SQLite-vec paths: {e}")
return False
else:
chroma_path = args.chroma_path or (base_dir / 'chroma_db')
backups_path = args.backups_path or (base_dir / 'backups')
storage_path = chroma_path
...
```
**After:**
```python
def setup_storage_directories(backend: str, base_dir: Path, args) -> Tuple[Path, Path, bool]:
"""Setup storage and backup directories for the specified backend.
Args:
backend: Storage backend type
base_dir: Base directory for storage
args: Command line arguments
Returns:
Tuple of (storage_path, backups_path, success)
"""
if backend in ['sqlite_vec', 'hybrid', 'cloudflare']:
storage_path = args.chroma_path or (base_dir / 'sqlite_vec.db')
storage_dir = storage_path.parent if storage_path.name.endswith('.db') else storage_path
else: # chromadb
storage_path = args.chroma_path or (base_dir / 'chroma_db')
storage_dir = storage_path
backups_path = args.backups_path or (base_dir / 'backups')
try:
os.makedirs(storage_dir, exist_ok=True)
os.makedirs(backups_path, exist_ok=True)
# Test writability
test_file = storage_dir / '.write_test'
test_file.write_text('test')
test_file.unlink()
print_info(f"Storage path: {storage_path}")
print_info(f"Backups path: {backups_path}")
return storage_path, backups_path, True
except Exception as e:
print_error(f"Failed to configure storage paths: {e}")
return storage_path, backups_path, False
def configure_paths(args):
print_step("4", "Configuring paths")
system_info = detect_system()
base_dir = get_platform_base_dir()
storage_backend = os.environ.get('MCP_MEMORY_STORAGE_BACKEND', 'chromadb')
storage_path, backups_path, success = setup_storage_directories(
storage_backend, base_dir, args
)
if not success:
print_warning("Continuing with Claude Desktop configuration despite storage setup failure")
...
```
**Complexity Impact:** 12 → 8 (-4)
- Removes nested storage backend setup logic
- Early return pattern for error handling
### Refactoring Proposal #3: Extract Claude Config Update
**Risk:** Medium | **Impact:** -3 complexity | **Time:** 1.5 hours
**Before:**
```python
def configure_paths(args):
...
# Configure Claude Desktop if available
import json
claude_config_paths = [...]
for config_path in claude_config_paths:
if config_path.exists():
print_info(f"Found Claude Desktop config at {config_path}")
try:
config_text = config_path.read_text()
config = json.loads(config_text)
# Validate config structure
if not isinstance(config, dict):
print_warning(f"Invalid config format...")
continue
# Update or add MCP Memory configuration
if 'mcpServers' not in config:
config['mcpServers'] = {}
# Create environment configuration based on storage backend
env_config = {...}
if storage_backend in ['sqlite_vec', 'hybrid']:
env_config["MCP_MEMORY_SQLITE_PATH"] = str(storage_path)
...
```
**After:**
```python
def build_mcp_env_config(storage_backend: str, storage_path: Path,
backups_path: Path) -> Dict[str, str]:
"""Build MCP environment configuration for Claude Desktop.
Args:
storage_backend: Type of storage backend
storage_path: Path to storage directory/file
backups_path: Path to backups directory
Returns:
Dict of environment variables for MCP configuration
"""
env_config = {
"MCP_MEMORY_BACKUPS_PATH": str(backups_path),
"MCP_MEMORY_STORAGE_BACKEND": storage_backend
}
if storage_backend in ['sqlite_vec', 'hybrid']:
env_config["MCP_MEMORY_SQLITE_PATH"] = str(storage_path)
env_config["MCP_MEMORY_SQLITE_PRAGMAS"] = "busy_timeout=15000,cache_size=20000"
if storage_backend in ['hybrid', 'cloudflare']:
cloudflare_vars = [
'CLOUDFLARE_API_TOKEN',
'CLOUDFLARE_ACCOUNT_ID',
'CLOUDFLARE_D1_DATABASE_ID',
'CLOUDFLARE_VECTORIZE_INDEX'
]
for var in cloudflare_vars:
value = os.environ.get(var)
if value:
env_config[var] = value
if storage_backend == 'chromadb':
env_config["MCP_MEMORY_CHROMA_PATH"] = str(storage_path)
return env_config
def update_claude_config_file(config_path: Path, env_config: Dict[str, str],
project_root: Path, is_windows: bool) -> bool:
"""Update Claude Desktop configuration file with MCP Memory settings.
Args:
config_path: Path to Claude config file
env_config: Environment configuration dictionary
project_root: Root directory of the project
is_windows: Whether running on Windows
Returns:
bool: True if update succeeded
"""
try:
config_text = config_path.read_text()
config = json.loads(config_text)
if not isinstance(config, dict):
print_warning(f"Invalid config format in {config_path}")
return False
if 'mcpServers' not in config:
config['mcpServers'] = {}
# Create server configuration
if is_windows:
script_path = str((project_root / "memory_wrapper.py").resolve())
config['mcpServers']['memory'] = {
"command": "python",
"args": [script_path],
"env": env_config
}
else:
config['mcpServers']['memory'] = {
"command": "uv",
"args": ["--directory", str(project_root.resolve()), "run", "memory"],
"env": env_config
}
config_path.write_text(json.dumps(config, indent=2))
print_success("Updated Claude Desktop configuration")
return True
except (OSError, PermissionError, json.JSONDecodeError) as e:
print_warning(f"Failed to update Claude Desktop configuration: {e}")
return False
def configure_paths(args):
print_step("4", "Configuring paths")
system_info = detect_system()
base_dir = get_platform_base_dir()
storage_backend = os.environ.get('MCP_MEMORY_STORAGE_BACKEND', 'chromadb')
storage_path, backups_path, success = setup_storage_directories(
storage_backend, base_dir, args
)
if not success:
print_warning("Continuing with Claude Desktop configuration")
# Configure Claude Desktop
env_config = build_mcp_env_config(storage_backend, storage_path, backups_path)
project_root = Path(__file__).parent.parent.parent
claude_config_paths = [
Path.home() / 'Library' / 'Application Support' / 'Claude' / 'claude_desktop_config.json',
Path.home() / '.config' / 'Claude' / 'claude_desktop_config.json',
Path('claude_config') / 'claude_desktop_config.json'
]
for config_path in claude_config_paths:
if config_path.exists():
print_info(f"Found Claude Desktop config at {config_path}")
if update_claude_config_file(config_path, env_config, project_root,
system_info["is_windows"]):
break
return True
```
**Complexity Impact:** 8 → 5 (-3)
- Removes nested config update logic
- Separates env config building from file I/O
- Early return pattern in update function
### Implementation Plan
1. **Extract platform detection** (1 hour, low risk) - Simple dict lookup
2. **Extract storage setup** (1.5 hours, low risk) - Straightforward extraction
3. **Extract Claude config** (1.5 hours, medium risk) - Requires careful testing
**Total Complexity Reduction:** 15 → 5 (-10 points)
**Total Time:** 4 hours
---
## Target Function 2: cloudflare.py::_execute_batch() (Complexity: 14)
### Current Implementation
**Purpose:** Execute batched D1 SQL queries with retry logic.
**Note:** After examining the cloudflare.py file, I found that `_execute_batch()` does not exist. The complexity report may be outdated or the function was refactored. Instead, I'll analyze `_search_by_tags_internal()` which shows similar complexity patterns (lines 583-667, complexity ~13).
### Complexity Breakdown (\_search_by_tags_internal)
```
Lines 590-610: +4 complexity (tag normalization and operation validation)
Lines 612-636: +5 complexity (SQL query construction with time filtering)
Lines 638-667: +4 complexity (result processing with error handling)
Total: 13
```
### Refactoring Proposal #1: Extract Tag Normalization
**Risk:** Low | **Impact:** -2 complexity | **Time:** 45 minutes
**Before:**
```python
async def _search_by_tags_internal(self, tags, operation=None, time_start=None, time_end=None):
try:
if not tags:
return []
# Normalize tags (deduplicate, drop empty strings)
deduped_tags = list(dict.fromkeys([tag for tag in tags if tag]))
if not deduped_tags:
return []
if isinstance(operation, str):
normalized_operation = operation.strip().upper() or "AND"
else:
normalized_operation = "AND"
if normalized_operation not in {"AND", "OR"}:
logger.warning("Unsupported tag search operation '%s'; defaulting to AND", operation)
normalized_operation = "AND"
```
**After:**
```python
def normalize_tags_for_search(tags: List[str]) -> List[str]:
"""Deduplicate and filter empty tag strings.
Args:
tags: List of tag strings (may contain duplicates or empty strings)
Returns:
Deduplicated list of non-empty tags
"""
return list(dict.fromkeys([tag for tag in tags if tag]))
def normalize_operation(operation: Optional[str]) -> str:
"""Normalize tag search operation to AND or OR.
Args:
operation: Raw operation string (case-insensitive)
Returns:
Normalized operation: "AND" or "OR"
"""
if isinstance(operation, str):
normalized = operation.strip().upper() or "AND"
else:
normalized = "AND"
if normalized not in {"AND", "OR"}:
logger.warning(f"Unsupported operation '{operation}'; defaulting to AND")
normalized = "AND"
return normalized
async def _search_by_tags_internal(self, tags, operation=None, time_start=None, time_end=None):
try:
if not tags:
return []
deduped_tags = normalize_tags_for_search(tags)
if not deduped_tags:
return []
normalized_operation = normalize_operation(operation)
```
**Complexity Impact:** 13 → 11 (-2)
### Refactoring Proposal #2: Extract SQL Query Builder
**Risk:** Low | **Impact:** -3 complexity | **Time:** 1 hour
**Before:**
```python
async def _search_by_tags_internal(self, tags, operation=None, time_start=None, time_end=None):
...
placeholders = ",".join(["?"] * len(deduped_tags))
params: List[Any] = list(deduped_tags)
sql = (
"SELECT m.* FROM memories m "
"JOIN memory_tags mt ON m.id = mt.memory_id "
"JOIN tags t ON mt.tag_id = t.id "
f"WHERE t.name IN ({placeholders})"
)
if time_start is not None:
sql += " AND m.created_at >= ?"
params.append(time_start)
if time_end is not None:
sql += " AND m.created_at <= ?"
params.append(time_end)
sql += " GROUP BY m.id"
if normalized_operation == "AND":
sql += " HAVING COUNT(DISTINCT t.name) = ?"
params.append(len(deduped_tags))
sql += " ORDER BY m.created_at DESC"
```
**After:**
```python
def build_tag_search_query(tags: List[str], operation: str,
time_start: Optional[float] = None,
time_end: Optional[float] = None) -> Tuple[str, List[Any]]:
"""Build SQL query for tag-based search with time filtering.
Args:
tags: List of deduplicated tags
operation: Search operation ("AND" or "OR")
time_start: Optional start timestamp filter
time_end: Optional end timestamp filter
Returns:
Tuple of (sql_query, parameters_list)
"""
placeholders = ",".join(["?"] * len(tags))
params: List[Any] = list(tags)
sql = (
"SELECT m.* FROM memories m "
"JOIN memory_tags mt ON m.id = mt.memory_id "
"JOIN tags t ON mt.tag_id = t.id "
f"WHERE t.name IN ({placeholders})"
)
if time_start is not None:
sql += " AND m.created_at >= ?"
params.append(time_start)
if time_end is not None:
sql += " AND m.created_at <= ?"
params.append(time_end)
sql += " GROUP BY m.id"
if operation == "AND":
sql += " HAVING COUNT(DISTINCT t.name) = ?"
params.append(len(tags))
sql += " ORDER BY m.created_at DESC"
return sql, params
async def _search_by_tags_internal(self, tags, operation=None, time_start=None, time_end=None):
try:
if not tags:
return []
deduped_tags = normalize_tags_for_search(tags)
if not deduped_tags:
return []
normalized_operation = normalize_operation(operation)
sql, params = build_tag_search_query(deduped_tags, normalized_operation,
time_start, time_end)
```
**Complexity Impact:** 11 → 8 (-3)
### Implementation Plan
1. **Extract tag normalization** (45 min, low risk) - Pure functions, easy to test
2. **Extract SQL builder** (1 hour, low risk) - Testable without database
**Total Complexity Reduction:** 13 → 8 (-5 points)
**Total Time:** 1.75 hours
---
## Target Function 3: consolidator.py::_compress_redundant_memories() (Complexity: 13)
### Current Implementation
**Purpose:** Identify and compress semantically similar memory clusters.
**Note:** After examining consolidator.py (556 lines), I found that `_compress_redundant_memories()` does not exist in the current codebase. The function was likely refactored into the consolidation pipeline. The most complex function in this file is `consolidate()` at lines 80-210 (complexity ~12).
### Complexity Breakdown (consolidate method)
```
Lines 99-110: +2 complexity (hybrid backend sync pause logic)
Lines 112-120: +2 complexity (memory retrieval and validation)
Lines 125-150: +4 complexity (association discovery conditional logic)
Lines 155-181: +4 complexity (compression and forgetting conditional logic)
Total: 12
```
### Refactoring Proposal #1: Extract Hybrid Sync Management
**Risk:** Low | **Impact:** -2 complexity | **Time:** 45 minutes
**Before:**
```python
async def consolidate(self, time_horizon: str, **kwargs) -> ConsolidationReport:
...
# Check if hybrid backend and pause sync during consolidation
sync_was_paused = False
is_hybrid = hasattr(self.storage, 'pause_sync') and hasattr(self.storage, 'resume_sync')
try:
self.logger.info(f"Starting {time_horizon} consolidation...")
# Pause hybrid sync to eliminate bottleneck during metadata updates
if is_hybrid:
self.logger.info("Pausing hybrid backend sync during consolidation")
await self.storage.pause_sync()
sync_was_paused = True
...
finally:
# Resume hybrid sync after consolidation
if sync_was_paused:
try:
self.logger.info("Resuming hybrid backend sync after consolidation")
await self.storage.resume_sync()
except Exception as e:
self.logger.error(f"Failed to resume sync after consolidation: {e}")
```
**After:**
```python
class SyncPauseContext:
"""Context manager for pausing hybrid backend sync during consolidation."""
def __init__(self, storage, logger):
self.storage = storage
self.logger = logger
self.is_hybrid = hasattr(storage, 'pause_sync') and hasattr(storage, 'resume_sync')
self.was_paused = False
async def __aenter__(self):
if self.is_hybrid:
self.logger.info("Pausing hybrid backend sync during consolidation")
await self.storage.pause_sync()
self.was_paused = True
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.was_paused:
try:
self.logger.info("Resuming hybrid backend sync")
await self.storage.resume_sync()
except Exception as e:
self.logger.error(f"Failed to resume sync: {e}")
async def consolidate(self, time_horizon: str, **kwargs) -> ConsolidationReport:
start_time = datetime.now()
report = ConsolidationReport(...)
async with SyncPauseContext(self.storage, self.logger):
try:
self.logger.info(f"Starting {time_horizon} consolidation...")
# ... rest of consolidation logic
```
**Complexity Impact:** 12 → 10 (-2)
- Removes nested sync management logic
- Async context manager handles cleanup automatically
### Refactoring Proposal #2: Extract Phase-Specific Processing Guard
**Risk:** Low | **Impact:** -2 complexity | **Time:** 30 minutes
**Before:**
```python
async def consolidate(self, time_horizon: str, **kwargs) -> ConsolidationReport:
...
# 3. Cluster by semantic similarity (if enabled and appropriate)
clusters = []
if self.config.clustering_enabled and time_horizon in ['weekly', 'monthly', 'quarterly']:
self.logger.info(f"🔗 Phase 2/6: Clustering memories...")
clusters = await self.clustering_engine.process(memories)
report.clusters_created = len(clusters)
# 4. Run creative associations (if enabled and appropriate)
associations = []
if self.config.associations_enabled and time_horizon in ['weekly', 'monthly']:
self.logger.info(f"🧠 Phase 3/6: Discovering associations...")
existing_associations = await self._get_existing_associations()
associations = await self.association_engine.process(memories, existing_associations)
report.associations_discovered = len(associations)
```
**After:**
```python
def should_run_clustering(self, time_horizon: str) -> bool:
"""Check if clustering should run for this time horizon."""
return self.config.clustering_enabled and time_horizon in ['weekly', 'monthly', 'quarterly']
def should_run_associations(self, time_horizon: str) -> bool:
"""Check if association discovery should run for this time horizon."""
return self.config.associations_enabled and time_horizon in ['weekly', 'monthly']
def should_run_compression(self, time_horizon: str) -> bool:
"""Check if compression should run for this time horizon."""
return self.config.compression_enabled
def should_run_forgetting(self, time_horizon: str) -> bool:
"""Check if controlled forgetting should run for this time horizon."""
return self.config.forgetting_enabled and time_horizon in ['monthly', 'quarterly', 'yearly']
async def consolidate(self, time_horizon: str, **kwargs) -> ConsolidationReport:
...
# 3. Cluster by semantic similarity
clusters = []
if self.should_run_clustering(time_horizon):
self.logger.info(f"🔗 Phase 2/6: Clustering memories...")
clusters = await self.clustering_engine.process(memories)
report.clusters_created = len(clusters)
# 4. Run creative associations
associations = []
if self.should_run_associations(time_horizon):
self.logger.info(f"🧠 Phase 3/6: Discovering associations...")
existing_associations = await self._get_existing_associations()
associations = await self.association_engine.process(memories, existing_associations)
report.associations_discovered = len(associations)
```
**Complexity Impact:** 10 → 8 (-2)
- Extracts multi-condition guards to named methods
- Improves readability and testability
### Implementation Plan
1. **Extract sync context manager** (45 min, low risk) - Standard async pattern
2. **Extract phase guards** (30 min, low risk) - Simple boolean methods
**Total Complexity Reduction:** 12 → 8 (-4 points)
**Total Time:** 1.25 hours
---
## Target Function 4: analytics.py::get_analytics() (Complexity: 12)
### Current Implementation
**Purpose:** Aggregate analytics overview from storage backend.
**Note:** After examining analytics.py, the `get_analytics()` function doesn't exist. The most complex function is `get_memory_growth()` at lines 267-363 (complexity ~11).
### Complexity Breakdown (get_memory_growth)
```
Lines 279-293: +4 complexity (period validation and interval calculation)
Lines 304-334: +5 complexity (date grouping and interval aggregation loops)
Lines 336-353: +2 complexity (label generation and data point creation)
Total: 11
```
### Refactoring Proposal #1: Extract Period Configuration
**Risk:** Low | **Impact:** -3 complexity | **Time:** 45 minutes
**Before:**
```python
@router.get("/memory-growth", response_model=MemoryGrowthData, tags=["analytics"])
async def get_memory_growth(period: str = Query("month", ...), ...):
try:
# Define the period
if period == "week":
days = 7
interval_days = 1
elif period == "month":
days = 30
interval_days = 7
elif period == "quarter":
days = 90
interval_days = 7
elif period == "year":
days = 365
interval_days = 30
else:
raise HTTPException(status_code=400, detail="Invalid period...")
```
**After:**
```python
@dataclass
class PeriodConfig:
"""Configuration for time period analysis."""
days: int
interval_days: int
label_format: str
PERIOD_CONFIGS = {
"week": PeriodConfig(days=7, interval_days=1, label_format="daily"),
"month": PeriodConfig(days=30, interval_days=7, label_format="weekly"),
"quarter": PeriodConfig(days=90, interval_days=7, label_format="weekly"),
"year": PeriodConfig(days=365, interval_days=30, label_format="monthly"),
}
def get_period_config(period: str) -> PeriodConfig:
"""Get configuration for the specified time period.
Args:
period: Time period identifier (week, month, quarter, year)
Returns:
PeriodConfig for the specified period
Raises:
HTTPException: If period is invalid
"""
config = PERIOD_CONFIGS.get(period)
if not config:
raise HTTPException(
status_code=400,
detail=f"Invalid period. Use: {', '.join(PERIOD_CONFIGS.keys())}"
)
return config
@router.get("/memory-growth", response_model=MemoryGrowthData, tags=["analytics"])
async def get_memory_growth(period: str = Query("month", ...), ...):
try:
config = get_period_config(period)
days = config.days
interval_days = config.interval_days
```
**Complexity Impact:** 11 → 8 (-3)
- Replaces if/elif chain with dict lookup
- Configuration is data-driven and easily extensible
### Refactoring Proposal #2: Extract Interval Aggregation
**Risk:** Low | **Impact:** -2 complexity | **Time:** 1 hour
**Before:**
```python
async def get_memory_growth(...):
...
# Create data points
current_date = start_date.date()
while current_date <= end_date.date():
# For intervals > 1 day, sum counts across the entire interval
interval_end = current_date + timedelta(days=interval_days)
count = 0
# Sum all memories within this interval
check_date = current_date
while check_date < interval_end and check_date <= end_date.date():
count += date_counts.get(check_date, 0)
check_date += timedelta(days=1)
cumulative += count
# Convert date to datetime for label generation
current_datetime = datetime.combine(current_date, datetime.min.time())
label = _generate_interval_label(current_datetime, period)
data_points.append(MemoryGrowthPoint(...))
current_date += timedelta(days=interval_days)
```
**After:**
```python
def aggregate_interval_counts(date_counts: Dict[date, int],
start_date: date,
end_date: date,
interval_days: int) -> List[Tuple[date, int]]:
"""Aggregate memory counts over time intervals.
Args:
date_counts: Map of dates to memory counts
start_date: Start date for aggregation
end_date: End date for aggregation
interval_days: Number of days per interval
Returns:
List of (interval_start_date, count) tuples
"""
intervals = []
current_date = start_date
while current_date <= end_date:
interval_end = current_date + timedelta(days=interval_days)
# Sum all memories within this interval
count = 0
check_date = current_date
while check_date < interval_end and check_date <= end_date:
count += date_counts.get(check_date, 0)
check_date += timedelta(days=1)
intervals.append((current_date, count))
current_date += timedelta(days=interval_days)
return intervals
def build_growth_data_points(intervals: List[Tuple[date, int]],
period: str) -> List[MemoryGrowthPoint]:
"""Build MemoryGrowthPoint objects from interval data.
Args:
intervals: List of (date, count) tuples
period: Time period for label generation
Returns:
List of MemoryGrowthPoint objects with labels
"""
data_points = []
cumulative = 0
for current_date, count in intervals:
cumulative += count
current_datetime = datetime.combine(current_date, datetime.min.time())
label = _generate_interval_label(current_datetime, period)
data_points.append(MemoryGrowthPoint(
date=current_date.isoformat(),
count=count,
cumulative=cumulative,
label=label
))
return data_points
async def get_memory_growth(...):
...
intervals = aggregate_interval_counts(date_counts, start_date.date(),
end_date.date(), interval_days)
data_points = build_growth_data_points(intervals, period)
```
**Complexity Impact:** 8 → 6 (-2)
- Separates data aggregation from presentation
- Nested loops extracted to dedicated function
### Implementation Plan
1. **Extract period config** (45 min, low risk) - Dict lookup pattern
2. **Extract interval aggregation** (1 hour, low risk) - Pure function extraction
**Total Complexity Reduction:** 11 → 6 (-5 points)
**Total Time:** 1.75 hours
---
## Target Function 5: quality_gate.sh functions (Complexity: 10-12)
### Current Implementation
**Purpose:** Multiple bash functions for PR quality checks.
**Note:** After searching, I found bash scripts in `/scripts/pr/` but they don't contain individual functions with measurable cyclomatic complexity in the Python sense. Bash scripts typically have complexity from conditional branches and loops, but they're measured differently.
Instead, I'll analyze a Python equivalent that would benefit from refactoring: The analytics endpoint functions that have similar complexity patterns.
Let me analyze `get_tag_usage_analytics()` from analytics.py (lines 366-428, complexity ~10).
### Complexity Breakdown (get_tag_usage_analytics)
```
Lines 379-395: +3 complexity (storage method availability checks and fallbacks)
Lines 397-410: +4 complexity (tag data processing with total memory calculation)
Lines 412-421: +3 complexity (tag stats calculation loop)
Total: 10
```
### Refactoring Proposal #1: Extract Storage Stats Retrieval
**Risk:** Low | **Impact:** -2 complexity | **Time:** 30 minutes
**Before:**
```python
async def get_tag_usage_analytics(...):
try:
# Get all tags with counts
if hasattr(storage, 'get_all_tags_with_counts'):
tag_data = await storage.get_all_tags_with_counts()
else:
raise HTTPException(status_code=501, detail="Tag analytics not supported...")
# Get total memories for accurate percentage calculation
if hasattr(storage, 'get_stats'):
try:
stats = await storage.get_stats()
total_memories = stats.get("total_memories", 0)
except Exception as e:
logger.warning(f"Failed to retrieve storage stats: {e}")
stats = {}
total_memories = 0
else:
total_memories = 0
if total_memories == 0:
# Fallback: calculate from all tag data
all_tags = tag_data.copy()
total_memories = sum(tag["count"] for tag in all_tags)
```
**After:**
```python
async def get_total_memory_count(storage: MemoryStorage,
tag_data: List[Dict]) -> int:
"""Get total memory count from storage or calculate from tag data.
Args:
storage: Storage backend
tag_data: Tag count data for fallback calculation
Returns:
Total memory count
"""
if hasattr(storage, 'get_stats'):
try:
stats = await storage.get_stats()
total = stats.get("total_memories", 0)
if total > 0:
return total
except Exception as e:
logger.warning(f"Failed to retrieve storage stats: {e}")
# Fallback: calculate from tag data
return sum(tag["count"] for tag in tag_data)
async def get_tag_usage_analytics(...):
try:
# Get all tags with counts
if hasattr(storage, 'get_all_tags_with_counts'):
tag_data = await storage.get_all_tags_with_counts()
else:
raise HTTPException(status_code=501,
detail="Tag analytics not supported by storage backend")
total_memories = await get_total_memory_count(storage, tag_data)
```
**Complexity Impact:** 10 → 8 (-2)
### Refactoring Proposal #2: Extract Tag Stats Calculation
**Risk:** Low | **Impact:** -2 complexity | **Time:** 30 minutes
**Before:**
```python
async def get_tag_usage_analytics(...):
...
# Convert to response format
tags = []
for tag_item in tag_data:
percentage = (tag_item["count"] / total_memories * 100) if total_memories > 0 else 0
tags.append(TagUsageStats(
tag=tag_item["tag"],
count=tag_item["count"],
percentage=round(percentage, 1),
growth_rate=None # Would need historical data to calculate
))
return TagUsageData(
tags=tags,
total_memories=total_memories,
period=period
)
```
**After:**
```python
def calculate_tag_percentage(count: int, total: int) -> float:
"""Calculate percentage safely handling division by zero.
Args:
count: Tag usage count
total: Total memory count
Returns:
Rounded percentage (1 decimal place)
"""
return round((count / total * 100) if total > 0 else 0, 1)
def build_tag_usage_stats(tag_data: List[Dict], total_memories: int) -> List[TagUsageStats]:
"""Build TagUsageStats objects from raw tag data.
Args:
tag_data: Raw tag count data
total_memories: Total memory count for percentage calculation
Returns:
List of TagUsageStats objects
"""
return [
TagUsageStats(
tag=tag_item["tag"],
count=tag_item["count"],
percentage=calculate_tag_percentage(tag_item["count"], total_memories),
growth_rate=None # Would need historical data
)
for tag_item in tag_data
]
async def get_tag_usage_analytics(...):
...
tags = build_tag_usage_stats(tag_data, total_memories)
return TagUsageData(
tags=tags,
total_memories=total_memories,
period=period
)
```
**Complexity Impact:** 8 → 6 (-2)
### Implementation Plan
1. **Extract memory count retrieval** (30 min, low risk) - Simple extraction
2. **Extract tag stats calculation** (30 min, low risk) - Pure function
**Total Complexity Reduction:** 10 → 6 (-4 points)
**Total Time:** 1 hour
---
## Quick Wins Summary
### Quick Win 1: install.py::detect_gpu() (Complexity: 10 → 7)
**Refactoring:** Extract platform-specific GPU detection to separate functions
**Time:** 1 hour | **Risk:** Low
**Before:**
```python
def detect_gpu():
system_info = detect_system()
# Check for CUDA
has_cuda = False
cuda_version = None
if system_info["is_windows"]:
cuda_path = os.environ.get('CUDA_PATH')
if cuda_path and os.path.exists(cuda_path):
has_cuda = True
# ... 15 lines of version detection
elif system_info["is_linux"]:
cuda_paths = ['/usr/local/cuda', os.environ.get('CUDA_HOME')]
for path in cuda_paths:
if path and os.path.exists(path):
has_cuda = True
# ... 15 lines of version detection
```
**After:**
```python
def detect_cuda_windows() -> Tuple[bool, Optional[str]]:
"""Detect CUDA on Windows systems."""
cuda_path = os.environ.get('CUDA_PATH')
if not (cuda_path and os.path.exists(cuda_path)):
return False, None
# ... version detection logic
return True, cuda_version
def detect_cuda_linux() -> Tuple[bool, Optional[str]]:
"""Detect CUDA on Linux systems."""
cuda_paths = ['/usr/local/cuda', os.environ.get('CUDA_HOME')]
for path in cuda_paths:
if path and os.path.exists(path):
# ... version detection logic
return True, cuda_version
return False, None
CUDA_DETECTORS = {
'windows': detect_cuda_windows,
'linux': detect_cuda_linux,
}
def detect_gpu():
system_info = detect_system()
# Detect CUDA using platform-specific detector
detector_key = 'windows' if system_info["is_windows"] else 'linux'
detector = CUDA_DETECTORS.get(detector_key, lambda: (False, None))
has_cuda, cuda_version = detector()
```
**Impact:** -3 complexity
- Platform-specific logic extracted
- Dict dispatch replaces if/elif chain
---
### Quick Win 2: cloudflare.py::get_memory_timestamps() (Complexity: 9 → 7)
**Refactoring:** Extract SQL query building and result processing
**Time:** 45 minutes | **Risk:** Low
**Before:**
```python
async def get_memory_timestamps(self, days: Optional[int] = None) -> List[float]:
try:
if days is not None:
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
cutoff_timestamp = cutoff.timestamp()
sql = "SELECT created_at FROM memories WHERE created_at >= ? ORDER BY created_at DESC"
payload = {"sql": sql, "params": [cutoff_timestamp]}
else:
sql = "SELECT created_at FROM memories ORDER BY created_at DESC"
payload = {"sql": sql, "params": []}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
timestamps = []
if result.get("success") and result.get("result", [{}])[0].get("results"):
for row in result["result"][0]["results"]:
if row.get("created_at") is not None:
timestamps.append(float(row["created_at"]))
```
**After:**
```python
def build_timestamp_query(days: Optional[int]) -> Tuple[str, List[Any]]:
"""Build SQL query for fetching memory timestamps.
Args:
days: Optional day limit for filtering
Returns:
Tuple of (sql_query, parameters)
"""
if days is not None:
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
return (
"SELECT created_at FROM memories WHERE created_at >= ? ORDER BY created_at DESC",
[cutoff.timestamp()]
)
return (
"SELECT created_at FROM memories ORDER BY created_at DESC",
[]
)
def extract_timestamps(result: Dict) -> List[float]:
"""Extract timestamp values from D1 query result.
Args:
result: D1 query response JSON
Returns:
List of Unix timestamps
"""
if not (result.get("success") and result.get("result", [{}])[0].get("results")):
return []
return [
float(row["created_at"])
for row in result["result"][0]["results"]
if row.get("created_at") is not None
]
async def get_memory_timestamps(self, days: Optional[int] = None) -> List[float]:
try:
sql, params = build_timestamp_query(days)
payload = {"sql": sql, "params": params}
response = await self._retry_request("POST", f"{self.d1_url}/query", json=payload)
result = response.json()
timestamps = extract_timestamps(result)
```
**Impact:** -2 complexity
- Query building extracted
- Result processing extracted
---
### Quick Win 3: consolidator.py::_get_memories_for_horizon() (Complexity: 10 → 8)
**Refactoring:** Extract time range calculation and incremental mode sorting
**Time:** 45 minutes | **Risk:** Low
**Before:**
```python
async def _get_memories_for_horizon(self, time_horizon: str, **kwargs) -> List[Memory]:
now = datetime.now()
# Define time ranges for different horizons
time_ranges = {
'daily': timedelta(days=1),
'weekly': timedelta(days=7),
'monthly': timedelta(days=30),
'quarterly': timedelta(days=90),
'yearly': timedelta(days=365)
}
if time_horizon not in time_ranges:
raise ConsolidationError(f"Unknown time horizon: {time_horizon}")
# For daily processing, get recent memories (no change - already efficient)
if time_horizon == 'daily':
start_time = (now - timedelta(days=2)).timestamp()
end_time = now.timestamp()
memories = await self.storage.get_memories_by_time_range(start_time, end_time)
else:
# ... complex incremental logic
```
**After:**
```python
TIME_HORIZON_CONFIGS = {
'daily': {'days': 1, 'use_time_range': True, 'range_days': 2},
'weekly': {'days': 7, 'use_time_range': False},
'monthly': {'days': 30, 'use_time_range': False},
'quarterly': {'days': 90, 'use_time_range': False},
'yearly': {'days': 365, 'use_time_range': False}
}
def get_consolidation_sort_key(memory: Memory) -> float:
"""Get sort key for incremental consolidation (oldest first).
Args:
memory: Memory object to get sort key for
Returns:
Sort key (timestamp, lower = older)
"""
if memory.metadata and 'last_consolidated_at' in memory.metadata:
return float(memory.metadata['last_consolidated_at'])
return memory.created_at if memory.created_at else 0.0
async def _get_memories_for_horizon(self, time_horizon: str, **kwargs) -> List[Memory]:
config = TIME_HORIZON_CONFIGS.get(time_horizon)
if not config:
raise ConsolidationError(f"Unknown time horizon: {time_horizon}")
now = datetime.now()
if config['use_time_range']:
start_time = (now - timedelta(days=config['range_days'])).timestamp()
end_time = now.timestamp()
return await self.storage.get_memories_by_time_range(start_time, end_time)
# ... simplified incremental logic using extracted functions
```
**Impact:** -2 complexity
- Config-driven time range selection
- Sort key extraction to separate function
---
### Quick Win 4: analytics.py::get_activity_breakdown() (Complexity: 9 → 7)
**Refactoring:** Extract granularity-specific aggregation functions
**Time:** 1 hour | **Risk:** Low
**Before:**
```python
async def get_activity_breakdown(granularity: str = Query("daily", ...)):
...
if granularity == "hourly":
hour_counts = defaultdict(int)
for timestamp in timestamps:
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
hour_counts[dt.hour] += 1
active_days.add(dt.date())
activity_dates.append(dt.date())
# ... 10 lines of breakdown building
elif granularity == "daily":
day_counts = defaultdict(int)
day_names = ["Monday", "Tuesday", ...]
# ... 15 lines of breakdown building
else: # weekly
week_counts = defaultdict(int)
# ... 20 lines of breakdown building
```
**After:**
```python
def aggregate_hourly(timestamps: List[float]) -> Tuple[List[ActivityBreakdown], Set[date], List[date]]:
"""Aggregate activity data by hour."""
hour_counts = defaultdict(int)
active_days = set()
activity_dates = []
for timestamp in timestamps:
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
hour_counts[dt.hour] += 1
active_days.add(dt.date())
activity_dates.append(dt.date())
breakdown = [
ActivityBreakdown(period="hourly", count=hour_counts.get(hour, 0), label=f"{hour:02d}:00")
for hour in range(24)
]
return breakdown, active_days, activity_dates
GRANULARITY_AGGREGATORS = {
'hourly': aggregate_hourly,
'daily': aggregate_daily,
'weekly': aggregate_weekly
}
async def get_activity_breakdown(granularity: str = Query("daily", ...)):
...
aggregator = GRANULARITY_AGGREGATORS.get(granularity, aggregate_daily)
breakdown, active_days, activity_dates = aggregator(timestamps)
```
**Impact:** -2 complexity
- Granularity-specific logic extracted
- Dict dispatch replaces if/elif chain
---
### Quick Win 5: analytics.py::get_memory_type_distribution() (Complexity: 9 → 7)
**Refactoring:** Extract storage backend type detection and query building
**Time:** 45 minutes | **Risk:** Low
**Before:**
```python
async def get_memory_type_distribution(storage: MemoryStorage = Depends(get_storage), ...):
try:
# Try multiple approaches based on storage backend
if hasattr(storage, 'get_type_counts'):
type_counts_data = await storage.get_type_counts()
type_counts = dict(type_counts_data)
total_memories = sum(type_counts.values())
elif hasattr(storage, 'primary') and hasattr(storage.primary, 'conn'):
# Hybrid storage - access underlying SQLite
cursor = storage.primary.conn.cursor()
cursor.execute("""SELECT ... FROM memories GROUP BY mem_type""")
type_counts = {row[0]: row[1] for row in cursor.fetchall()}
...
elif hasattr(storage, 'conn') and storage.conn:
# Direct SQLite storage
cursor = storage.conn.cursor()
cursor.execute("""SELECT ... FROM memories GROUP BY mem_type""")
...
```
**After:**
```python
async def get_type_counts_from_storage(storage: MemoryStorage) -> Tuple[Dict[str, int], int]:
"""Get memory type counts from storage backend.
Returns:
Tuple of (type_counts_dict, total_memories)
"""
# Native support
if hasattr(storage, 'get_type_counts'):
type_counts_data = await storage.get_type_counts()
type_counts = dict(type_counts_data)
return type_counts, sum(type_counts.values())
# Direct SQLite query (hybrid or direct)
conn = None
if hasattr(storage, 'primary') and hasattr(storage.primary, 'conn'):
conn = storage.primary.conn
elif hasattr(storage, 'conn'):
conn = storage.conn
if conn:
cursor = conn.cursor()
cursor.execute("""
SELECT
CASE WHEN memory_type IS NULL OR memory_type = '' THEN 'untyped'
ELSE memory_type END as mem_type,
COUNT(*) as count
FROM memories GROUP BY mem_type
""")
type_counts = {row[0]: row[1] for row in cursor.fetchall()}
cursor.execute("SELECT COUNT(*) FROM memories")
return type_counts, cursor.fetchone()[0]
# Fallback to sampling
logger.warning("Using sampling approach - results may be incomplete")
memories = await storage.get_recent_memories(n=1000)
type_counts = defaultdict(int)
for memory in memories:
type_counts[memory.memory_type or "untyped"] += 1
return dict(type_counts), len(memories)
async def get_memory_type_distribution(storage: MemoryStorage = Depends(get_storage), ...):
try:
type_counts, total_memories = await get_type_counts_from_storage(storage)
# ... build response
```
**Impact:** -2 complexity
- Backend detection logic extracted
- Early return pattern in extraction function
---
## Implementation Roadmap
### Phase 2A: Core Functions (Week 1)
**Target:** configure_paths, cloudflare tag search, consolidator.consolidate
| Function | Priority | Time | Dependency | Parallel? |
|----------|----------|------|------------|-----------|
| install.py::configure_paths() | High | 4h | None | Yes |
| cloudflare.py::_search_by_tags_internal() | High | 1.75h | None | Yes |
| consolidator.py::consolidate() | High | 1.25h | None | Yes |
**Subtotal:** 7 hours (can be done in parallel)
### Phase 2B: Analytics Functions (Week 2)
**Target:** analytics endpoints optimization
| Function | Priority | Time | Dependency | Parallel? |
|----------|----------|------|------------|-----------|
| analytics.py::get_memory_growth() | Medium | 1.75h | None | Yes |
| analytics.py::get_tag_usage_analytics() | Medium | 1h | None | Yes |
**Subtotal:** 2.75 hours (can be done in parallel)
### Phase 2C: Quick Wins (Week 2-3)
**Target:** Low-risk, high-impact improvements
| Function | Priority | Time | Dependency | Parallel? |
|----------|----------|------|------------|-----------|
| install.py::detect_gpu() | Low | 1h | None | Yes |
| cloudflare.py::get_memory_timestamps() | Low | 45m | None | Yes |
| consolidator.py::_get_memories_for_horizon() | Low | 45m | None | Yes |
| analytics.py::get_activity_breakdown() | Low | 1h | None | Yes |
| analytics.py::get_memory_type_distribution() | Low | 45m | None | Yes |
**Subtotal:** 4.25 hours (can be done in parallel)
### Total Time Estimate
- **Sequential execution:** 14 hours
- **Parallel execution (with team):** 7 hours (Phase 2A) + 2.75h (Phase 2B) + 2h (Phase 2C) = **11.75 hours**
- **Recommended:** 12-15 hours (including testing and documentation)
---
## Expected Health Impact
### Complexity Score Improvement
**Current:** 40/100
- 5 main target functions: -28 complexity points total
- 5 quick wins: -11 complexity points total
- **Total reduction:** -39 complexity points across 10 functions
**Projected:** 50-55/100 (+10-15 points)
### Overall Health Score Improvement
**Current:** 63/100 (Grade C)
**Projected:** 66-68/100 (Grade C+)
**Calculation:**
- Phase 1 (dead code): +5-9 points → 68-72
- Phase 2 (complexity): +3 points → 71-75
---
## Success Criteria
### Quantitative
- [ ] All 5 main functions reduced by 3+ complexity points each
- [ ] All 5 quick wins implemented successfully
- [ ] Total complexity reduction: 30+ points
- [ ] No breaking changes (all tests passing)
- [ ] No performance regressions
### Qualitative
- [ ] Code readability improved (subjective review)
- [ ] Functions easier to understand and maintain
- [ ] Better separation of concerns
- [ ] Improved testability (isolated functions)
---
## Risk Assessment Matrix
| Function | Risk | Testing Requirements | Critical Path | Priority |
|----------|------|---------------------|---------------|----------|
| configure_paths | Low | Unit + integration | No (setup only) | High |
| _search_by_tags_internal | Low | Unit + DB tests | Yes (core search) | High |
| consolidate | Medium | Integration tests | Yes (consolidation) | High |
| get_memory_growth | Low | Unit + API tests | No (analytics) | Medium |
| get_tag_usage_analytics | Low | Unit + API tests | No (analytics) | Medium |
| detect_gpu | Low | Unit tests | No (setup only) | Low |
| get_memory_timestamps | Low | Unit + DB tests | No (analytics) | Low |
| _get_memories_for_horizon | Medium | Integration tests | Yes (consolidation) | Medium |
| get_activity_breakdown | Low | Unit + API tests | No (analytics) | Low |
| get_memory_type_distribution | Low | Unit + API tests | No (analytics) | Low |
**Critical Path Functions (require careful testing):**
1. _search_by_tags_internal - Core search functionality
2. consolidate - Memory consolidation pipeline
3. _get_memories_for_horizon - Consolidation memory selection
**Low-Risk Functions (easier to refactor):**
- All analytics endpoints (read-only, non-critical)
- Setup functions (configure_paths, detect_gpu)
---
## Testing Strategy
### Unit Tests (per function)
- Test extracted functions independently
- Verify input/output contracts
- Test edge cases and error handling
### Integration Tests
- Test critical path functions with real storage
- Verify no behavioral changes
- Performance benchmarks (before/after)
### Regression Tests
- Run full test suite after each refactoring
- Verify API contracts unchanged
- Check performance hasn't degraded
---
## Next Steps
1. **Review and approve** this Phase 2 analysis
2. **Select implementation approach:**
- Option A: Sequential (14 hours, single developer)
- Option B: Parallel (12 hours, multiple developers)
- Option C: Prioritized (7 hours for critical functions only)
3. **Set up tracking:**
- Create GitHub issues for each function
- Track complexity reduction progress
- Monitor test coverage
4. **Begin Phase 2A** (highest priority functions)
---
## Appendix: Refactoring Patterns Used
### Pattern 1: Extract Method
**Purpose:** Reduce function length and improve testability
**Used in:** All functions analyzed
**Example:** Platform detection, SQL query building
### Pattern 2: Guard Clause
**Purpose:** Reduce nesting and improve readability
**Used in:** Tag search, config updates
**Example:** Early returns for validation
### Pattern 3: Dict Lookup
**Purpose:** Replace if/elif chains with data-driven logic
**Used in:** Period configs, platform detection
**Example:** `PERIOD_CONFIGS[period]` instead of if/elif
### Pattern 4: Context Manager
**Purpose:** Simplify resource management and cleanup
**Used in:** Consolidation sync management
**Example:** `async with SyncPauseContext(...)`
### Pattern 5: Configuration Object
**Purpose:** Centralize related configuration data
**Used in:** Period analysis, time horizons
**Example:** `@dataclass PeriodConfig`
---
## Lessons from Phase 1
**What worked well:**
- Clear complexity scoring and prioritization
- Incremental approach (low-risk first)
- Automated testing validation
**Improvements for Phase 2:**
- More explicit refactoring examples (✅ done)
- Better risk assessment (✅ done)
- Parallel execution planning (✅ done)
---
**End of Phase 2 Analysis**
**Total Functions Analyzed:** 10 (5 main + 5 quick wins)
**Total Complexity Reduction:** -39 points
**Total Time Estimate:** 12-15 hours
```
--------------------------------------------------------------------------------
/claude-hooks/install_hooks.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unified Claude Code Memory Awareness Hooks Installer
====================================================
Cross-platform installer for Claude Code memory awareness hooks with support for:
- Basic memory awareness hooks (session-start, session-end)
- Natural Memory Triggers v7.1.3 (intelligent automatic memory awareness)
- Mid-conversation hooks for real-time memory injection
- Performance optimization and CLI management tools
- Smart MCP detection and DRY configuration
Replaces multiple platform-specific installers with a single Python solution.
Implements DRY principle by detecting and reusing existing Claude Code MCP configurations.
Version: Dynamically synced with main project version
"""
import os
import sys
import json
import shutil
import platform
import argparse
import subprocess
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Dynamic version detection from main project
def get_project_version() -> str:
"""Get version dynamically from main project."""
try:
# Add the src directory to the path to import version
src_path = Path(__file__).parent.parent / "src"
if str(src_path) not in sys.path:
sys.path.insert(0, str(src_path))
from mcp_memory_service import __version__
return __version__
except ImportError:
# Fallback for standalone installations
return "7.2.0"
class Colors:
"""ANSI color codes for terminal output."""
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
RED = '\033[0;31m'
BLUE = '\033[0;34m'
CYAN = '\033[0;36m'
NC = '\033[0m' # No Color
class HookInstaller:
"""Unified hook installer for all platforms and feature levels."""
# Environment type constants
CLAUDE_CODE_ENV = "claude-code"
STANDALONE_ENV = "standalone"
def __init__(self):
self.script_dir = Path(__file__).parent.absolute()
self.platform_name = platform.system().lower()
self.claude_hooks_dir = self._detect_claude_hooks_directory()
self.backup_dir = None
def _detect_claude_hooks_directory(self) -> Path:
"""Detect the Claude Code hooks directory across platforms."""
home = Path.home()
# Primary paths by platform
primary_paths = {
'windows': [
home / 'AppData' / 'Roaming' / 'Claude' / 'hooks',
home / '.claude' / 'hooks'
],
'darwin': [ # macOS
home / '.claude' / 'hooks',
home / 'Library' / 'Application Support' / 'Claude' / 'hooks'
],
'linux': [
home / '.claude' / 'hooks',
home / '.config' / 'claude' / 'hooks'
]
}
# Check platform-specific paths first
platform_paths = primary_paths.get(self.platform_name, primary_paths['linux'])
for path in platform_paths:
if path.exists():
return path
# Check if Claude Code CLI can tell us the location
try:
result = subprocess.run(['claude', '--help'],
capture_output=True, text=True, timeout=5)
# Look for hooks directory info in help output
# This is a placeholder - actual Claude CLI might not provide this
except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired):
pass
# Default to standard location
return home / '.claude' / 'hooks'
def info(self, message: str) -> None:
"""Print info message."""
print(f"{Colors.GREEN}[INFO]{Colors.NC} {message}")
def warn(self, message: str) -> None:
"""Print warning message."""
print(f"{Colors.YELLOW}[WARN]{Colors.NC} {message}")
def error(self, message: str) -> None:
"""Print error message."""
print(f"{Colors.RED}[ERROR]{Colors.NC} {message}")
def success(self, message: str) -> None:
"""Print success message."""
print(f"{Colors.BLUE}[SUCCESS]{Colors.NC} {message}")
def header(self, message: str) -> None:
"""Print header message."""
print(f"\n{Colors.CYAN}{'=' * 60}{Colors.NC}")
print(f"{Colors.CYAN} {message}{Colors.NC}")
print(f"{Colors.CYAN}{'=' * 60}{Colors.NC}\n")
def check_prerequisites(self) -> bool:
"""Check system prerequisites for hook installation."""
self.info("Checking prerequisites...")
all_good = True
# Check Claude Code CLI
try:
result = subprocess.run(['claude', '--version'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
self.success(f"Claude Code CLI found: {result.stdout.strip()}")
else:
self.warn("Claude Code CLI found but version check failed")
except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired):
self.warn("Claude Code CLI not found in PATH")
self.info("You can still install hooks, but some features may not work")
# Check Node.js
try:
result = subprocess.run(['node', '--version'],
capture_output=True, text=True, timeout=5)
if result.returncode == 0:
version = result.stdout.strip()
major_version = int(version.replace('v', '').split('.')[0])
if major_version >= 14:
self.success(f"Node.js found: {version} (compatible)")
else:
self.error(f"Node.js {version} found, but version 14+ required")
all_good = False
else:
self.error("Node.js found but version check failed")
all_good = False
except (subprocess.SubprocessError, FileNotFoundError, subprocess.TimeoutExpired):
self.error("Node.js not found - required for hook execution")
self.info("Please install Node.js 14+ from https://nodejs.org/")
all_good = False
# Check Python version
if sys.version_info < (3, 7):
self.error(f"Python {sys.version} found, but Python 3.7+ required")
all_good = False
else:
self.success(f"Python {sys.version_info.major}.{sys.version_info.minor} found (compatible)")
return all_good
def detect_claude_mcp_configuration(self) -> Optional[Dict]:
"""Detect existing Claude Code MCP memory server configuration."""
self.info("Detecting existing Claude Code MCP configuration...")
try:
# Check if memory server is configured in Claude Code
result = subprocess.run(['claude', 'mcp', 'get', 'memory'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
# Parse the output to extract configuration details
config_info = self._parse_mcp_get_output(result.stdout)
if config_info:
self.success(f"Found existing memory server: {config_info.get('command', 'Unknown')}")
self.success(f"Status: {config_info.get('status', 'Unknown')}")
self.success(f"Type: {config_info.get('type', 'Unknown')}")
return config_info
else:
self.warn("Memory server found but configuration could not be parsed")
else:
self.info("No existing memory server found in Claude Code MCP configuration")
except subprocess.TimeoutExpired:
self.warn("Claude MCP command timed out")
except FileNotFoundError:
self.warn("Claude Code CLI not found - cannot detect existing MCP configuration")
except Exception as e:
self.warn(f"Failed to detect MCP configuration: {e}")
return None
def _parse_mcp_get_output(self, output: str) -> Optional[Dict]:
"""Parse the output of 'claude mcp get memory' command."""
config = {}
try:
lines = output.strip().split('\n')
for line in lines:
line = line.strip()
if line.startswith('Status:'):
config['status'] = line.replace('Status:', '').strip()
elif line.startswith('Type:'):
config['type'] = line.replace('Type:', '').strip()
elif line.startswith('Command:'):
config['command'] = line.replace('Command:', '').strip()
elif line.startswith('Scope:'):
config['scope'] = line.replace('Scope:', '').strip()
elif line.startswith('Environment:'):
config['environment'] = line.replace('Environment:', '').strip()
# Only return config if we found essential information
if 'command' in config and 'status' in config:
return config
except Exception as e:
self.warn(f"Failed to parse MCP output: {e}")
return None
def detect_environment_type(self) -> str:
"""Detect if running in Claude Code vs standalone environment."""
self.info("Detecting environment type...")
# Check for Claude Code MCP server (indicates Claude Code is active)
mcp_config = self.detect_claude_mcp_configuration()
if mcp_config and 'Connected' in mcp_config.get('status', ''):
self.success("Claude Code environment detected (MCP server active)")
return self.CLAUDE_CODE_ENV
else:
self.success("Standalone environment detected (no active MCP server)")
return self.STANDALONE_ENV
def _detect_python_path(self) -> str:
"""Detect the appropriate Python executable path for the current platform.
Returns:
str: Python executable name/path ('python3' for Unix, 'python' for Windows)
"""
import sys
import platform
# Check Python version (must be 3.10+)
if sys.version_info < (3, 10):
self.warn(f"Python {sys.version_info.major}.{sys.version_info.minor} detected - code execution requires 3.10+")
# Platform-specific Python path
if platform.system() == 'Windows':
return 'python'
else:
return 'python3'
def configure_protocol_for_environment(self, env_type: str) -> Dict:
"""Configure optimal protocol based on detected environment."""
# Data-driven configuration map
config_map = {
self.CLAUDE_CODE_ENV: {
"protocol": "http",
"preferredProtocol": "http",
"fallbackEnabled": True,
"reason": "Claude Code environment - using HTTP to avoid MCP conflicts",
"log_title": "📋 Protocol Configuration: HTTP (recommended for Claude Code)",
"log_reason": "Avoids MCP server conflicts when Claude Code is active"
},
self.STANDALONE_ENV: {
"protocol": "auto",
"preferredProtocol": "mcp",
"fallbackEnabled": True,
"reason": "Standalone environment - MCP preferred for performance",
"log_title": "📋 Protocol Configuration: Auto (MCP preferred)",
"log_reason": "MCP provides best performance in standalone scenarios"
}
}
# Get configuration for environment type (default to standalone if unknown)
config = config_map.get(env_type, config_map[self.STANDALONE_ENV])
# Log the configuration
self.info(config["log_title"])
self.info(f" Reason: {config['log_reason']}")
# Return only the protocol configuration (excluding logging fields)
return {
"protocol": config["protocol"],
"preferredProtocol": config["preferredProtocol"],
"fallbackEnabled": config["fallbackEnabled"],
"reason": config["reason"]
}
def validate_mcp_prerequisites(self, detected_config: Optional[Dict] = None) -> Tuple[bool, List[str]]:
"""Validate that MCP memory service is properly configured."""
issues = []
if not detected_config:
detected_config = self.detect_claude_mcp_configuration()
if not detected_config:
issues.append("No memory server found in Claude Code MCP configuration")
return False, issues
# Check if server is connected
status = detected_config.get('status', '')
if '✓ Connected' not in status and 'Connected' not in status:
issues.append(f"Memory server is not connected. Status: {status}")
# Validate command format
command = detected_config.get('command', '')
if not command:
issues.append("Memory server command is empty")
elif 'mcp_memory_service' not in command:
issues.append(f"Unexpected memory server command: {command}")
# Check server type
server_type = detected_config.get('type', '')
if server_type not in ['stdio', 'http']:
issues.append(f"Unsupported server type: {server_type}")
return len(issues) == 0, issues
def generate_hooks_config_from_mcp(self, detected_config: Dict, env_type: str = "standalone") -> Dict:
"""Generate hooks configuration based on detected Claude Code MCP setup.
Args:
detected_config: Dictionary containing detected MCP configuration
env_type: Environment type ('claude-code' or 'standalone'), defaults to 'standalone'
Returns:
Dictionary containing complete hooks configuration
"""
command = detected_config.get('command', '')
server_type = detected_config.get('type', 'stdio')
# Get environment-appropriate protocol configuration
protocol_config = self.configure_protocol_for_environment(env_type)
if server_type == 'stdio':
# For stdio servers, we'll reference the existing server
mcp_config = {
"useExistingServer": True,
"serverName": "memory",
"connectionTimeout": 5000,
"toolCallTimeout": 10000
}
else:
# For HTTP servers, extract endpoint information
mcp_config = {
"useExistingServer": True,
"serverName": "memory",
"connectionTimeout": 5000,
"toolCallTimeout": 10000
}
# Detect Python path based on platform
python_path = self._detect_python_path()
config = {
"codeExecution": {
"enabled": True,
"timeout": 8000,
"fallbackToMCP": True,
"enableMetrics": True,
"pythonPath": python_path
},
"memoryService": {
"protocol": protocol_config["protocol"],
"preferredProtocol": protocol_config["preferredProtocol"],
"fallbackEnabled": protocol_config["fallbackEnabled"],
"http": {
"endpoint": "https://localhost:8443",
"apiKey": "auto-detect",
"healthCheckTimeout": 3000,
"useDetailedHealthCheck": True
},
"mcp": mcp_config,
"defaultTags": ["claude-code", "auto-generated"],
"maxMemoriesPerSession": 8,
"enableSessionConsolidation": True,
"injectAfterCompacting": False,
"recentFirstMode": True,
"recentMemoryRatio": 0.6,
"recentTimeWindow": "last-week",
"fallbackTimeWindow": "last-month",
"showStorageSource": True,
"sourceDisplayMode": "brief"
}
}
return config
def generate_basic_config(self, env_type: str = "standalone") -> Dict:
"""Generate basic configuration when no template is available.
Args:
env_type: Environment type ('claude-code' or 'standalone'), defaults to 'standalone'
Returns:
Dictionary containing basic hooks configuration
"""
# Get environment-appropriate protocol configuration
protocol_config = self.configure_protocol_for_environment(env_type)
# Detect Python path based on platform
python_path = self._detect_python_path()
return {
"codeExecution": {
"enabled": True,
"timeout": 8000,
"fallbackToMCP": True,
"enableMetrics": True,
"pythonPath": python_path
},
"memoryService": {
"protocol": protocol_config["protocol"],
"preferredProtocol": protocol_config["preferredProtocol"],
"fallbackEnabled": protocol_config["fallbackEnabled"],
"http": {
"endpoint": "https://localhost:8443",
"apiKey": "auto-detect",
"healthCheckTimeout": 3000,
"useDetailedHealthCheck": True
},
"mcp": {
"serverCommand": ["uv", "run", "python", "-m", "mcp_memory_service.server"],
"serverWorkingDir": str(self.script_dir.parent),
"connectionTimeout": 5000,
"toolCallTimeout": 10000
},
"defaultTags": ["claude-code", "auto-generated"],
"maxMemoriesPerSession": 8,
"enableSessionConsolidation": True,
"injectAfterCompacting": False,
"recentFirstMode": True,
"recentMemoryRatio": 0.6,
"recentTimeWindow": "last-week",
"fallbackTimeWindow": "last-month",
"showStorageSource": True,
"sourceDisplayMode": "brief"
},
"projectDetection": {
"gitRepository": True,
"packageFiles": ["package.json", "pyproject.toml", "Cargo.toml", "go.mod", "pom.xml"],
"frameworkDetection": True,
"languageDetection": True,
"confidenceThreshold": 0.3
},
"output": {
"verbose": True,
"showMemoryDetails": True,
"showProjectDetails": True,
"cleanMode": False
}
}
def enhance_config_for_natural_triggers(self, config: Dict) -> Dict:
"""Enhance configuration with Natural Memory Triggers settings."""
# Add natural triggers configuration
config["naturalTriggers"] = {
"enabled": True,
"triggerThreshold": 0.6,
"cooldownPeriod": 30000,
"maxMemoriesPerTrigger": 5
}
# Add performance configuration
config["performance"] = {
"defaultProfile": "balanced",
"enableMonitoring": True,
"autoAdjust": True,
"profiles": {
"speed_focused": {
"maxLatency": 100,
"enabledTiers": ["instant"],
"backgroundProcessing": False,
"degradeThreshold": 200,
"description": "Fastest response, minimal memory awareness"
},
"balanced": {
"maxLatency": 200,
"enabledTiers": ["instant", "fast"],
"backgroundProcessing": True,
"degradeThreshold": 400,
"description": "Moderate latency, smart memory triggers"
},
"memory_aware": {
"maxLatency": 500,
"enabledTiers": ["instant", "fast", "intensive"],
"backgroundProcessing": True,
"degradeThreshold": 1000,
"description": "Full memory awareness, accept higher latency"
}
}
}
# Add other advanced settings
config["gitAnalysis"] = {
"enabled": True,
"commitLookback": 14,
"maxCommits": 20,
"includeChangelog": True,
"maxGitMemories": 3,
"gitContextWeight": 1.2
}
return config
def create_backup(self) -> None:
"""Create backup of existing hooks installation."""
if not self.claude_hooks_dir.exists():
self.info("No existing hooks installation found - no backup needed")
return
timestamp = subprocess.run(['date', '+%Y%m%d-%H%M%S'],
capture_output=True, text=True).stdout.strip()
if not timestamp: # Fallback for Windows
import datetime
timestamp = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
self.backup_dir = self.claude_hooks_dir.parent / f"hooks-backup-{timestamp}"
try:
shutil.copytree(self.claude_hooks_dir, self.backup_dir)
self.success(f"Backup created: {self.backup_dir}")
except Exception as e:
self.warn(f"Failed to create backup: {e}")
self.warn("Continuing without backup...")
def install_basic_hooks(self) -> bool:
"""Install basic memory awareness hooks."""
self.info("Installing basic memory awareness hooks...")
try:
# Create necessary directories
(self.claude_hooks_dir / "core").mkdir(parents=True, exist_ok=True)
(self.claude_hooks_dir / "utilities").mkdir(parents=True, exist_ok=True)
(self.claude_hooks_dir / "tests").mkdir(parents=True, exist_ok=True)
# Core hooks
core_files = [
"session-start.js",
"session-end.js",
"memory-retrieval.js",
"topic-change.js"
]
for file in core_files:
src = self.script_dir / "core" / file
dst = self.claude_hooks_dir / "core" / file
if src.exists():
shutil.copy2(src, dst)
else:
self.warn(f"Core file not found: {file}")
# Copy ALL utility files to ensure updates are deployed
# This prevents stale versions when files are updated in the repo
utilities_dir = self.script_dir / "utilities"
if utilities_dir.exists():
utility_count = 0
for utility_file in utilities_dir.glob("*.js"):
dst = self.claude_hooks_dir / "utilities" / utility_file.name
shutil.copy2(utility_file, dst)
utility_count += 1
self.success(f"Copied {utility_count} utility files")
else:
self.warn("Utilities directory not found")
# Tests
test_files = ["integration-test.js"]
for file in test_files:
src = self.script_dir / "tests" / file
dst = self.claude_hooks_dir / "tests" / file
if src.exists():
shutil.copy2(src, dst)
# Documentation
readme_src = self.script_dir / "README.md"
if readme_src.exists():
shutil.copy2(readme_src, self.claude_hooks_dir / "README.md")
# StatusLine script (v8.5.7+)
statusline_src = self.script_dir / "statusline.sh"
if statusline_src.exists():
statusline_dst = self.claude_hooks_dir / "statusline.sh"
shutil.copy2(statusline_src, statusline_dst)
# Make executable on Unix-like systems
if self.platform_name != 'windows':
os.chmod(statusline_dst, 0o755)
self.success("StatusLine script installed")
# Check for jq dependency
jq_available = shutil.which('jq') is not None
if jq_available:
self.success("✓ jq is installed (required for statusLine)")
else:
self.warn("⚠ jq not found - statusLine requires jq for JSON parsing")
self.info(" Install jq:")
if self.platform_name == 'darwin':
self.info(" macOS: brew install jq")
elif self.platform_name == 'linux':
self.info(" Linux: sudo apt install jq (or equivalent)")
elif self.platform_name == 'windows':
self.info(" Windows: choco install jq (or download from https://jqlang.github.io/jq/)")
self.success("Basic hooks installed successfully")
return True
except Exception as e:
self.error(f"Failed to install basic hooks: {e}")
return False
def install_natural_triggers(self) -> bool:
"""Install Natural Memory Triggers v7.1.3 components."""
self.info("Installing Natural Memory Triggers v7.1.3...")
try:
# Ensure directories exist
(self.claude_hooks_dir / "core").mkdir(parents=True, exist_ok=True)
(self.claude_hooks_dir / "utilities").mkdir(parents=True, exist_ok=True)
# Mid-conversation hook
mid_conv_src = self.script_dir / "core" / "mid-conversation.js"
if mid_conv_src.exists():
shutil.copy2(mid_conv_src, self.claude_hooks_dir / "core" / "mid-conversation.js")
self.success("Installed mid-conversation hooks")
else:
self.warn("Mid-conversation hook not found")
# CRITICAL: Copy ALL utility files to ensure updates are deployed
# This prevents the issue where updated files like memory-scorer.js don't get copied
utilities_dir = self.script_dir / "utilities"
if utilities_dir.exists():
utility_count = 0
for utility_file in utilities_dir.glob("*.js"):
dst = self.claude_hooks_dir / "utilities" / utility_file.name
shutil.copy2(utility_file, dst)
utility_count += 1
self.success(f"Copied {utility_count} utility files (ensuring all updates are deployed)")
else:
self.warn("Utilities directory not found")
# CLI management tools
cli_tools = [
"memory-mode-controller.js",
"debug-pattern-test.js"
]
for file in cli_tools:
src = self.script_dir / file
dst = self.claude_hooks_dir / file
if src.exists():
shutil.copy2(src, dst)
# Test files
test_files = [
"test-natural-triggers.js",
"test-mcp-hook.js",
"test-dual-protocol-hook.js"
]
for file in test_files:
src = self.script_dir / file
dst = self.claude_hooks_dir / file
if src.exists():
shutil.copy2(src, dst)
self.success("Natural Memory Triggers v7.1.3 installed successfully")
return True
except Exception as e:
self.error(f"Failed to install Natural Memory Triggers: {e}")
return False
def install_configuration(self, install_natural_triggers: bool = False, detected_mcp: Optional[Dict] = None, env_type: str = "standalone") -> bool:
"""Install or update configuration files.
Args:
install_natural_triggers: Whether to include Natural Memory Triggers configuration
detected_mcp: Optional detected MCP configuration to use
env_type: Environment type ('claude-code' or 'standalone'), defaults to 'standalone'
Returns:
True if installation successful, False otherwise
"""
self.info("Installing configuration...")
try:
# Install template configuration
template_src = self.script_dir / "config.template.json"
template_dst = self.claude_hooks_dir / "config.template.json"
if template_src.exists():
shutil.copy2(template_src, template_dst)
# Install main configuration
config_src = self.script_dir / "config.json"
config_dst = self.claude_hooks_dir / "config.json"
if config_dst.exists():
# Backup existing config
backup_config = config_dst.with_suffix('.json.backup')
shutil.copy2(config_dst, backup_config)
self.info("Existing configuration backed up")
# Generate configuration based on detected MCP or fallback to template
try:
if detected_mcp:
# Use smart configuration generation for existing MCP
config = self.generate_hooks_config_from_mcp(detected_mcp, env_type)
self.success("Generated configuration based on detected MCP setup")
elif config_src.exists():
# Use template configuration and update paths
with open(config_src, 'r') as f:
config = json.load(f)
# Update server working directory path for independent setup
if 'memoryService' in config and 'mcp' in config['memoryService']:
config['memoryService']['mcp']['serverWorkingDir'] = str(self.script_dir.parent)
self.success("Generated configuration using template with updated paths")
else:
# Generate basic configuration
config = self.generate_basic_config(env_type)
self.success("Generated basic configuration")
# Add additional configuration based on installation options
if install_natural_triggers:
config = self.enhance_config_for_natural_triggers(config)
# Write the final configuration
with open(config_dst, 'w') as f:
json.dump(config, f, indent=2)
self.success("Configuration installed successfully")
except Exception as e:
self.warn(f"Failed to generate configuration: {e}")
# Fallback to template copy if available
if config_src.exists():
shutil.copy2(config_src, config_dst)
self.warn("Fell back to template configuration")
return True
except Exception as e:
self.error(f"Failed to install configuration: {e}")
return False
def configure_claude_settings(self, install_mid_conversation: bool = False) -> bool:
"""Configure Claude Code settings.json for hook integration."""
self.info("Configuring Claude Code settings...")
try:
# Determine settings path based on platform
home = Path.home()
if self.platform_name == 'windows':
settings_dir = home / 'AppData' / 'Roaming' / 'Claude'
else:
settings_dir = home / '.claude'
settings_dir.mkdir(parents=True, exist_ok=True)
settings_file = settings_dir / 'settings.json'
# Windows-specific warning for SessionStart hooks (issue #160)
skip_session_start = False
if self.platform_name == 'windows':
self.warn("⚠️ Windows Platform Detected - SessionStart Hook Limitation")
self.warn("SessionStart hooks cause Claude Code to hang on Windows (issue #160)")
self.warn("Workaround: Use '/session-start' slash command instead")
self.info("Skipping SessionStart hook configuration for Windows")
self.info("See: https://github.com/doobidoo/mcp-memory-service/issues/160")
skip_session_start = True
# Create hook configuration
hook_config = {
"hooks": {}
}
# Add SessionStart only on non-Windows platforms
if not skip_session_start:
hook_config["hooks"]["SessionStart"] = [
{
"hooks": [
{
"type": "command",
"command": f'node "{self.claude_hooks_dir}/core/session-start.js"',
"timeout": 10
}
]
}
]
# SessionEnd works on all platforms
hook_config["hooks"]["SessionEnd"] = [
{
"hooks": [
{
"type": "command",
"command": f'node "{self.claude_hooks_dir}/core/session-end.js"',
"timeout": 15
}
]
}
]
# Add mid-conversation hook if Natural Memory Triggers are installed
if install_mid_conversation:
hook_config["hooks"]["UserPromptSubmit"] = [
{
"hooks": [
{
"type": "command",
"command": f'node "{self.claude_hooks_dir}/core/mid-conversation.js"',
"timeout": 8
}
]
}
]
# Add statusLine configuration for v8.5.7+ (Unix/Linux/macOS only - requires bash)
statusline_script = self.claude_hooks_dir / 'statusline.sh'
if statusline_script.exists() and self.platform_name != 'windows':
hook_config["statusLine"] = {
"type": "command",
"command": str(statusline_script),
"padding": 0
}
self.info("Added statusLine configuration for memory awareness display")
elif statusline_script.exists() and self.platform_name == 'windows':
self.info("Skipping statusLine (requires bash - not available on Windows)")
# Handle existing settings with intelligent merging
final_config = hook_config
if settings_file.exists():
# Backup existing settings
backup_settings = settings_file.with_suffix('.json.backup')
shutil.copy2(settings_file, backup_settings)
self.info("Existing settings.json backed up")
try:
# Load existing settings
with open(settings_file, 'r') as f:
existing_settings = json.load(f)
# Intelligent merging: preserve existing hooks while adding/updating memory awareness hooks
if 'hooks' not in existing_settings:
existing_settings['hooks'] = {}
# Check for conflicts and merge intelligently
memory_hook_types = {'SessionStart', 'SessionEnd', 'UserPromptSubmit'}
conflicts = []
for hook_type in memory_hook_types:
if hook_type in existing_settings['hooks'] and hook_type in hook_config['hooks']:
# Check if existing hook is different from our memory awareness hook
existing_commands = [
hook.get('command', '') for hooks_group in existing_settings['hooks'][hook_type]
for hook in hooks_group.get('hooks', [])
]
memory_commands = [
hook.get('command', '') for hooks_group in hook_config['hooks'][hook_type]
for hook in hooks_group.get('hooks', [])
]
# Check if any existing command contains memory hook
is_memory_hook = any('session-start.js' in cmd or 'session-end.js' in cmd or 'mid-conversation.js' in cmd
for cmd in existing_commands)
if not is_memory_hook:
conflicts.append(hook_type)
if conflicts:
self.warn(f"Found existing non-memory hooks for: {', '.join(conflicts)}")
self.warn("Memory awareness hooks will be added alongside existing hooks")
# Add memory hooks alongside existing ones
for hook_type in hook_config['hooks']:
if hook_type in existing_settings['hooks']:
existing_settings['hooks'][hook_type].extend(hook_config['hooks'][hook_type])
else:
existing_settings['hooks'][hook_type] = hook_config['hooks'][hook_type]
else:
# No conflicts, safe to update memory awareness hooks
existing_settings['hooks'].update(hook_config['hooks'])
self.info("Updated memory awareness hooks without conflicts")
final_config = existing_settings
self.success("Settings merged intelligently, preserving existing configuration")
except json.JSONDecodeError as e:
self.warn(f"Existing settings.json invalid, using backup and creating new: {e}")
final_config = hook_config
except Exception as e:
self.warn(f"Error merging settings, creating new configuration: {e}")
final_config = hook_config
# Write final configuration
with open(settings_file, 'w') as f:
json.dump(final_config, f, indent=2)
self.success("Claude Code settings configured successfully")
return True
except Exception as e:
self.error(f"Failed to configure Claude Code settings: {e}")
return False
def run_tests(self, test_natural_triggers: bool = False) -> bool:
"""Run hook tests to verify installation."""
self.info("Running installation tests...")
success = True
# Check required files exist
required_files = [
"core/session-start.js",
"core/session-end.js",
"utilities/project-detector.js",
"utilities/memory-scorer.js",
"utilities/context-formatter.js",
"config.json"
]
if test_natural_triggers:
required_files.extend([
"core/mid-conversation.js",
"utilities/adaptive-pattern-detector.js",
"utilities/performance-manager.js",
"utilities/mcp-client.js"
])
missing_files = []
for file in required_files:
if not (self.claude_hooks_dir / file).exists():
missing_files.append(file)
if missing_files:
self.error("Installation incomplete - missing files:")
for file in missing_files:
self.error(f" - {file}")
success = False
else:
self.success("All required files installed correctly")
# Test Node.js execution
test_script = self.claude_hooks_dir / "core" / "session-start.js"
if test_script.exists():
try:
result = subprocess.run(['node', '--check', str(test_script)],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
self.success("Hook JavaScript syntax validation passed")
else:
self.error(f"Hook JavaScript syntax validation failed: {result.stderr}")
success = False
except Exception as e:
self.warn(f"Could not validate JavaScript syntax: {e}")
# Run integration tests if available
integration_test = self.claude_hooks_dir / "tests" / "integration-test.js"
if integration_test.exists():
try:
self.info("Running integration tests...")
result = subprocess.run(['node', str(integration_test)],
capture_output=True, text=True,
timeout=30, cwd=str(self.claude_hooks_dir))
if result.returncode == 0:
self.success("Integration tests passed")
else:
self.warn("Some integration tests failed - check configuration")
if result.stdout:
self.info(f"Test output: {result.stdout}")
except Exception as e:
self.warn(f"Could not run integration tests: {e}")
# Run Natural Memory Triggers tests if applicable
if test_natural_triggers:
natural_test = self.claude_hooks_dir / "test-natural-triggers.js"
if natural_test.exists():
try:
self.info("Running Natural Memory Triggers tests...")
result = subprocess.run(['node', str(natural_test)],
capture_output=True, text=True,
timeout=30, cwd=str(self.claude_hooks_dir))
if result.returncode == 0:
self.success("Natural Memory Triggers tests passed")
else:
self.warn("Some Natural Memory Triggers tests failed")
except Exception as e:
self.warn(f"Could not run Natural Memory Triggers tests: {e}")
return success
def _cleanup_empty_directories(self) -> None:
"""Remove empty directories after uninstall."""
try:
# Directories to check for cleanup (in reverse order to handle nested structure)
directories_to_check = [
self.claude_hooks_dir / "core",
self.claude_hooks_dir / "utilities",
self.claude_hooks_dir / "tests"
]
for directory in directories_to_check:
if directory.exists() and directory.is_dir():
try:
# Check if directory is empty (no files, only empty subdirectories allowed)
items = list(directory.iterdir())
if not items:
# Directory is completely empty
directory.rmdir()
self.info(f"Removed empty directory: {directory.name}/")
else:
# Check if it only contains empty subdirectories
all_empty = True
for item in items:
if item.is_file():
all_empty = False
break
elif item.is_dir() and list(item.iterdir()):
all_empty = False
break
if all_empty:
# Remove empty subdirectories first
for item in items:
if item.is_dir():
item.rmdir()
# Then remove the parent directory
directory.rmdir()
self.info(f"Removed empty directory tree: {directory.name}/")
except OSError:
# Directory not empty or permission issue, skip silently
pass
except Exception as e:
self.warn(f"Could not cleanup empty directories: {e}")
def uninstall(self) -> bool:
"""Remove installed hooks."""
self.info("Uninstalling Claude Code memory awareness hooks...")
try:
if not self.claude_hooks_dir.exists():
self.info("No hooks installation found")
return True
# Remove hook files
files_to_remove = [
"core/session-start.js",
"core/session-end.js",
"core/mid-conversation.js",
"core/memory-retrieval.js",
"core/topic-change.js",
"memory-mode-controller.js",
"test-natural-triggers.js",
"test-mcp-hook.js",
"debug-pattern-test.js"
]
# Remove utilities
utility_files = [
"utilities/adaptive-pattern-detector.js",
"utilities/performance-manager.js",
"utilities/mcp-client.js",
"utilities/memory-client.js",
"utilities/tiered-conversation-monitor.js"
]
files_to_remove.extend(utility_files)
removed_count = 0
for file in files_to_remove:
file_path = self.claude_hooks_dir / file
if file_path.exists():
file_path.unlink()
removed_count += 1
# Remove config files if user confirms
config_file = self.claude_hooks_dir / "config.json"
if config_file.exists():
# We'll keep config files by default since they may have user customizations
self.info("Configuration files preserved (contains user customizations)")
# Clean up empty directories
self._cleanup_empty_directories()
self.success(f"Removed {removed_count} hook files and cleaned up empty directories")
return True
except Exception as e:
self.error(f"Failed to uninstall hooks: {e}")
return False
def main():
"""Main installer function."""
parser = argparse.ArgumentParser(
description="Unified Claude Code Memory Awareness Hooks Installer",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python install_hooks.py # Install all features (default)
python install_hooks.py --basic # Basic hooks only
python install_hooks.py --natural-triggers # Natural Memory Triggers only
python install_hooks.py --test # Run tests only
python install_hooks.py --uninstall # Remove hooks
Features:
Basic: Session-start and session-end hooks for memory awareness
Natural Triggers: v7.1.3 intelligent automatic memory awareness with
pattern detection, performance optimization, and CLI tools
"""
)
parser.add_argument('--basic', action='store_true',
help='Install basic memory awareness hooks only')
parser.add_argument('--natural-triggers', action='store_true',
help='Install Natural Memory Triggers v7.1.3 only')
parser.add_argument('--all', action='store_true',
help='Install all features (default behavior)')
parser.add_argument('--test', action='store_true',
help='Run tests only (do not install)')
parser.add_argument('--uninstall', action='store_true',
help='Remove installed hooks')
parser.add_argument('--force', action='store_true',
help='Force installation even if prerequisites fail')
parser.add_argument('--dry-run', action='store_true',
help='Show what would be installed without making changes')
args = parser.parse_args()
# Create installer instance
installer = HookInstaller()
installer.header(f"Claude Code Memory Awareness Hooks Installer v{get_project_version()}")
installer.info(f"Script location: {installer.script_dir}")
installer.info(f"Target hooks directory: {installer.claude_hooks_dir}")
installer.info(f"Platform: {installer.platform_name}")
# Handle special modes first
if args.uninstall:
if installer.uninstall():
installer.success("Hooks uninstalled successfully")
else:
installer.error("Uninstall failed")
sys.exit(1)
return
if args.test:
test_natural_triggers = not args.basic
if installer.run_tests(test_natural_triggers=test_natural_triggers):
installer.success("All tests passed")
else:
installer.error("Some tests failed")
sys.exit(1)
return
# Check prerequisites
if not installer.check_prerequisites() and not args.force:
installer.error("Prerequisites check failed. Use --force to continue anyway.")
sys.exit(1)
# Enhanced MCP Detection and Configuration
installer.header("MCP Configuration Detection")
detected_mcp = installer.detect_claude_mcp_configuration()
use_existing_mcp = False
if detected_mcp:
# Validate MCP prerequisites
is_valid, issues = installer.validate_mcp_prerequisites(detected_mcp)
if is_valid:
installer.success("✅ Valid MCP configuration detected!")
installer.info("📋 Configuration Options:")
installer.info(" [1] Use existing MCP setup (recommended) - DRY principle ✨")
installer.info(" [2] Create independent hooks setup - legacy fallback")
# For now, we'll default to using existing MCP (can be made interactive later)
use_existing_mcp = True
installer.info("Using existing MCP configuration (option 1)")
else:
installer.warn("⚠️ MCP configuration found but has issues:")
for issue in issues:
installer.warn(f" - {issue}")
installer.info("Will use independent setup as fallback")
else:
installer.info("No existing MCP configuration found - using independent setup")
# Environment Detection and Protocol Configuration
installer.header("Environment Detection & Protocol Configuration")
env_type = installer.detect_environment_type()
# Determine what to install
install_all = not (args.basic or args.natural_triggers) or args.all
install_basic = args.basic or install_all
install_natural_triggers = args.natural_triggers or install_all
installer.info(f"Installation plan:")
installer.info(f" Basic hooks: {'Yes' if install_basic else 'No'}")
installer.info(f" Natural Memory Triggers: {'Yes' if install_natural_triggers else 'No'}")
if args.dry_run:
installer.info("DRY RUN - No changes will be made")
installer.info("Would install:")
if install_basic:
installer.info(" - Basic memory awareness hooks")
installer.info(" - Core utilities and configuration")
if install_natural_triggers:
installer.info(" - Natural Memory Triggers v7.1.3")
installer.info(" - Mid-conversation hooks")
installer.info(" - Performance optimization utilities")
installer.info(" - CLI management tools")
return
# Create backup
installer.create_backup()
# Perform installation
overall_success = True
# Install components based on selection
if install_basic:
if not installer.install_basic_hooks():
overall_success = False
if install_natural_triggers:
if not installer.install_natural_triggers():
overall_success = False
# Install configuration (always needed) with MCP awareness
if not installer.install_configuration(install_natural_triggers=install_natural_triggers,
detected_mcp=detected_mcp if use_existing_mcp else None,
env_type=env_type):
overall_success = False
# Configure Claude Code settings
if not installer.configure_claude_settings(install_mid_conversation=install_natural_triggers):
overall_success = False
# Run tests to verify installation
if overall_success:
installer.info("Running post-installation tests...")
if installer.run_tests(test_natural_triggers=install_natural_triggers):
installer.header("Installation Complete!")
if install_basic and install_natural_triggers:
installer.success("Complete Claude Code memory awareness system installed")
installer.info("Features available:")
installer.info(" ✅ Session-start and session-end hooks")
installer.info(" ✅ Natural Memory Triggers with intelligent pattern detection")
installer.info(" ✅ Mid-conversation memory injection")
installer.info(" ✅ Performance optimization and CLI management")
installer.info("")
installer.info("CLI Management:")
installer.info(f" node {installer.claude_hooks_dir}/memory-mode-controller.js status")
installer.info(f" node {installer.claude_hooks_dir}/memory-mode-controller.js profile balanced")
elif install_natural_triggers:
installer.success("Natural Memory Triggers v7.1.3 installed")
installer.info("Advanced memory awareness features available")
elif install_basic:
installer.success("Basic memory awareness hooks installed")
installer.info("Session-based memory awareness enabled")
# Code execution enabled message (applies to all installation types)
installer.info("")
installer.success("Code Execution Interface enabled by default")
installer.info(" ✅ 75-90% token reduction")
installer.info(" ✅ Automatic MCP fallback")
installer.info(" ✅ Zero breaking changes")
installer.info(" ℹ️ Disable in ~/.claude/hooks/config.json if needed")
else:
installer.warn("Installation completed but some tests failed")
installer.info("Hooks may still work - check configuration manually")
else:
installer.error("Installation failed - some components could not be installed")
sys.exit(1)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}Installation cancelled by user{Colors.NC}")
sys.exit(1)
except Exception as e:
print(f"\n{Colors.RED}Unexpected error: {e}{Colors.NC}")
sys.exit(1)
```