#
tokens: 48317/50000 9/625 files (page 23/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 23 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── amp-bridge.md
│   │   ├── amp-pr-automator.md
│   │   ├── code-quality-guard.md
│   │   ├── gemini-pr-automator.md
│   │   └── github-release-manager.md
│   ├── settings.local.json.backup
│   └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   ├── feature_request.yml
│   │   └── performance_issue.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── bridge-tests.yml
│       ├── CACHE_FIX.md
│       ├── claude-code-review.yml
│       ├── claude.yml
│       ├── cleanup-images.yml.disabled
│       ├── dev-setup-validation.yml
│       ├── docker-publish.yml
│       ├── LATEST_FIXES.md
│       ├── main-optimized.yml.disabled
│       ├── main.yml
│       ├── publish-and-test.yml
│       ├── README_OPTIMIZATION.md
│       ├── release-tag.yml.disabled
│       ├── release.yml
│       ├── roadmap-review-reminder.yml
│       ├── SECRET_CONDITIONAL_FIX.md
│       └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│   ├── .gitignore
│   └── reports
│       └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│   ├── deployment
│   │   ├── deploy_fastmcp_fixed.sh
│   │   ├── deploy_http_with_mcp.sh
│   │   └── deploy_mcp_v4.sh
│   ├── deployment-configs
│   │   ├── empty_config.yml
│   │   └── smithery.yaml
│   ├── development
│   │   └── test_fastmcp.py
│   ├── docs-removed-2025-08-23
│   │   ├── authentication.md
│   │   ├── claude_integration.md
│   │   ├── claude-code-compatibility.md
│   │   ├── claude-code-integration.md
│   │   ├── claude-code-quickstart.md
│   │   ├── claude-desktop-setup.md
│   │   ├── complete-setup-guide.md
│   │   ├── database-synchronization.md
│   │   ├── development
│   │   │   ├── autonomous-memory-consolidation.md
│   │   │   ├── CLEANUP_PLAN.md
│   │   │   ├── CLEANUP_README.md
│   │   │   ├── CLEANUP_SUMMARY.md
│   │   │   ├── dream-inspired-memory-consolidation.md
│   │   │   ├── hybrid-slm-memory-consolidation.md
│   │   │   ├── mcp-milestone.md
│   │   │   ├── multi-client-architecture.md
│   │   │   ├── test-results.md
│   │   │   └── TIMESTAMP_FIX_SUMMARY.md
│   │   ├── distributed-sync.md
│   │   ├── invocation_guide.md
│   │   ├── macos-intel.md
│   │   ├── master-guide.md
│   │   ├── mcp-client-configuration.md
│   │   ├── multi-client-server.md
│   │   ├── service-installation.md
│   │   ├── sessions
│   │   │   └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│   │   ├── UBUNTU_SETUP.md
│   │   ├── ubuntu.md
│   │   ├── windows-setup.md
│   │   └── windows.md
│   ├── docs-root-cleanup-2025-08-23
│   │   ├── AWESOME_LIST_SUBMISSION.md
│   │   ├── CLOUDFLARE_IMPLEMENTATION.md
│   │   ├── DOCUMENTATION_ANALYSIS.md
│   │   ├── DOCUMENTATION_CLEANUP_PLAN.md
│   │   ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│   │   ├── LITESTREAM_SETUP_GUIDE.md
│   │   ├── lm_studio_system_prompt.md
│   │   ├── PYTORCH_DOWNLOAD_FIX.md
│   │   └── README-ORIGINAL-BACKUP.md
│   ├── investigations
│   │   └── MACOS_HOOKS_INVESTIGATION.md
│   ├── litestream-configs-v6.3.0
│   │   ├── install_service.sh
│   │   ├── litestream_master_config_fixed.yml
│   │   ├── litestream_master_config.yml
│   │   ├── litestream_replica_config_fixed.yml
│   │   ├── litestream_replica_config.yml
│   │   ├── litestream_replica_simple.yml
│   │   ├── litestream-http.service
│   │   ├── litestream.service
│   │   └── requirements-cloudflare.txt
│   ├── release-notes
│   │   └── release-notes-v7.1.4.md
│   └── setup-development
│       ├── README.md
│       ├── setup_consolidation_mdns.sh
│       ├── STARTUP_SETUP_GUIDE.md
│       └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│   ├── memory-context.md
│   ├── memory-health.md
│   ├── memory-ingest-dir.md
│   ├── memory-ingest.md
│   ├── memory-recall.md
│   ├── memory-search.md
│   ├── memory-store.md
│   ├── README.md
│   └── session-start.md
├── claude-hooks
│   ├── config.json
│   ├── config.template.json
│   ├── CONFIGURATION.md
│   ├── core
│   │   ├── memory-retrieval.js
│   │   ├── mid-conversation.js
│   │   ├── session-end.js
│   │   ├── session-start.js
│   │   └── topic-change.js
│   ├── debug-pattern-test.js
│   ├── install_claude_hooks_windows.ps1
│   ├── install_hooks.py
│   ├── memory-mode-controller.js
│   ├── MIGRATION.md
│   ├── README-NATURAL-TRIGGERS.md
│   ├── README-phase2.md
│   ├── README.md
│   ├── simple-test.js
│   ├── statusline.sh
│   ├── test-adaptive-weights.js
│   ├── test-dual-protocol-hook.js
│   ├── test-mcp-hook.js
│   ├── test-natural-triggers.js
│   ├── test-recency-scoring.js
│   ├── tests
│   │   ├── integration-test.js
│   │   ├── phase2-integration-test.js
│   │   ├── test-code-execution.js
│   │   ├── test-cross-session.json
│   │   ├── test-session-tracking.json
│   │   └── test-threading.json
│   ├── utilities
│   │   ├── adaptive-pattern-detector.js
│   │   ├── context-formatter.js
│   │   ├── context-shift-detector.js
│   │   ├── conversation-analyzer.js
│   │   ├── dynamic-context-updater.js
│   │   ├── git-analyzer.js
│   │   ├── mcp-client.js
│   │   ├── memory-client.js
│   │   ├── memory-scorer.js
│   │   ├── performance-manager.js
│   │   ├── project-detector.js
│   │   ├── session-tracker.js
│   │   ├── tiered-conversation-monitor.js
│   │   └── version-checker.js
│   └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│   ├── amp-cli-bridge.md
│   ├── api
│   │   ├── code-execution-interface.md
│   │   ├── memory-metadata-api.md
│   │   ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│   │   ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│   │   ├── PHASE2_REPORT.md
│   │   └── tag-standardization.md
│   ├── architecture
│   │   ├── search-enhancement-spec.md
│   │   └── search-examples.md
│   ├── architecture.md
│   ├── archive
│   │   └── obsolete-workflows
│   │       ├── load_memory_context.md
│   │       └── README.md
│   ├── assets
│   │   └── images
│   │       ├── dashboard-v3.3.0-preview.png
│   │       ├── memory-awareness-hooks-example.png
│   │       ├── project-infographic.svg
│   │       └── README.md
│   ├── CLAUDE_CODE_QUICK_REFERENCE.md
│   ├── cloudflare-setup.md
│   ├── deployment
│   │   ├── docker.md
│   │   ├── dual-service.md
│   │   ├── production-guide.md
│   │   └── systemd-service.md
│   ├── development
│   │   ├── ai-agent-instructions.md
│   │   ├── code-quality
│   │   │   ├── phase-2a-completion.md
│   │   │   ├── phase-2a-handle-get-prompt.md
│   │   │   ├── phase-2a-index.md
│   │   │   ├── phase-2a-install-package.md
│   │   │   └── phase-2b-session-summary.md
│   │   ├── code-quality-workflow.md
│   │   ├── dashboard-workflow.md
│   │   ├── issue-management.md
│   │   ├── pr-review-guide.md
│   │   ├── refactoring-notes.md
│   │   ├── release-checklist.md
│   │   └── todo-tracker.md
│   ├── docker-optimized-build.md
│   ├── document-ingestion.md
│   ├── DOCUMENTATION_AUDIT.md
│   ├── enhancement-roadmap-issue-14.md
│   ├── examples
│   │   ├── analysis-scripts.js
│   │   ├── maintenance-session-example.md
│   │   ├── memory-distribution-chart.jsx
│   │   └── tag-schema.json
│   ├── first-time-setup.md
│   ├── glama-deployment.md
│   ├── guides
│   │   ├── advanced-command-examples.md
│   │   ├── chromadb-migration.md
│   │   ├── commands-vs-mcp-server.md
│   │   ├── mcp-enhancements.md
│   │   ├── mdns-service-discovery.md
│   │   ├── memory-consolidation-guide.md
│   │   ├── migration.md
│   │   ├── scripts.md
│   │   └── STORAGE_BACKENDS.md
│   ├── HOOK_IMPROVEMENTS.md
│   ├── hooks
│   │   └── phase2-code-execution-migration.md
│   ├── http-server-management.md
│   ├── ide-compatability.md
│   ├── IMAGE_RETENTION_POLICY.md
│   ├── images
│   │   └── dashboard-placeholder.md
│   ├── implementation
│   │   ├── health_checks.md
│   │   └── performance.md
│   ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│   ├── integration
│   │   ├── homebrew.md
│   │   └── multi-client.md
│   ├── integrations
│   │   ├── gemini.md
│   │   ├── groq-bridge.md
│   │   ├── groq-integration-summary.md
│   │   └── groq-model-comparison.md
│   ├── integrations.md
│   ├── legacy
│   │   └── dual-protocol-hooks.md
│   ├── LM_STUDIO_COMPATIBILITY.md
│   ├── maintenance
│   │   └── memory-maintenance.md
│   ├── mastery
│   │   ├── api-reference.md
│   │   ├── architecture-overview.md
│   │   ├── configuration-guide.md
│   │   ├── local-setup-and-run.md
│   │   ├── testing-guide.md
│   │   └── troubleshooting.md
│   ├── migration
│   │   └── code-execution-api-quick-start.md
│   ├── natural-memory-triggers
│   │   ├── cli-reference.md
│   │   ├── installation-guide.md
│   │   └── performance-optimization.md
│   ├── oauth-setup.md
│   ├── pr-graphql-integration.md
│   ├── quick-setup-cloudflare-dual-environment.md
│   ├── README.md
│   ├── remote-configuration-wiki-section.md
│   ├── research
│   │   ├── code-execution-interface-implementation.md
│   │   └── code-execution-interface-summary.md
│   ├── ROADMAP.md
│   ├── sqlite-vec-backend.md
│   ├── statistics
│   │   ├── charts
│   │   │   ├── activity_patterns.png
│   │   │   ├── contributors.png
│   │   │   ├── growth_trajectory.png
│   │   │   ├── monthly_activity.png
│   │   │   └── october_sprint.png
│   │   ├── data
│   │   │   ├── activity_by_day.csv
│   │   │   ├── activity_by_hour.csv
│   │   │   ├── contributors.csv
│   │   │   └── monthly_activity.csv
│   │   ├── generate_charts.py
│   │   └── REPOSITORY_STATISTICS.md
│   ├── technical
│   │   ├── development.md
│   │   ├── memory-migration.md
│   │   ├── migration-log.md
│   │   ├── sqlite-vec-embedding-fixes.md
│   │   └── tag-storage.md
│   ├── testing
│   │   └── regression-tests.md
│   ├── testing-cloudflare-backend.md
│   ├── troubleshooting
│   │   ├── cloudflare-api-token-setup.md
│   │   ├── cloudflare-authentication.md
│   │   ├── general.md
│   │   ├── hooks-quick-reference.md
│   │   ├── pr162-schema-caching-issue.md
│   │   ├── session-end-hooks.md
│   │   └── sync-issues.md
│   └── tutorials
│       ├── advanced-techniques.md
│       ├── data-analysis.md
│       └── demo-session-walkthrough.md
├── examples
│   ├── claude_desktop_config_template.json
│   ├── claude_desktop_config_windows.json
│   ├── claude-desktop-http-config.json
│   ├── config
│   │   └── claude_desktop_config.json
│   ├── http-mcp-bridge.js
│   ├── memory_export_template.json
│   ├── README.md
│   ├── setup
│   │   └── setup_multi_client_complete.py
│   └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│   ├── .claude
│   │   └── settings.local.json
│   ├── archive
│   │   └── check_missing_timestamps.py
│   ├── backup
│   │   ├── backup_memories.py
│   │   ├── backup_sqlite_vec.sh
│   │   ├── export_distributable_memories.sh
│   │   └── restore_memories.py
│   ├── benchmarks
│   │   ├── benchmark_code_execution_api.py
│   │   ├── benchmark_hybrid_sync.py
│   │   └── benchmark_server_caching.py
│   ├── database
│   │   ├── analyze_sqlite_vec_db.py
│   │   ├── check_sqlite_vec_status.py
│   │   ├── db_health_check.py
│   │   └── simple_timestamp_check.py
│   ├── development
│   │   ├── debug_server_initialization.py
│   │   ├── find_orphaned_files.py
│   │   ├── fix_mdns.sh
│   │   ├── fix_sitecustomize.py
│   │   ├── remote_ingest.sh
│   │   ├── setup-git-merge-drivers.sh
│   │   ├── uv-lock-merge.sh
│   │   └── verify_hybrid_sync.py
│   ├── hooks
│   │   └── pre-commit
│   ├── installation
│   │   ├── install_linux_service.py
│   │   ├── install_macos_service.py
│   │   ├── install_uv.py
│   │   ├── install_windows_service.py
│   │   ├── install.py
│   │   ├── setup_backup_cron.sh
│   │   ├── setup_claude_mcp.sh
│   │   └── setup_cloudflare_resources.py
│   ├── linux
│   │   ├── service_status.sh
│   │   ├── start_service.sh
│   │   ├── stop_service.sh
│   │   ├── uninstall_service.sh
│   │   └── view_logs.sh
│   ├── maintenance
│   │   ├── assign_memory_types.py
│   │   ├── check_memory_types.py
│   │   ├── cleanup_corrupted_encoding.py
│   │   ├── cleanup_memories.py
│   │   ├── cleanup_organize.py
│   │   ├── consolidate_memory_types.py
│   │   ├── consolidation_mappings.json
│   │   ├── delete_orphaned_vectors_fixed.py
│   │   ├── fast_cleanup_duplicates_with_tracking.sh
│   │   ├── find_all_duplicates.py
│   │   ├── find_cloudflare_duplicates.py
│   │   ├── find_duplicates.py
│   │   ├── memory-types.md
│   │   ├── README.md
│   │   ├── recover_timestamps_from_cloudflare.py
│   │   ├── regenerate_embeddings.py
│   │   ├── repair_malformed_tags.py
│   │   ├── repair_memories.py
│   │   ├── repair_sqlite_vec_embeddings.py
│   │   ├── repair_zero_embeddings.py
│   │   ├── restore_from_json_export.py
│   │   └── scan_todos.sh
│   ├── migration
│   │   ├── cleanup_mcp_timestamps.py
│   │   ├── legacy
│   │   │   └── migrate_chroma_to_sqlite.py
│   │   ├── mcp-migration.py
│   │   ├── migrate_sqlite_vec_embeddings.py
│   │   ├── migrate_storage.py
│   │   ├── migrate_tags.py
│   │   ├── migrate_timestamps.py
│   │   ├── migrate_to_cloudflare.py
│   │   ├── migrate_to_sqlite_vec.py
│   │   ├── migrate_v5_enhanced.py
│   │   ├── TIMESTAMP_CLEANUP_README.md
│   │   └── verify_mcp_timestamps.py
│   ├── pr
│   │   ├── amp_collect_results.sh
│   │   ├── amp_detect_breaking_changes.sh
│   │   ├── amp_generate_tests.sh
│   │   ├── amp_pr_review.sh
│   │   ├── amp_quality_gate.sh
│   │   ├── amp_suggest_fixes.sh
│   │   ├── auto_review.sh
│   │   ├── detect_breaking_changes.sh
│   │   ├── generate_tests.sh
│   │   ├── lib
│   │   │   └── graphql_helpers.sh
│   │   ├── quality_gate.sh
│   │   ├── resolve_threads.sh
│   │   ├── run_pyscn_analysis.sh
│   │   ├── run_quality_checks.sh
│   │   ├── thread_status.sh
│   │   └── watch_reviews.sh
│   ├── quality
│   │   ├── fix_dead_code_install.sh
│   │   ├── phase1_dead_code_analysis.md
│   │   ├── phase2_complexity_analysis.md
│   │   ├── README_PHASE1.md
│   │   ├── README_PHASE2.md
│   │   ├── track_pyscn_metrics.sh
│   │   └── weekly_quality_review.sh
│   ├── README.md
│   ├── run
│   │   ├── run_mcp_memory.sh
│   │   ├── run-with-uv.sh
│   │   └── start_sqlite_vec.sh
│   ├── run_memory_server.py
│   ├── server
│   │   ├── check_http_server.py
│   │   ├── check_server_health.py
│   │   ├── memory_offline.py
│   │   ├── preload_models.py
│   │   ├── run_http_server.py
│   │   ├── run_memory_server.py
│   │   ├── start_http_server.bat
│   │   └── start_http_server.sh
│   ├── service
│   │   ├── deploy_dual_services.sh
│   │   ├── install_http_service.sh
│   │   ├── mcp-memory-http.service
│   │   ├── mcp-memory.service
│   │   ├── memory_service_manager.sh
│   │   ├── service_control.sh
│   │   ├── service_utils.py
│   │   └── update_service.sh
│   ├── sync
│   │   ├── check_drift.py
│   │   ├── claude_sync_commands.py
│   │   ├── export_memories.py
│   │   ├── import_memories.py
│   │   ├── litestream
│   │   │   ├── apply_local_changes.sh
│   │   │   ├── enhanced_memory_store.sh
│   │   │   ├── init_staging_db.sh
│   │   │   ├── io.litestream.replication.plist
│   │   │   ├── manual_sync.sh
│   │   │   ├── memory_sync.sh
│   │   │   ├── pull_remote_changes.sh
│   │   │   ├── push_to_remote.sh
│   │   │   ├── README.md
│   │   │   ├── resolve_conflicts.sh
│   │   │   ├── setup_local_litestream.sh
│   │   │   ├── setup_remote_litestream.sh
│   │   │   ├── staging_db_init.sql
│   │   │   ├── stash_local_changes.sh
│   │   │   ├── sync_from_remote_noconfig.sh
│   │   │   └── sync_from_remote.sh
│   │   ├── README.md
│   │   ├── safe_cloudflare_update.sh
│   │   ├── sync_memory_backends.py
│   │   └── sync_now.py
│   ├── testing
│   │   ├── run_complete_test.py
│   │   ├── run_memory_test.sh
│   │   ├── simple_test.py
│   │   ├── test_cleanup_logic.py
│   │   ├── test_cloudflare_backend.py
│   │   ├── test_docker_functionality.py
│   │   ├── test_installation.py
│   │   ├── test_mdns.py
│   │   ├── test_memory_api.py
│   │   ├── test_memory_simple.py
│   │   ├── test_migration.py
│   │   ├── test_search_api.py
│   │   ├── test_sqlite_vec_embeddings.py
│   │   ├── test_sse_events.py
│   │   ├── test-connection.py
│   │   └── test-hook.js
│   ├── utils
│   │   ├── claude_commands_utils.py
│   │   ├── generate_personalized_claude_md.sh
│   │   ├── groq
│   │   ├── groq_agent_bridge.py
│   │   ├── list-collections.py
│   │   ├── memory_wrapper_uv.py
│   │   ├── query_memories.py
│   │   ├── smithery_wrapper.py
│   │   ├── test_groq_bridge.sh
│   │   └── uv_wrapper.py
│   └── validation
│       ├── check_dev_setup.py
│       ├── check_documentation_links.py
│       ├── diagnose_backend_config.py
│       ├── validate_configuration_complete.py
│       ├── validate_memories.py
│       ├── validate_migration.py
│       ├── validate_timestamp_integrity.py
│       ├── verify_environment.py
│       ├── verify_pytorch_windows.py
│       └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│   └── mcp_memory_service
│       ├── __init__.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── operations.py
│       │   ├── sync_wrapper.py
│       │   └── types.py
│       ├── backup
│       │   ├── __init__.py
│       │   └── scheduler.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── ingestion.py
│       │   ├── main.py
│       │   └── utils.py
│       ├── config.py
│       ├── consolidation
│       │   ├── __init__.py
│       │   ├── associations.py
│       │   ├── base.py
│       │   ├── clustering.py
│       │   ├── compression.py
│       │   ├── consolidator.py
│       │   ├── decay.py
│       │   ├── forgetting.py
│       │   ├── health.py
│       │   └── scheduler.py
│       ├── dependency_check.py
│       ├── discovery
│       │   ├── __init__.py
│       │   ├── client.py
│       │   └── mdns_service.py
│       ├── embeddings
│       │   ├── __init__.py
│       │   └── onnx_embeddings.py
│       ├── ingestion
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chunker.py
│       │   ├── csv_loader.py
│       │   ├── json_loader.py
│       │   ├── pdf_loader.py
│       │   ├── registry.py
│       │   ├── semtools_loader.py
│       │   └── text_loader.py
│       ├── lm_studio_compat.py
│       ├── mcp_server.py
│       ├── models
│       │   ├── __init__.py
│       │   └── memory.py
│       ├── server.py
│       ├── services
│       │   ├── __init__.py
│       │   └── memory_service.py
│       ├── storage
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloudflare.py
│       │   ├── factory.py
│       │   ├── http_client.py
│       │   ├── hybrid.py
│       │   └── sqlite_vec.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── exporter.py
│       │   ├── importer.py
│       │   └── litestream_config.py
│       ├── utils
│       │   ├── __init__.py
│       │   ├── cache_manager.py
│       │   ├── content_splitter.py
│       │   ├── db_utils.py
│       │   ├── debug.py
│       │   ├── document_processing.py
│       │   ├── gpu_detection.py
│       │   ├── hashing.py
│       │   ├── http_server_manager.py
│       │   ├── port_detection.py
│       │   ├── system_detection.py
│       │   └── time_parser.py
│       └── web
│           ├── __init__.py
│           ├── api
│           │   ├── __init__.py
│           │   ├── analytics.py
│           │   ├── backup.py
│           │   ├── consolidation.py
│           │   ├── documents.py
│           │   ├── events.py
│           │   ├── health.py
│           │   ├── manage.py
│           │   ├── mcp.py
│           │   ├── memories.py
│           │   ├── search.py
│           │   └── sync.py
│           ├── app.py
│           ├── dependencies.py
│           ├── oauth
│           │   ├── __init__.py
│           │   ├── authorization.py
│           │   ├── discovery.py
│           │   ├── middleware.py
│           │   ├── models.py
│           │   ├── registration.py
│           │   └── storage.py
│           ├── sse.py
│           └── static
│               ├── app.js
│               ├── index.html
│               ├── README.md
│               ├── sse_test.html
│               └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── test_compact_types.py
│   │   └── test_operations.py
│   ├── bridge
│   │   ├── mock_responses.js
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   └── test_http_mcp_bridge.js
│   ├── conftest.py
│   ├── consolidation
│   │   ├── __init__.py
│   │   ├── conftest.py
│   │   ├── test_associations.py
│   │   ├── test_clustering.py
│   │   ├── test_compression.py
│   │   ├── test_consolidator.py
│   │   ├── test_decay.py
│   │   └── test_forgetting.py
│   ├── contracts
│   │   └── api-specification.yml
│   ├── integration
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── test_api_key_fallback.py
│   │   ├── test_api_memories_chronological.py
│   │   ├── test_api_tag_time_search.py
│   │   ├── test_api_with_memory_service.py
│   │   ├── test_bridge_integration.js
│   │   ├── test_cli_interfaces.py
│   │   ├── test_cloudflare_connection.py
│   │   ├── test_concurrent_clients.py
│   │   ├── test_data_serialization_consistency.py
│   │   ├── test_http_server_startup.py
│   │   ├── test_mcp_memory.py
│   │   ├── test_mdns_integration.py
│   │   ├── test_oauth_basic_auth.py
│   │   ├── test_oauth_flow.py
│   │   ├── test_server_handlers.py
│   │   └── test_store_memory.py
│   ├── performance
│   │   ├── test_background_sync.py
│   │   └── test_hybrid_live.py
│   ├── README.md
│   ├── smithery
│   │   └── test_smithery.py
│   ├── sqlite
│   │   └── simple_sqlite_vec_test.py
│   ├── test_client.py
│   ├── test_content_splitting.py
│   ├── test_database.py
│   ├── test_hybrid_cloudflare_limits.py
│   ├── test_hybrid_storage.py
│   ├── test_memory_ops.py
│   ├── test_semantic_search.py
│   ├── test_sqlite_vec_storage.py
│   ├── test_time_parser.py
│   ├── test_timestamp_preservation.py
│   ├── timestamp
│   │   ├── test_hook_vs_manual_storage.py
│   │   ├── test_issue99_final_validation.py
│   │   ├── test_search_retrieval_inconsistency.py
│   │   ├── test_timestamp_issue.py
│   │   └── test_timestamp_simple.py
│   └── unit
│       ├── conftest.py
│       ├── test_cloudflare_storage.py
│       ├── test_csv_loader.py
│       ├── test_fastapi_dependencies.py
│       ├── test_import.py
│       ├── test_json_loader.py
│       ├── test_mdns_simple.py
│       ├── test_mdns.py
│       ├── test_memory_service.py
│       ├── test_memory.py
│       ├── test_semtools_loader.py
│       ├── test_storage_interface_compatibility.py
│       └── test_tag_time_filtering.py
├── tools
│   ├── docker
│   │   ├── DEPRECATED.md
│   │   ├── docker-compose.http.yml
│   │   ├── docker-compose.pythonpath.yml
│   │   ├── docker-compose.standalone.yml
│   │   ├── docker-compose.uv.yml
│   │   ├── docker-compose.yml
│   │   ├── docker-entrypoint-persistent.sh
│   │   ├── docker-entrypoint-unified.sh
│   │   ├── docker-entrypoint.sh
│   │   ├── Dockerfile
│   │   ├── Dockerfile.glama
│   │   ├── Dockerfile.slim
│   │   ├── README.md
│   │   └── test-docker-modes.sh
│   └── README.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/docs/guides/advanced-command-examples.md:
--------------------------------------------------------------------------------

```markdown
# Advanced Claude Code Commands - Real-World Examples

This guide showcases advanced usage patterns and real-world workflows using MCP Memory Service Claude Code commands.

## Table of Contents
- [Development Workflows](#development-workflows)
- [Project Management](#project-management)
- [Learning & Knowledge Management](#learning--knowledge-management)
- [Team Collaboration](#team-collaboration)
- [Debugging & Troubleshooting](#debugging--troubleshooting)
- [Advanced Search Techniques](#advanced-search-techniques)
- [Automation & Scripting](#automation--scripting)

---

## Development Workflows

### Full-Stack Development Session

**Scenario**: Working on a web application with authentication

```bash
# Start development session with context capture
claude /memory-context --summary "Starting OAuth 2.0 integration for user authentication"

# Store architecture decisions as you make them
claude /memory-store --tags "architecture,oauth,security" \
  "Using Authorization Code flow with PKCE for mobile app security"

claude /memory-store --tags "database,schema" --type "reference" \
  "User table schema: id, email, oauth_provider, oauth_id, created_at, last_login"

# Store implementation details
claude /memory-store --tags "implementation,react" \
  "React auth context uses useReducer for state management with actions: LOGIN, LOGOUT, REFRESH_TOKEN"

# Store configuration details (marked as private)
claude /memory-store --tags "config,oauth" --type "reference" --private \
  "Auth0 configuration: domain=dev-xyz.auth0.com, audience=https://api.myapp.com"

# Later, recall decisions when working on related features
claude /memory-recall "what did we decide about OAuth implementation yesterday?"

# Search for specific implementation patterns
claude /memory-search --tags "react,auth" "state management patterns"

# End session with comprehensive context capture
claude /memory-context --summary "Completed OAuth integration - ready for testing" \
  --include-files --include-commits
```

### Bug Fixing Workflow

**Scenario**: Tracking and resolving a complex bug

```bash
# Store bug discovery
claude /memory-store --tags "bug,critical,payment" --type "task" \
  "Payment processing fails for amounts over $1000 - investigation needed"

# Store investigation findings
claude /memory-store --tags "bug,payment,stripe" \
  "Issue traced to Stripe API rate limiting on high-value transactions"

# Store attempted solutions
claude /memory-store --tags "bug,payment,attempted-fix" \
  "Tried increasing timeout from 5s to 30s - did not resolve issue"

# Store working solution
claude /memory-store --tags "bug,payment,solution" --type "decision" \
  "Fixed by implementing exponential backoff retry mechanism with max 3 attempts"

# Create searchable reference for future
claude /memory-store --tags "reference,stripe,best-practice" \
  "Stripe high-value transactions require retry logic - see payment-service.js line 245"

# Later, search for similar issues
claude /memory-search --tags "bug,stripe" "rate limiting payment"
```

### Code Review & Refactoring

**Scenario**: Systematic code improvement process

```bash
# Store code review insights
claude /memory-store --tags "code-review,performance,database" \
  "N+1 query problem in user dashboard - fetching posts individually instead of batch"

# Store refactoring decisions
claude /memory-store --tags "refactoring,database,optimization" --type "decision" \
  "Replaced individual queries with single JOIN query - reduced DB calls from 50+ to 1"

# Store before/after metrics
claude /memory-store --tags "performance,metrics,improvement" \
  "Dashboard load time: Before=2.3s, After=0.4s (83% improvement)"

# Track technical debt
claude /memory-store --tags "technical-debt,todo" --type "task" \
  "TODO: Extract user dashboard logic into dedicated service class"

# Review improvements over time
claude /memory-recall "what performance improvements did we make this month?"
```

---

## Project Management

### Sprint Planning & Tracking

**Scenario**: Agile development with memory-enhanced tracking

```bash
# Start of sprint
claude /memory-context --summary "Sprint 15 planning - Focus on user onboarding improvements"

# Store sprint goals
claude /memory-store --tags "sprint-15,goals,onboarding" --type "planning" \
  "Sprint 15 goals: Simplify signup flow, add email verification, implement welcome tour"

# Track daily progress
claude /memory-store --tags "sprint-15,progress,day-1" \
  "Completed signup form validation and error handling - 2 story points"

# Store blockers and risks
claude /memory-store --tags "sprint-15,blocker,email" --type "task" \
  "Email service integration blocked - waiting for IT to configure SendGrid account"

# Mid-sprint review
claude /memory-recall "what blockers did we identify this sprint?"
claude /memory-search --tags "sprint-15,progress"

# Sprint retrospective
claude /memory-store --tags "sprint-15,retrospective" --type "meeting" \
  "Sprint 15 retro: Delivered 18/20 points, email blocker resolved, team velocity improving"

# Cross-sprint analysis
claude /memory-search --tags "retrospective" --limit 5
claude /memory-recall "what patterns do we see in our sprint blockers?"
```

### Feature Development Lifecycle

**Scenario**: End-to-end feature development tracking

```bash
# Feature inception
claude /memory-store --tags "feature,user-profiles,inception" --type "planning" \
  "User profiles feature: Allow users to customize avatar, bio, social links, privacy settings"

# Requirements gathering
claude /memory-store --tags "feature,user-profiles,requirements" \
  "Requirements: Image upload (max 2MB), bio text (max 500 chars), 5 social links, public/private toggle"

# Technical design
claude /memory-store --tags "feature,user-profiles,design" --type "architecture" \
  "Design: New profiles table, S3 for image storage, React profile editor component"

# Implementation milestones
claude /memory-store --tags "feature,user-profiles,milestone" \
  "Milestone 1 complete: Database schema created and migrated to production"

# Testing notes
claude /memory-store --tags "feature,user-profiles,testing" \
  "Testing discovered: Large images cause timeout - need client-side compression"

# Launch preparation
claude /memory-store --tags "feature,user-profiles,launch" \
  "Launch checklist: DB migration ✓, S3 bucket ✓, feature flag ready ✓, docs updated ✓"

# Post-launch analysis
claude /memory-store --tags "feature,user-profiles,metrics" \
  "Week 1 metrics: 45% adoption rate, avg 3.2 social links per profile, 12% privacy toggle usage"

# Feature evolution tracking
claude /memory-search --tags "feature,user-profiles" --limit 20
```

---

## Learning & Knowledge Management

### Technology Research & Evaluation

**Scenario**: Evaluating new technologies for adoption

```bash
# Research session start
claude /memory-context --summary "Researching GraphQL vs REST API for mobile app backend"

# Store research findings
claude /memory-store --tags "research,graphql,pros" \
  "GraphQL benefits: Single endpoint, client-defined queries, strong typing, introspection"

claude /memory-store --tags "research,graphql,cons" \
  "GraphQL challenges: Learning curve, caching complexity, N+1 query risk, server complexity"

# Store comparison data
claude /memory-store --tags "research,performance,comparison" \
  "Performance test: GraphQL 340ms avg, REST 280ms avg for mobile app typical queries"

# Store team feedback
claude /memory-store --tags "research,team-feedback,graphql" \
  "Team survey: 60% excited about GraphQL, 30% prefer REST familiarity, 10% neutral"

# Store decision and rationale
claude /memory-store --tags "decision,architecture,graphql" --type "decision" \
  "Decision: Adopt GraphQL for new features, maintain REST for existing APIs during 6-month transition"

# Create reference documentation
claude /memory-store --tags "reference,graphql,implementation" \
  "GraphQL implementation guide: Use Apollo Server, implement DataLoader for N+1 prevention"

# Later research sessions
claude /memory-recall "what did we learn about GraphQL performance last month?"
claude /memory-search --tags "research,comparison" "technology evaluation"
```

### Personal Learning Journal

**Scenario**: Building a personal knowledge base

```bash
# Daily learning capture
claude /memory-store --tags "learning,javascript,async" \
  "Learned: Promise.allSettled() waits for all promises unlike Promise.all() which fails fast"

claude /memory-store --tags "learning,css,flexbox" \
  "CSS trick: flex-grow: 1 on middle item makes it expand to fill available space"

# Code snippets and examples
claude /memory-store --tags "snippet,react,custom-hook" --type "reference" \
  "Custom hook pattern: useLocalStorage - encapsulates localStorage with React state sync"

# Book and article insights
claude /memory-store --tags "book,clean-code,insight" \
  "Clean Code principle: Functions should do one thing well - if function has 'and' in description, split it"

# Conference and talk notes
claude /memory-store --tags "conference,react-conf,2024" \
  "React Conf 2024: New concurrent features in React 18.3, Server Components adoption patterns"

# Weekly knowledge review
claude /memory-recall "what did I learn about React this week?"
claude /memory-search --tags "learning,javascript" --limit 10

# Monthly learning patterns
claude /memory-search --tags "learning" --since "last month"
```

---

## Team Collaboration

### Cross-Team Communication

**Scenario**: Working with multiple teams on shared systems

```bash
# Store cross-team decisions
claude /memory-store --tags "team,frontend,backend,api-contract" --type "decision" \
  "API contract agreed: User service will return ISO 8601 timestamps, frontend will handle timezone conversion"

# Store meeting outcomes
claude /memory-store --tags "meeting,security-team,compliance" \
  "Security review outcome: Authentication service approved for production with rate limiting requirement"

# Store shared resource information
claude /memory-store --tags "shared,database,access" --type "reference" \
  "Shared analytics DB access: Use service account [email protected], read-only access"

# Track dependencies and blockers
claude /memory-store --tags "dependency,infrastructure,blocker" --type "task" \
  "Blocked on infrastructure team: Need production K8s namespace for user-service deployment"

# Store team contact information
claude /memory-store --tags "team,contacts,infrastructure" --type "reference" --private \
  "Infrastructure team: Primary contact Alex Chen ([email protected]), escalation Sarah Kim ([email protected])"

# Regular cross-team syncs
claude /memory-recall "what dependencies do we have on the infrastructure team?"
claude /memory-search --tags "team,backend" "shared decisions"
```

### Code Handoff & Documentation

**Scenario**: Preparing code handoff to another developer

```bash
# Store system overview for handoff
claude /memory-store --tags "handoff,system-overview,payment-service" \
  "Payment service architecture: Express.js API, PostgreSQL DB, Redis cache, Stripe integration"

# Document key implementation details
claude /memory-store --tags "handoff,implementation,payment-service" \
  "Key files: server.js (main app), routes/payments.js (API endpoints), services/stripe.js (integration logic)"

# Store operational knowledge
claude /memory-store --tags "handoff,operations,payment-service" \
  "Monitoring: Grafana dashboard 'Payment Service', alerts on Slack #payments-alerts channel"

# Document gotchas and edge cases
claude /memory-store --tags "handoff,gotchas,payment-service" \
  "Known issues: Webhook retries can cause duplicate processing - check payment_id before processing"

# Store testing information
claude /memory-store --tags "handoff,testing,payment-service" \
  "Testing: npm test (unit), npm run test:integration, Stripe test cards in test-data.md"

# Create comprehensive handoff package
claude /memory-search --tags "handoff,payment-service" --export
```

---

## Debugging & Troubleshooting

### Production Issue Investigation

**Scenario**: Investigating and resolving production incidents

```bash
# Store incident details
claude /memory-store --tags "incident,p1,database,performance" --type "task" \
  "P1 Incident: Database connection timeouts causing 504 errors, affecting 15% of users"

# Store investigation timeline
claude /memory-store --tags "incident,investigation,timeline" \
  "10:15 AM - First reports of timeouts, 10:22 AM - Confirmed DB connection pool exhaustion"

# Store root cause analysis
claude /memory-store --tags "incident,root-cause,connection-pool" \
  "Root cause: Connection pool size (10) insufficient for increased traffic, no connection recycling"

# Store immediate fixes applied
claude /memory-store --tags "incident,fix,immediate" \
  "Immediate fix: Increased connection pool to 50, enabled connection recycling, deployed at 11:30 AM"

# Store monitoring improvements
claude /memory-store --tags "incident,monitoring,improvement" \
  "Added monitoring: DB connection pool utilization alerts at 80% threshold"

# Store prevention measures
claude /memory-store --tags "incident,prevention,long-term" --type "task" \
  "Long-term prevention: Implement connection pool auto-scaling, add load testing to CI/CD"

# Post-incident review
claude /memory-store --tags "incident,postmortem,lessons" \
  "Lessons learned: Need proactive monitoring of resource utilization, not just error rates"

# Search for similar incidents
claude /memory-search --tags "incident,database" "connection timeout"
```

### Performance Optimization Tracking

**Scenario**: Systematic performance improvement initiative

```bash
# Baseline measurements
claude /memory-store --tags "performance,baseline,api-response" \
  "Baseline metrics: API avg response time 850ms, p99 2.1s, DB query avg 340ms"

# Store optimization experiments
claude /memory-store --tags "performance,experiment,caching" \
  "Experiment 1: Added Redis caching for user profiles - 30% response time improvement"

claude /memory-store --tags "performance,experiment,database" \
  "Experiment 2: Optimized N+1 queries in posts endpoint - 45% DB query time reduction"

# Track measurement methodology
claude /memory-store --tags "performance,methodology,testing" \
  "Load testing setup: k6 script, 100 VU, 5min ramp-up, 10min steady state, production-like data"

# Store optimization results
claude /memory-store --tags "performance,results,final" \
  "Final metrics: API avg response time 420ms (51% improvement), p99 980ms (53% improvement)"

# Document optimization techniques
claude /memory-store --tags "performance,techniques,reference" --type "reference" \
  "Optimization techniques applied: Redis caching, query optimization, connection pooling, response compression"

# Performance trend analysis  
claude /memory-recall "what performance improvements did we achieve this quarter?"
```

---

## Advanced Search Techniques

### Complex Query Patterns

**Scenario**: Advanced search strategies for large knowledge bases

```bash
# Multi-tag searches with boolean logic
claude /memory-search --tags "architecture,database" --content "performance"
# Finds memories tagged with both architecture AND database, containing performance-related content

# Time-constrained searches
claude /memory-search --tags "bug,critical" --since "last week" --limit 20
# Recent critical bugs only

# Project-specific technical searches
claude /memory-search --project "user-service" --type "decision" --content "authentication"
# Architecture decisions about authentication in specific project

# Minimum relevance threshold searches
claude /memory-search --min-score 0.8 "microservices communication patterns"
# Only highly relevant results about microservices communication

# Comprehensive metadata searches
claude /memory-search --include-metadata --tags "api,design" --export
# Full metadata export for API design memories
```

### Research and Analysis Queries

**Scenario**: Analyzing patterns and trends in stored knowledge

```bash
# Trend analysis across time periods
claude /memory-recall "what architectural decisions did we make in Q1?"
claude /memory-recall "what architectural decisions did we make in Q2?"
# Compare decision patterns across quarters

# Technology adoption tracking
claude /memory-search --tags "adoption" --content "react"
claude /memory-search --tags "adoption" --content "vue"
# Compare technology adoption discussions

# Problem pattern identification
claude /memory-search --tags "bug,database" --limit 50
# Identify common database-related issues

# Team learning velocity analysis
claude /memory-search --tags "learning" --since "last month"
# Recent learning activities

# Decision outcome tracking
claude /memory-search --tags "decision" --content "outcome"
# Find decisions with documented outcomes
```

---

## Automation & Scripting

### Automated Memory Capture

**Scenario**: Scripting common memory operations

```bash
# Daily standup automation
#!/bin/bash
# daily-standup.sh
DATE=$(date +"%Y-%m-%d")
echo "What did you accomplish yesterday?" | read YESTERDAY
echo "What are you working on today?" | read TODAY
echo "Any blockers?" | read BLOCKERS

claude /memory-store --tags "standup,daily,$DATE" \
  "Yesterday: $YESTERDAY. Today: $TODAY. Blockers: $BLOCKERS"

# Git commit message enhancement
#!/bin/bash
# enhanced-commit.sh
COMMIT_MSG="$1"
BRANCH=$(git branch --show-current)
FILES=$(git diff --name-only --cached)

claude /memory-store --tags "commit,$BRANCH,development" \
  "Commit: $COMMIT_MSG. Files: $FILES. Branch: $BRANCH"

git commit -m "$COMMIT_MSG"

# End-of-day summary
#!/bin/bash
# eod-summary.sh
claude /memory-context --summary "End of day summary - $(date +%Y-%m-%d)" \
  --include-files --include-commits
```

### Batch Operations and Analysis

**Scenario**: Processing multiple memories for analysis

```bash
# Export all architectural decisions for documentation
claude /memory-search --tags "architecture,decision" --limit 100 --export
# Creates exportable report of all architectural decisions

# Weekly learning summary
claude /memory-search --tags "learning" --since "last week" --export
# Export week's learning for review

# Project retrospective data gathering
claude /memory-search --project "user-service" --tags "bug,issue" --export
claude /memory-search --project "user-service" --tags "success,milestone" --export
# Gather both problems and successes for retrospective

# Technical debt analysis
claude /memory-search --tags "technical-debt,todo" --include-metadata --export
# Comprehensive technical debt report

# Performance trend analysis
claude /memory-search --tags "performance,metrics" --limit 50 --export
# Historical performance data for trend analysis
```

---

## Best Practices Summary

### Effective Tagging Strategies
- **Hierarchical tags**: Use `team`, `team-frontend`, `team-frontend-react`
- **Temporal tags**: Include sprint numbers, quarters, years
- **Context tags**: Project names, feature names, team names
- **Type tags**: `decision`, `bug`, `learning`, `reference`, `todo`
- **Priority tags**: `critical`, `important`, `nice-to-have`

### Content Organization
- **Specific titles**: Instead of "Fixed bug", use "Fixed payment timeout bug in Stripe integration"
- **Context inclusion**: Always include relevant project and time context
- **Outcome documentation**: Store not just decisions but their outcomes
- **Link related memories**: Reference related decisions and implementations

### Search Optimization
- **Use multiple search strategies**: Combine tags, content, and time-based searches
- **Iterate searches**: Start broad, then narrow down with additional filters
- **Export important results**: Save comprehensive analyses for documentation
- **Regular reviews**: Weekly/monthly searches to identify patterns

### Workflow Integration
- **Start sessions with context**: Use `/memory-context` to set session scope
- **Real-time capture**: Store decisions and insights as they happen
- **End sessions with summary**: Capture session outcomes and next steps
- **Regular retrospectives**: Use search to analyze patterns and improvements

---

**These advanced patterns will transform your development workflow with persistent, searchable memory that grows with your expertise!** 🚀
```

--------------------------------------------------------------------------------
/archive/docs-removed-2025-08-23/development/hybrid-slm-memory-consolidation.md:
--------------------------------------------------------------------------------

```markdown
# Hybrid Memory Consolidation with On-Device SLMs

## Overview

This document extends the [Autonomous Memory Consolidation](./autonomous-memory-consolidation.md) system by selectively incorporating on-device Small Language Models (SLMs) to enhance natural language capabilities while maintaining privacy, efficiency, and autonomous operation.

> **Note**: This is an optional enhancement. The memory consolidation system works fully autonomously without SLMs, but can provide richer insights when enhanced with local AI models.

## Why Hybrid?

The autonomous system excels at:
- ✅ Mathematical operations (similarity, clustering)
- ✅ Deterministic behavior
- ✅ Zero dependencies
- ❌ Natural language summaries
- ❌ Complex reasoning about connections

On-device SLMs add:
- ✅ Eloquent prose summaries
- ✅ Nuanced understanding
- ✅ Creative insights
- ✅ Still completely private (local processing)

## Recommended On-Device SLMs

### Tier 1: Ultra-Lightweight (< 2GB RAM)

#### **Llama 3.2 1B-Instruct**
- **Size**: ~1.2GB quantized (Q4_K_M)
- **Performance**: 50-100 tokens/sec on CPU
- **Best for**: Basic summarization, keyword expansion
- **Install**: `ollama pull llama3.2:1b`

```python
import ollama

def generate_summary_with_llama(cluster_data):
    """Generate natural language summary for memory cluster."""
    prompt = f"""Summarize these key themes from related memories:
    Keywords: {', '.join(cluster_data['keywords'])}
    Time span: {cluster_data['time_span']}
    Number of memories: {cluster_data['count']}
    
    Provide a concise, insightful summary:"""
    
    response = ollama.generate(model='llama3.2:1b', prompt=prompt)
    return response['response']
```

#### **Phi-3-mini (3.8B)**
- **Size**: ~2.3GB quantized
- **Strengths**: Exceptional reasoning for size
- **Best for**: Analyzing creative connections
- **Install**: `ollama pull phi3:mini`

### Tier 2: Balanced Performance (4-8GB RAM)

#### **Mistral 7B-Instruct v0.3**
- **Size**: ~4GB quantized (Q4_K_M)
- **Performance**: 20-40 tokens/sec on modern CPU
- **Best for**: Full consolidation narratives
- **Install**: `ollama pull mistral:7b-instruct-q4_K_M`

```python
class MistralEnhancedConsolidator:
    def __init__(self):
        self.model = "mistral:7b-instruct-q4_K_M"
    
    async def create_consolidation_narrative(self, clusters, associations):
        """Create a narrative summary of the consolidation results."""
        prompt = f"""Based on memory consolidation analysis:
        
        Found {len(clusters)} memory clusters and {len(associations)} creative connections.
        
        Key themes: {self.extract_themes(clusters)}
        Surprising connections: {self.format_associations(associations[:3])}
        
        Write a brief narrative summary highlighting the most important insights 
        and patterns discovered during this consolidation cycle."""
        
        response = await ollama.generate(
            model=self.model,
            prompt=prompt,
            options={"temperature": 0.7, "max_tokens": 200}
        )
        return response['response']
```

#### **Gemma 2B**
- **Size**: ~1.5GB quantized
- **Strengths**: Google's training quality
- **Best for**: Classification and scoring
- **Install**: `ollama pull gemma:2b`

### Tier 3: High-Performance (8-16GB RAM)

#### **Qwen 2.5 7B-Instruct**
- **Size**: ~4-5GB quantized
- **Strengths**: Multilingual, complex reasoning
- **Best for**: International users, detailed analysis
- **Install**: `ollama pull qwen2.5:7b-instruct`

## Hybrid Implementation Architecture

```python
from enum import Enum
from typing import List, Dict, Optional
import numpy as np
from datetime import datetime

class ProcessingMode(Enum):
    AUTONOMOUS_ONLY = "autonomous"
    HYBRID_SELECTIVE = "hybrid_selective"
    HYBRID_FULL = "hybrid_full"

class HybridMemoryConsolidator:
    """
    Combines autonomous processing with selective SLM enhancement.
    
    The system always runs autonomous processing first, then selectively
    enhances results with SLM-generated insights where valuable.
    """
    
    def __init__(self, storage, config):
        # Core autonomous system (always available)
        self.autonomous = AutonomousMemoryConsolidator(storage, config)
        
        # SLM configuration (optional enhancement)
        self.mode = ProcessingMode(config.get('processing_mode', 'autonomous'))
        self.slm_model = config.get('slm_model', 'llama3.2:1b')
        self.slm_available = self._check_slm_availability()
        
        # Enhancement thresholds
        self.min_cluster_size = config.get('slm_min_cluster_size', 5)
        self.min_importance = config.get('slm_min_importance', 0.7)
        self.enhancement_horizons = config.get(
            'slm_time_horizons', 
            ['weekly', 'monthly', 'quarterly', 'yearly']
        )
    
    def _check_slm_availability(self) -> bool:
        """Check if SLM is available for enhancement."""
        if self.mode == ProcessingMode.AUTONOMOUS_ONLY:
            return False
            
        try:
            import ollama
            # Check if model is available
            models = ollama.list()
            return any(m['name'].startswith(self.slm_model) for m in models['models'])
        except:
            return False
    
    async def consolidate(self, time_horizon: str) -> Dict:
        """
        Run consolidation with optional SLM enhancement.
        
        Always performs autonomous processing first, then selectively
        enhances based on configuration and context.
        """
        # Step 1: Always run autonomous processing
        auto_results = await self.autonomous.consolidate(time_horizon)
        
        # Step 2: Determine if SLM enhancement should be applied
        if not self._should_enhance(time_horizon, auto_results):
            return auto_results
        
        # Step 3: Selective SLM enhancement
        enhanced_results = await self._enhance_with_slm(
            auto_results, 
            time_horizon
        )
        
        return enhanced_results
    
    def _should_enhance(self, time_horizon: str, results: Dict) -> bool:
        """Determine if SLM enhancement would add value."""
        # Check if SLM is available
        if not self.slm_available:
            return False
        
        # Check if time horizon warrants enhancement
        if time_horizon not in self.enhancement_horizons:
            return False
        
        # Check if results are significant enough
        significant_clusters = sum(
            1 for cluster in results.get('clusters', [])
            if len(cluster) >= self.min_cluster_size
        )
        
        return significant_clusters > 0
    
    async def _enhance_with_slm(self, auto_results: Dict, time_horizon: str) -> Dict:
        """Selectively enhance autonomous results with SLM insights."""
        enhanced = auto_results.copy()
        
        # Enhance cluster summaries
        if 'clusters' in enhanced:
            enhanced['narrative_summaries'] = []
            for i, cluster in enumerate(enhanced['clusters']):
                if len(cluster) >= self.min_cluster_size:
                    narrative = await self._generate_cluster_narrative(
                        cluster, 
                        auto_results.get('compressed_summaries', [])[i]
                    )
                    enhanced['narrative_summaries'].append({
                        'cluster_id': i,
                        'narrative': narrative,
                        'memory_count': len(cluster)
                    })
        
        # Enhance creative associations
        if 'associations' in enhanced and len(enhanced['associations']) > 0:
            insights = await self._generate_association_insights(
                enhanced['associations'][:5]  # Top 5 associations
            )
            enhanced['association_insights'] = insights
        
        # Generate consolidation overview
        enhanced['consolidation_narrative'] = await self._generate_overview(
            enhanced, 
            time_horizon
        )
        
        enhanced['processing_mode'] = 'hybrid'
        enhanced['slm_model'] = self.slm_model
        
        return enhanced
    
    async def _generate_cluster_narrative(
        self, 
        cluster: List, 
        compressed_summary: Dict
    ) -> str:
        """Generate natural language narrative for a memory cluster."""
        prompt = f"""Based on this memory cluster analysis:
        
        Keywords: {', '.join(compressed_summary['keywords'][:10])}
        Time span: {compressed_summary['temporal_range']['start']} to {compressed_summary['temporal_range']['end']}
        Common tags: {', '.join(compressed_summary['common_tags'][:5])}
        Number of memories: {len(cluster)}
        
        Create a brief, insightful summary that captures the essence of these 
        related memories and any patterns or themes you notice:"""
        
        response = await self._call_slm(prompt, max_tokens=150)
        return response
    
    async def _generate_association_insights(
        self, 
        associations: List[Dict]
    ) -> List[Dict]:
        """Generate insights about creative associations discovered."""
        insights = []
        
        for assoc in associations:
            prompt = f"""Two memories were found to have an interesting connection 
            (similarity: {assoc['similarity']:.2f}).
            
            Memory 1: {assoc['memory_1_preview'][:100]}...
            Memory 2: {assoc['memory_2_preview'][:100]}...
            
            What insight or pattern might this connection reveal?
            Be concise and focus on the non-obvious relationship:"""
            
            insight = await self._call_slm(prompt, max_tokens=80)
            insights.append({
                'association_id': assoc['id'],
                'insight': insight,
                'similarity': assoc['similarity']
            })
        
        return insights
    
    async def _generate_overview(
        self, 
        results: Dict, 
        time_horizon: str
    ) -> str:
        """Generate a narrative overview of the consolidation cycle."""
        prompt = f"""Memory consolidation {time_horizon} summary:
        
        - Processed {results.get('total_memories', 0)} memories
        - Found {len(results.get('clusters', []))} memory clusters
        - Discovered {len(results.get('associations', []))} creative connections
        - Archived {results.get('archived_count', 0)} low-relevance memories
        
        Key themes: {self._extract_top_themes(results)}
        
        Write a brief executive summary of this consolidation cycle, 
        highlighting the most important patterns and any surprising discoveries:"""
        
        response = await self._call_slm(prompt, max_tokens=200)
        return response
    
    async def _call_slm(self, prompt: str, max_tokens: int = 100) -> str:
        """Call the SLM with error handling."""
        try:
            import ollama
            response = ollama.generate(
                model=self.slm_model,
                prompt=prompt,
                options={
                    "temperature": 0.7,
                    "max_tokens": max_tokens,
                    "stop": ["\n\n", "###"]
                }
            )
            return response['response'].strip()
        except Exception as e:
            # Fallback to autonomous summary
            return f"[SLM unavailable: {str(e)}]"
    
    def _extract_top_themes(self, results: Dict) -> str:
        """Extract top themes from results."""
        all_keywords = []
        for summary in results.get('compressed_summaries', []):
            all_keywords.extend(summary.get('keywords', []))
        
        # Count frequency
        from collections import Counter
        theme_counts = Counter(all_keywords)
        top_themes = [theme for theme, _ in theme_counts.most_common(5)]
        
        return ', '.join(top_themes) if top_themes else 'various topics'
```

## Smart Enhancement Strategy

```python
class SmartEnhancementStrategy:
    """
    Intelligently decide when and how to use SLM enhancement.
    
    Principles:
    1. Autonomous processing is always the foundation
    2. SLM enhancement only when it adds significant value
    3. Resource usage scales with importance
    """
    
    def __init__(self, config):
        self.config = config
        
        # Enhancement criteria
        self.criteria = {
            'min_cluster_size': 5,
            'min_importance_score': 0.7,
            'min_association_similarity': 0.4,
            'max_association_similarity': 0.7,
            'enhancement_time_horizons': ['weekly', 'monthly', 'quarterly', 'yearly'],
            'daily_enhancement': False,  # Too frequent
            'require_user_request': False
        }
    
    def should_enhance_cluster(self, cluster_info: Dict) -> bool:
        """Decide if a cluster warrants SLM enhancement."""
        # Size check
        if cluster_info['size'] < self.criteria['min_cluster_size']:
            return False
        
        # Importance check
        avg_importance = np.mean([m.importance_score for m in cluster_info['memories']])
        if avg_importance < self.criteria['min_importance_score']:
            return False
        
        # Complexity check (high variance suggests interesting cluster)
        embedding_variance = np.var([m.embedding for m in cluster_info['memories']], axis=0).mean()
        if embedding_variance < 0.1:  # Too homogeneous
            return False
        
        return True
    
    def select_model_for_task(self, task_type: str, resource_limit: str) -> str:
        """Select appropriate model based on task and resources."""
        model_selection = {
            'basic_summary': {
                'low': 'llama3.2:1b',
                'medium': 'phi3:mini',
                'high': 'mistral:7b-instruct'
            },
            'creative_insights': {
                'low': 'phi3:mini',  # Good reasoning even when small
                'medium': 'mistral:7b-instruct',
                'high': 'qwen2.5:7b-instruct'
            },
            'technical_analysis': {
                'low': 'gemma:2b',
                'medium': 'mistral:7b-instruct',
                'high': 'qwen2.5:7b-instruct'
            }
        }
        
        return model_selection.get(task_type, {}).get(resource_limit, 'llama3.2:1b')
```

## Configuration Examples

### Minimal Enhancement (Low Resources)
```yaml
hybrid_consolidation:
  processing_mode: "hybrid_selective"
  slm_model: "llama3.2:1b"
  slm_min_cluster_size: 10  # Only largest clusters
  slm_min_importance: 0.8   # Only most important
  slm_time_horizons: ["monthly", "quarterly"]  # Less frequent
  max_tokens_per_summary: 100
```

### Balanced Enhancement (Recommended)
```yaml
hybrid_consolidation:
  processing_mode: "hybrid_selective"
  slm_model: "mistral:7b-instruct-q4_K_M"
  slm_min_cluster_size: 5
  slm_min_importance: 0.7
  slm_time_horizons: ["weekly", "monthly", "quarterly", "yearly"]
  max_tokens_per_summary: 150
  enable_creative_insights: true
  enable_narrative_summaries: true
```

### Full Enhancement (High Resources)
```yaml
hybrid_consolidation:
  processing_mode: "hybrid_full"
  slm_model: "qwen2.5:7b-instruct"
  slm_min_cluster_size: 3
  slm_min_importance: 0.5
  slm_time_horizons: ["daily", "weekly", "monthly", "quarterly", "yearly"]
  max_tokens_per_summary: 200
  enable_creative_insights: true
  enable_narrative_summaries: true
  enable_predictive_insights: true
  parallel_processing: true
```

## Installation Guide

### Using Ollama (Recommended)
```bash
# Install Ollama
curl -fsSL https://ollama.ai/install.sh | sh

# Pull models based on your resources
# Minimal (2GB)
ollama pull llama3.2:1b

# Balanced (8GB)
ollama pull mistral:7b-instruct-q4_K_M

# High-performance (16GB)
ollama pull qwen2.5:7b-instruct

# Test the model
ollama run llama3.2:1b "Summarize: AI helps organize memories"
```

### Using llama.cpp
```python
from llama_cpp import Llama

# Initialize with specific model
llm = Llama(
    model_path="./models/llama-3.2-1b-instruct.Q4_K_M.gguf",
    n_ctx=2048,
    n_threads=4,
    n_gpu_layers=-1  # Use GPU if available
)

# Generate summary
response = llm(
    prompt="Summarize these themes: productivity, learning, coding",
    max_tokens=100,
    temperature=0.7
)
```

## Performance Considerations

### Resource Usage by Model

| Model | RAM Usage | CPU Tokens/sec | GPU Tokens/sec | Quality |
|-------|-----------|----------------|----------------|---------|
| Llama 3.2 1B | 1.2GB | 50-100 | 200-400 | Good |
| Phi-3 mini | 2.3GB | 30-60 | 150-300 | Excellent |
| Mistral 7B Q4 | 4GB | 20-40 | 100-200 | Excellent |
| Gemma 2B | 1.5GB | 40-80 | 180-350 | Good |
| Qwen 2.5 7B | 5GB | 15-30 | 80-150 | Best |

### Optimization Strategies

1. **Batch Processing**: Process multiple summaries in one call
2. **Caching**: Cache SLM responses for similar inputs
3. **Progressive Enhancement**: Start with fast model, upgrade if needed
4. **Time-based Scheduling**: Run SLM enhancement during off-hours

## Benefits of Hybrid Approach

### ✅ **Advantages**
1. **Best of Both Worlds**: Mathematical precision + natural language eloquence
2. **Flexible Deployment**: Can disable SLM without breaking system
3. **Privacy Preserved**: Everything runs locally
4. **Resource Efficient**: SLM only when valuable
5. **Progressive Enhancement**: Better with SLM, functional without

### 📊 **Comparison**

| Feature | Autonomous Only | Hybrid with SLM |
|---------|----------------|-----------------|
| Natural summaries | ❌ Structured data | ✅ Eloquent prose |
| Creative insights | ❌ Statistical only | ✅ Nuanced understanding |
| Resource usage | ✅ Minimal | 🔶 Moderate |
| Speed | ✅ Very fast | 🔶 Task-dependent |
| Deterministic | ✅ Always | 🔶 Core operations only |
| Privacy | ✅ Complete | ✅ Complete |

## Example Output Comparison

### Autonomous Only
```json
{
  "cluster_summary": {
    "keywords": ["python", "debugging", "memory", "optimization"],
    "memory_count": 8,
    "time_span": "2025-07-21 to 2025-07-28",
    "representative_memory": "Fixed memory leak in consolidation engine"
  }
}
```

### Hybrid with SLM
```json
{
  "cluster_summary": {
    "keywords": ["python", "debugging", "memory", "optimization"],
    "memory_count": 8,
    "time_span": "2025-07-21 to 2025-07-28",
    "representative_memory": "Fixed memory leak in consolidation engine",
    "narrative": "This week focused on resolving critical performance issues in the memory consolidation system. The memory leak in the clustering algorithm was traced to improper cleanup of embedding vectors, resulting in a 40% performance improvement after the fix. These debugging sessions revealed important patterns about resource management in long-running consolidation processes.",
    "key_insight": "Proper lifecycle management of vector embeddings is crucial for maintaining performance in continuous consolidation systems."
  }
}
```

## Future Enhancements

1. **Fine-tuned Models**: Train small models specifically for memory consolidation
2. **Multi-Model Ensemble**: Use different models for different tasks
3. **Adaptive Model Selection**: Automatically choose model based on task complexity
4. **Streaming Generation**: Process summaries as they generate
5. **Quantization Optimization**: Test various quantization levels for best trade-offs

## Conclusion

The hybrid approach with on-device SLMs provides the perfect balance between the reliability of autonomous processing and the expressiveness of natural language AI. By running everything locally and using SLMs selectively, we maintain privacy, control costs, and ensure the system remains functional even without AI enhancement.

This transforms the dream-inspired memory consolidation from a purely algorithmic system into an intelligent assistant that can provide genuine insights while respecting user privacy and system resources.

---

**Related Documents:**
- 🔧 [Autonomous Memory Consolidation Guide](./autonomous-memory-consolidation.md)
- 💭 [Dream-Inspired Memory Consolidation System](./dream-inspired-memory-consolidation.md)
- 📋 [Issue #11: Multi-Layered Memory Consolidation](https://github.com/doobidoo/mcp-memory-service/issues/11)

*Created: July 28, 2025*
```

--------------------------------------------------------------------------------
/claude-hooks/utilities/adaptive-pattern-detector.js:
--------------------------------------------------------------------------------

```javascript
/**
 * Adaptive Pattern Detector
 * Detects natural language patterns that suggest memory triggers needed
 */

class AdaptivePatternDetector {
    constructor(config = {}, performanceManager = null) {
        this.config = config;
        this.performanceManager = performanceManager;

        // Pattern sensitivity (0-1, where 1 is most sensitive)
        this.sensitivity = config.sensitivity || 0.7;

        // Pattern categories with performance tiers
        this.patterns = {
            // Tier 1: Instant patterns (regex-based, < 50ms)
            instant: {
                explicitMemoryRequests: [
                    {
                        pattern: /what (did|do) we (decide|choose|do|discuss) (about|regarding|for|with)/i,
                        confidence: 0.9,
                        description: 'Direct memory request'
                    },
                    {
                        pattern: /remind me (about|how|what|of|regarding)/i,
                        confidence: 0.9,
                        description: 'Explicit reminder request'
                    },
                    {
                        pattern: /remember (when|how|what|that) we/i,
                        confidence: 0.8,
                        description: 'Memory recall request'
                    },
                    {
                        pattern: /according to (our|the) (previous|earlier|last)/i,
                        confidence: 0.8,
                        description: 'Reference to past decisions'
                    }
                ],

                pastWorkReferences: [
                    {
                        pattern: /similar to (what|how) we (did|used|implemented)/i,
                        confidence: 0.7,
                        description: 'Comparison to past work'
                    },
                    {
                        pattern: /like (we|the) (discussed|decided|implemented|chose) (before|earlier|previously)/i,
                        confidence: 0.7,
                        description: 'Reference to past implementation'
                    },
                    {
                        pattern: /the (same|approach|solution|pattern) (we|that) (used|implemented|chose)/i,
                        confidence: 0.6,
                        description: 'Reuse of past solutions'
                    }
                ],

                questionPatterns: [
                    {
                        pattern: /^(how do|how did|how should|how can) we/i,
                        confidence: 0.5,
                        description: 'Implementation question'
                    },
                    {
                        pattern: /^(what is|what was|what should be) (our|the) (approach|strategy|pattern)/i,
                        confidence: 0.6,
                        description: 'Strategy question'
                    },
                    {
                        pattern: /^(why did|why do|why should) we (choose|use|implement)/i,
                        confidence: 0.5,
                        description: 'Rationale question'
                    }
                ]
            },

            // Tier 2: Fast patterns (contextual analysis, < 150ms)
            fast: {
                technicalDiscussions: [
                    {
                        pattern: /\b(architecture|design|pattern|approach|strategy|implementation)\b/i,
                        context: ['technical', 'decision'],
                        confidence: 0.4,
                        description: 'Technical architecture discussion'
                    },
                    {
                        pattern: /\b(authentication|authorization|security|oauth|jwt)\b/i,
                        context: ['security', 'implementation'],
                        confidence: 0.5,
                        description: 'Security implementation discussion'
                    },
                    {
                        pattern: /\b(database|storage|persistence|schema|migration)\b/i,
                        context: ['data', 'implementation'],
                        confidence: 0.5,
                        description: 'Data layer discussion'
                    }
                ],

                projectContinuity: [
                    {
                        pattern: /\b(continue|continuing|resume|pick up where)\b/i,
                        context: ['continuation'],
                        confidence: 0.6,
                        description: 'Project continuation'
                    },
                    {
                        pattern: /\b(next step|next phase|moving forward|proceed with)\b/i,
                        context: ['progression'],
                        confidence: 0.4,
                        description: 'Project progression'
                    }
                ],

                problemSolving: [
                    {
                        pattern: /\b(issue|problem|bug|error|failure) (with|in|regarding)/i,
                        context: ['troubleshooting'],
                        confidence: 0.6,
                        description: 'Problem solving discussion'
                    },
                    {
                        pattern: /\b(fix|resolve|solve|debug|troubleshoot)\b/i,
                        context: ['troubleshooting'],
                        confidence: 0.4,
                        description: 'Problem resolution'
                    }
                ]
            },

            // Tier 3: Intensive patterns (semantic analysis, < 500ms)
            intensive: {
                contextualReferences: [
                    {
                        semantic: ['previous discussion', 'earlier conversation', 'past decision'],
                        confidence: 0.7,
                        description: 'Contextual reference to past'
                    },
                    {
                        semantic: ['established pattern', 'agreed approach', 'standard practice'],
                        confidence: 0.6,
                        description: 'Reference to established practices'
                    }
                ],

                complexQuestions: [
                    {
                        semantic: ['best practice', 'recommended approach', 'optimal solution'],
                        confidence: 0.5,
                        description: 'Best practice inquiry'
                    }
                ]
            }
        };

        // Pattern matching statistics
        this.stats = {
            totalMatches: 0,
            patternHits: new Map(),
            falsePositives: 0,
            userFeedback: []
        };

        // Adaptive learning
        this.adaptiveSettings = {
            learningEnabled: config.adaptiveLearning !== false,
            confidenceAdjustments: new Map(),
            userPreferences: new Map()
        };
    }

    /**
     * Detect patterns in user message with tiered approach
     */
    async detectPatterns(message, context = {}) {
        const results = {
            matches: [],
            confidence: 0,
            processingTier: 'none',
            triggerRecommendation: false
        };

        // Tier 1: Instant pattern detection
        if (this.shouldRunTier('instant')) {
            const timing = this.performanceManager?.startTiming('pattern_detection_instant', 'instant');

            const instantMatches = this.detectInstantPatterns(message);
            results.matches.push(...instantMatches);
            results.processingTier = 'instant';

            if (timing) this.performanceManager.endTiming(timing);

            // Early return if high confidence instant match
            const maxConfidence = Math.max(...instantMatches.map(m => m.confidence), 0);
            if (maxConfidence > 0.8) {
                results.confidence = maxConfidence;
                results.triggerRecommendation = true;
                return results;
            }
        }

        // Tier 2: Fast contextual analysis
        if (this.shouldRunTier('fast')) {
            const timing = this.performanceManager?.startTiming('pattern_detection_fast', 'fast');

            const fastMatches = this.detectFastPatterns(message, context);
            results.matches.push(...fastMatches);
            results.processingTier = 'fast';

            if (timing) this.performanceManager.endTiming(timing);
        }

        // Tier 3: Intensive semantic analysis
        if (this.shouldRunTier('intensive') && this.shouldRunIntensiveAnalysis(results.matches)) {
            const timing = this.performanceManager?.startTiming('pattern_detection_intensive', 'intensive');

            const intensiveMatches = await this.detectIntensivePatterns(message, context);
            results.matches.push(...intensiveMatches);
            results.processingTier = 'intensive';

            if (timing) this.performanceManager.endTiming(timing);
        }

        // Calculate overall confidence and recommendation
        results.confidence = this.calculateOverallConfidence(results.matches);
        results.triggerRecommendation = this.shouldRecommendTrigger(results);

        // Record statistics
        this.recordPatternMatch(results);

        return results;
    }

    /**
     * Detect instant patterns using regex
     */
    detectInstantPatterns(message) {
        const matches = [];

        for (const [category, patterns] of Object.entries(this.patterns.instant)) {
            for (const patternDef of patterns) {
                if (patternDef.pattern.test(message)) {
                    const adjustedConfidence = this.adjustConfidence(patternDef.confidence, category);

                    matches.push({
                        type: 'instant',
                        category,
                        pattern: patternDef.description,
                        confidence: adjustedConfidence,
                        message: patternDef.description
                    });

                    // Record pattern hit
                    const key = `${category}:${patternDef.description}`;
                    this.stats.patternHits.set(key, (this.stats.patternHits.get(key) || 0) + 1);
                }
            }
        }

        return matches;
    }

    /**
     * Detect fast patterns with context analysis
     */
    detectFastPatterns(message, context) {
        const matches = [];

        for (const [category, patterns] of Object.entries(this.patterns.fast)) {
            for (const patternDef of patterns) {
                if (patternDef.pattern.test(message)) {
                    // Check if context matches
                    const contextMatch = this.checkContextMatch(patternDef.context, context);
                    const contextBoost = contextMatch ? 0.2 : 0;

                    const adjustedConfidence = this.adjustConfidence(
                        patternDef.confidence + contextBoost,
                        category
                    );

                    matches.push({
                        type: 'fast',
                        category,
                        pattern: patternDef.description,
                        confidence: adjustedConfidence,
                        contextMatch,
                        message: patternDef.description
                    });
                }
            }
        }

        return matches;
    }

    /**
     * Detect intensive patterns with semantic analysis
     */
    async detectIntensivePatterns(message, context) {
        const matches = [];

        // This would integrate with semantic analysis libraries if available
        // For now, we'll do enhanced keyword matching with semantic similarity

        for (const [category, patterns] of Object.entries(this.patterns.intensive)) {
            for (const patternDef of patterns) {
                if (patternDef.semantic) {
                    const semanticMatch = this.checkSemanticMatch(message, patternDef.semantic);
                    if (semanticMatch.isMatch) {
                        const adjustedConfidence = this.adjustConfidence(
                            patternDef.confidence * semanticMatch.similarity,
                            category
                        );

                        matches.push({
                            type: 'intensive',
                            category,
                            pattern: patternDef.description,
                            confidence: adjustedConfidence,
                            similarity: semanticMatch.similarity,
                            message: patternDef.description
                        });
                    }
                }
            }
        }

        return matches;
    }

    /**
     * Check if semantic patterns match (simplified implementation)
     */
    checkSemanticMatch(message, semanticTerms) {
        const messageLower = message.toLowerCase();
        let matchCount = 0;
        let totalTerms = 0;

        for (const term of semanticTerms) {
            totalTerms++;
            // Simple keyword matching - in practice, this would use semantic similarity
            const words = term.toLowerCase().split(' ');
            const termMatches = words.every(word => messageLower.includes(word));

            if (termMatches) {
                matchCount++;
            }
        }

        // Prevent division by zero
        if (totalTerms === 0) {
            return {
                isMatch: false,
                similarity: 0
            };
        }

        const similarity = matchCount / totalTerms;
        return {
            isMatch: similarity > 0.3, // Threshold for semantic match
            similarity
        };
    }

    /**
     * Check if context matches pattern requirements
     */
    checkContextMatch(requiredContext, actualContext) {
        if (!requiredContext || !actualContext) return false;

        return requiredContext.some(reqCtx =>
            Object.keys(actualContext).some(key =>
                key.toLowerCase().includes(reqCtx.toLowerCase()) ||
                (typeof actualContext[key] === 'string' &&
                 actualContext[key].toLowerCase().includes(reqCtx.toLowerCase()))
            )
        );
    }

    /**
     * Adjust confidence based on learning and user preferences
     */
    adjustConfidence(baseConfidence, category) {
        let adjusted = baseConfidence;

        // Apply sensitivity adjustment
        adjusted = adjusted * this.sensitivity;

        // Apply learned adjustments
        if (this.adaptiveSettings.confidenceAdjustments.has(category)) {
            const adjustment = this.adaptiveSettings.confidenceAdjustments.get(category);
            adjusted = Math.max(0, Math.min(1, adjusted + adjustment));
        }

        return adjusted;
    }

    /**
     * Calculate overall confidence from all matches
     */
    calculateOverallConfidence(matches) {
        if (matches.length === 0) return 0;

        // Weight by match type (instant > fast > intensive for reliability)
        const weights = { instant: 1.0, fast: 0.8, intensive: 0.6 };
        let weightedSum = 0;
        let totalWeight = 0;

        for (const match of matches) {
            const weight = weights[match.type] || 0.5;
            weightedSum += match.confidence * weight;
            totalWeight += weight;
        }

        return totalWeight > 0 ? weightedSum / totalWeight : 0;
    }

    /**
     * Determine if we should recommend triggering memory hooks
     */
    shouldRecommendTrigger(results) {
        const confidence = results.confidence;
        const matchCount = results.matches.length;

        // High confidence single match
        if (confidence > 0.8) return true;

        // Medium confidence with multiple matches
        if (confidence > 0.6 && matchCount > 1) return true;

        // Lower threshold for explicit memory requests
        const hasExplicitRequest = results.matches.some(m =>
            m.category === 'explicitMemoryRequests' && m.confidence > 0.5
        );
        if (hasExplicitRequest) return true;

        // Any match with reasonable confidence should trigger
        if (matchCount > 0 && confidence > 0.4) return true;

        return false;
    }

    /**
     * Determine if we should run intensive analysis
     */
    shouldRunIntensiveAnalysis(currentMatches) {
        // Only run intensive if we have some matches but low confidence
        const hasMatches = currentMatches.length > 0;
        const lowConfidence = Math.max(...currentMatches.map(m => m.confidence), 0) < 0.7;

        return hasMatches && lowConfidence;
    }

    /**
     * Check if a processing tier should run
     */
    shouldRunTier(tier) {
        if (!this.performanceManager) return true;

        const tierMap = {
            instant: 'instant',
            fast: 'fast',
            intensive: 'intensive'
        };

        try {
            return this.performanceManager.shouldRunHook(`pattern_detection_${tier}`, tierMap[tier]);
        } catch (error) {
            // If performance manager fails, allow the tier to run
            console.warn(`[Pattern Detector] Performance check failed for ${tier}: ${error.message}`);
            return true;
        }
    }

    /**
     * Record pattern match for statistics and learning
     */
    recordPatternMatch(results) {
        this.stats.totalMatches++;

        // Update pattern hit statistics
        for (const match of results.matches) {
            const key = `${match.category}:${match.pattern}`;
            this.stats.patternHits.set(key, (this.stats.patternHits.get(key) || 0) + 1);
        }
    }

    /**
     * Learn from user feedback
     */
    recordUserFeedback(isPositive, patternResults, context = {}) {
        if (!this.adaptiveSettings.learningEnabled) return;

        const feedback = {
            positive: isPositive,
            patterns: patternResults.matches.map(m => ({
                category: m.category,
                pattern: m.pattern,
                confidence: m.confidence
            })),
            overallConfidence: patternResults.confidence,
            triggerRecommendation: patternResults.triggerRecommendation,
            timestamp: Date.now(),
            context
        };

        this.stats.userFeedback.push(feedback);

        // Adjust confidence for patterns based on feedback
        this.adjustPatternsFromFeedback(feedback);

        // Keep feedback history manageable
        if (this.stats.userFeedback.length > 100) {
            this.stats.userFeedback.splice(0, 20);
        }
    }

    /**
     * Adjust pattern confidence based on user feedback
     */
    adjustPatternsFromFeedback(feedback) {
        const adjustmentFactor = feedback.positive ? 0.05 : -0.05;

        for (const patternInfo of feedback.patterns) {
            const currentAdjustment = this.adaptiveSettings.confidenceAdjustments.get(patternInfo.category) || 0;
            const newAdjustment = Math.max(-0.3, Math.min(0.3, currentAdjustment + adjustmentFactor));
            this.adaptiveSettings.confidenceAdjustments.set(patternInfo.category, newAdjustment);
        }
    }

    /**
     * Get pattern detection statistics
     */
    getStatistics() {
        const recentFeedback = this.stats.userFeedback.slice(-20);
        const positiveRate = recentFeedback.length > 0 ?
            recentFeedback.filter(f => f.positive).length / recentFeedback.length : 0;

        return {
            totalMatches: this.stats.totalMatches,
            patternHitCounts: Object.fromEntries(this.stats.patternHits),
            positiveRate: Math.round(positiveRate * 100),
            confidenceAdjustments: Object.fromEntries(this.adaptiveSettings.confidenceAdjustments),
            sensitivity: this.sensitivity,
            learningEnabled: this.adaptiveSettings.learningEnabled
        };
    }

    /**
     * Update sensitivity setting
     */
    updateSensitivity(newSensitivity) {
        this.sensitivity = Math.max(0, Math.min(1, newSensitivity));
    }

    /**
     * Reset learning data (useful for testing)
     */
    resetLearning() {
        this.adaptiveSettings.confidenceAdjustments.clear();
        this.stats.userFeedback = [];
        this.stats.patternHits.clear();
        this.stats.totalMatches = 0;
    }
}

module.exports = { AdaptivePatternDetector };
```

--------------------------------------------------------------------------------
/tests/unit/test_memory_service.py:
--------------------------------------------------------------------------------

```python
"""
Unit tests for MemoryService business logic.

These tests verify the MemoryService class centralizes memory operations
correctly and provides consistent behavior across all interfaces.
"""

import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from datetime import datetime
from typing import List

from mcp_memory_service.services.memory_service import MemoryService
from mcp_memory_service.models.memory import Memory
from mcp_memory_service.storage.base import MemoryStorage


# Test Fixtures

@pytest.fixture
def mock_storage():
    """Create a mock storage backend."""
    storage = AsyncMock(spec=MemoryStorage)
    # Add required properties
    storage.max_content_length = 1000
    storage.supports_chunking = True
    # Setup method return values to avoid AttributeError
    storage.store.return_value = (True, "Success")
    storage.delete.return_value = (True, "Deleted")
    storage.get_stats.return_value = {
        "backend": "mock",
        "total_memories": 0
    }
    return storage


@pytest.fixture
def memory_service(mock_storage):
    """Create a MemoryService instance with mock storage."""
    return MemoryService(storage=mock_storage)


@pytest.fixture
def sample_memory():
    """Create a sample memory object for testing."""
    return Memory(
        content="Test memory content",
        content_hash="test_hash_123",
        tags=["test", "sample"],
        memory_type="note",
        metadata={"source": "test"},
        created_at=1698765432.0,
        updated_at=1698765432.0
    )


@pytest.fixture
def sample_memories():
    """Create a list of sample memories."""
    memories = []
    for i in range(5):
        memories.append(Memory(
            content=f"Test memory {i+1}",
            content_hash=f"hash_{i+1}",
            tags=[f"tag{i+1}", "test"],
            memory_type="note",
            metadata={"index": i+1},
            created_at=1698765432.0 + i * 100,
            updated_at=1698765432.0 + i * 100
        ))
    return memories


# Test list_memories method

@pytest.mark.asyncio
async def test_list_memories_basic_pagination(memory_service, mock_storage, sample_memories):
    """Test basic pagination functionality."""
    # Setup mock
    mock_storage.get_all_memories.return_value = sample_memories[:2]
    mock_storage.count_all_memories.return_value = 5

    # Execute
    result = await memory_service.list_memories(page=1, page_size=2)

    # Verify
    assert result["page"] == 1
    assert result["page_size"] == 2
    assert result["total"] == 5
    assert result["has_more"] is True
    assert len(result["memories"]) == 2

    # Verify storage called with correct parameters
    mock_storage.get_all_memories.assert_called_once_with(
        limit=2,
        offset=0,
        memory_type=None,
        tags=None
    )


@pytest.mark.asyncio
async def test_list_memories_with_tag_filter(memory_service, mock_storage, sample_memories):
    """Test filtering by tag."""
    filtered_memories = [m for m in sample_memories if "tag1" in m.tags]
    mock_storage.get_all_memories.return_value = filtered_memories
    mock_storage.count_all_memories.return_value = len(filtered_memories)

    result = await memory_service.list_memories(page=1, page_size=10, tag="tag1")

    # Verify tag passed to storage as list
    mock_storage.get_all_memories.assert_called_once()
    call_kwargs = mock_storage.get_all_memories.call_args.kwargs
    assert call_kwargs["tags"] == ["tag1"]


@pytest.mark.asyncio
async def test_list_memories_with_type_filter(memory_service, mock_storage, sample_memories):
    """Test filtering by memory type."""
    mock_storage.get_all_memories.return_value = sample_memories
    mock_storage.count_all_memories.return_value = 5

    result = await memory_service.list_memories(page=1, page_size=10, memory_type="note")

    # Verify type passed to storage
    mock_storage.get_all_memories.assert_called_once()
    call_kwargs = mock_storage.get_all_memories.call_args.kwargs
    assert call_kwargs["memory_type"] == "note"


@pytest.mark.asyncio
async def test_list_memories_offset_calculation(memory_service, mock_storage):
    """Test correct offset calculation for different pages."""
    mock_storage.get_all_memories.return_value = []
    mock_storage.count_all_memories.return_value = 0

    # Page 3 with page_size 10 should have offset 20
    await memory_service.list_memories(page=3, page_size=10)

    call_kwargs = mock_storage.get_all_memories.call_args.kwargs
    assert call_kwargs["offset"] == 20
    assert call_kwargs["limit"] == 10


@pytest.mark.asyncio
async def test_list_memories_has_more_false(memory_service, mock_storage, sample_memories):
    """Test has_more is False when no more results."""
    mock_storage.get_all_memories.return_value = sample_memories
    mock_storage.count_all_memories.return_value = 5

    # Requesting page that includes last item
    result = await memory_service.list_memories(page=1, page_size=10)

    assert result["has_more"] is False


@pytest.mark.asyncio
async def test_list_memories_error_handling(memory_service, mock_storage):
    """Test error handling in list_memories."""
    mock_storage.get_all_memories.side_effect = Exception("Database error")

    result = await memory_service.list_memories(page=1, page_size=10)

    assert result["success"] is False
    assert "error" in result
    assert "Database error" in result["error"]
    assert result["memories"] == []


# Test store_memory method

@pytest.mark.asyncio
async def test_store_memory_basic(memory_service, mock_storage):
    """Test basic memory storage."""
    mock_storage.store.return_value = (True, "Success")

    result = await memory_service.store_memory(
        content="Test content",
        tags=["test"],
        memory_type="note"
    )

    assert result["success"] is True
    assert "memory" in result
    assert result["memory"]["content"] == "Test content"

    # Verify storage.store was called
    mock_storage.store.assert_called_once()
    stored_memory = mock_storage.store.call_args.args[0]
    assert stored_memory.content == "Test content"
    assert stored_memory.tags == ["test"]


@pytest.mark.asyncio
async def test_store_memory_with_hostname_tagging(memory_service, mock_storage):
    """Test hostname tagging is applied correctly."""
    mock_storage.store.return_value = None

    result = await memory_service.store_memory(
        content="Test content",
        tags=["test"],
        client_hostname="my-machine"
    )

    # Verify hostname tag added
    stored_memory = mock_storage.store.call_args.args[0]
    assert "source:my-machine" in stored_memory.tags
    assert stored_memory.metadata["hostname"] == "my-machine"


@pytest.mark.asyncio
async def test_store_memory_hostname_not_duplicated(memory_service, mock_storage):
    """Test hostname tag is not duplicated if already present."""
    mock_storage.store.return_value = None

    result = await memory_service.store_memory(
        content="Test content",
        tags=["test", "source:my-machine"],
        client_hostname="my-machine"
    )

    stored_memory = mock_storage.store.call_args.args[0]
    # Count occurrences of hostname tag
    hostname_tags = [t for t in stored_memory.tags if t.startswith("source:")]
    assert len(hostname_tags) == 1


@pytest.mark.asyncio
@patch('mcp_memory_service.services.memory_service.ENABLE_AUTO_SPLIT', True)
async def test_store_memory_with_chunking(memory_service, mock_storage):
    """Test content chunking when enabled and content is large."""
    mock_storage.store.return_value = (True, "Success")
    # Set max_content_length to trigger chunking
    mock_storage.max_content_length = 100

    # Create content larger than max_content_length
    long_content = "x" * 200

    with patch('mcp_memory_service.services.memory_service.split_content') as mock_split:
        mock_split.return_value = ["chunk1", "chunk2"]

        result = await memory_service.store_memory(content=long_content)

        assert result["success"] is True
        assert "memories" in result
        assert result["total_chunks"] == 2
        assert "original_hash" in result

        # Verify storage.store called twice (once per chunk)
        assert mock_storage.store.call_count == 2


@pytest.mark.asyncio
async def test_store_memory_validation_error(memory_service, mock_storage):
    """Test ValueError is caught and returned as error."""
    mock_storage.store.side_effect = ValueError("Invalid content")

    result = await memory_service.store_memory(content="Test")

    assert result["success"] is False
    assert "error" in result
    assert "Invalid memory data" in result["error"]


@pytest.mark.asyncio
async def test_store_memory_connection_error(memory_service, mock_storage):
    """Test ConnectionError is caught and handled."""
    mock_storage.store.side_effect = ConnectionError("Storage unavailable")

    result = await memory_service.store_memory(content="Test")

    assert result["success"] is False
    assert "error" in result
    assert "Storage connection failed" in result["error"]


@pytest.mark.asyncio
async def test_store_memory_unexpected_error(memory_service, mock_storage):
    """Test unexpected exceptions are caught."""
    mock_storage.store.side_effect = RuntimeError("Unexpected error")

    result = await memory_service.store_memory(content="Test")

    assert result["success"] is False
    assert "error" in result
    assert "Failed to store memory" in result["error"]


# Test retrieve_memories method

@pytest.mark.asyncio
async def test_retrieve_memories_basic(memory_service, mock_storage, sample_memories):
    """Test basic semantic search retrieval."""
    mock_storage.retrieve.return_value = sample_memories[:3]

    result = await memory_service.retrieve_memories(query="test query", n_results=3)

    assert result["query"] == "test query"
    assert result["count"] == 3
    assert len(result["memories"]) == 3

    # After fix: storage.retrieve() only accepts query and n_results
    mock_storage.retrieve.assert_called_once_with(
        query="test query",
        n_results=3
    )


@pytest.mark.asyncio
async def test_retrieve_memories_with_filters(memory_service, mock_storage, sample_memories):
    """Test retrieval with tag and type filters."""
    # Return memories that will be filtered by MemoryService
    mock_storage.retrieve.return_value = sample_memories

    # Create a memory with matching tags for filtering
    from mcp_memory_service.models.memory import Memory
    import hashlib
    content_hash = hashlib.sha256("test content".encode()).hexdigest()
    matching_memory = Memory(
        content="test content",
        content_hash=content_hash,
        tags=["tag1"],
        memory_type="note",
        created_at=1234567890.0
    )
    matching_memory.metadata = {"tags": ["tag1"], "memory_type": "note"}
    mock_storage.retrieve.return_value = [matching_memory]

    result = await memory_service.retrieve_memories(
        query="test",
        n_results=5,
        tags=["tag1"],
        memory_type="note"
    )

    # After fix: storage.retrieve() only accepts query and n_results
    # Filtering is done by MemoryService after retrieval
    mock_storage.retrieve.assert_called_once_with(
        query="test",
        n_results=5
    )


@pytest.mark.asyncio
async def test_retrieve_memories_error_handling(memory_service, mock_storage):
    """Test error handling in retrieve_memories."""
    mock_storage.retrieve.side_effect = Exception("Retrieval failed")

    result = await memory_service.retrieve_memories(query="test")

    assert "error" in result
    assert result["memories"] == []
    assert "Failed to retrieve memories" in result["error"]


# Test search_by_tag method

@pytest.mark.asyncio
async def test_search_by_tag_single_tag(memory_service, mock_storage, sample_memories):
    """Test searching by a single tag."""
    mock_storage.search_by_tag.return_value = sample_memories[:2]

    result = await memory_service.search_by_tag(tags="test")

    assert result["tags"] == ["test"]
    assert result["match_type"] == "ANY"
    assert result["count"] == 2

    mock_storage.search_by_tag.assert_called_once_with(
        tags=["test"]
    )


@pytest.mark.asyncio
async def test_search_by_tag_multiple_tags(memory_service, mock_storage, sample_memories):
    """Test searching by multiple tags."""
    mock_storage.search_by_tag.return_value = sample_memories

    result = await memory_service.search_by_tag(tags=["tag1", "tag2"])

    assert result["tags"] == ["tag1", "tag2"]
    assert result["match_type"] == "ANY"


@pytest.mark.asyncio
async def test_search_by_tag_match_all(memory_service, mock_storage, sample_memories):
    """Test searching with match_all=True."""
    mock_storage.search_by_tag.return_value = sample_memories[:1]

    result = await memory_service.search_by_tag(tags=["tag1", "tag2"], match_all=True)

    assert result["match_type"] == "ALL"
    mock_storage.search_by_tag.assert_called_once_with(
        tags=["tag1", "tag2"]
    )


@pytest.mark.asyncio
async def test_search_by_tag_error_handling(memory_service, mock_storage):
    """Test error handling in search_by_tag."""
    mock_storage.search_by_tag.side_effect = Exception("Search failed")

    result = await memory_service.search_by_tag(tags="test")

    assert "error" in result
    assert result["memories"] == []
    assert "Failed to search by tags" in result["error"]


# Test get_memory_by_hash method

@pytest.mark.asyncio
async def test_get_memory_by_hash_found(memory_service, mock_storage, sample_memory):
    """Test getting memory by hash when found."""
    mock_storage.get_by_hash.return_value = sample_memory

    result = await memory_service.get_memory_by_hash("test_hash_123")

    assert result["found"] is True
    assert "memory" in result
    assert result["memory"]["content_hash"] == "test_hash_123"
    mock_storage.get_by_hash.assert_called_once_with("test_hash_123")


@pytest.mark.asyncio
async def test_get_memory_by_hash_not_found(memory_service, mock_storage):
    """Test getting memory by hash when not found."""
    mock_storage.get_by_hash.return_value = None

    result = await memory_service.get_memory_by_hash("nonexistent_hash")

    assert result["found"] is False
    assert result["content_hash"] == "nonexistent_hash"
    mock_storage.get_by_hash.assert_called_once_with("nonexistent_hash")


@pytest.mark.asyncio
async def test_get_memory_by_hash_error(memory_service, mock_storage):
    """Test error handling in get_memory_by_hash."""
    mock_storage.get_by_hash.side_effect = Exception("Database error")

    result = await memory_service.get_memory_by_hash("test_hash")

    assert result["found"] is False
    assert "error" in result
    assert "Failed to get memory" in result["error"]


# Test delete_memory method

@pytest.mark.asyncio
async def test_delete_memory_success(memory_service, mock_storage):
    """Test successful memory deletion."""
    mock_storage.delete.return_value = (True, "Deleted successfully")

    result = await memory_service.delete_memory("test_hash")

    assert result["success"] is True
    assert result["content_hash"] == "test_hash"
    mock_storage.delete.assert_called_once_with("test_hash")


@pytest.mark.asyncio
async def test_delete_memory_not_found(memory_service, mock_storage):
    """Test deleting non-existent memory."""
    mock_storage.delete.return_value = (False, "Not found")

    result = await memory_service.delete_memory("nonexistent_hash")

    assert result["success"] is False


@pytest.mark.asyncio
async def test_delete_memory_error(memory_service, mock_storage):
    """Test error handling in delete_memory."""
    mock_storage.delete.side_effect = Exception("Delete failed")

    result = await memory_service.delete_memory("test_hash")

    assert result["success"] is False
    assert "error" in result
    assert "Failed to delete memory" in result["error"]


# Test health_check method

@pytest.mark.asyncio
async def test_health_check_success(memory_service, mock_storage):
    """Test successful health check."""
    mock_storage.get_stats.return_value = {
        "backend": "sqlite-vec",
        "total_memories": 100,
        "database_size": "5MB"
    }

    result = await memory_service.health_check()

    assert result["healthy"] is True
    assert result["storage_type"] == "sqlite-vec"
    assert result["total_memories"] == 100
    assert "last_updated" in result


@pytest.mark.asyncio
async def test_health_check_failure(memory_service, mock_storage):
    """Test health check when storage fails."""
    mock_storage.get_stats.side_effect = Exception("Health check failed")

    result = await memory_service.health_check()

    assert result["healthy"] is False
    assert "error" in result
    assert "Health check failed" in result["error"]


# Test _format_memory_response method

def test_format_memory_response(memory_service, sample_memory):
    """Test memory formatting for API responses."""
    formatted = memory_service._format_memory_response(sample_memory)

    assert formatted["content"] == sample_memory.content
    assert formatted["content_hash"] == sample_memory.content_hash
    assert formatted["tags"] == sample_memory.tags
    assert formatted["memory_type"] == sample_memory.memory_type
    assert formatted["metadata"] == sample_memory.metadata
    assert "created_at" in formatted
    assert "updated_at" in formatted
    assert "created_at_iso" in formatted
    assert "updated_at_iso" in formatted


def test_format_memory_response_preserves_all_fields(memory_service, sample_memory):
    """Test that formatting preserves all memory fields."""
    formatted = memory_service._format_memory_response(sample_memory)

    # Verify all TypedDict fields are present
    required_fields = [
        "content", "content_hash", "tags", "memory_type", "metadata",
        "created_at", "updated_at", "created_at_iso", "updated_at_iso"
    ]

    for field in required_fields:
        assert field in formatted


# Integration-style tests (still using mocks but testing workflows)

@pytest.mark.asyncio
async def test_store_and_retrieve_workflow(memory_service, mock_storage, sample_memory):
    """Test complete workflow: store then retrieve."""
    # Setup mocks
    mock_storage.store.return_value = (True, "Success")
    mock_storage.retrieve.return_value = [sample_memory]

    # Store memory
    store_result = await memory_service.store_memory(
        content="Test workflow",
        tags=["workflow"],
        memory_type="test"
    )
    assert store_result["success"] is True

    # Retrieve memory
    retrieve_result = await memory_service.retrieve_memories(query="workflow")
    assert len(retrieve_result["memories"]) > 0


@pytest.mark.asyncio
async def test_list_memories_database_level_filtering(memory_service, mock_storage):
    """Test that list_memories uses database-level filtering (not loading all)."""
    mock_storage.get_all_memories.return_value = []
    mock_storage.count_all_memories.return_value = 1000

    # Request page 1 with 10 items from 1000 total
    result = await memory_service.list_memories(page=1, page_size=10)

    # Verify we only requested 10 items, not all 1000
    call_kwargs = mock_storage.get_all_memories.call_args.kwargs
    assert call_kwargs["limit"] == 10
    assert call_kwargs["offset"] == 0

    # This proves we're using database-level filtering, not O(n) memory loading
    mock_storage.get_all_memories.assert_called_once()


@pytest.mark.asyncio
async def test_empty_tags_list_stored_correctly(memory_service, mock_storage):
    """Test that empty or None tags are handled correctly."""
    mock_storage.store.return_value = None

    # Store with None tags
    result = await memory_service.store_memory(content="Test", tags=None)

    stored_memory = mock_storage.store.call_args.args[0]
    assert isinstance(stored_memory.tags, list)
    assert len(stored_memory.tags) == 0


@pytest.mark.asyncio
async def test_metadata_preserved_through_storage(memory_service, mock_storage):
    """Test that metadata is preserved correctly."""
    mock_storage.store.return_value = None

    custom_metadata = {"key1": "value1", "key2": 123}
    result = await memory_service.store_memory(
        content="Test",
        metadata=custom_metadata
    )

    stored_memory = mock_storage.store.call_args.args[0]
    assert "key1" in stored_memory.metadata
    assert stored_memory.metadata["key1"] == "value1"

```

--------------------------------------------------------------------------------
/src/mcp_memory_service/consolidation/health.py:
--------------------------------------------------------------------------------

```python
# Copyright 2024 Heinrich Krupp  
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Health monitoring and error handling for consolidation system."""

import asyncio
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from enum import Enum
import traceback

from .base import ConsolidationError


class HealthStatus(Enum):
    """Health status levels."""
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"
    CRITICAL = "critical"


@dataclass
class HealthMetric:
    """Represents a health metric."""
    name: str
    value: Any
    status: HealthStatus
    message: str = ""
    timestamp: datetime = field(default_factory=datetime.now)
    threshold_warning: Optional[float] = None
    threshold_critical: Optional[float] = None


@dataclass
class HealthAlert:
    """Represents a health alert."""
    alert_id: str
    component: str
    severity: HealthStatus
    message: str
    timestamp: datetime = field(default_factory=datetime.now)
    resolved: bool = False
    resolution_timestamp: Optional[datetime] = None


class ConsolidationHealthMonitor:
    """Monitors health of the consolidation system."""
    
    def __init__(self, config=None):
        self.config = config
        self.logger = logging.getLogger(__name__)
        
        # Health metrics storage
        self.metrics: Dict[str, HealthMetric] = {}
        self.alerts: List[HealthAlert] = []
        self.error_history: List[Dict[str, Any]] = []
        
        # Health thresholds
        self.thresholds = {
            'consolidation_success_rate': {'warning': 0.8, 'critical': 0.6},
            'average_duration_seconds': {'warning': 300, 'critical': 600},
            'memory_processing_rate': {'warning': 0.1, 'critical': 0.05},
            'error_rate': {'warning': 0.1, 'critical': 0.2},
            'storage_response_time': {'warning': 5.0, 'critical': 10.0}
        }
        
        # Performance tracking
        self.performance_history: List[Dict[str, Any]] = []
        self.max_history_entries = 1000
        
        # Component health cache
        self.component_health_cache: Dict[str, Dict[str, Any]] = {}
        self.cache_ttl = timedelta(minutes=5)
        self.last_health_check = {}
    
    async def check_overall_health(self) -> Dict[str, Any]:
        """Check overall consolidation system health."""
        try:
            health = {
                'status': HealthStatus.HEALTHY.value,
                'timestamp': datetime.now().isoformat(),
                'components': {},
                'metrics': {},
                'alerts': [],
                'recommendations': []
            }
            
            # Check individual components
            components = [
                'decay_calculator',
                'association_engine',
                'clustering_engine', 
                'compression_engine',
                'forgetting_engine',
                'scheduler',
                'storage_backend'
            ]
            
            overall_status = HealthStatus.HEALTHY
            
            for component in components:
                component_health = await self._check_component_health(component)
                health['components'][component] = component_health
                
                # Update overall status based on component health
                component_status = HealthStatus(component_health.get('status', 'healthy'))
                if component_status == HealthStatus.CRITICAL:
                    overall_status = HealthStatus.CRITICAL
                elif component_status == HealthStatus.UNHEALTHY and overall_status != HealthStatus.CRITICAL:
                    overall_status = HealthStatus.UNHEALTHY
                elif component_status == HealthStatus.DEGRADED and overall_status == HealthStatus.HEALTHY:
                    overall_status = HealthStatus.DEGRADED
            
            # Add current metrics
            health['metrics'] = {name: {
                'value': metric.value,
                'status': metric.status.value,
                'message': metric.message,
                'timestamp': metric.timestamp.isoformat()
            } for name, metric in self.metrics.items()}
            
            # Add active alerts
            active_alerts = [alert for alert in self.alerts if not alert.resolved]
            health['alerts'] = [{
                'alert_id': alert.alert_id,
                'component': alert.component,
                'severity': alert.severity.value,
                'message': alert.message,
                'timestamp': alert.timestamp.isoformat()
            } for alert in active_alerts[-10:]]  # Last 10 alerts
            
            # Add recommendations
            health['recommendations'] = await self._generate_health_recommendations()
            
            health['status'] = overall_status.value
            
            return health
            
        except Exception as e:
            self.logger.error(f"Error checking overall health: {e}")
            return {
                'status': HealthStatus.CRITICAL.value,
                'timestamp': datetime.now().isoformat(),
                'error': str(e),
                'components': {},
                'metrics': {},
                'alerts': [],
                'recommendations': []
            }
    
    async def _check_component_health(self, component: str) -> Dict[str, Any]:
        """Check health of a specific component."""
        # Check cache first
        now = datetime.now()
        if (component in self.component_health_cache and 
            component in self.last_health_check and
            now - self.last_health_check[component] < self.cache_ttl):
            return self.component_health_cache[component]
        
        try:
            health = {
                'status': HealthStatus.HEALTHY.value,
                'timestamp': now.isoformat(),
                'checks': {},
                'metrics': {}
            }
            
            if component == 'decay_calculator':
                health.update(await self._check_decay_calculator_health())
            elif component == 'association_engine':
                health.update(await self._check_association_engine_health())
            elif component == 'clustering_engine':
                health.update(await self._check_clustering_engine_health())
            elif component == 'compression_engine':
                health.update(await self._check_compression_engine_health())
            elif component == 'forgetting_engine':
                health.update(await self._check_forgetting_engine_health())
            elif component == 'scheduler':
                health.update(await self._check_scheduler_health())
            elif component == 'storage_backend':
                health.update(await self._check_storage_backend_health())
            
            # Cache the result
            self.component_health_cache[component] = health
            self.last_health_check[component] = now
            
            return health
            
        except Exception as e:
            self.logger.error(f"Error checking {component} health: {e}")
            return {
                'status': HealthStatus.UNHEALTHY.value,
                'timestamp': now.isoformat(),
                'error': str(e),
                'checks': {},
                'metrics': {}
            }
    
    async def _check_decay_calculator_health(self) -> Dict[str, Any]:
        """Check decay calculator health."""
        return {
            'checks': {
                'configuration': 'valid',
                'retention_periods': 'configured',
                'decay_algorithm': 'functional'
            },
            'metrics': {
                'recent_calculations': len([h for h in self.performance_history 
                                          if h.get('component') == 'decay_calculator'
                                          and h.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=1)])
            }
        }
    
    async def _check_association_engine_health(self) -> Dict[str, Any]:
        """Check association engine health."""
        recent_associations = len([h for h in self.performance_history 
                                 if h.get('component') == 'association_engine'
                                 and h.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=1)])
        
        return {
            'checks': {
                'similarity_thresholds': 'configured',
                'concept_extraction': 'functional',
                'association_discovery': 'active'
            },
            'metrics': {
                'recent_associations_discovered': recent_associations,
                'similarity_range': '0.3-0.7'
            }
        }
    
    async def _check_clustering_engine_health(self) -> Dict[str, Any]:
        """Check clustering engine health."""
        return {
            'checks': {
                'clustering_algorithm': 'available',
                'minimum_cluster_size': 'configured',
                'embedding_processing': 'functional'
            },
            'metrics': {
                'recent_clusters_created': len([h for h in self.performance_history 
                                              if h.get('component') == 'clustering_engine'
                                              and h.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=1)])
            }
        }
    
    async def _check_compression_engine_health(self) -> Dict[str, Any]:
        """Check compression engine health."""
        return {
            'checks': {
                'summary_generation': 'functional',
                'concept_extraction': 'active',
                'compression_ratio': 'optimal'
            },
            'metrics': {
                'recent_compressions': len([h for h in self.performance_history 
                                          if h.get('component') == 'compression_engine'
                                          and h.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=1)])
            }
        }
    
    async def _check_forgetting_engine_health(self) -> Dict[str, Any]:
        """Check forgetting engine health."""
        return {
            'checks': {
                'archive_storage': 'accessible',
                'relevance_thresholds': 'configured',
                'controlled_forgetting': 'safe'
            },
            'metrics': {
                'recent_archival_operations': len([h for h in self.performance_history 
                                                 if h.get('component') == 'forgetting_engine'
                                                 and h.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=1)])
            }
        }
    
    async def _check_scheduler_health(self) -> Dict[str, Any]:
        """Check scheduler health."""
        return {
            'checks': {
                'scheduler_running': 'active',
                'job_scheduling': 'functional',
                'cron_expressions': 'valid'
            },
            'metrics': {
                'scheduled_jobs': 'configured',
                'last_execution': 'recent'
            }
        }
    
    async def _check_storage_backend_health(self) -> Dict[str, Any]:
        """Check storage backend health."""
        return {
            'checks': {
                'storage_connection': 'connected',
                'read_operations': 'functional', 
                'write_operations': 'functional',
                'backup_integrity': 'verified'
            },
            'metrics': {
                'response_time_ms': 'normal',
                'storage_utilization': 'optimal'
            }
        }
    
    async def _generate_health_recommendations(self) -> List[str]:
        """Generate health recommendations based on current system state."""
        recommendations = []
        
        # Check error rates
        recent_errors = len([e for e in self.error_history 
                           if e.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=24)])
        
        if recent_errors > 10:
            recommendations.append("High error rate detected. Consider reviewing consolidation configuration.")
        
        # Check performance metrics
        if 'average_duration_seconds' in self.metrics:
            duration = self.metrics['average_duration_seconds'].value
            if duration > 300:
                recommendations.append("Consolidation operations are taking longer than expected. Consider optimizing memory processing.")
        
        # Check active alerts
        critical_alerts = [a for a in self.alerts if not a.resolved and a.severity == HealthStatus.CRITICAL]
        if critical_alerts:
            recommendations.append("Critical alerts detected. Immediate attention required.")
        
        # Check storage health
        if 'storage_response_time' in self.metrics:
            response_time = self.metrics['storage_response_time'].value
            if response_time > 5.0:
                recommendations.append("Storage backend response time is elevated. Check database performance.")
        
        return recommendations
    
    def record_consolidation_performance(self, time_horizon: str, duration: float, 
                                       memories_processed: int, success: bool, 
                                       errors: List[str] = None):
        """Record performance metrics from a consolidation run."""
        entry = {
            'timestamp': datetime.now(),
            'time_horizon': time_horizon,
            'duration_seconds': duration,
            'memories_processed': memories_processed,
            'success': success,
            'errors': errors or [],
            'memories_per_second': memories_processed / duration if duration > 0 else 0
        }
        
        self.performance_history.append(entry)
        
        # Trim history to max size
        if len(self.performance_history) > self.max_history_entries:
            self.performance_history = self.performance_history[-self.max_history_entries:]
        
        # Update metrics
        self._update_performance_metrics()
        
        # Check for alerts
        if not success or (errors and len(errors) > 0):
            self._create_alert(
                component='consolidator',
                severity=HealthStatus.DEGRADED if success else HealthStatus.UNHEALTHY,
                message=f"Consolidation issues detected: {', '.join(errors[:3])}"
            )
    
    def record_error(self, component: str, error: Exception, context: Dict[str, Any] = None):
        """Record an error in the consolidation system.""" 
        error_entry = {
            'timestamp': datetime.now(),
            'component': component,
            'error_type': type(error).__name__,
            'error_message': str(error),
            'traceback': traceback.format_exc(),
            'context': context or {}
        }
        
        self.error_history.append(error_entry)
        
        # Trim error history
        if len(self.error_history) > self.max_history_entries:
            self.error_history = self.error_history[-self.max_history_entries:]
        
        # Create alert for serious errors
        if isinstance(error, ConsolidationError):
            severity = HealthStatus.UNHEALTHY
        else:
            severity = HealthStatus.DEGRADED
        
        self._create_alert(
            component=component,
            severity=severity,
            message=f"{type(error).__name__}: {str(error)}"
        )
        
        self.logger.error(f"Error in {component}: {error}", exc_info=True)
    
    def _update_performance_metrics(self):
        """Update performance metrics based on recent data."""
        now = datetime.now()
        recent_cutoff = now - timedelta(hours=24)
        
        # Get recent performance data
        recent_runs = [r for r in self.performance_history if r['timestamp'] > recent_cutoff]
        
        if not recent_runs:
            return
        
        # Calculate success rate
        successful_runs = [r for r in recent_runs if r['success']]
        success_rate = len(successful_runs) / len(recent_runs)
        
        self.metrics['consolidation_success_rate'] = HealthMetric(
            name='consolidation_success_rate',
            value=success_rate,
            status=self._get_status_for_metric('consolidation_success_rate', success_rate),
            message=f"{len(successful_runs)}/{len(recent_runs)} consolidations successful"
        )
        
        # Calculate average duration
        avg_duration = sum(r['duration_seconds'] for r in recent_runs) / len(recent_runs)
        
        self.metrics['average_duration_seconds'] = HealthMetric(
            name='average_duration_seconds',
            value=avg_duration,
            status=self._get_status_for_metric('average_duration_seconds', avg_duration),
            message=f"Average consolidation duration: {avg_duration:.1f}s"
        )
        
        # Calculate processing rate
        total_memories = sum(r['memories_processed'] for r in recent_runs)
        total_duration = sum(r['duration_seconds'] for r in recent_runs)
        processing_rate = total_memories / total_duration if total_duration > 0 else 0
        
        self.metrics['memory_processing_rate'] = HealthMetric(
            name='memory_processing_rate',
            value=processing_rate,
            status=self._get_status_for_metric('memory_processing_rate', processing_rate),
            message=f"Processing rate: {processing_rate:.2f} memories/second"
        )
        
        # Calculate error rate
        recent_error_cutoff = now - timedelta(hours=1)
        recent_errors = [e for e in self.error_history if e['timestamp'] > recent_error_cutoff]
        error_rate = len(recent_errors) / max(len(recent_runs), 1)
        
        self.metrics['error_rate'] = HealthMetric(
            name='error_rate',
            value=error_rate,
            status=self._get_status_for_metric('error_rate', error_rate),
            message=f"Error rate: {error_rate:.2f} errors per consolidation"
        )
    
    def _get_status_for_metric(self, metric_name: str, value: float) -> HealthStatus:
        """Determine health status for a metric value."""
        if metric_name not in self.thresholds:
            return HealthStatus.HEALTHY
        
        thresholds = self.thresholds[metric_name]
        
        # For error rate and duration, higher is worse
        if metric_name in ['error_rate', 'average_duration_seconds', 'storage_response_time']:
            if value >= thresholds['critical']:
                return HealthStatus.CRITICAL
            elif value >= thresholds['warning']:
                return HealthStatus.DEGRADED
            else:
                return HealthStatus.HEALTHY
        
        # For success rate and processing rate, lower is worse
        else:
            if value <= thresholds['critical']:
                return HealthStatus.CRITICAL
            elif value <= thresholds['warning']:
                return HealthStatus.DEGRADED
            else:
                return HealthStatus.HEALTHY
    
    def _create_alert(self, component: str, severity: HealthStatus, message: str):
        """Create a new health alert."""
        alert_id = f"{component}_{severity.value}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        alert = HealthAlert(
            alert_id=alert_id,
            component=component,
            severity=severity,
            message=message
        )
        
        self.alerts.append(alert)
        
        # Trim alerts to reasonable size
        if len(self.alerts) > 100:
            self.alerts = self.alerts[-100:]
        
        self.logger.warning(f"Health alert [{severity.value}] for {component}: {message}")
    
    def resolve_alert(self, alert_id: str):
        """Mark an alert as resolved."""
        for alert in self.alerts:
            if alert.alert_id == alert_id and not alert.resolved:
                alert.resolved = True
                alert.resolution_timestamp = datetime.now()
                self.logger.info(f"Alert {alert_id} resolved")
                break
    
    async def get_health_summary(self) -> Dict[str, Any]:
        """Get a summary of consolidation system health."""
        health = await self.check_overall_health()
        
        return {
            'overall_status': health['status'],
            'timestamp': health['timestamp'],
            'component_count': len(health['components']),
            'healthy_components': len([c for c in health['components'].values() 
                                     if c.get('status') == 'healthy']),
            'active_alerts': len([a for a in health['alerts'] if not a.get('resolved', False)]),
            'critical_alerts': len([a for a in health['alerts'] 
                                  if a.get('severity') == 'critical' and not a.get('resolved', False)]),
            'recommendations_count': len(health.get('recommendations', [])),
            'recent_errors': len([e for e in self.error_history 
                                if e.get('timestamp', datetime.min) > datetime.now() - timedelta(hours=24)])
        }
```

--------------------------------------------------------------------------------
/tests/consolidation/test_forgetting.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the controlled forgetting engine."""

import pytest
import os
import json
import tempfile
from datetime import datetime, timedelta
from pathlib import Path

from mcp_memory_service.consolidation.forgetting import (
    ControlledForgettingEngine, 
    ForgettingCandidate,
    ForgettingResult
)
from mcp_memory_service.consolidation.decay import RelevanceScore
from mcp_memory_service.models.memory import Memory


@pytest.mark.unit
class TestControlledForgettingEngine:
    """Test the controlled forgetting system."""
    
    @pytest.fixture
    def forgetting_engine(self, consolidation_config):
        return ControlledForgettingEngine(consolidation_config)
    
    @pytest.fixture
    def sample_relevance_scores(self, sample_memories):
        """Create sample relevance scores for memories."""
        scores = []
        for i, memory in enumerate(sample_memories):
            # Create varied relevance scores
            if "critical" in memory.tags:
                total_score = 1.5
            elif "temporary" in memory.tags:
                total_score = 0.05  # Very low relevance
            elif "test" in memory.content:
                total_score = 0.02  # Low quality content
            else:
                total_score = 0.8
            
            score = RelevanceScore(
                memory_hash=memory.content_hash,
                total_score=total_score,
                base_importance=1.0,
                decay_factor=0.8,
                connection_boost=1.0,
                access_boost=1.0,
                metadata={"test_score": True}
            )
            scores.append(score)
        
        return scores
    
    @pytest.mark.asyncio
    async def test_basic_forgetting_process(self, forgetting_engine, sample_memories, sample_relevance_scores):
        """Test basic forgetting process functionality."""
        access_patterns = {
            sample_memories[0].content_hash: datetime.now() - timedelta(days=100)  # Old access
        }
        
        results = await forgetting_engine.process(
            sample_memories, 
            sample_relevance_scores,
            access_patterns=access_patterns,
            time_horizon="monthly"
        )
        
        assert isinstance(results, list)
        assert all(isinstance(result, ForgettingResult) for result in results)
        
        # Check that some memories were processed for forgetting
        actions = [result.action_taken for result in results]
        valid_actions = {"archived", "compressed", "deleted", "skipped"}
        assert all(action in valid_actions for action in actions)
    
    @pytest.mark.asyncio
    async def test_identify_forgetting_candidates(self, forgetting_engine, sample_memories, sample_relevance_scores):
        """Test identification of forgetting candidates."""
        access_patterns = {}
        
        candidates = await forgetting_engine._identify_forgetting_candidates(
            sample_memories,
            {score.memory_hash: score for score in sample_relevance_scores},
            access_patterns,
            "monthly"
        )
        
        assert isinstance(candidates, list)
        assert all(isinstance(candidate, ForgettingCandidate) for candidate in candidates)
        
        # Check candidate properties
        for candidate in candidates:
            assert isinstance(candidate.memory, Memory)
            assert isinstance(candidate.relevance_score, RelevanceScore)
            assert isinstance(candidate.forgetting_reasons, list)
            assert len(candidate.forgetting_reasons) > 0
            assert candidate.archive_priority in [1, 2, 3]
            assert isinstance(candidate.can_be_deleted, bool)
    
    @pytest.mark.asyncio
    async def test_protected_memory_exclusion(self, forgetting_engine, sample_memories, sample_relevance_scores):
        """Test that protected memories are excluded from forgetting."""
        # Find critical memory (should be protected)
        critical_memory = next((m for m in sample_memories if "critical" in m.tags), None)
        
        if critical_memory:
            candidates = await forgetting_engine._identify_forgetting_candidates(
                [critical_memory],
                {critical_memory.content_hash: sample_relevance_scores[0]},  # Use first score
                {},
                "yearly"
            )
            
            # Critical memory should not be a candidate for forgetting
            assert len(candidates) == 0
    
    @pytest.mark.asyncio
    async def test_low_relevance_identification(self, forgetting_engine):
        """Test identification of low relevance memories."""
        now = datetime.now()
        
        low_relevance_memory = Memory(
            content="Low relevance test content",
            content_hash="low_relevance",
            tags=["test"],
            embedding=[0.1] * 320,
            created_at=now.timestamp(),
            updated_at=(now - timedelta(days=100)).timestamp()  # Old access
        )
        
        low_score = RelevanceScore(
            memory_hash="low_relevance",
            total_score=0.05,  # Below threshold
            base_importance=1.0,
            decay_factor=0.1,
            connection_boost=1.0,
            access_boost=1.0,
            metadata={}
        )
        
        candidates = await forgetting_engine._identify_forgetting_candidates(
            [low_relevance_memory],
            {"low_relevance": low_score},
            {},
            "monthly"
        )
        
        assert len(candidates) > 0
        candidate = candidates[0]
        assert "low_relevance" in candidate.forgetting_reasons
    
    @pytest.mark.asyncio
    async def test_old_access_identification(self, forgetting_engine):
        """Test identification of memories with old access patterns."""
        now = datetime.now()
        
        old_access_memory = Memory(
            content="Memory with old access",
            content_hash="old_access",
            tags=["test"],
            embedding=[0.1] * 320,
            created_at=now.timestamp()
        )
        
        score = RelevanceScore(
            memory_hash="old_access",
            total_score=0.5,  # Decent relevance
            base_importance=1.0,
            decay_factor=0.8,
            connection_boost=1.0,
            access_boost=1.0,
            metadata={}
        )
        
        # Very old access
        old_access_patterns = {
            "old_access": now - timedelta(days=120)  # Older than threshold
        }
        
        candidates = await forgetting_engine._identify_forgetting_candidates(
            [old_access_memory],
            {"old_access": score},
            old_access_patterns,
            "monthly"
        )
        
        assert len(candidates) > 0
        candidate = candidates[0]
        assert "old_access" in candidate.forgetting_reasons
    
    @pytest.mark.asyncio
    async def test_temporary_memory_expiration(self, forgetting_engine):
        """Test identification of expired temporary memories."""
        now = datetime.now()
        
        expired_temp_memory = Memory(
            content="Expired temporary memory",
            content_hash="expired_temp",
            tags=["temporary"],
            memory_type="temporary",
            embedding=[0.1] * 320,
            created_at=(now - timedelta(days=10)).timestamp()  # Older than 7 days
        )
        
        score = RelevanceScore(
            memory_hash="expired_temp",
            total_score=0.8,  # Good relevance, but temporary
            base_importance=1.0,
            decay_factor=0.8,
            connection_boost=1.0,
            access_boost=1.0,
            metadata={}
        )
        
        candidates = await forgetting_engine._identify_forgetting_candidates(
            [expired_temp_memory],
            {"expired_temp": score},
            {},
            "monthly"
        )
        
        assert len(candidates) > 0
        candidate = candidates[0]
        assert "expired_temporary" in candidate.forgetting_reasons
        assert candidate.can_be_deleted is True
    
    @pytest.mark.asyncio
    async def test_low_quality_content_detection(self, forgetting_engine):
        """Test detection of low quality content."""
        # Very short content
        short_memory = Memory(
            content="test",
            content_hash="short",
            tags=["test"],
            embedding=[0.1] * 320,
            created_at=datetime.now().timestamp()
        )
        
        # Repetitive content
        repetitive_memory = Memory(
            content="test test test test test test",
            content_hash="repetitive",
            tags=["test"],
            embedding=[0.1] * 320,
            created_at=datetime.now().timestamp()
        )
        
        # Mostly non-alphabetic
        non_alpha_memory = Memory(
            content="!@#$%^&*()_+{}|<>?",
            content_hash="non_alpha",
            tags=["test"],
            embedding=[0.1] * 320,
            created_at=datetime.now().timestamp()
        )
        
        test_memories = [short_memory, repetitive_memory, non_alpha_memory]
        
        for memory in test_memories:
            is_low_quality = forgetting_engine._is_low_quality_content(memory)
            assert is_low_quality is True
    
    @pytest.mark.asyncio
    async def test_duplicate_detection(self, forgetting_engine, sample_memories):
        """Test detection of potential duplicate content."""
        # Create a memory that's very similar to an existing one
        existing_memory = sample_memories[0]
        duplicate_memory = Memory(
            content=existing_memory.content + " duplicate",  # Very similar
            content_hash="duplicate_test",
            tags=existing_memory.tags,
            embedding=existing_memory.embedding,
            created_at=datetime.now().timestamp()
        )
        
        test_memories = sample_memories + [duplicate_memory]
        
        is_duplicate = forgetting_engine._appears_to_be_duplicate(duplicate_memory, test_memories)
        # This might not always detect as duplicate due to the simple algorithm
        # Just ensure the method runs without error
        assert isinstance(is_duplicate, bool)
    
    @pytest.mark.asyncio
    async def test_archive_memory(self, forgetting_engine):
        """Test archiving a memory to filesystem."""
        memory = Memory(
            content="Memory to archive",
            content_hash="archive_test",
            tags=["test", "archive"],
            embedding=[0.1] * 320,
            created_at=datetime.now().timestamp()
        )
        
        score = RelevanceScore(
            memory_hash="archive_test",
            total_score=0.3,
            base_importance=1.0,
            decay_factor=0.5,
            connection_boost=1.0,
            access_boost=1.0,
            metadata={}
        )
        
        candidate = ForgettingCandidate(
            memory=memory,
            relevance_score=score,
            forgetting_reasons=["test_archival"],
            archive_priority=2,
            can_be_deleted=False
        )
        
        result = await forgetting_engine._archive_memory(candidate)
        
        assert isinstance(result, ForgettingResult)
        assert result.action_taken == "archived"
        assert result.archive_path is not None
        assert os.path.exists(result.archive_path)
        
        # Check archive file content
        with open(result.archive_path, 'r') as f:
            archive_data = json.load(f)
        
        assert "memory" in archive_data
        assert "relevance_score" in archive_data
        assert "forgetting_metadata" in archive_data
        assert archive_data["memory"]["content_hash"] == "archive_test"
    
    @pytest.mark.asyncio
    async def test_compress_memory(self, forgetting_engine):
        """Test compressing a memory."""
        memory = Memory(
            content="This is a longer memory content that should be compressed to preserve key information while reducing size",
            content_hash="compress_test",
            tags=["test", "compression"],
            embedding=[0.1] * 320,
            created_at=datetime.now().timestamp()
        )
        
        score = RelevanceScore(
            memory_hash="compress_test",
            total_score=0.4,
            base_importance=1.0,
            decay_factor=0.6,
            connection_boost=1.0,
            access_boost=1.0,
            metadata={}
        )
        
        candidate = ForgettingCandidate(
            memory=memory,
            relevance_score=score,
            forgetting_reasons=["test_compression"],
            archive_priority=3,
            can_be_deleted=False
        )
        
        result = await forgetting_engine._compress_memory(candidate)
        
        assert isinstance(result, ForgettingResult)
        assert result.action_taken == "compressed"
        assert result.compressed_version is not None
        assert result.archive_path is not None
        
        # Check compressed memory
        compressed = result.compressed_version
        assert compressed.memory_type == "compressed"
        assert "compressed" in compressed.tags
        assert len(compressed.content) <= len(memory.content)
        assert "original_hash" in compressed.metadata
        assert "compression_ratio" in compressed.metadata
    
    @pytest.mark.asyncio
    async def test_delete_memory(self, forgetting_engine):
        """Test deleting a memory with backup."""
        memory = Memory(
            content="Memory to delete",
            content_hash="delete_test",
            tags=["test", "delete"],
            embedding=[0.1] * 320,
            created_at=datetime.now().timestamp()
        )
        
        score = RelevanceScore(
            memory_hash="delete_test",
            total_score=0.01,
            base_importance=1.0,
            decay_factor=0.1,
            connection_boost=1.0,
            access_boost=1.0,
            metadata={}
        )
        
        candidate = ForgettingCandidate(
            memory=memory,
            relevance_score=score,
            forgetting_reasons=["potential_duplicate"],
            archive_priority=1,
            can_be_deleted=True
        )
        
        result = await forgetting_engine._delete_memory(candidate)
        
        assert isinstance(result, ForgettingResult)
        assert result.action_taken == "deleted"
        assert result.archive_path is not None  # Backup should exist
        assert os.path.exists(result.archive_path)
        
        # Check backup file
        with open(result.archive_path, 'r') as f:
            backup_data = json.load(f)
        
        assert "memory" in backup_data
        assert "deletion_metadata" in backup_data
        assert backup_data["memory"]["content_hash"] == "delete_test"
    
    @pytest.mark.asyncio
    async def test_memory_recovery(self, forgetting_engine):
        """Test recovery of forgotten memories."""
        # First archive a memory
        memory = Memory(
            content="Memory for recovery test",
            content_hash="recovery_test",
            tags=["test", "recovery"],
            embedding=[0.1] * 320,
            created_at=datetime.now().timestamp()
        )
        
        score = RelevanceScore(
            memory_hash="recovery_test",
            total_score=0.2,
            base_importance=1.0,
            decay_factor=0.4,
            connection_boost=1.0,
            access_boost=1.0,
            metadata={}
        )
        
        candidate = ForgettingCandidate(
            memory=memory,
            relevance_score=score,
            forgetting_reasons=["test_recovery"],
            archive_priority=2,
            can_be_deleted=False
        )
        
        # Archive the memory
        await forgetting_engine._archive_memory(candidate)
        
        # Now try to recover it
        recovered_memory = await forgetting_engine.recover_memory("recovery_test")
        
        assert recovered_memory is not None
        assert isinstance(recovered_memory, Memory)
        assert recovered_memory.content_hash == "recovery_test"
        assert recovered_memory.content == memory.content
    
    @pytest.mark.asyncio
    async def test_forgetting_statistics(self, forgetting_engine, sample_memories, sample_relevance_scores):
        """Test getting forgetting statistics."""
        # Process some memories to generate statistics
        access_patterns = {
            sample_memories[0].content_hash: datetime.now() - timedelta(days=100)
        }
        
        await forgetting_engine.process(
            sample_memories[:3],  # Use subset for faster test
            sample_relevance_scores[:3],
            access_patterns=access_patterns,
            time_horizon="monthly"
        )
        
        stats = await forgetting_engine.get_forgetting_statistics()
        
        assert isinstance(stats, dict)
        assert "total_archived" in stats
        assert "total_compressed" in stats
        assert "total_deleted" in stats
        assert "archive_size_bytes" in stats
        
        # Values should be non-negative
        assert stats["total_archived"] >= 0
        assert stats["total_compressed"] >= 0
        assert stats["total_deleted"] >= 0
        assert stats["archive_size_bytes"] >= 0
    
    @pytest.mark.asyncio
    async def test_create_compressed_content(self, forgetting_engine):
        """Test creation of compressed content."""
        original_content = """
        This is a longer piece of content that contains multiple sentences. 
        It has important information in the first sentence. 
        The middle part contains additional details and context.
        The final sentence wraps up the content nicely.
        """
        
        compressed = forgetting_engine._create_compressed_content(original_content)
        
        assert isinstance(compressed, str)
        assert len(compressed) <= len(original_content)
        assert len(compressed) > 0
        
        # Should contain compression indicator if significantly compressed
        if len(compressed) < len(original_content) * 0.8:
            assert "[Compressed]" in compressed
    
    @pytest.mark.asyncio
    async def test_extract_important_terms(self, forgetting_engine):
        """Test extraction of important terms from text."""
        text = """
        The CamelCaseVariable is used with the API_ENDPOINT.
        Visit https://example.com for documentation.
        Contact [email protected] for help.
        The temperature is 25.5 degrees.
        See "important documentation" for details.
        Use snake_case_variables appropriately.
        """
        
        terms = forgetting_engine._extract_important_terms(text)
        
        assert isinstance(terms, list)
        assert len(terms) <= 10  # Should be limited
        
        # Should extract various types of important terms
        terms_lower = [term.lower() for term in terms]
        term_str = " ".join(terms_lower)
        
        # Should find some patterns (exact matches may vary)
        assert len(terms) > 0  # At least some terms should be found
    
    @pytest.mark.asyncio
    async def test_archive_directories_creation(self, temp_archive_path):
        """Test that archive directories are created properly."""
        config = type('Config', (), {
            'relevance_threshold': 0.1,
            'access_threshold_days': 90,
            'archive_location': temp_archive_path
        })()
        
        engine = ControlledForgettingEngine(config)
        
        # Check that directories were created
        assert engine.archive_path.exists()
        assert engine.daily_archive.exists()
        assert engine.compressed_archive.exists()
        assert engine.metadata_archive.exists()
    
    @pytest.mark.asyncio
    async def test_empty_input_handling(self, forgetting_engine):
        """Test handling of empty inputs."""
        results = await forgetting_engine.process([], [])
        assert results == []
    
    @pytest.mark.asyncio
    async def test_time_horizon_filtering(self, forgetting_engine, sample_memories, sample_relevance_scores):
        """Test that time horizon affects forgetting behavior."""
        access_patterns = {
            sample_memories[0].content_hash: datetime.now() - timedelta(days=100)
        }
        
        # Test with different time horizons
        daily_results = await forgetting_engine.process(
            sample_memories[:2],
            sample_relevance_scores[:2],
            access_patterns=access_patterns,
            time_horizon="daily"
        )
        
        yearly_results = await forgetting_engine.process(
            sample_memories[:2],
            sample_relevance_scores[:2],
            access_patterns=access_patterns,
            time_horizon="yearly"
        )
        
        # Different time horizons may produce different results
        # At minimum, both should handle the input without errors
        assert isinstance(daily_results, list)
        assert isinstance(yearly_results, list)
    
    @pytest.mark.asyncio
    async def test_metadata_entry_creation(self, forgetting_engine):
        """Test creation of metadata log entries."""
        memory = Memory(
            content="Test memory for metadata",
            content_hash="metadata_test",
            tags=["test"],
            embedding=[0.1] * 320,
            created_at=datetime.now().timestamp()
        )
        
        archive_path = forgetting_engine.metadata_archive / "test_archive.json"
        
        await forgetting_engine._create_metadata_entry(memory, archive_path, "archived")
        
        # Check that log file was created
        log_file = forgetting_engine.metadata_archive / "forgetting_log.jsonl"
        assert log_file.exists()
        
        # Check log content
        with open(log_file, 'r') as f:
            log_content = f.read().strip()
        
        assert len(log_content) > 0
        
        # Parse the JSON line
        log_entry = json.loads(log_content.split('\n')[-1])  # Get last line
        assert log_entry["memory_hash"] == "metadata_test"
        assert log_entry["action"] == "archived"
```

--------------------------------------------------------------------------------
/claude-hooks/tests/phase2-integration-test.js:
--------------------------------------------------------------------------------

```javascript
#!/usr/bin/env node

/**
 * Phase 2 Integration Tests
 * Comprehensive testing for intelligent context updates and conversation awareness
 */

const path = require('path');

// Import Phase 2 components
const { analyzeConversation, detectTopicChanges } = require('../utilities/conversation-analyzer');
const { scoreMemoryRelevance } = require('../utilities/memory-scorer');
const { SessionTracker } = require('../utilities/session-tracker');
const { DynamicContextUpdater } = require('../utilities/dynamic-context-updater');

// Test utilities
function createMockMemory(content, tags = [], createdDaysAgo = 0) {
    const created = new Date();
    created.setDate(created.getDate() - createdDaysAgo);
    
    return {
        content: content,
        content_hash: `hash-${Math.random().toString(36).substr(2, 9)}`,
        tags: tags,
        created_at: created.toISOString(),
        memory_type: 'note'
    };
}

function createMockProjectContext() {
    return {
        name: 'mcp-memory-service',
        type: 'Multi-language Project',
        languages: ['javascript', 'python'],
        frameworks: ['node.js', 'fastapi'],
        tools: ['git', 'npm', 'pip'],
        confidence: 0.95
    };
}

// Test suite
class Phase2TestSuite {
    constructor() {
        this.testResults = [];
        this.totalTests = 0;
        this.passedTests = 0;
    }

    async runTest(testName, testFunction) {
        console.log(`\n🧪 Testing: ${testName}`);
        this.totalTests++;

        try {
            const result = await testFunction();
            if (result === true || result === undefined) {
                console.log(`✅ PASS: ${testName}`);
                this.passedTests++;
                this.testResults.push({ name: testName, status: 'PASS' });
            } else {
                console.log(`❌ FAIL: ${testName} - ${result}`);
                this.testResults.push({ name: testName, status: 'FAIL', reason: result });
            }
        } catch (error) {
            console.log(`❌ ERROR: ${testName} - ${error.message}`);
            this.testResults.push({ name: testName, status: 'ERROR', error: error.message });
        }
    }

    async runAllTests() {
        console.log('🚀 Phase 2 Integration Tests - Intelligent Context Updates');
        console.log('Testing conversation awareness, dynamic memory loading, and cross-session intelligence\n');

        // Conversation Analysis Tests
        await this.runTest('Conversation Analysis - Topic Detection', this.testTopicDetection);
        await this.runTest('Conversation Analysis - Entity Extraction', this.testEntityExtraction);
        await this.runTest('Conversation Analysis - Intent Detection', this.testIntentDetection);
        await this.runTest('Conversation Analysis - Code Context Detection', this.testCodeContextDetection);

        // Topic Change Detection Tests
        await this.runTest('Topic Change Detection - Significant Changes', this.testSignificantTopicChanges);
        await this.runTest('Topic Change Detection - Minor Changes', this.testMinorTopicChanges);

        // Enhanced Memory Scoring Tests
        await this.runTest('Enhanced Memory Scoring - Conversation Context', this.testConversationContextScoring);
        await this.runTest('Enhanced Memory Scoring - Weight Adjustment', this.testWeightAdjustment);

        // Session Tracking Tests  
        await this.runTest('Session Tracking - Session Creation', this.testSessionCreation);
        await this.runTest('Session Tracking - Conversation Threading', this.testConversationThreading);
        await this.runTest('Session Tracking - Cross-session Context', this.testCrossSessionContext);

        // Dynamic Context Update Tests
        await this.runTest('Dynamic Context Update - Update Triggering', this.testUpdateTriggering);
        await this.runTest('Dynamic Context Update - Rate Limiting', this.testRateLimiting);
        await this.runTest('Dynamic Context Update - Context Formatting', this.testContextFormatting);

        // Integration Tests
        await this.runTest('Full Integration - Conversation Flow', this.testFullConversationFlow);

        this.printSummary();
    }

    // Test implementations
    async testTopicDetection() {
        const conversationText = `
        I'm having issues with the database performance. The SQLite queries are running slowly
        and I think we need to optimize the memory service. Let's debug this architecture problem
        and implement a better caching solution.
        `;

        const analysis = analyzeConversation(conversationText);
        
        const topicNames = analysis.topics.map(t => t.name);
        const hasDbTopic = topicNames.includes('database');
        const hasDebuggingTopic = topicNames.includes('debugging');
        const hasArchTopic = topicNames.includes('architecture');

        if (!hasDbTopic) return 'Database topic not detected';
        if (!hasDebuggingTopic) return 'Debugging topic not detected';
        if (!hasArchTopic) return 'Architecture topic not detected';
        if (analysis.topics.length === 0) return 'No topics detected';

        console.log(`  Detected ${analysis.topics.length} topics: ${topicNames.join(', ')}`);
        return true;
    }

    async testEntityExtraction() {
        const conversationText = `
        We're using JavaScript with React for the frontend and Python with FastAPI for the backend.
        The database is PostgreSQL and we're deploying on AWS with Docker containers.
        `;

        const analysis = analyzeConversation(conversationText);
        
        const entityNames = analysis.entities.map(e => e.name);
        const hasJS = entityNames.includes('javascript');
        const hasReact = entityNames.includes('react');
        const hasPython = entityNames.includes('python');
        const hasFastAPI = entityNames.includes('fastapi');

        if (!hasJS) return 'JavaScript entity not detected';
        if (!hasReact) return 'React entity not detected'; 
        if (!hasPython) return 'Python entity not detected';

        console.log(`  Detected ${analysis.entities.length} entities: ${entityNames.join(', ')}`);
        return true;
    }

    async testIntentDetection() {
        const conversationText = `
        How do I fix this error in the authentication system? The JWT tokens are not validating
        properly and users can't log in. I need to solve this problem quickly.
        `;

        const analysis = analyzeConversation(conversationText);
        
        if (!analysis.intent) return 'Intent not detected';
        if (analysis.intent.name !== 'problem-solving') {
            return `Expected 'problem-solving' intent, got '${analysis.intent.name}'`;
        }
        if (analysis.intent.confidence < 0.5) {
            return `Intent confidence too low: ${analysis.intent.confidence}`;
        }

        console.log(`  Detected intent: ${analysis.intent.name} (${(analysis.intent.confidence * 100).toFixed(1)}%)`);
        return true;
    }

    async testCodeContextDetection() {
        const conversationText = `
        Here's the function that's causing issues:
        
        \`\`\`javascript
        function validateToken(token) {
            return jwt.verify(token, secret);
        }
        \`\`\`
        
        The error message is: "TokenExpiredError: jwt expired"
        Can you help me fix this in auth.js?
        `;

        const analysis = analyzeConversation(conversationText);
        
        if (!analysis.codeContext) return 'Code context not detected';
        if (!analysis.codeContext.isCodeRelated) return 'Code relationship not detected';
        if (!analysis.codeContext.hasCodeBlocks) return 'Code blocks not detected';
        if (!analysis.codeContext.hasErrorMessages) return 'Error messages not detected';
        if (!analysis.codeContext.hasFilePaths) return 'File paths not detected';

        console.log(`  Code context detected: languages=[${analysis.codeContext.languages.join(', ')}]`);
        return true;
    }

    async testSignificantTopicChanges() {
        const previousAnalysis = analyzeConversation('We are implementing a new authentication system using JWT tokens.');
        const currentAnalysis = analyzeConversation('Now I need to debug a database performance issue with slow queries.');

        const changes = detectTopicChanges(previousAnalysis, currentAnalysis);

        if (!changes.hasTopicShift) return 'Topic shift not detected';
        if (changes.significanceScore < 0.3) {
            return `Significance score too low: ${changes.significanceScore}`;
        }
        if (changes.newTopics.length === 0) return 'New topics not detected';

        console.log(`  Topic shift detected: score=${changes.significanceScore.toFixed(2)}, new topics=${changes.newTopics.length}`);
        return true;
    }

    async testMinorTopicChanges() {
        const previousAnalysis = analyzeConversation('We are implementing JWT authentication.');
        const currentAnalysis = analyzeConversation('Let me add better error handling to the authentication code.');

        const changes = detectTopicChanges(previousAnalysis, currentAnalysis);

        // Minor changes should have lower significance
        if (changes.hasTopicShift && changes.significanceScore > 0.5) {
            return `Significance score too high for minor change: ${changes.significanceScore}`;
        }

        console.log(`  Minor change detected correctly: score=${changes.significanceScore.toFixed(2)}`);
        return true;
    }

    async testConversationContextScoring() {
        const memories = [
            createMockMemory('Database optimization techniques for SQLite', ['database', 'optimization'], 1),
            createMockMemory('JWT authentication implementation guide', ['auth', 'jwt'], 2),
            createMockMemory('React component debugging tips', ['react', 'debugging'], 3)
        ];

        const projectContext = createMockProjectContext();
        const conversationAnalysis = analyzeConversation('I need help optimizing database queries for better performance');

        const scoredMemories = scoreMemoryRelevance(memories, projectContext, {
            includeConversationContext: true,
            conversationAnalysis: conversationAnalysis
        });

        // Database memory should score highest due to conversation context
        const dbMemory = scoredMemories.find(m => m.content.includes('Database optimization'));
        if (!dbMemory) return 'Database memory not found in results';
        if (dbMemory.relevanceScore < 0.35) {
            return `Database memory score too low: ${dbMemory.relevanceScore}`;
        }

        // Verify conversation context was used
        if (!dbMemory.scoreBreakdown.conversationRelevance) {
            return 'Conversation relevance not calculated';
        }

        console.log(`  Database memory scored highest: ${dbMemory.relevanceScore.toFixed(3)} (conversation: ${dbMemory.scoreBreakdown.conversationRelevance.toFixed(3)})`);
        return true;
    }

    async testWeightAdjustment() {
        const memory = createMockMemory('Authentication system implementation', ['auth'], 1);
        const projectContext = createMockProjectContext();
        const conversationAnalysis = analyzeConversation('How to implement authentication?');

        // Test with conversation context enabled
        const withContext = scoreMemoryRelevance([memory], projectContext, {
            includeConversationContext: true,
            conversationAnalysis: conversationAnalysis
        })[0];

        // Test without conversation context
        const withoutContext = scoreMemoryRelevance([memory], projectContext, {
            includeConversationContext: false
        })[0];

        if (!withContext.hasConversationContext) return 'Conversation context not enabled';
        if (withContext.hasConversationContext === withoutContext.hasConversationContext) {
            return 'Weight adjustment not applied';
        }

        console.log(`  Weight adjustment applied: with context=${withContext.relevanceScore.toFixed(3)}, without=${withoutContext.relevanceScore.toFixed(3)}`);
        return true;
    }

    async testSessionCreation() {
        const sessionTracker = new SessionTracker({
            trackingDataPath: path.join(__dirname, 'test-session-tracking.json')
        });

        await sessionTracker.initialize();

        const sessionId = 'test-session-' + Date.now();
        const context = {
            projectContext: createMockProjectContext(),
            workingDirectory: '/test/directory'
        };

        const session = await sessionTracker.startSession(sessionId, context);

        if (!session) return 'Session not created';
        if (session.id !== sessionId) return 'Session ID mismatch';
        if (session.status !== 'active') return 'Session status not active';
        if (!session.projectContext) return 'Project context not stored';

        console.log(`  Session created: ${session.id} for project ${session.projectContext.name}`);
        return true;
    }

    async testConversationThreading() {
        const sessionTracker = new SessionTracker({
            trackingDataPath: path.join(__dirname, 'test-threading.json')
        });

        await sessionTracker.initialize();

        const context = {
            projectContext: createMockProjectContext(),
            workingDirectory: '/test/directory'
        };

        // Create first session
        const session1 = await sessionTracker.startSession('session-1', context);
        await sessionTracker.endSession('session-1', { type: 'completed', summary: 'Implemented auth' });

        // Create related session
        const session2 = await sessionTracker.startSession('session-2', context);

        if (!session1.threadId) return 'Thread ID not created for first session';
        if (!session2.threadId) return 'Thread ID not created for second session';

        // Sessions should be linked if they're related
        const areLinked = session1.threadId === session2.threadId || 
                         session2.parentSessionId === session1.id;

        console.log(`  Threading: session1=${session1.threadId}, session2=${session2.threadId}, linked=${areLinked}`);
        return true;
    }

    async testCrossSessionContext() {
        const sessionTracker = new SessionTracker({
            trackingDataPath: path.join(__dirname, 'test-cross-session.json')
        });

        await sessionTracker.initialize();

        const projectContext = createMockProjectContext();

        // Create and end a session with outcome
        const session1 = await sessionTracker.startSession('cross-session-1', { projectContext });
        await sessionTracker.endSession('cross-session-1', {
            type: 'implementation',
            summary: 'Implemented user authentication',
            topics: ['auth', 'jwt']
        });

        // Get conversation context for new session
        const context = await sessionTracker.getConversationContext(projectContext);

        if (!context) return 'Cross-session context not retrieved';
        if (context.recentSessions.length === 0) return 'No recent sessions found';
        if (!context.projectName) return 'Project name not in context';

        console.log(`  Cross-session context: ${context.recentSessions.length} recent sessions for ${context.projectName}`);
        return true;
    }

    async testUpdateTriggering() {
        const updater = new DynamicContextUpdater({
            updateThreshold: 0.3,
            maxMemoriesPerUpdate: 2
        });

        await updater.initialize({
            projectContext: createMockProjectContext()
        });

        // Mock memory service config
        const mockConfig = {
            endpoint: 'https://mock.local:8443',
            apiKey: 'mock-key'
        };

        // Mock context injector
        let injectedContext = null;
        const mockInjector = (context) => {
            injectedContext = context;
        };

        // Simulate conversation with significant topic change
        const conversationText = 'I need help debugging this authentication error in the JWT validation';

        // This would normally trigger an update, but we'll simulate the decision logic
        const analysis = analyzeConversation(conversationText);
        const changes = detectTopicChanges(null, analysis);

        if (!changes.hasTopicShift) return 'Topic shift not detected for significant conversation change';
        if (changes.significanceScore < 0.3) return 'Significance score below threshold';

        console.log(`  Update would be triggered: significance=${changes.significanceScore.toFixed(2)}`);
        return true;
    }

    async testRateLimiting() {
        const updater = new DynamicContextUpdater({
            updateCooldownMs: 1000,  // 1 second cooldown
            maxUpdatesPerSession: 3
        });

        await updater.initialize({
            projectContext: createMockProjectContext()
        });

        // First update should be allowed
        if (!updater.shouldProcessUpdate()) return 'First update not allowed';

        // Simulate update
        updater.lastUpdateTime = Date.now();
        updater.updateCount = 1;

        // Immediate second update should be blocked by cooldown
        if (updater.shouldProcessUpdate()) return 'Cooldown not enforced';

        // After cooldown, should be allowed
        updater.lastUpdateTime = Date.now() - 2000; // 2 seconds ago
        if (!updater.shouldProcessUpdate()) return 'Update after cooldown not allowed';

        // But not if max updates reached
        updater.updateCount = 10; // Exceed max
        if (updater.shouldProcessUpdate()) return 'Max updates limit not enforced';

        console.log('  Rate limiting working correctly');
        return true;
    }

    async testContextFormatting() {
        const memories = [
            createMockMemory('Database optimization completed successfully', ['database', 'optimization'], 1),
            createMockMemory('JWT implementation guide for auth', ['auth', 'jwt'], 2)
        ];

        memories.forEach(memory => {
            memory.relevanceScore = 0.8;
        });

        const updater = new DynamicContextUpdater();
        const analysis = analyzeConversation('Working on database performance improvements');
        const changes = { newTopics: [{ name: 'database' }], changedIntents: false };

        const formatted = updater.formatContextUpdate(memories, analysis, changes, null);

        if (!formatted.includes('Dynamic Context Update')) return 'Header not found';
        if (!formatted.includes('New topics detected')) return 'Topic change not mentioned';
        if (!formatted.includes('Database optimization')) return 'Memory content not included';

        console.log('  Context formatting working correctly');
        return true;
    }

    async testFullConversationFlow() {
        // This test simulates a full conversation flow with topic changes
        const sessionTracker = new SessionTracker({
            trackingDataPath: path.join(__dirname, 'test-full-flow.json')
        });

        const updater = new DynamicContextUpdater({
            updateThreshold: 0.15,  // Even lower threshold for testing
            enableCrossSessionContext: true
        });

        await sessionTracker.initialize();
        await updater.initialize({
            projectContext: createMockProjectContext()
        });

        // Simulate conversation evolution
        const conversations = [
            'Starting work on authentication system implementation',
            'Now debugging database performance issues with slow queries and errors',
            'Switching focus to frontend React component optimization and testing framework'
        ];

        let lastAnalysis = null;
        let significantChanges = 0;

        for (let i = 0; i < conversations.length; i++) {
            const analysis = analyzeConversation(conversations[i]);
            
            if (lastAnalysis) {
                const changes = detectTopicChanges(lastAnalysis, analysis);
                if (changes.hasTopicShift && changes.significanceScore > 0.15) {
                    significantChanges++;
                }
            }
            
            lastAnalysis = analysis;
        }

        if (significantChanges < 2) {
            return `Expected at least 2 significant changes, got ${significantChanges}`;
        }

        console.log(`  Full conversation flow: ${significantChanges} significant topic changes detected`);
        return true;
    }

    printSummary() {
        console.log('\n============================================================');
        console.log('🎯 PHASE 2 TEST SUMMARY');
        console.log('============================================================');
        console.log(`Total Tests: ${this.totalTests}`);
        console.log(`✅ Passed: ${this.passedTests}`);
        console.log(`❌ Failed: ${this.totalTests - this.passedTests}`);
        console.log(`Success Rate: ${((this.passedTests / this.totalTests) * 100).toFixed(1)}%`);
        console.log('============================================================');

        if (this.passedTests === this.totalTests) {
            console.log('🎉 ALL PHASE 2 TESTS PASSED! Intelligent context updates ready.');
        } else {
            console.log('\n❌ Failed Tests:');
            this.testResults
                .filter(result => result.status !== 'PASS')
                .forEach(result => {
                    console.log(`  - ${result.name}: ${result.reason || result.error || 'Unknown error'}`);
                });
        }

        console.log('\n📋 Phase 2 Features Tested:');
        console.log('  ✅ Conversation Analysis & Topic Detection');
        console.log('  ✅ Dynamic Context Updates & Memory Loading');  
        console.log('  ✅ Enhanced Memory Scoring with Conversation Context');
        console.log('  ✅ Session Tracking & Cross-Session Intelligence');
        console.log('  ✅ Rate Limiting & Update Management');
        console.log('  ✅ Full Conversation Flow Integration');
    }
}

// Run tests if called directly
if (require.main === module) {
    const testSuite = new Phase2TestSuite();
    testSuite.runAllTests().catch(error => {
        console.error('Test suite failed:', error);
        process.exit(1);
    });
}

module.exports = Phase2TestSuite;
```

--------------------------------------------------------------------------------
/scripts/maintenance/consolidate_memory_types.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Memory Type Consolidation Script

Consolidates fragmented memory types into a standardized taxonomy.
Run with --dry-run to preview changes before executing.

⚠️ IMPORTANT SAFETY NOTES:
- Creates automatic backup before execution
- Stop HTTP server before running: systemctl --user stop mcp-memory-http.service
- Disconnect MCP clients (use /mcp in Claude Code)
- Database must not be locked or in use

Usage:
    python consolidate_memory_types.py --dry-run  # Preview changes (safe)
    python consolidate_memory_types.py            # Execute consolidation
    python consolidate_memory_types.py --config custom_mappings.json  # Use custom mappings
"""

import sqlite3
import sys
import os
import subprocess
import shutil
from pathlib import Path
from typing import Dict, Tuple, Optional
from collections import defaultdict
from datetime import datetime

# Database path (platform-aware)
import platform
if platform.system() == "Darwin":  # macOS
    DB_PATH = Path.home() / "Library/Application Support/mcp-memory/sqlite_vec.db"
elif platform.system() == "Windows":
    DB_PATH = Path(os.getenv('LOCALAPPDATA')) / "mcp-memory" / "sqlite_vec.db"
else:  # Linux and other Unix-like systems
    DB_PATH = Path.home() / ".local/share/mcp-memory/sqlite_vec.db"

# Version
VERSION = "1.0.0"

# Consolidation mapping: old_type -> new_type
# Special handling for empty strings, NULL values, and pattern-based consolidation
CONSOLIDATION_MAP: Dict[str, str] = {
    # Empty type and NULL -> note
    "": "note",
    None: "note",

    # Session variants -> session
    "session-summary": "session",
    "session-checkpoint": "session",
    "session-completion": "session",
    "session-context": "session",
    "analysis-session": "session",
    "development-session": "session",
    "development_session": "session",
    "maintenance-session": "session",
    "project-session": "session",

    # Special sessions -> troubleshooting
    "troubleshooting-session": "troubleshooting",
    "diagnostic-session": "troubleshooting",
    "technical-session": "troubleshooting",

    # Milestone and completion variants -> milestone
    "project-milestone": "milestone",
    "development-milestone": "milestone",
    "major-milestone": "milestone",
    "major_milestone": "milestone",
    "documentation-milestone": "milestone",
    "release-milestone": "milestone",
    "deployment-milestone": "milestone",
    "final-milestone": "milestone",
    "progress-milestone": "milestone",
    "mission-accomplished": "milestone",

    # Completion types -> milestone
    "completion": "milestone",
    "project-completion": "milestone",
    "work-completion": "milestone",
    "completion-summary": "milestone",
    "milestone-completion": "milestone",
    "release-completion": "milestone",
    "development-completion": "milestone",
    "documentation-completion": "milestone",
    "feature-completion": "milestone",
    "final-completion": "milestone",
    "implementation-completion": "milestone",
    "merge-completion": "milestone",
    "session-completion": "milestone",
    "workflow-complete": "milestone",

    # Technical prefix removal - documentation
    "technical-documentation": "documentation",

    # Technical prefix removal - implementation
    "technical-implementation": "implementation",

    # Technical prefix removal - solution
    "technical-solution": "solution",
    "technical solution": "solution",

    # Technical prefix removal - fix
    "technical-fix": "fix",

    # Technical prefix removal - analysis
    "technical-analysis": "analysis",

    # Technical prefix removal - reference
    "technical-reference": "reference",

    # Technical prefix removal - note
    "technical-note": "note",
    "technical-notes": "note",

    # Technical prefix removal - guide
    "technical-guide": "guide",
    "technical-guidance": "guide",
    "technical-howto": "guide",

    # Technical prefix removal - other
    "technical-specification": "architecture",
    "technical-decision": "architecture",
    "technical-design": "architecture",
    "technical-knowledge": "reference",
    "technical_knowledge": "reference",
    "technical-finding": "analysis",
    "technical-pattern": "architecture",
    "technical-rule": "process",
    "technical-process": "process",
    "technical-achievement": "achievement",
    "technical_achievement": "achievement",
    "technical-data": "document",
    "technical-diagram": "document",
    "technical-enhancement": "feature",
    "technical-problem": "troubleshooting",
    "technical-setup": "configuration",
    "technical-summary": "note",
    "technical-todo": "note",

    # Project prefix removal
    "project-documentation": "documentation",
    "project-status": "status",
    "project-summary": "note",
    "project-update": "status",
    "project-management": "process",
    "project-improvement": "feature",
    "project-action": "note",
    "project-event": "note",
    "project-final-update": "status",
    "project-goals": "note",
    "project-implementation": "implementation",
    "project-outcome": "milestone",
    "project-overview": "note",
    "project-policy": "process",
    "project-requirement": "note",
    "project-planning": "process",
    "project-plan": "process",
    "project-roadmap": "process",
    "project-strategy": "process",
    "project-task": "note",
    "project-timeline": "process",
    "project-tracker": "status",
    "project-workflow": "process",
    "project-issue": "troubleshooting",
    "project-problem": "troubleshooting",
    "project-challenge": "troubleshooting",
    "project-risk": "note",
    "project-solution": "solution",
    "project-result": "milestone",
    "project-success": "achievement",
    "project-failure": "note",
    "project-learning": "reference",
    "project-lesson": "reference",
    "project-feedback": "note",
    "project-review": "analysis",
    "project-assessment": "analysis",
    "project-evaluation": "analysis",
    "project-analysis": "analysis",
    "project-report": "analysis",
    "project-metrics": "analysis",
    "project-performance": "analysis",
    "project-impact": "analysis",
    "project-outcome-analysis": "analysis",
    "project-benefit": "achievement",
    "project-achievement": "achievement",

    # System and configuration
    "system-config": "configuration",
    "system-setup": "configuration",
    "server-config": "configuration",
    "server-configuration": "configuration",
    "system-configuration": "configuration",
    "infrastructure_setup": "configuration",
    "setup": "configuration",
    "setup-guide": "guide",
    "setup-memo": "configuration",
    "configuration-guide": "guide",
    "user-preference": "configuration",

    # Infrastructure
    "infrastructure-change": "infrastructure",
    "infrastructure-analysis": "infrastructure",
    "infrastructure-report": "infrastructure",

    # Process and workflow
    "workflow": "process",
    "procedure": "process",
    "workflow-guide": "guide",
    "process-guide": "guide",
    "process-improvement": "process",
    "action-plan": "process",
    "detailed-process": "process",
    "development-plan": "process",
    "comprehensive-plan": "process",
    "cleanup": "process",
    "maintenance": "process",

    # Installation and features
    "installation-guide": "guide",
    "feature-specification": "feature",
    "feature-summary": "feature",

    # General mappings
    "summary": "note",
    "memo": "note",
    "reminder": "note",
    "clarification": "note",
    "checkpoint": "note",
    "finding": "analysis",
    "report": "analysis",
    "analysis-summary": "analysis",
    "analysis-report": "analysis",
    "financial-analysis": "analysis",
    "security-analysis": "analysis",
    "verification": "test",
    "correction": "fix",
    "enhancement": "feature",
    "improvement": "feature",
    "improvement-summary": "feature",
    "fix-summary": "fix",
    "user-feedback": "note",
    "user-identity": "note",
    "user-account": "configuration",

    # Extended mappings from database analysis
    "marketing": "note",
    "support": "note",
    "integration": "implementation",
    "strategy-integration": "implementation",
    "methodology": "process",
    "guideline": "guide",
    "critical-lesson": "reference",
    "security-reminder": "security",
    "security-recovery": "security",
    "security-resolution": "security",
    "security-update": "security",
    "workflow-rule": "process",
    "professional_story": "note",

    # Additional types found in database
    "applescript-template": "document",
    "project": "note",
    "test-document": "test",
    "documentation-summary": "documentation",
    "documentation-final": "documentation",
    "fact": "note",
    "development-summary": "note",
    "lesson-learned": "reference",
    "reference-guide": "guide",
    "task": "note",
    "update": "status",

    # German terms (found in database)
    "Bankzahlung": "note",
    "Betrugsschema": "note",
    "Finanzbeweis": "note",
    "Strafanzeige": "note",

    # Analysis and investigation types
    "analysis-finding": "analysis",
    "analysis-start": "analysis",
    "comprehensive-analysis": "analysis",
    "final-analysis": "analysis",
    "investigation": "analysis",
    "issue-identification": "analysis",
    "issue_investigation": "analysis",
    "temporal-analysis": "analysis",
    "testing-insights": "analysis",

    # Architecture types
    "architecture-decision": "architecture",
    "architecture-visualization": "architecture",
    "concept-design": "architecture",
    "design-decision": "architecture",
    "tool-decision": "architecture",

    # Status and reporting
    "backup-record": "status",
    "maintenance-report": "status",
    "maintenance-summary": "status",
    "progress": "status",
    "progress-tracking": "status",
    "system-health-report": "status",
    "system-report": "status",

    # Reference types
    "best-practice": "reference",
    "learning": "reference",
    "lesson": "reference",
    "network-info": "reference",
    "principle": "reference",

    # Fix types
    "bug": "fix",
    "critical-fix": "fix",

    # Troubleshooting types
    "configuration-issue": "troubleshooting",
    "critical-issue": "troubleshooting",
    "debugging": "troubleshooting",
    "error": "troubleshooting",
    "problem-escalation": "troubleshooting",

    # Solution types
    "final-resolution": "solution",
    "solution-complete": "solution",
    "solution-design": "solution",
    "solution-implemented": "solution",

    # Test types
    "compatibility-test": "test",
    "debug-test": "test",
    "functionality-test": "test",
    "healthcheck_test": "test",
    "post-fix-test": "test",
    "post-restart-test": "test",
    "string-format-test": "test",
    "system_test": "test",
    "test-case": "test",
    "test-result": "test",
    "testing": "test",
    "validation": "test",
    "validation-results": "test",
    "verification-test": "test",

    # Guide types
    "comprehensive-guide": "guide",
    "tutorial_resource": "guide",

    # Document types
    "comprehensive_collection": "document",
    "strategy-document": "document",

    # Achievement types
    "success": "achievement",
    "work-achievement": "achievement",

    # Note types (various)
    "concept": "note",
    "contribution": "note",
    "critical-discovery": "note",
    "design-note": "note",
    "discovery": "note",
    "important-note": "note",
    "imported": "note",
    "network-limitation": "note",
    "reflection": "note",
    "server-behavior": "note",
    "system": "note",
    "tool": "note",
    "user-input": "note",
    "user-question": "note",
    "user-request": "note",
    "issue_creation": "note",
}

def check_http_server_running() -> bool:
    """Check if HTTP server is running (Linux only)."""
    try:
        # Check systemd service
        result = subprocess.run(
            ["systemctl", "--user", "is-active", "mcp-memory-http.service"],
            capture_output=True,
            text=True
        )
        return result.returncode == 0
    except (subprocess.SubprocessError, FileNotFoundError):
        # Not Linux or systemctl not available
        return False


def check_database_locked(db_path: Path) -> bool:
    """Check if database is currently locked."""
    try:
        # Try to open with a very short timeout
        conn = sqlite3.connect(db_path, timeout=0.1)
        cursor = conn.cursor()
        cursor.execute("BEGIN IMMEDIATE")
        conn.rollback()
        conn.close()
        return False
    except sqlite3.OperationalError:
        return True


def create_backup(db_path: Path, dry_run: bool = False) -> Optional[Path]:
    """Create a timestamped backup of the database."""
    if dry_run:
        return None

    timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
    backup_path = db_path.parent / f"{db_path.stem}.backup-{timestamp}{db_path.suffix}"

    try:
        shutil.copy2(db_path, backup_path)

        # Verify backup
        if not backup_path.exists():
            raise FileNotFoundError(f"Backup file not created: {backup_path}")

        if backup_path.stat().st_size != db_path.stat().st_size:
            raise ValueError(f"Backup size mismatch: {backup_path.stat().st_size} != {db_path.stat().st_size}")

        return backup_path
    except Exception as e:
        print(f"\n❌ Error creating backup: {e}")
        raise


def perform_safety_checks(db_path: Path, dry_run: bool = False) -> bool:
    """Perform all safety checks before consolidation."""
    print("\n" + "="*80)
    print("Safety Checks")
    print("="*80)

    all_passed = True

    # Check 1: Database exists
    if not db_path.exists():
        print("❌ Database not found at:", db_path)
        return False
    print(f"✓ Database found: {db_path}")

    # Check 2: Database is not locked
    if check_database_locked(db_path):
        print("❌ Database is currently locked (in use by another process)")
        print("   Stop HTTP server: systemctl --user stop mcp-memory-http.service")
        print("   Disconnect MCP: Use /mcp command in Claude Code")
        all_passed = False
    else:
        print("✓ Database is not locked")

    # Check 3: HTTP server status (Linux only)
    if os.name != 'nt':  # Not Windows
        if check_http_server_running():
            print("⚠️  HTTP server is running")
            print("   Recommended: systemctl --user stop mcp-memory-http.service")
            if not dry_run:
                response = input("   Continue anyway? (yes/no): ")
                if response.lower() != "yes":
                    all_passed = False
        else:
            print("✓ HTTP server is not running")

    # Check 4: Sufficient disk space
    stat = os.statvfs(db_path.parent)
    free_space = stat.f_bavail * stat.f_frsize
    db_size = db_path.stat().st_size
    if free_space < db_size * 2:  # Need at least 2x database size
        print(f"⚠️  Low disk space: {free_space / 1024**2:.1f} MB free, need {db_size * 2 / 1024**2:.1f} MB")
        all_passed = False
    else:
        print(f"✓ Sufficient disk space: {free_space / 1024**2:.1f} MB free")

    print("="*80)

    return all_passed


def analyze_database(conn: sqlite3.Connection) -> Tuple[Dict[str, int], int]:
    """Analyze current state of memory types."""
    cursor = conn.cursor()

    # Get type distribution
    cursor.execute("SELECT memory_type, COUNT(*) FROM memories GROUP BY memory_type")
    type_counts = {row[0]: row[1] for row in cursor.fetchall()}

    # Get total count
    cursor.execute("SELECT COUNT(*) FROM memories")
    total = cursor.fetchone()[0]

    return type_counts, total


def preview_consolidation(type_counts: Dict[str, int]) -> Dict[str, Dict[str, int]]:
    """Preview what the consolidation will do."""
    # Group by target type
    consolidation_preview = defaultdict(lambda: {"old_count": 0, "sources": []})
    unchanged = {}

    for old_type, count in type_counts.items():
        if old_type in CONSOLIDATION_MAP:
            new_type = CONSOLIDATION_MAP[old_type]
            consolidation_preview[new_type]["old_count"] += count
            consolidation_preview[new_type]["sources"].append(f"{old_type} ({count})")
        else:
            unchanged[old_type] = count

    return dict(consolidation_preview), unchanged


def execute_consolidation(conn: sqlite3.Connection, dry_run: bool = True) -> Tuple[int, Dict[str, int]]:
    """Execute the consolidation."""
    cursor = conn.cursor()
    total_updated = 0
    updates_by_type = defaultdict(int)

    if dry_run:
        print("\n" + "="*80)
        print("DRY RUN MODE - No changes will be made")
        print("="*80 + "\n")

    # Process each mapping
    for old_type, new_type in CONSOLIDATION_MAP.items():
        # Handle None/NULL specially
        if old_type is None:
            if dry_run:
                cursor.execute("SELECT COUNT(*) FROM memories WHERE memory_type IS NULL")
                count = cursor.fetchone()[0]
                if count > 0:
                    print(f"Would update {count:4d} memories: (None/NULL) → {new_type}")
                    total_updated += count
                    updates_by_type[new_type] += count
            else:
                cursor.execute("UPDATE memories SET memory_type = ? WHERE memory_type IS NULL", (new_type,))
                count = cursor.rowcount
                if count > 0:
                    print(f"Updated {count:4d} memories: (None/NULL) → {new_type}")
                    total_updated += count
                    updates_by_type[new_type] += count
        else:
            if dry_run:
                cursor.execute(
                    "SELECT COUNT(*) FROM memories WHERE memory_type = ?",
                    (old_type,)
                )
                count = cursor.fetchone()[0]
                if count > 0:
                    print(f"Would update {count:4d} memories: {old_type!r:40s} → {new_type}")
                    total_updated += count
                    updates_by_type[new_type] += count
            else:
                cursor.execute(
                    "UPDATE memories SET memory_type = ? WHERE memory_type = ?",
                    (new_type, old_type)
                )
                count = cursor.rowcount
                if count > 0:
                    print(f"Updated {count:4d} memories: {old_type!r:40s} → {new_type}")
                    total_updated += count
                    updates_by_type[new_type] += count

    return total_updated, dict(updates_by_type)


def main():
    """Main execution."""
    dry_run = "--dry-run" in sys.argv

    print(f"\nMemory Type Consolidation Script v{VERSION}")
    print(f"Database: {DB_PATH}")
    print(f"Mode: {'DRY RUN (preview only)' if dry_run else 'LIVE EXECUTION'}")
    print("="*80)

    # Perform safety checks
    if not perform_safety_checks(DB_PATH, dry_run):
        print("\n❌ Safety checks failed. Aborting.")
        sys.exit(1)

    # Create backup (unless dry-run)
    if not dry_run:
        print("\nCreating backup...")
        try:
            backup_path = create_backup(DB_PATH, dry_run)
            if backup_path:
                print(f"✓ Backup created: {backup_path}")
                print(f"  Size: {backup_path.stat().st_size / 1024**2:.2f} MB")
        except Exception as e:
            print(f"❌ Failed to create backup: {e}")
            sys.exit(1)

    # Connect to database
    conn = sqlite3.connect(DB_PATH, timeout=30)

    try:
        # Analyze current state
        print("\nAnalyzing current state...")
        type_counts, total = analyze_database(conn)
        unique_types = len(type_counts)

        print(f"\nCurrent State:")
        print(f"  Total memories: {total:,}")
        print(f"  Unique types: {unique_types}")
        print(f"  Empty type: {type_counts.get('', 0)}")

        # Preview consolidation
        print("\nConsolidation Preview:")
        consolidation_preview, unchanged = preview_consolidation(type_counts)

        print(f"\nTypes that will be consolidated:")
        for new_type in sorted(consolidation_preview.keys()):
            info = consolidation_preview[new_type]
            print(f"\n  {new_type}: {info['old_count']} memories from {len(info['sources'])} sources")
            for source in sorted(info['sources']):
                print(f"    ← {source}")

        print(f"\nTypes that will remain unchanged: {len(unchanged)}")
        for old_type, count in sorted(unchanged.items(), key=lambda x: -x[1])[:20]:
            type_display = old_type if old_type is not None else "(None/NULL)"
            print(f"  {type_display:40s} {count:4d}")
        if len(unchanged) > 20:
            print(f"  ... and {len(unchanged) - 20} more")

        # Execute consolidation
        print("\n" + "="*80)
        if not dry_run:
            response = input("\nProceed with consolidation? (yes/no): ")
            if response.lower() != "yes":
                print("Consolidation cancelled.")
                return

        total_updated, updates_by_type = execute_consolidation(conn, dry_run)

        if not dry_run:
            conn.commit()
            print(f"\n✓ Consolidation complete!")

        print(f"\nTotal memories updated: {total_updated:,}")
        print(f"\nBreakdown by target type:")
        for new_type in sorted(updates_by_type.keys(), key=lambda x: -updates_by_type[x]):
            print(f"  {new_type:30s} +{updates_by_type[new_type]:4d}")

        # Show final state
        if not dry_run:
            print("\nAnalyzing final state...")
            final_type_counts, final_total = analyze_database(conn)
            final_unique_types = len(final_type_counts)

            print(f"\nFinal State:")
            print(f"  Total memories: {final_total:,}")
            print(f"  Unique types: {final_unique_types}")
            print(f"  Reduction: {unique_types} → {final_unique_types} types ({unique_types - final_unique_types} removed)")

            print(f"\nTop types by count:")
            for memory_type, count in sorted(final_type_counts.items(), key=lambda x: -x[1])[:25]:
                pct = (count / final_total) * 100
                print(f"  {memory_type:30s} {count:4d} ({pct:5.1f}%)")

    except Exception as e:
        print(f"\nError: {e}")
        if not dry_run:
            conn.rollback()
            print("Changes rolled back.")
        raise

    finally:
        conn.close()

    if dry_run:
        print("\n" + "="*80)
        print("DRY RUN COMPLETE - Run without --dry-run to execute")
        print("="*80)


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/tests/unit/test_cloudflare_storage.py:
--------------------------------------------------------------------------------

```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for Cloudflare storage backend."""

import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
from typing import List

from src.mcp_memory_service.storage.cloudflare import CloudflareStorage
from src.mcp_memory_service.models.memory import Memory
from src.mcp_memory_service.utils.hashing import generate_content_hash


@pytest.fixture
def cloudflare_storage():
    """Create a CloudflareStorage instance for testing."""
    return CloudflareStorage(
        api_token="test-token",
        account_id="test-account",
        vectorize_index="test-index",
        d1_database_id="test-db",
        r2_bucket="test-bucket",
        embedding_model="@cf/baai/bge-base-en-v1.5"
    )


@pytest.fixture
def sample_memory():
    """Create a sample memory for testing."""
    content = "This is a test memory"
    return Memory(
        content=content,
        content_hash=generate_content_hash(content),
        tags=["test", "memory"],
        memory_type="standard"
    )


class TestCloudflareStorage:
    """Test suite for CloudflareStorage."""
    
    def test_initialization(self, cloudflare_storage):
        """Test CloudflareStorage initialization."""
        assert cloudflare_storage.api_token == "test-token"
        assert cloudflare_storage.account_id == "test-account"
        assert cloudflare_storage.vectorize_index == "test-index"
        assert cloudflare_storage.d1_database_id == "test-db"
        assert cloudflare_storage.r2_bucket == "test-bucket"
        assert not cloudflare_storage._initialized
        
    @pytest.mark.asyncio
    async def test_get_client(self, cloudflare_storage):
        """Test HTTP client creation."""
        client = await cloudflare_storage._get_client()
        assert client is not None
        assert cloudflare_storage.client == client
        
        # Verify headers are set correctly
        assert "Authorization" in client.headers
        assert client.headers["Authorization"] == "Bearer test-token"
        assert client.headers["Content-Type"] == "application/json"
        
    @pytest.mark.asyncio
    async def test_generate_embedding_cache(self, cloudflare_storage):
        """Test embedding generation and caching."""
        test_text = "Test content for embedding"
        
        # Mock the API call
        mock_response = Mock()
        mock_response.json.return_value = {
            "success": True,
            "result": {"data": [[0.1, 0.2, 0.3, 0.4, 0.5]]}
        }
        
        with patch.object(cloudflare_storage, '_retry_request', return_value=mock_response):
            # First call should make API request
            embedding1 = await cloudflare_storage._generate_embedding(test_text)
            assert embedding1 == [0.1, 0.2, 0.3, 0.4, 0.5]
            
            # Second call should use cache
            embedding2 = await cloudflare_storage._generate_embedding(test_text)
            assert embedding2 == [0.1, 0.2, 0.3, 0.4, 0.5]
            assert embedding1 == embedding2
            
            # Verify cache is populated
            assert len(cloudflare_storage._embedding_cache) == 1
    
    @pytest.mark.asyncio
    async def test_embedding_api_failure(self, cloudflare_storage):
        """Test handling of embedding API failures."""
        test_text = "Test content"
        
        # Mock failed API response
        mock_response = Mock()
        mock_response.json.return_value = {
            "success": False,
            "errors": ["API error"]
        }
        
        with patch.object(cloudflare_storage, '_retry_request', return_value=mock_response):
            with pytest.raises(ValueError, match="Workers AI embedding failed"):
                await cloudflare_storage._generate_embedding(test_text)
    
    @pytest.mark.asyncio
    async def test_retry_logic(self, cloudflare_storage):
        """Test retry logic with rate limiting."""
        import httpx
        
        # Mock rate limited response followed by success
        responses = [
            Mock(status_code=429, raise_for_status=Mock(side_effect=httpx.HTTPStatusError("Rate limited", request=Mock(), response=Mock()))),
            Mock(status_code=200, raise_for_status=Mock(), json=Mock(return_value={"success": True}))
        ]
        
        with patch('httpx.AsyncClient.request', side_effect=responses):
            with patch('asyncio.sleep'):  # Speed up test
                response = await cloudflare_storage._retry_request("GET", "https://test.com")
                assert response.status_code == 200
    
    @pytest.mark.asyncio
    async def test_initialization_schema_creation(self, cloudflare_storage):
        """Test D1 schema initialization."""
        mock_response = Mock()
        mock_response.json.return_value = {"success": True}
        
        with patch.object(cloudflare_storage, '_retry_request', return_value=mock_response) as mock_request:
            with patch.object(cloudflare_storage, '_verify_vectorize_index'):
                with patch.object(cloudflare_storage, '_verify_r2_bucket'):
                    await cloudflare_storage.initialize()
                    
                    # Verify D1 schema creation was called
                    assert any("CREATE TABLE" in str(call) for call in mock_request.call_args_list)
                    assert cloudflare_storage._initialized
    
    @pytest.mark.asyncio
    async def test_store_memory_small_content(self, cloudflare_storage, sample_memory):
        """Test storing memory with small content (no R2)."""
        # Mock successful responses
        mock_embedding = [0.1, 0.2, 0.3]
        mock_d1_response = Mock()
        mock_d1_response.json.return_value = {
            "success": True,
            "result": [{"meta": {"last_row_id": 123}}]
        }

        with patch.object(cloudflare_storage, '_generate_embedding', return_value=mock_embedding):
            # Mock Vectorize storage (bypasses HTTP client) - must be AsyncMock for async method
            with patch.object(cloudflare_storage, '_store_vectorize_vector', new_callable=AsyncMock):
                with patch.object(cloudflare_storage, '_retry_request') as mock_request:
                    # Need 5 responses: 1 for memory insert + 2 tags * 2 calls each (insert + link)
                    mock_request.side_effect = [
                        mock_d1_response,  # Insert memory
                        mock_d1_response,  # Insert tag "test"
                        mock_d1_response,  # Link tag "test"
                        mock_d1_response,  # Insert tag "memory"
                        mock_d1_response   # Link tag "memory"
                    ]

                    success, message = await cloudflare_storage.store(sample_memory)

                    assert success
                    assert "successfully" in message.lower()

                    # Verify all D1 calls were made
                    assert mock_request.call_count == 5
    
    @pytest.mark.asyncio
    async def test_store_memory_large_content(self, cloudflare_storage):
        """Test storing memory with large content (uses R2)."""
        # Create memory with large content
        large_content = "x" * (2 * 1024 * 1024)  # 2MB content
        memory = Memory(
            content=large_content,
            content_hash=generate_content_hash(large_content),
            tags=["large"],
            memory_type="standard"
        )

        mock_embedding = [0.1, 0.2, 0.3]
        mock_response = Mock()
        mock_response.json.return_value = {"success": True, "result": [{"meta": {"last_row_id": 123}}]}
        mock_response.status_code = 200

        with patch.object(cloudflare_storage, '_generate_embedding', return_value=mock_embedding):
            # Mock Vectorize storage (bypasses HTTP client) - must be AsyncMock for async method
            with patch.object(cloudflare_storage, '_store_vectorize_vector', new_callable=AsyncMock):
                with patch.object(cloudflare_storage, '_retry_request', return_value=mock_response):
                    success, message = await cloudflare_storage.store(memory)

                    assert success
                assert "successfully" in message.lower()
    
    @pytest.mark.asyncio
    async def test_retrieve_memories(self, cloudflare_storage):
        """Test retrieving memories by semantic search."""
        mock_embedding = [0.1, 0.2, 0.3]
        mock_vectorize_response = Mock()
        mock_vectorize_response.json.return_value = {
            "success": True,
            "result": {
                "matches": [{
                    "id": "mem_test123",
                    "score": 0.95,
                    "metadata": {"content_hash": "test123"}
                }]
            }
        }
        
        mock_d1_response = Mock()
        mock_d1_response.json.return_value = {
            "success": True,
            "result": [{
                "results": [{
                    "id": 1,
                    "content_hash": "test123",
                    "content": "Test memory content",
                    "memory_type": "standard",
                    "created_at": 1234567890,
                    "metadata_json": "{}"
                }]
            }]
        }
        
        # Mock tag loading
        mock_tags_response = Mock()
        mock_tags_response.json.return_value = {
            "success": True,
            "result": [{"results": [{"name": "test"}, {"name": "memory"}]}]
        }
        
        with patch.object(cloudflare_storage, '_generate_embedding', return_value=mock_embedding):
            with patch.object(cloudflare_storage, '_retry_request') as mock_request:
                mock_request.side_effect = [mock_vectorize_response, mock_d1_response, mock_tags_response]
                
                results = await cloudflare_storage.retrieve("test query", 5)
                
                assert len(results) == 1
                assert results[0].similarity_score == 0.95
                assert results[0].memory.content == "Test memory content"
                assert results[0].memory.content_hash == "test123"
    
    @pytest.mark.asyncio
    async def test_search_by_tag(self, cloudflare_storage):
        """Test searching memories by tags."""
        mock_d1_response = Mock()
        mock_d1_response.json.return_value = {
            "success": True,
            "result": [{
                "results": [{
                    "id": 1,
                    "content_hash": "test123",
                    "content": "Tagged memory",
                    "memory_type": "standard"
                }]
            }]
        }
        
        mock_tags_response = Mock()
        mock_tags_response.json.return_value = {
            "success": True,
            "result": [{"results": [{"name": "test"}]}]
        }

        with patch.object(cloudflare_storage, '_retry_request') as mock_request:
            mock_request.side_effect = [mock_d1_response, mock_tags_response]

            memories = await cloudflare_storage.search_by_tag(["test"])

            assert len(memories) == 1
            assert memories[0].content == "Tagged memory"
            assert memories[0].content_hash == "test123"

    @pytest.mark.asyncio
    async def test_search_by_tags_or_operation(self, cloudflare_storage):
        """Test search_by_tags uses OR semantics by default."""
        mock_d1_response = Mock()
        mock_d1_response.json.return_value = {
            "success": True,
            "result": [{
                "results": [{
                    "id": 2,
                    "content_hash": "abc123",
                    "content": "Multi tag memory",
                    "memory_type": "standard"
                }]
            }]
        }

        mock_tags_response = Mock()
        mock_tags_response.json.return_value = {
            "success": True,
            "result": [{"results": [{"name": "alpha"}, {"name": "beta"}]}]
        }

        with patch.object(cloudflare_storage, '_retry_request') as mock_request:
            mock_request.side_effect = [mock_d1_response, mock_tags_response]

            memories = await cloudflare_storage.search_by_tags(["alpha", "beta"], operation="OR")

            assert len(memories) == 1
            assert memories[0].content_hash == "abc123"

            query_payload = mock_request.call_args_list[0].kwargs["json"]
            assert "HAVING" not in query_payload["sql"].upper()
            assert query_payload["params"] == ["alpha", "beta"]

    @pytest.mark.asyncio
    async def test_search_by_tags_and_operation(self, cloudflare_storage):
        """Test search_by_tags enforces AND semantics when requested."""
        mock_d1_response = Mock()
        mock_d1_response.json.return_value = {
            "success": True,
            "result": [{
                "results": [{
                    "id": 3,
                    "content_hash": "def456",
                    "content": "All tag match",
                    "memory_type": "standard"
                }]
            }]
        }

        mock_tags_response = Mock()
        mock_tags_response.json.return_value = {
            "success": True,
            "result": [{"results": [{"name": "alpha"}, {"name": "beta"}]}]
        }

        with patch.object(cloudflare_storage, '_retry_request') as mock_request:
            mock_request.side_effect = [mock_d1_response, mock_tags_response]

            memories = await cloudflare_storage.search_by_tags(["alpha", "beta"], operation="AND")

            assert len(memories) == 1
            assert memories[0].content_hash == "def456"

            query_payload = mock_request.call_args_list[0].kwargs["json"]
            assert "HAVING COUNT(DISTINCT T.NAME) = ?" in query_payload["sql"].upper()
            assert query_payload["params"] == ["alpha", "beta", 2]

    @pytest.mark.asyncio
    async def test_get_all_tags(self, cloudflare_storage):
        """Test retrieving all distinct tags from Cloudflare storage."""
        mock_response = Mock()
        mock_response.json.return_value = {
            "success": True,
            "result": [{
                "results": [
                    {"name": "alpha"},
                    {"name": "beta"}
                ]
            }]
        }

        with patch.object(cloudflare_storage, '_retry_request', return_value=mock_response) as mock_request:
            tags = await cloudflare_storage.get_all_tags()
            assert tags == ["alpha", "beta"]
            assert mock_request.called

    @pytest.mark.asyncio
    async def test_get_all_tags_with_counts(self, cloudflare_storage):
        """Test retrieving tag usage counts using memory_tags join."""
        mock_response = Mock()
        mock_response.json.return_value = {
            "success": True,
            "result": [{
                "results": [
                    {"tag": "alpha", "count": 5},
                    {"tag": "beta", "count": 2}
                ]
            }]
        }

        with patch.object(cloudflare_storage, '_retry_request', return_value=mock_response) as mock_request:
            tags_with_counts = await cloudflare_storage.get_all_tags_with_counts()
            assert tags_with_counts == [
                {"tag": "alpha", "count": 5},
                {"tag": "beta", "count": 2}
            ]
            sql_payload = mock_request.call_args.kwargs["json"]["sql"].lower()
            assert "left join" in sql_payload and "group by" in sql_payload

    @pytest.mark.asyncio
    async def test_get_all_memories_with_tag_filter(self, cloudflare_storage):
        """Ensure list_memories SQL filters through memory_tags join."""
        memory_row = {
            "id": 1,
            "content_hash": "abc123",
            "content": "filtered content",
            "memory_type": "standard",
            "metadata_json": "{}",
            "created_at": 123,
            "created_at_iso": "2025-11-17T00:00:00Z",
            "updated_at": None,
            "updated_at_iso": None
        }

        mock_memories_response = Mock()
        mock_memories_response.json.return_value = {
            "success": True,
            "result": [{"results": [memory_row]}]
        }

        mock_tags_response = Mock()
        mock_tags_response.json.return_value = {
            "success": True,
            "result": [{"results": [{"name": "status:initialized"}]}]
        }

        with patch.object(cloudflare_storage, '_retry_request') as mock_request:
            mock_request.side_effect = [mock_memories_response, mock_tags_response]

            memories = await cloudflare_storage.get_all_memories(limit=10, tags=["status:initialized"])

            assert len(memories) == 1
            sql_payload = mock_request.call_args_list[0].kwargs["json"]
            sql_text = sql_payload["sql"].upper()
            assert "JOIN MEMORY_TAGS" in sql_text and "HAVING COUNT(DISTINCT T.NAME) = ?" in sql_text
            assert sql_payload["params"] == ["status:initialized", 1, 10]

    @pytest.mark.asyncio
    async def test_count_all_memories_with_tag_filter(self, cloudflare_storage):
        """Ensure count uses distinct memory IDs and tag filtering."""
        mock_response = Mock()
        mock_response.json.return_value = {
            "success": True,
            "result": [{"results": [{"count": 3}]}]
        }

        with patch.object(cloudflare_storage, '_retry_request', return_value=mock_response) as mock_request:
            count = await cloudflare_storage.count_all_memories(tags=["alpha", "beta"])

            assert count == 3
            sql_payload = mock_request.call_args.kwargs["json"]
            sql_text = sql_payload["sql"].upper()
            assert "COUNT(*) AS COUNT FROM" in sql_text and "HAVING COUNT(DISTINCT T.NAME) = ?" in sql_text
            assert sql_payload["params"] == ["alpha", "beta", 2]

    @pytest.mark.asyncio
    async def test_delete_memory(self, cloudflare_storage):
        """Test deleting a memory."""
        mock_find_response = Mock()
        mock_find_response.json.return_value = {
            "success": True,
            "result": [{
                "results": [{
                    "id": 1,
                    "vector_id": "mem_test123",
                    "r2_key": None
                }]
            }]
        }
        
        mock_delete_response = Mock()
        mock_delete_response.json.return_value = {"success": True}
        
        with patch.object(cloudflare_storage, '_retry_request') as mock_request:
            mock_request.side_effect = [mock_find_response, mock_delete_response, mock_delete_response]
            
            success, message = await cloudflare_storage.delete("test123")
            
            assert success
            assert "successfully" in message.lower()
    
    @pytest.mark.asyncio
    async def test_get_stats(self, cloudflare_storage):
        """Test getting storage statistics."""
        mock_response = Mock()
        mock_response.json.return_value = {
            "success": True,
            "result": [{
                "results": [{
                    "total_memories": 10,
                    "total_content_size": 1024,
                    "total_vectors": 10,
                    "r2_stored_count": 2
                }]
            }]
        }
        
        with patch.object(cloudflare_storage, '_retry_request', return_value=mock_response):
            stats = await cloudflare_storage.get_stats()
            
            assert stats["total_memories"] == 10
            assert stats["total_content_size_bytes"] == 1024
            assert stats["storage_backend"] == "cloudflare"
            assert stats["vectorize_index"] == "test-index"
            assert stats["status"] == "operational"
    
    @pytest.mark.asyncio
    async def test_cleanup_duplicates(self, cloudflare_storage):
        """Test cleaning up duplicate memories."""
        mock_find_response = Mock()
        mock_find_response.json.return_value = {
            "success": True,
            "result": [{
                "results": [{
                    "content_hash": "duplicate123",
                    "count": 3,
                    "keep_id": 1
                }]
            }]
        }
        
        mock_delete_response = Mock()
        mock_delete_response.json.return_value = {
            "success": True,
            "result": [{"meta": {"changes": 2}}]
        }
        
        with patch.object(cloudflare_storage, '_retry_request') as mock_request:
            mock_request.side_effect = [mock_find_response, mock_delete_response]
            
            count, message = await cloudflare_storage.cleanup_duplicates()
            
            assert count == 2
            assert "2 duplicates" in message
    
    @pytest.mark.asyncio
    async def test_close(self, cloudflare_storage):
        """Test closing the storage backend."""
        # Create a mock client
        mock_client = AsyncMock()
        cloudflare_storage.client = mock_client
        cloudflare_storage._embedding_cache = {"test": [1, 2, 3]}

        await cloudflare_storage.close()

        # Verify client was closed and cache cleared
        mock_client.aclose.assert_called_once()
        assert cloudflare_storage.client is None
        assert len(cloudflare_storage._embedding_cache) == 0

    def test_sanitized_method(self, cloudflare_storage):
        """Test tag sanitization method."""
        # Test with None
        assert cloudflare_storage.sanitized(None) == "[]"

        # Test with string
        assert cloudflare_storage.sanitized("tag1,tag2,tag3") == '["tag1", "tag2", "tag3"]'

        # Test with list
        assert cloudflare_storage.sanitized(["tag1", "tag2"]) == '["tag1", "tag2"]'

        # Test with empty string
        assert cloudflare_storage.sanitized("") == "[]"

        # Test with empty list
        assert cloudflare_storage.sanitized([]) == "[]"

        # Test with mixed types in list
        assert cloudflare_storage.sanitized([1, "tag2", 3.14]) == '["1", "tag2", "3.14"]'

```
Page 23/35FirstPrevNextLast