#
tokens: 49321/50000 13/625 files (page 18/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 18 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── amp-bridge.md
│   │   ├── amp-pr-automator.md
│   │   ├── code-quality-guard.md
│   │   ├── gemini-pr-automator.md
│   │   └── github-release-manager.md
│   ├── settings.local.json.backup
│   └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   ├── feature_request.yml
│   │   └── performance_issue.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── bridge-tests.yml
│       ├── CACHE_FIX.md
│       ├── claude-code-review.yml
│       ├── claude.yml
│       ├── cleanup-images.yml.disabled
│       ├── dev-setup-validation.yml
│       ├── docker-publish.yml
│       ├── LATEST_FIXES.md
│       ├── main-optimized.yml.disabled
│       ├── main.yml
│       ├── publish-and-test.yml
│       ├── README_OPTIMIZATION.md
│       ├── release-tag.yml.disabled
│       ├── release.yml
│       ├── roadmap-review-reminder.yml
│       ├── SECRET_CONDITIONAL_FIX.md
│       └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│   ├── .gitignore
│   └── reports
│       └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│   ├── deployment
│   │   ├── deploy_fastmcp_fixed.sh
│   │   ├── deploy_http_with_mcp.sh
│   │   └── deploy_mcp_v4.sh
│   ├── deployment-configs
│   │   ├── empty_config.yml
│   │   └── smithery.yaml
│   ├── development
│   │   └── test_fastmcp.py
│   ├── docs-removed-2025-08-23
│   │   ├── authentication.md
│   │   ├── claude_integration.md
│   │   ├── claude-code-compatibility.md
│   │   ├── claude-code-integration.md
│   │   ├── claude-code-quickstart.md
│   │   ├── claude-desktop-setup.md
│   │   ├── complete-setup-guide.md
│   │   ├── database-synchronization.md
│   │   ├── development
│   │   │   ├── autonomous-memory-consolidation.md
│   │   │   ├── CLEANUP_PLAN.md
│   │   │   ├── CLEANUP_README.md
│   │   │   ├── CLEANUP_SUMMARY.md
│   │   │   ├── dream-inspired-memory-consolidation.md
│   │   │   ├── hybrid-slm-memory-consolidation.md
│   │   │   ├── mcp-milestone.md
│   │   │   ├── multi-client-architecture.md
│   │   │   ├── test-results.md
│   │   │   └── TIMESTAMP_FIX_SUMMARY.md
│   │   ├── distributed-sync.md
│   │   ├── invocation_guide.md
│   │   ├── macos-intel.md
│   │   ├── master-guide.md
│   │   ├── mcp-client-configuration.md
│   │   ├── multi-client-server.md
│   │   ├── service-installation.md
│   │   ├── sessions
│   │   │   └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│   │   ├── UBUNTU_SETUP.md
│   │   ├── ubuntu.md
│   │   ├── windows-setup.md
│   │   └── windows.md
│   ├── docs-root-cleanup-2025-08-23
│   │   ├── AWESOME_LIST_SUBMISSION.md
│   │   ├── CLOUDFLARE_IMPLEMENTATION.md
│   │   ├── DOCUMENTATION_ANALYSIS.md
│   │   ├── DOCUMENTATION_CLEANUP_PLAN.md
│   │   ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│   │   ├── LITESTREAM_SETUP_GUIDE.md
│   │   ├── lm_studio_system_prompt.md
│   │   ├── PYTORCH_DOWNLOAD_FIX.md
│   │   └── README-ORIGINAL-BACKUP.md
│   ├── investigations
│   │   └── MACOS_HOOKS_INVESTIGATION.md
│   ├── litestream-configs-v6.3.0
│   │   ├── install_service.sh
│   │   ├── litestream_master_config_fixed.yml
│   │   ├── litestream_master_config.yml
│   │   ├── litestream_replica_config_fixed.yml
│   │   ├── litestream_replica_config.yml
│   │   ├── litestream_replica_simple.yml
│   │   ├── litestream-http.service
│   │   ├── litestream.service
│   │   └── requirements-cloudflare.txt
│   ├── release-notes
│   │   └── release-notes-v7.1.4.md
│   └── setup-development
│       ├── README.md
│       ├── setup_consolidation_mdns.sh
│       ├── STARTUP_SETUP_GUIDE.md
│       └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│   ├── memory-context.md
│   ├── memory-health.md
│   ├── memory-ingest-dir.md
│   ├── memory-ingest.md
│   ├── memory-recall.md
│   ├── memory-search.md
│   ├── memory-store.md
│   ├── README.md
│   └── session-start.md
├── claude-hooks
│   ├── config.json
│   ├── config.template.json
│   ├── CONFIGURATION.md
│   ├── core
│   │   ├── memory-retrieval.js
│   │   ├── mid-conversation.js
│   │   ├── session-end.js
│   │   ├── session-start.js
│   │   └── topic-change.js
│   ├── debug-pattern-test.js
│   ├── install_claude_hooks_windows.ps1
│   ├── install_hooks.py
│   ├── memory-mode-controller.js
│   ├── MIGRATION.md
│   ├── README-NATURAL-TRIGGERS.md
│   ├── README-phase2.md
│   ├── README.md
│   ├── simple-test.js
│   ├── statusline.sh
│   ├── test-adaptive-weights.js
│   ├── test-dual-protocol-hook.js
│   ├── test-mcp-hook.js
│   ├── test-natural-triggers.js
│   ├── test-recency-scoring.js
│   ├── tests
│   │   ├── integration-test.js
│   │   ├── phase2-integration-test.js
│   │   ├── test-code-execution.js
│   │   ├── test-cross-session.json
│   │   ├── test-session-tracking.json
│   │   └── test-threading.json
│   ├── utilities
│   │   ├── adaptive-pattern-detector.js
│   │   ├── context-formatter.js
│   │   ├── context-shift-detector.js
│   │   ├── conversation-analyzer.js
│   │   ├── dynamic-context-updater.js
│   │   ├── git-analyzer.js
│   │   ├── mcp-client.js
│   │   ├── memory-client.js
│   │   ├── memory-scorer.js
│   │   ├── performance-manager.js
│   │   ├── project-detector.js
│   │   ├── session-tracker.js
│   │   ├── tiered-conversation-monitor.js
│   │   └── version-checker.js
│   └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│   ├── amp-cli-bridge.md
│   ├── api
│   │   ├── code-execution-interface.md
│   │   ├── memory-metadata-api.md
│   │   ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│   │   ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│   │   ├── PHASE2_REPORT.md
│   │   └── tag-standardization.md
│   ├── architecture
│   │   ├── search-enhancement-spec.md
│   │   └── search-examples.md
│   ├── architecture.md
│   ├── archive
│   │   └── obsolete-workflows
│   │       ├── load_memory_context.md
│   │       └── README.md
│   ├── assets
│   │   └── images
│   │       ├── dashboard-v3.3.0-preview.png
│   │       ├── memory-awareness-hooks-example.png
│   │       ├── project-infographic.svg
│   │       └── README.md
│   ├── CLAUDE_CODE_QUICK_REFERENCE.md
│   ├── cloudflare-setup.md
│   ├── deployment
│   │   ├── docker.md
│   │   ├── dual-service.md
│   │   ├── production-guide.md
│   │   └── systemd-service.md
│   ├── development
│   │   ├── ai-agent-instructions.md
│   │   ├── code-quality
│   │   │   ├── phase-2a-completion.md
│   │   │   ├── phase-2a-handle-get-prompt.md
│   │   │   ├── phase-2a-index.md
│   │   │   ├── phase-2a-install-package.md
│   │   │   └── phase-2b-session-summary.md
│   │   ├── code-quality-workflow.md
│   │   ├── dashboard-workflow.md
│   │   ├── issue-management.md
│   │   ├── pr-review-guide.md
│   │   ├── refactoring-notes.md
│   │   ├── release-checklist.md
│   │   └── todo-tracker.md
│   ├── docker-optimized-build.md
│   ├── document-ingestion.md
│   ├── DOCUMENTATION_AUDIT.md
│   ├── enhancement-roadmap-issue-14.md
│   ├── examples
│   │   ├── analysis-scripts.js
│   │   ├── maintenance-session-example.md
│   │   ├── memory-distribution-chart.jsx
│   │   └── tag-schema.json
│   ├── first-time-setup.md
│   ├── glama-deployment.md
│   ├── guides
│   │   ├── advanced-command-examples.md
│   │   ├── chromadb-migration.md
│   │   ├── commands-vs-mcp-server.md
│   │   ├── mcp-enhancements.md
│   │   ├── mdns-service-discovery.md
│   │   ├── memory-consolidation-guide.md
│   │   ├── migration.md
│   │   ├── scripts.md
│   │   └── STORAGE_BACKENDS.md
│   ├── HOOK_IMPROVEMENTS.md
│   ├── hooks
│   │   └── phase2-code-execution-migration.md
│   ├── http-server-management.md
│   ├── ide-compatability.md
│   ├── IMAGE_RETENTION_POLICY.md
│   ├── images
│   │   └── dashboard-placeholder.md
│   ├── implementation
│   │   ├── health_checks.md
│   │   └── performance.md
│   ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│   ├── integration
│   │   ├── homebrew.md
│   │   └── multi-client.md
│   ├── integrations
│   │   ├── gemini.md
│   │   ├── groq-bridge.md
│   │   ├── groq-integration-summary.md
│   │   └── groq-model-comparison.md
│   ├── integrations.md
│   ├── legacy
│   │   └── dual-protocol-hooks.md
│   ├── LM_STUDIO_COMPATIBILITY.md
│   ├── maintenance
│   │   └── memory-maintenance.md
│   ├── mastery
│   │   ├── api-reference.md
│   │   ├── architecture-overview.md
│   │   ├── configuration-guide.md
│   │   ├── local-setup-and-run.md
│   │   ├── testing-guide.md
│   │   └── troubleshooting.md
│   ├── migration
│   │   └── code-execution-api-quick-start.md
│   ├── natural-memory-triggers
│   │   ├── cli-reference.md
│   │   ├── installation-guide.md
│   │   └── performance-optimization.md
│   ├── oauth-setup.md
│   ├── pr-graphql-integration.md
│   ├── quick-setup-cloudflare-dual-environment.md
│   ├── README.md
│   ├── remote-configuration-wiki-section.md
│   ├── research
│   │   ├── code-execution-interface-implementation.md
│   │   └── code-execution-interface-summary.md
│   ├── ROADMAP.md
│   ├── sqlite-vec-backend.md
│   ├── statistics
│   │   ├── charts
│   │   │   ├── activity_patterns.png
│   │   │   ├── contributors.png
│   │   │   ├── growth_trajectory.png
│   │   │   ├── monthly_activity.png
│   │   │   └── october_sprint.png
│   │   ├── data
│   │   │   ├── activity_by_day.csv
│   │   │   ├── activity_by_hour.csv
│   │   │   ├── contributors.csv
│   │   │   └── monthly_activity.csv
│   │   ├── generate_charts.py
│   │   └── REPOSITORY_STATISTICS.md
│   ├── technical
│   │   ├── development.md
│   │   ├── memory-migration.md
│   │   ├── migration-log.md
│   │   ├── sqlite-vec-embedding-fixes.md
│   │   └── tag-storage.md
│   ├── testing
│   │   └── regression-tests.md
│   ├── testing-cloudflare-backend.md
│   ├── troubleshooting
│   │   ├── cloudflare-api-token-setup.md
│   │   ├── cloudflare-authentication.md
│   │   ├── general.md
│   │   ├── hooks-quick-reference.md
│   │   ├── pr162-schema-caching-issue.md
│   │   ├── session-end-hooks.md
│   │   └── sync-issues.md
│   └── tutorials
│       ├── advanced-techniques.md
│       ├── data-analysis.md
│       └── demo-session-walkthrough.md
├── examples
│   ├── claude_desktop_config_template.json
│   ├── claude_desktop_config_windows.json
│   ├── claude-desktop-http-config.json
│   ├── config
│   │   └── claude_desktop_config.json
│   ├── http-mcp-bridge.js
│   ├── memory_export_template.json
│   ├── README.md
│   ├── setup
│   │   └── setup_multi_client_complete.py
│   └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│   ├── .claude
│   │   └── settings.local.json
│   ├── archive
│   │   └── check_missing_timestamps.py
│   ├── backup
│   │   ├── backup_memories.py
│   │   ├── backup_sqlite_vec.sh
│   │   ├── export_distributable_memories.sh
│   │   └── restore_memories.py
│   ├── benchmarks
│   │   ├── benchmark_code_execution_api.py
│   │   ├── benchmark_hybrid_sync.py
│   │   └── benchmark_server_caching.py
│   ├── database
│   │   ├── analyze_sqlite_vec_db.py
│   │   ├── check_sqlite_vec_status.py
│   │   ├── db_health_check.py
│   │   └── simple_timestamp_check.py
│   ├── development
│   │   ├── debug_server_initialization.py
│   │   ├── find_orphaned_files.py
│   │   ├── fix_mdns.sh
│   │   ├── fix_sitecustomize.py
│   │   ├── remote_ingest.sh
│   │   ├── setup-git-merge-drivers.sh
│   │   ├── uv-lock-merge.sh
│   │   └── verify_hybrid_sync.py
│   ├── hooks
│   │   └── pre-commit
│   ├── installation
│   │   ├── install_linux_service.py
│   │   ├── install_macos_service.py
│   │   ├── install_uv.py
│   │   ├── install_windows_service.py
│   │   ├── install.py
│   │   ├── setup_backup_cron.sh
│   │   ├── setup_claude_mcp.sh
│   │   └── setup_cloudflare_resources.py
│   ├── linux
│   │   ├── service_status.sh
│   │   ├── start_service.sh
│   │   ├── stop_service.sh
│   │   ├── uninstall_service.sh
│   │   └── view_logs.sh
│   ├── maintenance
│   │   ├── assign_memory_types.py
│   │   ├── check_memory_types.py
│   │   ├── cleanup_corrupted_encoding.py
│   │   ├── cleanup_memories.py
│   │   ├── cleanup_organize.py
│   │   ├── consolidate_memory_types.py
│   │   ├── consolidation_mappings.json
│   │   ├── delete_orphaned_vectors_fixed.py
│   │   ├── fast_cleanup_duplicates_with_tracking.sh
│   │   ├── find_all_duplicates.py
│   │   ├── find_cloudflare_duplicates.py
│   │   ├── find_duplicates.py
│   │   ├── memory-types.md
│   │   ├── README.md
│   │   ├── recover_timestamps_from_cloudflare.py
│   │   ├── regenerate_embeddings.py
│   │   ├── repair_malformed_tags.py
│   │   ├── repair_memories.py
│   │   ├── repair_sqlite_vec_embeddings.py
│   │   ├── repair_zero_embeddings.py
│   │   ├── restore_from_json_export.py
│   │   └── scan_todos.sh
│   ├── migration
│   │   ├── cleanup_mcp_timestamps.py
│   │   ├── legacy
│   │   │   └── migrate_chroma_to_sqlite.py
│   │   ├── mcp-migration.py
│   │   ├── migrate_sqlite_vec_embeddings.py
│   │   ├── migrate_storage.py
│   │   ├── migrate_tags.py
│   │   ├── migrate_timestamps.py
│   │   ├── migrate_to_cloudflare.py
│   │   ├── migrate_to_sqlite_vec.py
│   │   ├── migrate_v5_enhanced.py
│   │   ├── TIMESTAMP_CLEANUP_README.md
│   │   └── verify_mcp_timestamps.py
│   ├── pr
│   │   ├── amp_collect_results.sh
│   │   ├── amp_detect_breaking_changes.sh
│   │   ├── amp_generate_tests.sh
│   │   ├── amp_pr_review.sh
│   │   ├── amp_quality_gate.sh
│   │   ├── amp_suggest_fixes.sh
│   │   ├── auto_review.sh
│   │   ├── detect_breaking_changes.sh
│   │   ├── generate_tests.sh
│   │   ├── lib
│   │   │   └── graphql_helpers.sh
│   │   ├── quality_gate.sh
│   │   ├── resolve_threads.sh
│   │   ├── run_pyscn_analysis.sh
│   │   ├── run_quality_checks.sh
│   │   ├── thread_status.sh
│   │   └── watch_reviews.sh
│   ├── quality
│   │   ├── fix_dead_code_install.sh
│   │   ├── phase1_dead_code_analysis.md
│   │   ├── phase2_complexity_analysis.md
│   │   ├── README_PHASE1.md
│   │   ├── README_PHASE2.md
│   │   ├── track_pyscn_metrics.sh
│   │   └── weekly_quality_review.sh
│   ├── README.md
│   ├── run
│   │   ├── run_mcp_memory.sh
│   │   ├── run-with-uv.sh
│   │   └── start_sqlite_vec.sh
│   ├── run_memory_server.py
│   ├── server
│   │   ├── check_http_server.py
│   │   ├── check_server_health.py
│   │   ├── memory_offline.py
│   │   ├── preload_models.py
│   │   ├── run_http_server.py
│   │   ├── run_memory_server.py
│   │   ├── start_http_server.bat
│   │   └── start_http_server.sh
│   ├── service
│   │   ├── deploy_dual_services.sh
│   │   ├── install_http_service.sh
│   │   ├── mcp-memory-http.service
│   │   ├── mcp-memory.service
│   │   ├── memory_service_manager.sh
│   │   ├── service_control.sh
│   │   ├── service_utils.py
│   │   └── update_service.sh
│   ├── sync
│   │   ├── check_drift.py
│   │   ├── claude_sync_commands.py
│   │   ├── export_memories.py
│   │   ├── import_memories.py
│   │   ├── litestream
│   │   │   ├── apply_local_changes.sh
│   │   │   ├── enhanced_memory_store.sh
│   │   │   ├── init_staging_db.sh
│   │   │   ├── io.litestream.replication.plist
│   │   │   ├── manual_sync.sh
│   │   │   ├── memory_sync.sh
│   │   │   ├── pull_remote_changes.sh
│   │   │   ├── push_to_remote.sh
│   │   │   ├── README.md
│   │   │   ├── resolve_conflicts.sh
│   │   │   ├── setup_local_litestream.sh
│   │   │   ├── setup_remote_litestream.sh
│   │   │   ├── staging_db_init.sql
│   │   │   ├── stash_local_changes.sh
│   │   │   ├── sync_from_remote_noconfig.sh
│   │   │   └── sync_from_remote.sh
│   │   ├── README.md
│   │   ├── safe_cloudflare_update.sh
│   │   ├── sync_memory_backends.py
│   │   └── sync_now.py
│   ├── testing
│   │   ├── run_complete_test.py
│   │   ├── run_memory_test.sh
│   │   ├── simple_test.py
│   │   ├── test_cleanup_logic.py
│   │   ├── test_cloudflare_backend.py
│   │   ├── test_docker_functionality.py
│   │   ├── test_installation.py
│   │   ├── test_mdns.py
│   │   ├── test_memory_api.py
│   │   ├── test_memory_simple.py
│   │   ├── test_migration.py
│   │   ├── test_search_api.py
│   │   ├── test_sqlite_vec_embeddings.py
│   │   ├── test_sse_events.py
│   │   ├── test-connection.py
│   │   └── test-hook.js
│   ├── utils
│   │   ├── claude_commands_utils.py
│   │   ├── generate_personalized_claude_md.sh
│   │   ├── groq
│   │   ├── groq_agent_bridge.py
│   │   ├── list-collections.py
│   │   ├── memory_wrapper_uv.py
│   │   ├── query_memories.py
│   │   ├── smithery_wrapper.py
│   │   ├── test_groq_bridge.sh
│   │   └── uv_wrapper.py
│   └── validation
│       ├── check_dev_setup.py
│       ├── check_documentation_links.py
│       ├── diagnose_backend_config.py
│       ├── validate_configuration_complete.py
│       ├── validate_memories.py
│       ├── validate_migration.py
│       ├── validate_timestamp_integrity.py
│       ├── verify_environment.py
│       ├── verify_pytorch_windows.py
│       └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│   └── mcp_memory_service
│       ├── __init__.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── operations.py
│       │   ├── sync_wrapper.py
│       │   └── types.py
│       ├── backup
│       │   ├── __init__.py
│       │   └── scheduler.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── ingestion.py
│       │   ├── main.py
│       │   └── utils.py
│       ├── config.py
│       ├── consolidation
│       │   ├── __init__.py
│       │   ├── associations.py
│       │   ├── base.py
│       │   ├── clustering.py
│       │   ├── compression.py
│       │   ├── consolidator.py
│       │   ├── decay.py
│       │   ├── forgetting.py
│       │   ├── health.py
│       │   └── scheduler.py
│       ├── dependency_check.py
│       ├── discovery
│       │   ├── __init__.py
│       │   ├── client.py
│       │   └── mdns_service.py
│       ├── embeddings
│       │   ├── __init__.py
│       │   └── onnx_embeddings.py
│       ├── ingestion
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chunker.py
│       │   ├── csv_loader.py
│       │   ├── json_loader.py
│       │   ├── pdf_loader.py
│       │   ├── registry.py
│       │   ├── semtools_loader.py
│       │   └── text_loader.py
│       ├── lm_studio_compat.py
│       ├── mcp_server.py
│       ├── models
│       │   ├── __init__.py
│       │   └── memory.py
│       ├── server.py
│       ├── services
│       │   ├── __init__.py
│       │   └── memory_service.py
│       ├── storage
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloudflare.py
│       │   ├── factory.py
│       │   ├── http_client.py
│       │   ├── hybrid.py
│       │   └── sqlite_vec.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── exporter.py
│       │   ├── importer.py
│       │   └── litestream_config.py
│       ├── utils
│       │   ├── __init__.py
│       │   ├── cache_manager.py
│       │   ├── content_splitter.py
│       │   ├── db_utils.py
│       │   ├── debug.py
│       │   ├── document_processing.py
│       │   ├── gpu_detection.py
│       │   ├── hashing.py
│       │   ├── http_server_manager.py
│       │   ├── port_detection.py
│       │   ├── system_detection.py
│       │   └── time_parser.py
│       └── web
│           ├── __init__.py
│           ├── api
│           │   ├── __init__.py
│           │   ├── analytics.py
│           │   ├── backup.py
│           │   ├── consolidation.py
│           │   ├── documents.py
│           │   ├── events.py
│           │   ├── health.py
│           │   ├── manage.py
│           │   ├── mcp.py
│           │   ├── memories.py
│           │   ├── search.py
│           │   └── sync.py
│           ├── app.py
│           ├── dependencies.py
│           ├── oauth
│           │   ├── __init__.py
│           │   ├── authorization.py
│           │   ├── discovery.py
│           │   ├── middleware.py
│           │   ├── models.py
│           │   ├── registration.py
│           │   └── storage.py
│           ├── sse.py
│           └── static
│               ├── app.js
│               ├── index.html
│               ├── README.md
│               ├── sse_test.html
│               └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── test_compact_types.py
│   │   └── test_operations.py
│   ├── bridge
│   │   ├── mock_responses.js
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   └── test_http_mcp_bridge.js
│   ├── conftest.py
│   ├── consolidation
│   │   ├── __init__.py
│   │   ├── conftest.py
│   │   ├── test_associations.py
│   │   ├── test_clustering.py
│   │   ├── test_compression.py
│   │   ├── test_consolidator.py
│   │   ├── test_decay.py
│   │   └── test_forgetting.py
│   ├── contracts
│   │   └── api-specification.yml
│   ├── integration
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── test_api_key_fallback.py
│   │   ├── test_api_memories_chronological.py
│   │   ├── test_api_tag_time_search.py
│   │   ├── test_api_with_memory_service.py
│   │   ├── test_bridge_integration.js
│   │   ├── test_cli_interfaces.py
│   │   ├── test_cloudflare_connection.py
│   │   ├── test_concurrent_clients.py
│   │   ├── test_data_serialization_consistency.py
│   │   ├── test_http_server_startup.py
│   │   ├── test_mcp_memory.py
│   │   ├── test_mdns_integration.py
│   │   ├── test_oauth_basic_auth.py
│   │   ├── test_oauth_flow.py
│   │   ├── test_server_handlers.py
│   │   └── test_store_memory.py
│   ├── performance
│   │   ├── test_background_sync.py
│   │   └── test_hybrid_live.py
│   ├── README.md
│   ├── smithery
│   │   └── test_smithery.py
│   ├── sqlite
│   │   └── simple_sqlite_vec_test.py
│   ├── test_client.py
│   ├── test_content_splitting.py
│   ├── test_database.py
│   ├── test_hybrid_cloudflare_limits.py
│   ├── test_hybrid_storage.py
│   ├── test_memory_ops.py
│   ├── test_semantic_search.py
│   ├── test_sqlite_vec_storage.py
│   ├── test_time_parser.py
│   ├── test_timestamp_preservation.py
│   ├── timestamp
│   │   ├── test_hook_vs_manual_storage.py
│   │   ├── test_issue99_final_validation.py
│   │   ├── test_search_retrieval_inconsistency.py
│   │   ├── test_timestamp_issue.py
│   │   └── test_timestamp_simple.py
│   └── unit
│       ├── conftest.py
│       ├── test_cloudflare_storage.py
│       ├── test_csv_loader.py
│       ├── test_fastapi_dependencies.py
│       ├── test_import.py
│       ├── test_json_loader.py
│       ├── test_mdns_simple.py
│       ├── test_mdns.py
│       ├── test_memory_service.py
│       ├── test_memory.py
│       ├── test_semtools_loader.py
│       ├── test_storage_interface_compatibility.py
│       └── test_tag_time_filtering.py
├── tools
│   ├── docker
│   │   ├── DEPRECATED.md
│   │   ├── docker-compose.http.yml
│   │   ├── docker-compose.pythonpath.yml
│   │   ├── docker-compose.standalone.yml
│   │   ├── docker-compose.uv.yml
│   │   ├── docker-compose.yml
│   │   ├── docker-entrypoint-persistent.sh
│   │   ├── docker-entrypoint-unified.sh
│   │   ├── docker-entrypoint.sh
│   │   ├── Dockerfile
│   │   ├── Dockerfile.glama
│   │   ├── Dockerfile.slim
│   │   ├── README.md
│   │   └── test-docker-modes.sh
│   └── README.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/scripts/testing/test_installation.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Installation Verification Script for MCP Memory Service.

This script tests all critical components of MCP Memory Service to verify
that the installation is working correctly on the current platform.
"""
import os
import sys
import platform
import subprocess
import traceback
import importlib
from pathlib import Path

# ANSI color codes for terminal output
GREEN = "\033[92m"
YELLOW = "\033[93m"
RED = "\033[91m"
RESET = "\033[0m"
BOLD = "\033[1m"

def print_header(text):
    """Print a formatted header."""
    print(f"\n{BOLD}{'=' * 80}{RESET}")
    print(f"{BOLD} {text}{RESET}")
    print(f"{BOLD}{'=' * 80}{RESET}")

def print_success(text):
    """Print a success message."""
    print(f"{GREEN}✅ {text}{RESET}")

def print_warning(text):
    """Print a warning message."""
    print(f"{YELLOW}⚠️  {text}{RESET}")

def print_error(text):
    """Print an error message."""
    print(f"{RED}❌ {text}{RESET}")

def print_info(text):
    """Print an info message."""
    print(f"➔ {text}")

def check_python_version():
    """Check if Python version is compatible."""
    print_info(f"Python version: {sys.version}")
    major, minor, _ = platform.python_version_tuple()
    major, minor = int(major), int(minor)
    
    if major < 3 or (major == 3 and minor < 10):
        print_error(f"Python version {major}.{minor} is too old. MCP Memory Service requires Python 3.10+")
        return False
    else:
        print_success(f"Python version {major}.{minor} is compatible")
        return True

def check_dependencies():
    """Check if all required dependencies are installed and compatible."""
    required_packages = [
        "torch",
        "sentence_transformers",
        "chromadb",
        "mcp",
        "websockets",
        "numpy"
    ]
    
    success = True
    
    for package in required_packages:
        try:
            module = importlib.import_module(package)
            if hasattr(module, "__version__"):
                print_success(f"{package} is installed (version: {module.__version__})")
            else:
                print_success(f"{package} is installed")
                
            # Specific checks for critical packages
            if package == "torch":
                # Check PyTorch on different platforms
                check_torch_compatibility()
            elif package == "sentence_transformers":
                # Check sentence-transformers compatibility
                check_sentence_transformers_compatibility()
            elif package == "chromadb":
                # Check ChromaDB
                check_chromadb()
                
        except ImportError:
            print_error(f"{package} is not installed")
            success = False
        except Exception as e:
            print_error(f"Error checking {package}: {str(e)}")
            success = False
            
    return success

def check_torch_compatibility():
    """Check if PyTorch is compatible with the system."""
    import torch
    
    # Get system info
    system = platform.system().lower()
    machine = platform.machine().lower()
    is_windows = system == "windows"
    is_macos = system == "darwin"
    is_linux = system == "linux"
    is_arm = machine in ("arm64", "aarch64")
    is_x86 = machine in ("x86_64", "amd64", "x64")
    
    # Display torch info
    print_info(f"PyTorch version: {torch.__version__}")
    
    # Check CUDA availability
    if torch.cuda.is_available():
        device_count = torch.cuda.device_count()
        device_name = torch.cuda.get_device_name(0) if device_count > 0 else "Unknown"
        print_success(f"CUDA is available (version: {torch.version.cuda})")
        print_info(f"GPU Device: {device_name}")
        print_info(f"Device Count: {device_count}")
    # Check MPS availability (Apple Silicon)
    elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available():
        print_success("MPS (Metal Performance Shaders) is available")
        if not torch.backends.mps.is_built():
            print_warning("PyTorch is not built with MPS support")
    # Check DirectML (Windows)
    elif is_windows:
        try:
            import torch_directml
            print_success(f"DirectML is available (version: {torch_directml.__version__})")
        except ImportError:
            print_info("DirectML is not available, using CPU only")
    else:
        print_info("Using CPU only")
    
    # Special check for macOS Intel
    if is_macos and is_x86:
        torch_version = [int(x) for x in torch.__version__.split('.')[:2]]
        
        if torch_version[0] == 2 and torch_version[1] == 0:
            print_success("PyTorch 2.0.x detected, which is optimal for macOS Intel")
        elif torch_version[0] == 1 and torch_version[1] >= 13:
            print_success("PyTorch 1.13.x detected, which is compatible for macOS Intel")
        elif torch_version[0] == 1 and torch_version[1] < 11:
            print_warning("PyTorch version is below 1.11.0, which may be too old for sentence-transformers")
        elif torch_version[0] > 2 or (torch_version[0] == 2 and torch_version[1] > 0):
            print_warning("PyTorch version is newer than 2.0.x, which may have compatibility issues on macOS Intel")

def check_sentence_transformers_compatibility():
    """Check if sentence-transformers is compatible with the system and PyTorch."""
    import torch
    import sentence_transformers
    
    # Check compatibility
    torch_version = [int(x) for x in torch.__version__.split('.')[:2]]
    st_version = [int(x) for x in sentence_transformers.__version__.split('.')[:2]]
    
    print_info(f"sentence-transformers version: {sentence_transformers.__version__}")
    
    # Critical compatibility check
    system = platform.system().lower()
    machine = platform.machine().lower()
    is_macos = system == "darwin"
    is_x86 = machine in ("x86_64", "amd64", "x64")
    
    if is_macos and is_x86:
        if st_version[0] >= 3 and (torch_version[0] < 1 or (torch_version[0] == 1 and torch_version[1] < 11)):
            print_error("Incompatible versions: sentence-transformers 3.x+ requires torch>=1.11.0")
            return False
        elif st_version[0] == 2 and st_version[1] == 2 and (torch_version[0] == 2 and torch_version[1] == 0):
            print_success("Optimal combination: sentence-transformers 2.2.x with torch 2.0.x")
        elif st_version[0] == 2 and st_version[1] == 2 and (torch_version[0] == 1 and torch_version[1] >= 13):
            print_success("Compatible combination: sentence-transformers 2.2.x with torch 1.13.x")
        else:
            print_warning("Untested version combination. May work but not officially supported.")
    
    # Test sentence-transformers with a small model
    try:
        print_info("Testing model loading (paraphrase-MiniLM-L3-v2)...")
        start_time = __import__('time').time()
        model = sentence_transformers.SentenceTransformer('paraphrase-MiniLM-L3-v2')
        load_time = __import__('time').time() - start_time
        print_success(f"Model loaded successfully in {load_time:.2f}s")
        
        # Test encoding
        print_info("Testing encoding...")
        start_time = __import__('time').time()
        _ = model.encode("This is a test sentence")
        encode_time = __import__('time').time() - start_time
        print_success(f"Encoding successful in {encode_time:.2f}s")
        
        return True
    except Exception as e:
        print_error(f"Error testing sentence-transformers: {str(e)}")
        print(traceback.format_exc())
        return False

def check_chromadb():
    """Check if ChromaDB works correctly."""
    import chromadb
    
    print_info(f"ChromaDB version: {chromadb.__version__}")
    
    # Test in-memory client
    try:
        print_info("Testing in-memory ChromaDB client...")
        client = chromadb.Client()
        collection = client.create_collection("test_collection")
        collection.add(
            documents=["This is a test document"],
            metadatas=[{"source": "test"}],
            ids=["id1"]
        )
        results = collection.query(
            query_texts=["test document"],
            n_results=1
        )
        
        if results and len(results["ids"]) > 0:
            print_success("ChromaDB in-memory test successful")
        else:
            print_warning("ChromaDB query returned empty results")
            
        return True
    except Exception as e:
        print_error(f"Error testing ChromaDB: {str(e)}")
        print(traceback.format_exc())
        return False

def check_mcp_protocol():
    """Check if MCP protocol handler is working correctly."""
    try:
        import mcp
        from mcp.types import TextContent
        from mcp.server import Server
        
        print_info(f"MCP version: {mcp.__version__}")
        
        # Basic protocol functionality check
        server = Server("test_server")
        
        # Check if we can register handlers
        @server.list_tools()
        async def handle_list_tools():
            return []
            
        print_success("MCP protocol handler initialized successfully")
        return True
    except Exception as e:
        print_error(f"Error testing MCP protocol: {str(e)}")
        return False

def check_memory_service_installation():
    """Check if the MCP Memory Service package is installed correctly."""
    try:
        from mcp_memory_service import __file__ as package_path
        print_success(f"MCP Memory Service installed at: {package_path}")
        
        # Check if important modules are importable
        from mcp_memory_service.storage.chroma import ChromaMemoryStorage
        from mcp_memory_service.models.memory import Memory
        from mcp_memory_service.utils.time_parser import parse_time_expression
        
        print_success("All required MCP Memory Service modules imported successfully")
        return True
    except ImportError:
        print_error("MCP Memory Service package is not installed or importable")
        return False
    except Exception as e:
        print_error(f"Error importing MCP Memory Service modules: {str(e)}")
        return False

def check_system_paths():
    """Check if system paths are set up correctly."""
    print_info(f"System: {platform.system()} {platform.release()}")
    print_info(f"Architecture: {platform.machine()}")
    print_info(f"Python executable: {sys.executable}")
    
    # Check virtual environment
    in_venv = sys.prefix != sys.base_prefix
    if in_venv:
        print_success(f"Running in virtual environment: {sys.prefix}")
    else:
        print_warning("Not running in a virtual environment")
    
    # Check if 'memory' command is in PATH
    try:
        memory_cmd = subprocess.check_output(
            ["which", "memory"] if platform.system() != "Windows" else ["where", "memory"],
            stderr=subprocess.PIPE,
            text=True
        ).strip()
        print_success(f"'memory' command found at: {memory_cmd}")
    except subprocess.SubprocessError:
        print_warning("'memory' command not found in PATH")
    
    # Check for ChromaDB and backup paths
    chroma_path = os.environ.get("MCP_MEMORY_CHROMA_PATH")
    backups_path = os.environ.get("MCP_MEMORY_BACKUPS_PATH")
    
    if chroma_path:
        print_info(f"ChromaDB path: {chroma_path}")
        path = Path(chroma_path)
        if path.exists():
            print_success("ChromaDB path exists")
        else:
            print_warning("ChromaDB path does not exist yet")
    else:
        print_info("ChromaDB path not set in environment")
    
    if backups_path:
        print_info(f"Backups path: {backups_path}")
        path = Path(backups_path)
        if path.exists():
            print_success("Backups path exists")
        else:
            print_warning("Backups path does not exist yet")
    else:
        print_info("Backups path not set in environment")
        
    return True

def check_torch_operations():
    """Perform basic PyTorch operations to verify functionality."""
    try:
        import torch
        
        # Create a simple tensor
        print_info("Creating and manipulating tensors...")
        x = torch.rand(5, 3)
        y = torch.rand(5, 3)
        z = x + y
        
        # Try a basic neural network
        from torch import nn
        
        class SimpleNet(nn.Module):
            def __init__(self):
                super().__init__()
                self.fc1 = nn.Linear(10, 5)
                self.fc2 = nn.Linear(5, 2)
                
            def forward(self, x):
                x = torch.relu(self.fc1(x))
                x = self.fc2(x)
                return x
        
        model = SimpleNet()
        input_tensor = torch.rand(1, 10)
        output = model(input_tensor)
        
        print_success("PyTorch operations completed successfully")
        return True
    except Exception as e:
        print_error(f"Error in PyTorch operations: {str(e)}")
        return False

def run_verification():
    """Run all verification tests."""
    print_header("MCP Memory Service Installation Verification")
    
    # Track overall success
    success = True
    
    # Check Python version
    print_header("1. Python Environment")
    if not check_python_version():
        success = False
    
    check_system_paths()
    
    # Check dependencies
    print_header("2. Dependency Verification")
    if not check_dependencies():
        success = False
        
    # Check MCP Memory Service
    print_header("3. MCP Memory Service Installation")
    if not check_memory_service_installation():
        success = False
    
    if not check_mcp_protocol():
        success = False
    
    # Check PyTorch operations
    print_header("4. PyTorch Operations")
    if not check_torch_operations():
        success = False
    
    # Overall result
    print_header("Verification Results")
    if success:
        print_success("All verification tests passed! The installation appears to be working correctly.")
    else:
        print_warning("Some verification tests failed. Check the errors above for details.")
    
    return success

if __name__ == "__main__":
    success = run_verification()
    sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/Development-Sprint-November-2025.md:
--------------------------------------------------------------------------------

```markdown
# Development Sprint - November 2025

**Two Weeks. Seven Releases. Extraordinary Results.**

Between November 12-26, 2025, the MCP Memory Service project achieved a remarkable development sprint combining performance breakthroughs, code quality milestones, and workflow automation at unprecedented speed.

---

## 📊 Sprint Overview

| Metric | Achievement |
|--------|-------------|
| **Releases Shipped** | 7 major/minor versions |
| **Performance Gains** | 10x to 534,628x improvements |
| **Code Quality** | Grade D → Grade B (68-72/100) |
| **Fastest Release Cycle** | 35 minutes (issue → production) |
| **Lines of Duplicate Code Eliminated** | 176-186 lines |
| **Critical Bugs Prevented** | 2 (caught by AI review) |

---

## 🚀 Performance Breakthroughs

### v8.39.0 - Storage-Layer Date-Range Filtering (Nov 26)
**10x performance improvement** by moving analytics queries from application layer to database layer.

#### The Problem
Analytics endpoints were fetching ALL memories (10,000+) into Python, then filtering by date range in application code:
```python
# Old approach - inefficient
memories = await storage.get_all_memories(limit=10000)
for memory in memories:
    if start_time <= memory.created_at <= end_time:
        # Process memory
```

#### The Solution
Push filtering to SQL database layer with indexed WHERE clauses:
```python
# New approach - 10x faster
async def get_memories_by_time_range(self, start_time: float, end_time: float):
    sql = """
        SELECT m.*
        FROM memories m
        WHERE m.created_at BETWEEN ? AND ?
        ORDER BY m.created_at DESC
    """
    # Database handles filtering with indexes
```

#### Performance Impact
| Backend | Before | After | Improvement |
|---------|--------|-------|-------------|
| **SQLite-vec** | ~500ms | ~50ms | **10x faster** |
| **Cloudflare D1** | ~2-3s | ~200ms | **10-15x faster** |
| **Data Transfer** | 50MB | 1.5MB | **97% reduction** |

**Scalability**: Now handles databases with unlimited memories efficiently (previously hard-limited to 10,000).

**Development Speed**: Issue #238 → Production release in **35 minutes** using automated workflows.

---

### v8.26.0 - MCP Global Caching Breakthrough (Nov 16)
**MCP tools transformed from slowest to FASTEST** method for memory operations.

#### Revolutionary Achievement
**534,628x speedup** on cache hits - the most dramatic performance improvement in project history.

#### Before v8.26.0
- MCP Tools: ~1,810ms (slowest method)
- HTTP API: ~479ms (fastest method)

#### After v8.26.0
- **MCP Tools (cached)**: ~0.01ms ← **NEW FASTEST**
- MCP Tools (first call): ~2,485ms (one-time cost)
- HTTP API: ~479ms

#### Technical Implementation
Created `CacheManager` class with global storage/service caching:

```python
# Module-level cache persists across HTTP calls
_storage_cache: Dict[str, Any] = {}
_memory_service_cache: Dict[str, MemoryService] = {}

async def get_or_create_storage(backend: str, path: str):
    cache_key = f"{backend}:{path}"
    if cache_key not in _storage_cache:
        _storage_cache[cache_key] = await create_storage(backend, path)
    return _storage_cache[cache_key]
```

#### Real-World Results
- **90%+ cache hit rate** in production
- **41x faster than HTTP API** after warm-up
- **99.9996% latency reduction** on cached operations

**Impact**: Sub-millisecond response times transform the user experience for Claude Desktop and Claude Code users.

---

## 🎯 Code Quality Journey: Grade D → Grade B

### Three-Release Sprint (Nov 22-24)
Achieved **100% of Phase 2 complexity reduction targets** across three coordinated releases.

#### v8.34.0 - First Function (Nov 22)
**40 minutes**: Analysis → PR → Review → Merge → Release

- `analytics.py::get_memory_growth()` complexity: 11 → 6-7 (-4 to -5 points)
- Pattern: PeriodType Enum + data-driven approach
- gemini-pr-automator: 3 review iterations, exceeded target

#### v8.35.0 - Batch 1 High Priority (Nov 24)
**45 minutes**: 2 high-priority functions

- `install.py::configure_paths()` 15 → 5 (**-10 points**)
  - Extracted 4 helpers: `get_platform_base_dir()`, `setup_storage_directories()`, `build_mcp_env_config()`, `update_claude_config_file()`
- `cloudflare.py::_search_by_tags_internal()` 13 → 8 (-5 points)
  - Extracted 3 helpers for tag normalization and query building

#### v8.36.0 - Completion (Nov 24)
**60 minutes**: Remaining 7 functions (100% complete!)

- **2 consolidation functions** (-8 points): Context managers + config-driven patterns
- **3 analytics functions** (-8 points): 70+ lines extracted
- **1 GPU detection** (-2 points): Platform-specific checks unified
- **1 Cloudflare helper** (-1 point): Timestamp fetching

**CRITICAL**: Gemini Code Assist caught 2 bugs before release:
1. ❌→✅ Timezone bug: `datetime.now()` → `datetime.now(timezone.utc)` (would have caused incorrect consolidation timestamps)
2. ❌→✅ Analytics double-counting: Fixed total_memories calculation (would have shown incorrect percentages)

#### Final Metrics - 100% Achievement

| Metric | Target | Achieved | Result |
|--------|--------|----------|--------|
| Functions Refactored | 10 | 10 | ✅ 100% |
| Complexity Points Reduced | -39 | -39 | ✅ 100% |
| Complexity Score Gain | +10 | +11 | ✅ 110% |
| Health Score | 66-70 | 68-72 | ✅ **Grade B** |

**Before Phase 2**: Health 63/100 (Grade D)
**After Phase 2**: Health 68-72/100 (Grade B) ← **Full grade improvement**

---

### v8.38.0 - Phase 2b Duplication Reduction (Nov 25)
**176-186 lines of duplicate code eliminated** across 10 consolidation commits.

#### Helper Extraction Pattern
Consistently applied methodology across all consolidations:

```python
def _helper_function_name(param1, param2, optional=None):
    """
    Brief description of consolidation purpose.

    Args:
        param1: Varying parameter between original blocks
        param2: Another variation point
        optional: Optional parameter with sensible default

    Returns:
        Result type
    """
    # Consolidated logic with parameterized differences
    pass
```

#### Key Consolidations
1. **`parse_mcp_response()`** - MCP protocol error handling (3 blocks, 47 lines)
2. **`_get_or_create_memory_service()`** - Two-tier cache management (3 blocks, 65 lines)
3. **`_calculate_season_date_range()`** - Winter boundary logic (2 blocks, 24 lines)
4. **`_process_and_store_chunk()`** - Document processing (3 blocks, ~40-50 lines)

#### Strategic Decisions
**4 groups intentionally deferred** with documented rationale:
- High-risk backend logic (60 lines, critical startup code)
- Different semantic contexts (error handling patterns)
- Low-priority test/script duplication

**Key Insight**: Quality over arbitrary metrics - pursuing <3% duplication target would require high-risk, low-benefit consolidations.

#### Results
- **Duplication**: 5.5% → 4.5-4.7% (approaching <3% target)
- **Test Coverage**: 100% maintained throughout
- **Breaking Changes**: Zero - complete backward compatibility

---

## 🤖 AI-Assisted Development Workflow

### Agent Ecosystem
Three specialized agents orchestrated the development workflow:

#### 1. github-release-manager
**Complete release automation** - Zero manual steps

**Workflow**:
1. Four-file version bump (\_\_init\_\_.py, pyproject.toml, README.md, uv.lock)
2. CHANGELOG.md updates with detailed metrics
3. Git operations (commit, tag, push)
4. GitHub Release creation with release notes
5. CI/CD verification (Docker Publish, PyPI Publish, HTTP-MCP Bridge)

**Impact**: 3 complete releases in Phase 2 sprint with consistent documentation quality.

#### 2. gemini-pr-automator
**Automated PR review cycles** - Eliminates "Wait 1min → /gemini review" loops

**Features**:
- Automated Gemini Code Assist review iteration
- Breaking change detection
- Test generation for new code
- Quality gate checks

**v8.36.0 Example**:
- 5 review iterations
- Caught 2 CRITICAL bugs before release
- Saved 2-3 hours of manual review

**Time Savings**: 10-30 minutes per PR across 9 total review iterations in Phase 2.

#### 3. amp-bridge
**Complete code generation** - Not just analysis

**Usage**:
- Provided full implementations (not just suggestions)
- Zero syntax errors in generated code
- Strategic token conservation (~50-60K tokens saved)

**User Feedback**: "way faster than claude code"

---

## 📈 Development Velocity Metrics

### Release Cycle Times

| Release | Date | Development Time | Notable |
|---------|------|------------------|---------|
| **v8.39.0** | Nov 26 | **35 minutes** | Issue → Production (fastest ever) |
| v8.38.0 | Nov 25 | ~90 minutes | 10 consolidation commits |
| v8.36.0 | Nov 24 | 60 minutes | 7 functions, 2 critical bugs caught |
| v8.35.0 | Nov 24 | 45 minutes | 2 high-priority functions |
| v8.34.0 | Nov 22 | 40 minutes | First Phase 2 function |

### Phase 2 Complete Sprint
**Total Time**: ~4 hours across 3 days for 10-function refactoring
**vs Manual Estimate**: 8-12 hours
**Time Savings**: 50-67% with AI agents

### Critical Bug Prevention
**2 bugs caught by Gemini Code Assist before release**:
- Timezone handling in consolidation scheduler
- Analytics calculation errors

**Impact**: Would have required emergency hotfixes if shipped to production.

---

## 🔧 Technical Patterns Established

### 1. Database-Layer Filtering
**Pattern**: Push filtering to SQL WHERE clauses instead of application code
```python
# Bad: Application-layer filtering
memories = await get_all_memories(limit=10000)
filtered = [m for m in memories if start <= m.created_at <= end]

# Good: Database-layer filtering
memories = await get_memories_by_time_range(start, end)
```
**Benefit**: 10x performance, leverages indexes, scales to unlimited data

### 2. Global Caching Strategy
**Pattern**: Module-level cache dictionaries for stateless HTTP environments
```python
_cache: Dict[str, Any] = {}

def get_or_create(key: str):
    if key not in _cache:
        _cache[key] = create_expensive_resource()
    return _cache[key]
```
**Benefit**: 534,628x speedup, 90%+ hit rate, sub-millisecond response

### 3. Helper Extraction for Duplication
**Pattern**: Parameterize differences, extract to helper function
```python
# Before: 3 duplicate blocks
# After: 1 helper function with 3 callers
def _helper(varying_param, optional=default):
    # Consolidated logic
    pass
```
**Benefit**: 176-186 lines eliminated, improved maintainability

### 4. Configuration-Driven Logic
**Pattern**: Replace if/elif chains with dictionary lookups
```python
# Before
if horizon == 'daily':
    days = 1
elif horizon == 'weekly':
    days = 7
# ... more elif

# After
HORIZON_CONFIGS = {
    'daily': {'days': 1, ...},
    'weekly': {'days': 7, ...},
}
config = HORIZON_CONFIGS[horizon]
```
**Benefit**: Reduced complexity, easier to extend, config-as-data

---

## 📚 Key Lessons Learned

### What Worked Excellently

1. **Agent-First Approach**
   - Using specialized agents (amp-bridge, github-release-manager, gemini-pr-automator) dramatically improved efficiency
   - 50-67% time savings vs manual workflows

2. **Small Batch Releases**
   - v8.34.0 (1 function) had deepest review quality
   - Easier to reason about changes, faster iteration

3. **Gemini Code Assist Integration**
   - Caught 2 critical bugs before release
   - Provided portability fixes and API modernization suggestions
   - Iterative review cycles improved code quality

4. **Pattern Consistency**
   - Establishing helper extraction pattern early made subsequent work systematic
   - 10 consolidation commits followed same methodology

### Process Improvements Demonstrated

1. **Token Conservation**
   - Strategic use of amp-bridge for heavy work saved ~50-60K tokens
   - Allowed more complex work within context limits

2. **Quality Over Metrics**
   - Deferring high-risk groups showed mature engineering judgment
   - Grade B achieved without compromising stability

3. **Release Automation**
   - github-release-manager ensured no documentation steps missed
   - Consistent release quality across 7 versions

4. **Test Coverage**
   - 100% coverage throughout maintained confidence in changes
   - All changes backward compatible (zero breaking changes)

---

## 🎉 Sprint Highlights

### By The Numbers
- **7 releases** in 14 days
- **10x to 534,628x** performance improvements
- **35-minute** fastest release cycle
- **176-186 lines** of duplicate code eliminated
- **Grade D → Grade B** health score improvement
- **2 critical bugs** prevented before release
- **50-67% time savings** with AI agents
- **100% test coverage** maintained
- **0 breaking changes** across all releases

### Most Impressive Achievement
**v8.39.0 in 35 minutes**: From issue analysis (#238) to production release with 10x performance improvement, comprehensive tests, and full documentation - all in half an hour.

### Innovation Breakthrough
**MCP Global Caching (v8.26.0)**: Transformed MCP tools from slowest (1,810ms) to fastest (0.01ms) method - a 534,628x improvement that sets new standards for MCP server performance.

### Quality Milestone
**Phase 2 Complete (v8.34-36)**: Achieved 100% of complexity reduction targets across three coordinated releases in 4 hours, with AI code review catching critical bugs before production.

---

## 🔮 Future Implications

### Performance Standards
- Database-layer filtering now standard for all analytics endpoints
- Global caching pattern applicable to all stateless HTTP environments
- Sub-millisecond response times set user experience baseline

### Code Quality Foundation
- Helper extraction pattern established for future consolidations
- Configuration-driven logic reduces complexity systematically
- 100% test coverage requirement proven sustainable

### Development Velocity
- 35-minute release cycles achievable with agent automation
- AI code review preventing bugs before production
- Agent-first workflows becoming default approach

---

## 📖 Related Resources

**GitHub Releases**:
- [v8.39.0 - Storage-Layer Date-Range Filtering](https://github.com/doobidoo/mcp-memory-service/releases/tag/v8.39.0)
- [v8.38.0 - Phase 2b Duplication Reduction](https://github.com/doobidoo/mcp-memory-service/releases/tag/v8.38.0)
- [v8.36.0 - Phase 2 Complete](https://github.com/doobidoo/mcp-memory-service/releases/tag/v8.36.0)
- [v8.26.0 - MCP Global Caching](https://github.com/doobidoo/mcp-memory-service/releases/tag/v8.26.0)

**Project Repository**: https://github.com/doobidoo/mcp-memory-service

**Issues**:
- [#238 - Analytics Performance Optimization](https://github.com/doobidoo/mcp-memory-service/issues/238)
- [#240 - Phase 2 Code Quality](https://github.com/doobidoo/mcp-memory-service/issues/240)
- [#246 - Phase 2b Duplication Reduction](https://github.com/doobidoo/mcp-memory-service/issues/246)

---

**Last Updated**: November 26, 2025
**Sprint Duration**: November 12-26, 2025 (14 days)
**Total Releases**: 7 major/minor versions

```

--------------------------------------------------------------------------------
/docs/api/tag-standardization.md:
--------------------------------------------------------------------------------

```markdown
# Tag Standardization Guide

A comprehensive guide to creating and maintaining a consistent, professional tag system for optimal knowledge organization in the MCP Memory Service.

## 🎯 Overview

Effective tag standardization is the foundation of a powerful knowledge management system. This guide establishes proven tag schemas, naming conventions, and organizational patterns that transform chaotic information into searchable, structured knowledge.

## 📋 Core Principles

### 1. Consistency
- Use standardized naming conventions
- Apply tags systematically across similar content
- Maintain format consistency (lowercase, hyphens, etc.)

### 2. Hierarchy
- Organize tags from general to specific
- Use multiple category levels for comprehensive organization
- Create logical groupings that reflect actual usage patterns

### 3. Utility
- Tags should enhance discoverability
- Focus on how information will be retrieved
- Balance detail with practical searchability

### 4. Evolution
- Tag schemas should adapt to changing needs
- Regular review and refinement process
- Documentation of changes and rationale

## 🏷️ Standardized Tag Schema

### Category 1: Projects & Repositories

**Primary Projects:**
```
mcp-memory-service     # Core memory service development
memory-dashboard       # Dashboard application
github-integration     # GitHub connectivity and automation
mcp-protocol          # Protocol-level development
cloudflare-workers     # Edge computing integration
```

**Project Components:**
```
frontend               # User interface components
backend               # Server-side development
api                   # API design and implementation
database              # Data storage and management
infrastructure        # Deployment and DevOps
```

**Usage Example:**
```javascript
{
  "tags": ["mcp-memory-service", "backend", "database", "chromadb"]
}
```

### Category 2: Technologies & Tools

**Programming Languages:**
```
python                # Python development
typescript            # TypeScript development
javascript            # JavaScript development
bash                  # Shell scripting
sql                   # Database queries
```

**Frameworks & Libraries:**
```
react                 # React development
fastapi               # FastAPI framework
chromadb              # ChromaDB vector database
sentence-transformers # Embedding models
pytest                # Testing framework
```

**Tools & Platforms:**
```
git                   # Version control
docker                # Containerization
github                # Repository management
aws                   # Amazon Web Services
npm                   # Node package management
```

**Usage Example:**
```javascript
{
  "tags": ["python", "chromadb", "sentence-transformers", "pytest"]
}
```

### Category 3: Activities & Processes

**Development Activities:**
```
development           # General development work
implementation        # Feature implementation
debugging             # Bug investigation and fixing
testing               # Quality assurance activities
refactoring           # Code improvement
optimization          # Performance enhancement
```

**Documentation Activities:**
```
documentation         # Writing documentation
tutorial              # Creating tutorials
guide                 # Step-by-step guides
reference             # Reference materials
examples              # Code examples
```

**Operational Activities:**
```
deployment            # Application deployment
monitoring            # System monitoring
backup                # Data backup processes
migration             # Data or system migration
maintenance           # System maintenance
troubleshooting       # Problem resolution
```

**Usage Example:**
```javascript
{
  "tags": ["debugging", "troubleshooting", "testing", "verification"]
}
```

### Category 4: Content Types & Formats

**Knowledge Types:**
```
concept               # Conceptual information
architecture          # System architecture
design                # Design decisions and patterns
best-practices        # Proven methodologies
methodology           # Systematic approaches
workflow              # Process workflows
```

**Documentation Formats:**
```
tutorial              # Step-by-step instructions
reference             # Quick reference materials
example               # Code or process examples
template              # Reusable templates
checklist             # Verification checklists
summary               # Condensed information
```

**Technical Content:**
```
configuration         # System configuration
specification         # Technical specifications
analysis              # Technical analysis
research              # Research findings
review                # Code or process reviews
```

**Usage Example:**
```javascript
{
  "tags": ["architecture", "design", "best-practices", "reference"]
}
```

### Category 5: Status & Progress

**Development Status:**
```
resolved              # Completed and verified
in-progress           # Currently being worked on
blocked               # Waiting for external dependencies
needs-investigation   # Requires further analysis
planned               # Scheduled for future work
cancelled             # No longer being pursued
```

**Quality Status:**
```
verified              # Tested and confirmed working
tested                # Has undergone testing
reviewed              # Has been peer reviewed
approved              # Officially approved
experimental          # Proof of concept stage
deprecated            # No longer recommended
```

**Priority Levels:**
```
urgent                # Immediate attention required
high-priority         # Important, should be addressed soon
normal-priority       # Standard priority
low-priority          # Can be addressed when time allows
nice-to-have          # Enhancement, not critical
```

**Usage Example:**
```javascript
{
  "tags": ["resolved", "verified", "high-priority", "production-ready"]
}
```

### Category 6: Context & Temporal

**Temporal Markers:**
```
january-2025          # Specific month context
q1-2025               # Quarterly context
milestone-v1          # Version milestones
release-candidate     # Release stages
sprint-3              # Development sprints
```

**Environmental Context:**
```
development           # Development environment
staging               # Staging environment
production            # Production environment
testing               # Testing environment
local                 # Local development
```

**Scope & Impact:**
```
breaking-change       # Introduces breaking changes
feature               # New feature development
enhancement           # Improvement to existing feature
hotfix                # Critical fix
security              # Security-related
performance           # Performance-related
```

**Usage Example:**
```javascript
{
  "tags": ["june-2025", "production", "security", "hotfix", "critical"]
}
```

## 🎨 Tag Naming Conventions

### Format Standards

**Basic Rules:**
- Use lowercase letters
- Replace spaces with hyphens: `memory-service` not `memory service`
- Use descriptive but concise terms
- Avoid abbreviations unless widely understood
- Use singular form when possible: `bug` not `bugs`

**Multi-word Tags:**
```
✅ Good: memory-service, github-integration, best-practices
❌ Bad: memoryservice, GitHub_Integration, bestPractices
```

**Version and Date Tags:**
```
✅ Good: v1-2-0, january-2025, q1-2025
❌ Bad: v1.2.0, Jan2025, Q1/2025
```

**Status and State Tags:**
```
✅ Good: in-progress, needs-investigation, high-priority
❌ Bad: inProgress, needsInvestigation, highPriority
```

### Hierarchical Naming

**Use progressive specificity:**
```
General → Specific
project → mcp-memory-service → backend → database
testing → integration-testing → api-testing
issue → bug → critical-bug → data-corruption
```

**Example Progression:**
```javascript
// General testing memory
{"tags": ["testing", "verification"]}

// Specific test type
{"tags": ["testing", "unit-testing", "python", "pytest"]}

// Very specific test
{"tags": ["testing", "unit-testing", "memory-storage", "chromadb", "pytest"]}
```

## 📊 Tag Application Patterns

### Multi-Category Tagging

**Recommended Pattern:**
Apply tags from 3-6 categories for comprehensive organization:

```javascript
{
  "tags": [
    // Project/Repository (1-2 tags)
    "mcp-memory-service", "backend",
    
    // Technology (1-3 tags)
    "python", "chromadb",
    
    // Activity (1-2 tags)
    "debugging", "troubleshooting",
    
    // Content Type (1 tag)
    "troubleshooting-guide",
    
    // Status (1 tag)
    "resolved",
    
    // Context (0-2 tags)
    "june-2025", "production"
  ]
}
```

### Content-Specific Patterns

**Bug Reports and Issues:**
```javascript
{
  "tags": [
    "issue-7",                    // Specific issue reference
    "timestamp-corruption",       // Problem description
    "critical-bug",              // Severity
    "mcp-memory-service",        // Project
    "chromadb",                  // Technology
    "resolved"                   // Status
  ]
}
```

**Documentation:**
```javascript
{
  "tags": [
    "documentation",             // Content type
    "memory-maintenance",        // Topic
    "best-practices",           // Knowledge type
    "tutorial",                 // Format
    "mcp-memory-service",       // Project
    "reference"                 // Usage type
  ]
}
```

**Development Milestones:**
```javascript
{
  "tags": [
    "milestone",                // Event type
    "v1-2-0",                  // Version
    "production-ready",        // Status
    "mcp-memory-service",      // Project
    "feature-complete",        // Achievement
    "june-2025"               // Timeline
  ]
}
```

**Research and Concepts:**
```javascript
{
  "tags": [
    "concept",                 // Content type
    "memory-consolidation",    // Topic
    "architecture",           // Category
    "research",               // Activity
    "cognitive-processing",   // Domain
    "system-design"           // Application
  ]
}
```

## 🔍 Tag Selection Guidelines

### Step-by-Step Tag Selection

**1. Start with Primary Context**
- What project or domain does this relate to?
- What's the main subject matter?

**2. Add Technical Details**
- What technologies are involved?
- What tools or platforms?

**3. Describe the Activity**
- What was being done?
- What type of work or process?

**4. Classify the Content**
- What kind of information is this?
- How will it be used in the future?

**5. Add Status Information**
- What's the current state?
- What's the priority or urgency?

**6. Include Temporal Context**
- When is this relevant?
- What timeline or milestone?

### Tag Selection Examples

**Example 1: Debug Session Memory**

Content: "Fixed issue with ChromaDB connection timeout in production"

**Analysis:**
- Primary Context: MCP Memory Service, backend
- Technical: ChromaDB, connection issues, production
- Activity: Debugging, troubleshooting, problem resolution
- Content: Troubleshooting solution, fix documentation
- Status: Resolved, production issue
- Temporal: Current work, immediate fix

**Selected Tags:**
```javascript
{
  "tags": [
    "mcp-memory-service", "backend",
    "chromadb", "connection-timeout", "production",
    "debugging", "troubleshooting",
    "solution", "hotfix",
    "resolved", "critical"
  ]
}
```

**Example 2: Planning Document**

Content: "Q2 2025 roadmap for memory service improvements"

**Analysis:**
- Primary Context: MCP Memory Service, planning
- Technical: General service improvements
- Activity: Planning, roadmap development
- Content: Strategic document, planning guide
- Status: Planning phase, future work
- Temporal: Q2 2025, quarterly planning

**Selected Tags:**
```javascript
{
  "tags": [
    "mcp-memory-service", "planning",
    "roadmap", "improvements",
    "strategy", "planning-document",
    "q2-2025", "quarterly",
    "future-work", "enhancement"
  ]
}
```

## 🛠️ Tag Management Tools

### Quality Control Queries

**Find inconsistent tagging:**
```javascript
// Look for similar content with different tag patterns
retrieve_memory({"query": "debugging troubleshooting", "n_results": 10})
search_by_tag({"tags": ["debug"]})  // vs search_by_tag({"tags": ["debugging"]})
```

**Identify tag standardization opportunities:**
```javascript
// Find memories that might need additional tags
retrieve_memory({"query": "issue bug problem", "n_results": 15})
search_by_tag({"tags": ["test"]})  // Check if generic tags need specificity
```

### Tag Analysis Scripts

**Tag frequency analysis:**
```javascript
// Analyze which tags are most/least used
check_database_health()  // Get overall statistics
search_by_tag({"tags": ["frequent-tag"]})  // Count instances
```

**Pattern consistency check:**
```javascript
// Verify similar content has similar tagging
const patterns = [
  "mcp-memory-service",
  "debugging",
  "issue-",
  "resolved"
];
// Check each pattern for consistency
```

## 📈 Tag Schema Evolution

### Regular Review Process

**Monthly Review Questions:**
1. Are there new tag categories needed?
2. Are existing tags being used consistently?
3. Should any tags be merged or split?
4. Are there emerging patterns that need standardization?

**Quarterly Schema Updates:**
1. Analyze tag usage statistics
2. Identify inconsistencies or gaps
3. Propose schema improvements
4. Document rationale for changes
5. Implement updates systematically

### Schema Version Control

**Track changes with metadata:**
```javascript
store_memory({
  "content": "Tag Schema Update v2.1: Added security-related tags, consolidated testing categories...",
  "metadata": {
    "tags": ["tag-schema", "version-2-1", "schema-update", "documentation"],
    "type": "schema-documentation"
  }
})
```

## 🎯 Best Practices Summary

### Do's

✅ **Be Consistent**: Use the same tag patterns for similar content
✅ **Use Multiple Categories**: Apply tags from different categories for comprehensive organization
✅ **Follow Naming Conventions**: Stick to lowercase, hyphenated format
✅ **Think About Retrieval**: Tag based on how you'll search for information
✅ **Document Decisions**: Record rationale for tag choices
✅ **Review Regularly**: Update and improve tag schemas over time

### Don'ts

❌ **Over-tag**: Don't add too many tags; focus on the most relevant
❌ **Under-tag**: Don't use too few tags; aim for 4-8 well-chosen tags
❌ **Use Inconsistent Formats**: Avoid mixing naming conventions
❌ **Create Redundant Tags**: Don't duplicate information already in content
❌ **Ignore Context**: Don't forget temporal or project context
❌ **Set and Forget**: Don't create tags without ongoing maintenance

---

*This standardization guide provides the foundation for creating a professional, searchable, and maintainable knowledge management system. Consistent application of these standards will dramatically improve the value and usability of your MCP Memory Service.*
```

--------------------------------------------------------------------------------
/scripts/validation/verify_environment.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Enhanced environment verification script for MCP Memory Service.
This script checks the system environment, hardware capabilities,
and installed dependencies to ensure compatibility.
"""
import os
import sys
import platform
import subprocess
import json
import importlib
import pkg_resources
from pathlib import Path
import traceback
import ctypes

# Import shared GPU detection utilities
try:
    from mcp_memory_service.utils.gpu_detection import detect_gpu as shared_detect_gpu
except ImportError:
    # Fallback for scripts directory context
    sys.path.insert(0, str(Path(__file__).parent.parent.parent))
    from src.mcp_memory_service.utils.gpu_detection import detect_gpu as shared_detect_gpu

class EnvironmentVerifier:
    def __init__(self):
        self.verification_results = []
        self.critical_failures = []
        self.warnings = []
        self.system_info = self.detect_system()
        self.gpu_info = self.detect_gpu()
        self.claude_config = self.load_claude_config()

    def detect_system(self):
        """Detect system architecture and platform."""
        system_info = {
            "os_name": platform.system().lower(),
            "os_version": platform.version(),
            "architecture": platform.machine().lower(),
            "python_version": platform.python_version(),
            "cpu_count": os.cpu_count() or 1,
            "memory_gb": self.get_system_memory(),
            "in_virtual_env": sys.prefix != sys.base_prefix
        }
        
        self.verification_results.append(
            f"[OK] System: {platform.system()} {platform.version()}"
        )
        self.verification_results.append(
            f"[OK] Architecture: {system_info['architecture']}"
        )
        self.verification_results.append(
            f"[OK] Python: {system_info['python_version']}"
        )
        
        if system_info["in_virtual_env"]:
            self.verification_results.append(
                f"[OK] Virtual environment: {sys.prefix}"
            )
        else:
            self.warnings.append(
                "Not running in a virtual environment"
            )
        
        return system_info

    def get_system_memory(self):
        """Get the total system memory in GB."""
        try:
            if self.system_info["os_name"] == "linux":
                with open('/proc/meminfo', 'r') as f:
                    for line in f:
                        if line.startswith('MemTotal:'):
                            memory_kb = int(line.split()[1])
                            return round(memory_kb / (1024 * 1024), 2)
                            
            elif self.system_info["os_name"] == "darwin":
                output = subprocess.check_output(['sysctl', '-n', 'hw.memsize']).decode('utf-8').strip()
                memory_bytes = int(output)
                return round(memory_bytes / (1024**3), 2)
                
            elif self.system_info["os_name"] == "windows":
                class MEMORYSTATUSEX(ctypes.Structure):
                    _fields_ = [
                        ('dwLength', ctypes.c_ulong),
                        ('dwMemoryLoad', ctypes.c_ulong),
                        ('ullTotalPhys', ctypes.c_ulonglong),
                        ('ullAvailPhys', ctypes.c_ulonglong),
                        ('ullTotalPageFile', ctypes.c_ulonglong),
                        ('ullAvailPageFile', ctypes.c_ulonglong),
                        ('ullTotalVirtual', ctypes.c_ulonglong),
                        ('ullAvailVirtual', ctypes.c_ulonglong),
                        ('ullAvailExtendedVirtual', ctypes.c_ulonglong),
                    ]
                    
                memoryStatus = MEMORYSTATUSEX()
                memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUSEX)
                ctypes.windll.kernel32.GlobalMemoryStatusEx(ctypes.byref(memoryStatus))
                return round(memoryStatus.ullTotalPhys / (1024**3), 2)
                
        except Exception as e:
            self.warnings.append(f"Failed to get system memory: {e}")
            
        return 4.0  # Conservative default

    def detect_gpu(self):
        """Detect GPU and acceleration capabilities.

        Uses shared GPU detection module for platform detection.
        """
        # Adapt system info format for shared module
        adapted_system_info = {
            "is_windows": self.system_info["os_name"] == "windows",
            "is_linux": self.system_info["os_name"] == "linux",
            "is_macos": self.system_info["os_name"] == "darwin",
            "is_arm": self.system_info["architecture"] in ("arm64", "aarch64")
        }

        # Use shared GPU detection module
        gpu_info = shared_detect_gpu(adapted_system_info)

        # Append verification results (maintain verifier output format)
        if gpu_info.get("has_cuda"):
            cuda_version = gpu_info.get("cuda_version")
            self.verification_results.append(
                f"[OK] CUDA detected: {cuda_version or 'Unknown version'}"
            )
        elif gpu_info.get("has_rocm"):
            rocm_version = gpu_info.get("rocm_version")
            self.verification_results.append(
                f"[OK] ROCm detected: {rocm_version or 'Unknown version'}"
            )
        elif gpu_info.get("has_mps"):
            self.verification_results.append(
                "[OK] Apple Metal Performance Shaders (MPS) detected"
            )
        elif gpu_info.get("has_directml"):
            directml_version = gpu_info.get("directml_version")
            if directml_version:
                self.verification_results.append(f"[OK] DirectML detected: {directml_version}")
            else:
                self.verification_results.append("[OK] DirectML detected")
        else:
            self.verification_results.append(
                "[OK] Using CPU-only mode (no GPU acceleration detected)"
            )

        return gpu_info

    def load_claude_config(self):
        """Load configuration from Claude Desktop config."""
        try:
            home_dir = Path.home()
            possible_paths = [
                home_dir / "Library/Application Support/Claude/claude_desktop_config.json",
                home_dir / ".config/Claude/claude_desktop_config.json",
                Path(__file__).parent.parent / "claude_config/claude_desktop_config.json"
            ]

            for config_path in possible_paths:
                if config_path.exists():
                    with open(config_path) as f:
                        config = json.load(f)
                        self.verification_results.append(
                            f"[OK] Found Claude Desktop config at {config_path}"
                        )
                        return config

            self.warnings.append(
                "Could not find Claude Desktop config file in any standard location"
            )
            return None

        except Exception as e:
            self.critical_failures.append(
                f"Error loading Claude Desktop config: {str(e)}"
            )
            return None

    def verify_python_version(self):
        """Verify Python interpreter version matches production requirements."""
        try:
            python_version = sys.version.split()[0]
            required_version = "3.10"  # Updated to match current requirements
            
            if not python_version.startswith(required_version):
                self.critical_failures.append(
                    f"Python version mismatch: Found {python_version}, required {required_version}"
                )
            else:
                self.verification_results.append(
                    f"[OK] Python version verified: {python_version}"
                )
        except Exception as e:
            self.critical_failures.append(f"Failed to verify Python version: {str(e)}")

    def verify_virtual_environment(self):
        """Verify we're running in a virtual environment."""
        try:
            if sys.prefix == sys.base_prefix:
                self.critical_failures.append(
                    "Not running in a virtual environment!"
                )
            else:
                self.verification_results.append(
                    f"[OK] Virtual environment verified: {sys.prefix}"
                )
        except Exception as e:
            self.critical_failures.append(
                f"Failed to verify virtual environment: {str(e)}"
            )

    def verify_critical_packages(self):
        """Verify critical packages are installed with correct versions."""
        required_packages = {
            'chromadb': '0.5.23',
            'sentence-transformers': '2.2.2',
            'urllib3': '1.26.6',
            'python-dotenv': '1.0.0'
        }

        for package, required_version in required_packages.items():
            try:
                installed_version = pkg_resources.get_distribution(package).version
                if required_version and installed_version != required_version:
                    self.critical_failures.append(
                        f"Package version mismatch: {package} "
                        f"(found {installed_version}, required {required_version})"
                    )
                else:
                    self.verification_results.append(
                        f"[OK] Package verified: {package} {installed_version}"
                    )
            except pkg_resources.DistributionNotFound:
                self.critical_failures.append(f"Required package not found: {package}")
            except Exception as e:
                self.critical_failures.append(
                    f"Failed to verify package {package}: {str(e)}"
                )

    def verify_claude_paths(self):
        """Verify paths from Claude Desktop config."""
        if not self.claude_config:
            return

        try:
            chroma_path = self.claude_config.get('mcp-memory', {}).get('chroma_db')
            backup_path = self.claude_config.get('mcp-memory', {}).get('backup_path')

            if chroma_path:
                os.environ['CHROMA_DB_PATH'] = str(chroma_path)
                self.verification_results.append(
                    f"[OK] Set CHROMA_DB_PATH from config: {chroma_path}"
                )
            else:
                self.critical_failures.append("CHROMA_DB_PATH not found in Claude config")

            if backup_path:
                os.environ['MCP_MEMORY_BACKUP_PATH'] = str(backup_path)
                self.verification_results.append(
                    f"[OK] Set MCP_MEMORY_BACKUP_PATH from config: {backup_path}"
                )
            else:
                self.critical_failures.append("MCP_MEMORY_BACKUP_PATH not found in Claude config")

        except Exception as e:
            self.critical_failures.append(f"Failed to verify Claude paths: {str(e)}")

    def verify_import_functionality(self):
        """Verify critical imports work correctly."""
        critical_imports = [
            'chromadb',
            'sentence_transformers',
        ]

        for module_name in critical_imports:
            try:
                module = importlib.import_module(module_name)
                self.verification_results.append(f"[OK] Successfully imported {module_name}")
            except ImportError as e:
                self.critical_failures.append(f"Failed to import {module_name}: {str(e)}")

    def verify_paths(self):
        """Verify critical paths exist and are accessible."""
        critical_paths = [
            os.environ.get('CHROMA_DB_PATH', ''),
            os.environ.get('MCP_MEMORY_BACKUP_PATH', '')
        ]

        for path in critical_paths:
            if not path:
                continue
            try:
                path_obj = Path(path)
                if not path_obj.exists():
                    self.critical_failures.append(f"Critical path does not exist: {path}")
                elif not os.access(path, os.R_OK | os.W_OK):
                    self.critical_failures.append(f"Insufficient permissions for path: {path}")
                else:
                    self.verification_results.append(f"[OK] Path verified: {path}")
            except Exception as e:
                self.critical_failures.append(f"Failed to verify path {path}: {str(e)}")

    def run_verifications(self):
        """Run all verifications."""
        self.verify_python_version()
        self.verify_virtual_environment()
        self.verify_critical_packages()
        self.verify_claude_paths()
        self.verify_import_functionality()
        self.verify_paths()

    def print_results(self):
        """Print verification results."""
        print("\n=== Environment Verification Results ===\n")
        
        if self.verification_results:
            print("Successful Verifications:")
            for result in self.verification_results:
                print(f"  {result}")
        
        if self.warnings:
            print("\nWarnings:")
            for warning in self.warnings:
                print(f"  [!] {warning}")
        
        if self.critical_failures:
            print("\nCritical Failures:")
            for failure in self.critical_failures:
                print(f"  [X] {failure}")
        
        print("\nSummary:")
        print(f"  Passed: {len(self.verification_results)}")
        print(f"  Warnings: {len(self.warnings)}")
        print(f"  Failed: {len(self.critical_failures)}")
        
        if self.critical_failures:
            print("\nTo fix these issues:")
            print("1. Create a new virtual environment:")
            print("   conda create -n mcp-env python=3.10")
            print("   conda activate mcp-env")
            print("\n2. Install requirements:")
            print("   pip install -r requirements.txt")
            print("\n3. Ensure Claude Desktop config is properly set up with required paths")
            
        return len(self.critical_failures) == 0

def main():
    verifier = EnvironmentVerifier()
    verifier.run_verifications()
    environment_ok = verifier.print_results()
    
    if not environment_ok:
        print("\n[WARNING]  Environment verification failed! Please fix the issues above.")
        sys.exit(1)
    else:
        print("\n[OK] Environment verification passed! Safe to proceed.")
        sys.exit(0)

if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/src/mcp_memory_service/api/operations.py:
--------------------------------------------------------------------------------

```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Core operations for code execution interface.

Provides token-efficient functions for memory operations:
    - search: Semantic search with compact results
    - store: Store new memories with minimal parameters
    - health: Service health and status check

Token Efficiency:
    - search(5 results): ~385 tokens (vs ~2,625, 85% reduction)
    - store(): ~15 tokens (vs ~150, 90% reduction)
    - health(): ~20 tokens (vs ~125, 84% reduction)

Performance:
    - Cold call: ~50ms (storage initialization)
    - Warm call: ~5-10ms (connection reused)
    - Memory overhead: <10MB
"""

import logging
import time
from typing import Optional, Union, List
from .types import (
    CompactMemory, CompactSearchResult, CompactHealthInfo,
    CompactConsolidationResult, CompactSchedulerStatus
)
from .client import get_storage_async, get_consolidator, get_scheduler
from .sync_wrapper import sync_wrapper
from ..models.memory import Memory
from ..utils.hashing import generate_content_hash

logger = logging.getLogger(__name__)


@sync_wrapper
async def search(
    query: str,
    limit: int = 5,
    tags: Optional[List[str]] = None
) -> CompactSearchResult:
    """
    Search memories using semantic similarity.

    Token efficiency: ~25 tokens (query + params) + ~73 tokens per result
    Example (5 results): ~385 tokens vs ~2,625 tokens (85% reduction)

    Args:
        query: Search query text (natural language)
        limit: Maximum number of results to return (default: 5)
        tags: Optional list of tags to filter results

    Returns:
        CompactSearchResult with minimal memory representations

    Raises:
        RuntimeError: If storage backend is not available
        ValueError: If query is empty or limit is invalid

    Example:
        >>> from mcp_memory_service.api import search
        >>> results = search("recent architecture changes", limit=3)
        >>> print(results)
        SearchResult(found=3, shown=3)
        >>> for m in results.memories:
        ...     print(f"{m.hash}: {m.preview[:50]}...")
        abc12345: Implemented OAuth 2.1 authentication for...
        def67890: Refactored storage backend to support...
        ghi11121: Added hybrid mode for Cloudflare sync...

    Performance:
        - First call: ~50ms (includes storage initialization)
        - Subsequent calls: ~5-10ms (connection reused)
        - Scales linearly with limit (5ms + 1ms per result)
    """
    # Validate input
    if not query or not query.strip():
        raise ValueError("Query cannot be empty")
    if limit < 1:
        raise ValueError("Limit must be at least 1")
    if limit > 100:
        logger.warning(f"Large limit ({limit}) may impact performance")

    # Get storage instance
    storage = await get_storage_async()

    # Perform semantic search
    query_results = await storage.retrieve(query, n_results=limit)

    # Filter by tags if specified
    if tags:
        tag_set = set(tags)
        query_results = [
            r for r in query_results
            if any(tag in tag_set for tag in r.memory.tags)
        ]

    # Convert to compact format
    compact_memories = tuple(
        CompactMemory(
            hash=r.memory.content_hash[:8],  # 8-char hash
            preview=r.memory.content[:200],   # First 200 chars
            tags=tuple(r.memory.tags),        # Immutable tuple
            created=r.memory.created_at,      # Unix timestamp
            score=r.relevance_score           # Relevance score
        )
        for r in query_results
    )

    return CompactSearchResult(
        memories=compact_memories,
        total=len(compact_memories),
        query=query
    )


@sync_wrapper
async def store(
    content: str,
    tags: Optional[Union[str, List[str]]] = None,
    memory_type: str = "note"
) -> str:
    """
    Store a new memory.

    Token efficiency: ~15 tokens (params only)
    vs ~150 tokens for MCP tool call with schema (90% reduction)

    Args:
        content: Memory content text
        tags: Single tag or list of tags (optional)
        memory_type: Memory type classification (default: "note")

    Returns:
        8-character content hash of stored memory

    Raises:
        RuntimeError: If storage operation fails
        ValueError: If content is empty

    Example:
        >>> from mcp_memory_service.api import store
        >>> hash = store(
        ...     "Implemented OAuth 2.1 authentication",
        ...     tags=["authentication", "security", "feature"]
        ... )
        >>> print(f"Stored: {hash}")
        Stored: abc12345

    Performance:
        - First call: ~50ms (includes storage initialization)
        - Subsequent calls: ~10-20ms (includes embedding generation)
        - Scales with content length (20ms + 0.5ms per 100 chars)
    """
    # Validate input
    if not content or not content.strip():
        raise ValueError("Content cannot be empty")

    # Normalize tags to list
    if tags is None:
        tag_list = []
    elif isinstance(tags, str):
        tag_list = [tags]
    else:
        tag_list = list(tags)

    # Generate content hash
    content_hash = generate_content_hash(content)

    # Create memory object
    memory = Memory(
        content=content,
        content_hash=content_hash,
        tags=tag_list,
        memory_type=memory_type,
        metadata={}
    )

    # Get storage instance
    storage = await get_storage_async()

    # Store memory
    success, message = await storage.store(memory)

    if not success:
        raise RuntimeError(f"Failed to store memory: {message}")

    # Return short hash (8 chars)
    return content_hash[:8]


@sync_wrapper
async def health() -> CompactHealthInfo:
    """
    Get service health and status.

    Token efficiency: ~20 tokens
    vs ~125 tokens for MCP health check tool (84% reduction)

    Returns:
        CompactHealthInfo with backend, count, and ready status

    Raises:
        RuntimeError: If unable to retrieve health information

    Example:
        >>> from mcp_memory_service.api import health
        >>> info = health()
        >>> print(f"Status: {info.status}")
        Status: healthy
        >>> print(f"Backend: {info.backend}, Count: {info.count}")
        Backend: sqlite_vec, Count: 1247

    Performance:
        - First call: ~50ms (includes storage initialization)
        - Subsequent calls: ~5ms (cached stats)
    """
    try:
        # Get storage instance
        storage = await get_storage_async()

        # Get storage statistics
        stats = await storage.get_stats()

        # Determine status
        status = "healthy"
        if stats.get("status") == "degraded":
            status = "degraded"
        elif stats.get("status") == "error":
            status = "error"
        elif not stats.get("initialized", True):
            status = "error"

        # Extract backend type
        backend = stats.get("storage_backend", "unknown")

        # Extract memory count
        count = stats.get("total_memories", 0)

        return CompactHealthInfo(
            status=status,
            count=count,
            backend=backend
        )

    except Exception as e:
        logger.error(f"Health check failed: {e}")
        return CompactHealthInfo(
            status="error",
            count=0,
            backend="unknown"
        )


async def _consolidate_async(time_horizon: str) -> CompactConsolidationResult:
    """
    Internal async implementation of consolidation.

    This function contains the core consolidation logic and is used by both
    the sync-wrapped API function and the FastAPI endpoint to avoid duplication.
    """
    # Validate time horizon
    valid_horizons = ['daily', 'weekly', 'monthly', 'quarterly', 'yearly']
    if time_horizon not in valid_horizons:
        raise ValueError(
            f"Invalid time_horizon: {time_horizon}. "
            f"Must be one of: {', '.join(valid_horizons)}"
        )

    # Get consolidator instance
    consolidator = get_consolidator()
    if consolidator is None:
        raise RuntimeError(
            "Consolidator not available. "
            "Consolidation requires HTTP server with MCP_CONSOLIDATION_ENABLED=true. "
            "Start the HTTP server first."
        )

    try:
        # Record start time
        start_time = time.time()

        # Run consolidation
        logger.info(f"Running {time_horizon} consolidation...")
        result = await consolidator.consolidate(time_horizon)

        # Calculate duration
        duration = time.time() - start_time

        # Extract metrics from result (ConsolidationReport object)
        processed = result.memories_processed
        compressed = result.memories_compressed
        forgotten = result.memories_archived
        status = 'completed' if not result.errors else 'completed_with_errors'

        logger.info(
        f"🎉 Consolidation completed successfully! Processed: {processed}, Compressed: {compressed}, Forgotten: {forgotten} (Total time: {duration:.1f}s)"
        )

        return CompactConsolidationResult(
            status=status,
            horizon=time_horizon,
            processed=processed,
            compressed=compressed,
            forgotten=forgotten,
            duration=duration
        )

    except Exception as e:
        logger.error(f"Consolidation failed: {e}")
        return CompactConsolidationResult(
            status="failed",
            horizon=time_horizon,
            processed=0,
            compressed=0,
            forgotten=0,
            duration=0.0
        )


@sync_wrapper
async def consolidate(time_horizon: str = "weekly") -> CompactConsolidationResult:
    """
    Trigger memory consolidation for a specific time horizon.

    Token efficiency: ~40 tokens (result only)
    vs ~250 tokens for MCP consolidation result (84% reduction)

    Args:
        time_horizon: Time horizon for consolidation
            ('daily' | 'weekly' | 'monthly' | 'quarterly' | 'yearly')

    Returns:
        CompactConsolidationResult with operation metrics

    Raises:
        RuntimeError: If consolidation fails or consolidator not available
        ValueError: If time_horizon is invalid

    Example:
        >>> from mcp_memory_service.api import consolidate
        >>> result = consolidate('weekly')
        >>> print(result)
        Consolidation(completed, weekly, 2418 processed)
        >>> print(f"Compressed: {result.compressed}, Forgotten: {result.forgotten}")
        Compressed: 156, Forgotten: 43

    Performance:
        - Typical duration: 10-30 seconds (depends on memory count)
        - Scales linearly with total memories (~10ms per memory)
        - Background operation (non-blocking in HTTP server context)

    Note:
        Requires HTTP server with consolidation enabled. If called when
        HTTP server is not running, will raise RuntimeError.
    """
    return await _consolidate_async(time_horizon)


async def _scheduler_status_async() -> CompactSchedulerStatus:
    """
    Internal async implementation of scheduler status.

    This function contains the core status logic and is used by both
    the sync-wrapped API function and the FastAPI endpoint to avoid duplication.
    """
    # Get scheduler instance
    scheduler = get_scheduler()
    if scheduler is None:
        logger.warning("Scheduler not available")
        return CompactSchedulerStatus(
            running=False,
            next_daily=None,
            next_weekly=None,
            next_monthly=None,
            jobs_executed=0,
            jobs_failed=0
        )

    try:
        # Get scheduler status
        if hasattr(scheduler, 'scheduler') and scheduler.scheduler is not None:
            # Scheduler is running
            jobs = scheduler.scheduler.get_jobs()

            # Extract next run times for each horizon
            next_daily = None
            next_weekly = None
            next_monthly = None

            for job in jobs:
                if job.next_run_time:
                    timestamp = job.next_run_time.timestamp()
                    if 'daily' in job.id.lower():
                        next_daily = timestamp
                    elif 'weekly' in job.id.lower():
                        next_weekly = timestamp
                    elif 'monthly' in job.id.lower():
                        next_monthly = timestamp

            # Get execution statistics
            jobs_executed = scheduler.execution_stats.get('successful_jobs', 0)
            jobs_failed = scheduler.execution_stats.get('failed_jobs', 0)

            return CompactSchedulerStatus(
                running=True,
                next_daily=next_daily,
                next_weekly=next_weekly,
                next_monthly=next_monthly,
                jobs_executed=jobs_executed,
                jobs_failed=jobs_failed
            )
        else:
            # Scheduler exists but not running
            return CompactSchedulerStatus(
                running=False,
                next_daily=None,
                next_weekly=None,
                next_monthly=None,
                jobs_executed=0,
                jobs_failed=0
            )

    except Exception as e:
        logger.error(f"Failed to get scheduler status: {e}")
        return CompactSchedulerStatus(
            running=False,
            next_daily=None,
            next_weekly=None,
            next_monthly=None,
            jobs_executed=0,
            jobs_failed=0
        )


@sync_wrapper
async def scheduler_status() -> CompactSchedulerStatus:
    """
    Get consolidation scheduler status and next run times.

    Token efficiency: ~25 tokens
    vs ~150 tokens for MCP scheduler_status tool (83% reduction)

    Returns:
        CompactSchedulerStatus with scheduler state and job statistics

    Raises:
        RuntimeError: If scheduler not available

    Example:
        >>> from mcp_memory_service.api import scheduler_status
        >>> status = scheduler_status()
        >>> print(status)
        Scheduler(running, executed=42, failed=0)
        >>> if status.next_daily:
        ...     from datetime import datetime
        ...     next_run = datetime.fromtimestamp(status.next_daily)
        ...     print(f"Next daily: {next_run}")

    Performance:
        - Execution time: <5ms (reads cached state)
        - No storage access required
        - Lightweight status query

    Note:
        Requires HTTP server with consolidation scheduler enabled.
        Returns STOPPED status if scheduler not running.
    """
    return await _scheduler_status_async()

```

--------------------------------------------------------------------------------
/src/mcp_memory_service/utils/db_utils.py:
--------------------------------------------------------------------------------

```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Utilities for database validation and health checks."""
from typing import Dict, Any, Tuple
import logging
import os
import json
from datetime import datetime
import importlib

logger = logging.getLogger(__name__)

async def validate_database(storage) -> Tuple[bool, str]:
    """Validate database health and configuration."""
    try:
        # Check if storage is properly initialized
        if storage is None:
            return False, "Storage is not initialized"
        
        # Special case for direct access without checking for attribute 'collection'
        # This fixes compatibility issues with SQLite-vec and other storage backends
        storage_type = storage.__class__.__name__
        
        # First, use the 'is_initialized' method if available (preferred)
        if hasattr(storage, 'is_initialized') and callable(storage.is_initialized):
            try:
                init_status = storage.is_initialized()
                if not init_status:
                    # Get detailed status for debugging
                    if hasattr(storage, 'get_initialization_status') and callable(storage.get_initialization_status):
                        status = storage.get_initialization_status()
                        return False, f"Storage not fully initialized: {status}"
                    else:
                        return False, "Storage initialization incomplete"
            except Exception as init_error:
                logger.warning(f"Error checking initialization status: {init_error}")
                # Continue with alternative checks
        
        # SQLite-vec backend validation
        if storage_type == "SqliteVecMemoryStorage":
            if not hasattr(storage, 'conn') or storage.conn is None:
                return False, "SQLite database connection is not initialized"
            
            # Check for database health
            try:
                # Make sure the tables exist
                try:
                    cursor = storage.conn.execute('SELECT name FROM sqlite_master WHERE type="table" AND name="memories"')
                    if not cursor.fetchone():
                        return False, "SQLite database is missing required tables"
                except Exception as table_error:
                    return False, f"Failed to check for tables: {str(table_error)}"
                
                # Try a simple query to verify database connection
                cursor = storage.conn.execute('SELECT COUNT(*) FROM memories')
                memory_count = cursor.fetchone()[0]
                logger.info(f"SQLite-vec database contains {memory_count} memories")
                
                # Test if embedding generation works (if model is available)
                if hasattr(storage, 'embedding_model') and storage.embedding_model:
                    test_text = "Database validation test"
                    embedding = storage._generate_embedding(test_text)
                    if not embedding or len(embedding) != storage.embedding_dimension:
                        logger.warning("Embedding generation may not be working properly")
                else:
                    logger.warning("No embedding model available, some functionality may be limited")
                
                return True, "SQLite-vec database validation successful"
                
            except Exception as e:
                return False, f"SQLite database access error: {str(e)}"

        # Cloudflare storage validation
        elif storage_type == "CloudflareStorage":
            try:
                # Check if storage is properly initialized
                if not hasattr(storage, 'client') or storage.client is None:
                    return False, "Cloudflare storage client is not initialized"

                # Check basic connectivity by getting stats
                stats = await storage.get_stats()
                memory_count = stats.get("total_memories", 0)
                logger.info(f"Cloudflare storage contains {memory_count} memories")

                # Test embedding generation if available
                test_text = "Database validation test"
                try:
                    embedding = await storage._generate_embedding(test_text)
                    if not embedding or not isinstance(embedding, list):
                        logger.warning("Embedding generation may not be working properly")
                except Exception as embed_error:
                    logger.warning(f"Embedding test failed: {str(embed_error)}")

                return True, "Cloudflare storage validation successful"

            except Exception as e:
                return False, f"Cloudflare storage access error: {str(e)}"

        else:
            return False, f"Unknown storage type: {storage_type}"
            
    except Exception as e:
        logger.error(f"Database validation failed: {str(e)}")
        return False, f"Database validation failed: {str(e)}"

async def get_database_stats(storage) -> Dict[str, Any]:
    """Get detailed database statistics with proper error handling."""
    try:
        # Check if storage is properly initialized
        if storage is None:
            return {
                "status": "error",
                "error": "Storage is not initialized"
            }
        
        # Determine storage type
        storage_type = storage.__class__.__name__
        
        # SQLite-vec backend stats
        if storage_type == "SqliteVecMemoryStorage":
            # Use the storage's own stats method if available
            if hasattr(storage, 'get_stats') and callable(storage.get_stats):
                try:
                    stats = storage.get_stats()
                    stats["status"] = "healthy"
                    return stats
                except Exception as stats_error:
                    logger.warning(f"Error calling get_stats method: {stats_error}")
                    # Fall back to our implementation
            
            # Otherwise, gather basic stats
            if not hasattr(storage, 'conn') or storage.conn is None:
                return {
                    "status": "error",
                    "error": "SQLite database connection is not initialized"
                }
            
            try:
                # Check if tables exist
                cursor = storage.conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
                tables = [row[0] for row in cursor.fetchall()]
                
                # Count memories if the table exists
                memory_count = 0
                if 'memories' in tables:
                    cursor = storage.conn.execute('SELECT COUNT(*) FROM memories')
                    memory_count = cursor.fetchone()[0]
                
                # Get unique tags if the table exists
                unique_tags = 0
                if 'memories' in tables:
                    cursor = storage.conn.execute('SELECT COUNT(DISTINCT tags) FROM memories WHERE tags != ""')
                    unique_tags = cursor.fetchone()[0]
                
                # Get database file size
                db_path = storage.db_path if hasattr(storage, 'db_path') else "unknown"
                file_size = os.path.getsize(db_path) if isinstance(db_path, str) and os.path.exists(db_path) else 0
                
                # Get embedding model info
                embedding_model = "unknown"
                embedding_dimension = 0
                
                if hasattr(storage, 'embedding_model_name'):
                    embedding_model = storage.embedding_model_name
                
                if hasattr(storage, 'embedding_dimension'):
                    embedding_dimension = storage.embedding_dimension
                
                # Gather tables information
                tables_info = {}
                for table in tables:
                    try:
                        cursor = storage.conn.execute(f"SELECT COUNT(*) FROM {table}")
                        count = cursor.fetchone()[0]
                        tables_info[table] = {"count": count}
                    except Exception:
                        tables_info[table] = {"count": "unknown"}
                
                return {
                    "backend": "sqlite-vec",
                    "status": "healthy",
                    "total_memories": memory_count,
                    "unique_tags": unique_tags,
                    "database_size_bytes": file_size,
                    "database_size_mb": round(file_size / (1024 * 1024), 2) if file_size > 0 else 0,
                    "embedding_model": embedding_model,
                    "embedding_dimension": embedding_dimension,
                    "tables": tables,
                    "tables_info": tables_info
                }
            except Exception as e:
                return {
                    "status": "error",
                    "error": f"Error getting SQLite-vec stats: {str(e)}"
                }
        
        # Cloudflare storage stats
        elif storage_type == "CloudflareStorage":
            try:
                # Get storage stats from the Cloudflare storage implementation
                storage_stats = await storage.get_stats()

                # Add cloudflare-specific info
                cloudflare_info = {
                    "vectorize_index": storage.vectorize_index,
                    "d1_database_id": storage.d1_database_id,
                    "r2_bucket": storage.r2_bucket,
                    "embedding_model": storage.embedding_model,
                    "large_content_threshold": storage.large_content_threshold
                }

                return {
                    **storage_stats,
                    "cloudflare": cloudflare_info,
                    "backend": "cloudflare",
                    "status": "healthy"
                }

            except Exception as stats_error:
                return {
                    "status": "error",
                    "error": f"Error getting Cloudflare stats: {str(stats_error)}",
                    "backend": "cloudflare"
                }

        else:
            return {
                "status": "error",
                "error": f"Unknown storage type: {storage_type}"
            }
            
    except Exception as e:
        logger.error(f"Error getting database stats: {str(e)}")
        return {
            "status": "error",
            "error": str(e)
        }

async def repair_database(storage) -> Tuple[bool, str]:
    """Attempt to repair database issues."""
    try:
        # Determine storage type
        storage_type = storage.__class__.__name__
        
        # SQLite-vec backend repair
        if storage_type == "SqliteVecMemoryStorage":
            # For SQLite, we'll try to check and recreate the tables if needed
            if not hasattr(storage, 'conn') or storage.conn is None:
                # Try to reconnect
                try:
                    storage.conn = storage.conn or __import__('sqlite3').connect(storage.db_path)
                    
                    # Try to reload the extension
                    if importlib.util.find_spec('sqlite_vec'):
                        import sqlite_vec
                        storage.conn.enable_load_extension(True)
                        sqlite_vec.load(storage.conn)
                        storage.conn.enable_load_extension(False)
                    
                    # Recreate tables if needed
                    storage.conn.execute('''
                        CREATE TABLE IF NOT EXISTS memories (
                            id INTEGER PRIMARY KEY AUTOINCREMENT,
                            content_hash TEXT UNIQUE NOT NULL,
                            content TEXT NOT NULL,
                            tags TEXT,
                            memory_type TEXT,
                            metadata TEXT,
                            created_at REAL,
                            updated_at REAL,
                            created_at_iso TEXT,
                            updated_at_iso TEXT
                        )
                    ''')
                    
                    # Create virtual table for vector embeddings
                    embedding_dimension = getattr(storage, 'embedding_dimension', 384)
                    storage.conn.execute(f'''
                        CREATE VIRTUAL TABLE IF NOT EXISTS memory_embeddings USING vec0(
                            content_embedding FLOAT[{embedding_dimension}]
                        )
                    ''')
                    
                    # Create indexes for better performance
                    storage.conn.execute('CREATE INDEX IF NOT EXISTS idx_content_hash ON memories(content_hash)')
                    storage.conn.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON memories(created_at)')
                    storage.conn.execute('CREATE INDEX IF NOT EXISTS idx_memory_type ON memories(memory_type)')
                    
                    storage.conn.commit()
                    return True, "SQLite-vec database repaired"
                    
                except Exception as e:
                    return False, f"SQLite-vec repair failed: {str(e)}"
        
        # Cloudflare storage repair
        elif storage_type == "CloudflareStorage":
            # For Cloudflare storage, we can't repair infrastructure (Vectorize, D1, R2)
            # but we can validate the connection and re-initialize if needed
            try:
                # Validate current state
                is_valid, message = await validate_database(storage)
                if is_valid:
                    return True, "Cloudflare storage is already healthy"

                # Try to re-initialize the storage connection
                await storage.initialize()

                # Validate repair
                is_valid, message = await validate_database(storage)
                if is_valid:
                    return True, "Cloudflare storage connection successfully repaired"
                else:
                    return False, f"Cloudflare storage repair failed: {message}"

            except Exception as repair_error:
                return False, f"Cloudflare storage repair failed: {str(repair_error)}"

        else:
            return False, f"Unknown storage type: {storage_type}, cannot repair"
                
    except Exception as e:
        logger.error(f"Error repairing database: {str(e)}")
        return False, f"Error repairing database: {str(e)}"
```

--------------------------------------------------------------------------------
/tests/integration/test_data_serialization_consistency.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test script to examine data serialization differences that could cause Issue #99.
This focuses on how Memory objects are serialized/deserialized in different contexts
without requiring the full MCP server stack.
"""

import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'src'))

import asyncio
import json
import tempfile
from typing import Dict, List, Any
from datetime import datetime
import time

from mcp_memory_service.models.memory import Memory
from mcp_memory_service.utils.hashing import generate_content_hash
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage

class DataSerializationTest:
    """Test suite examining memory data serialization consistency."""

    def __init__(self):
        self.storage = None
        self.test_memories = []

    async def setup(self):
        """Set up test environment."""
        print("=== Setting up data serialization test environment ===")

        self.temp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
        self.temp_db.close()

        self.storage = SqliteVecMemoryStorage(
            db_path=self.temp_db.name,
            embedding_model="all-MiniLM-L6-v2"
        )
        await self.storage.initialize()
        print(f"✅ Storage initialized: {self.temp_db.name}")

    async def cleanup(self):
        """Clean up test environment."""
        self.storage = None
        if hasattr(self, 'temp_db') and os.path.exists(self.temp_db.name):
            os.unlink(self.temp_db.name)
            print("✅ Test database cleaned up")

    def create_hook_style_memory_with_metadata(self) -> Memory:
        """Create a memory simulating hook-generated content with rich metadata."""
        content = "Implemented comprehensive testing for Issue #99 memory storage inconsistency"

        memory = Memory(
            content=content,
            content_hash=generate_content_hash(content),
            tags=["claude-code-session", "session-consolidation", "issue-99", "mcp-memory-service", "language:python"],
            memory_type="session-summary",
            metadata={
                "session_analysis": {
                    "topics": ["testing", "storage", "consistency"],
                    "decisions_count": 3,
                    "insights_count": 5,
                    "confidence": 0.92
                },
                "project_context": {
                    "name": "mcp-memory-service",
                    "language": "python",
                    "frameworks": ["fastapi", "chromadb"]
                },
                "generated_by": "claude-code-session-end-hook",
                "generated_at": "2025-09-14T04:42:15.123Z"
            }
        )

        print(f"🔧 Created hook-style memory with {len(memory.tags)} tags and rich metadata")
        return memory

    def create_manual_memory_with_minimal_metadata(self) -> Memory:
        """Create a memory simulating manual /memory-store with minimal metadata."""
        content = "Manual note about Issue #99 analysis findings"

        memory = Memory(
            content=content,
            content_hash=generate_content_hash(content),
            tags=["issue-99", "analysis", "findings", "manual-note"],
            memory_type="note",
            metadata={
                "source": "user-input",
                "created_by": "manual-storage"
            }
        )

        print(f"📝 Created manual-style memory with {len(memory.tags)} tags and minimal metadata")
        return memory

    async def test_memory_serialization_roundtrip(self):
        """Test 1: Examine serialization/deserialization consistency."""
        print("\n🧪 Test 1: Memory Serialization Roundtrip Analysis")
        print("-" * 60)

        # Create test memories
        hook_memory = self.create_hook_style_memory_with_metadata()
        manual_memory = self.create_manual_memory_with_minimal_metadata()

        # Test serialization to dict and back
        hook_dict = hook_memory.to_dict()
        manual_dict = manual_memory.to_dict()

        print(f"📊 Hook memory dict keys: {sorted(hook_dict.keys())}")
        print(f"📊 Manual memory dict keys: {sorted(manual_dict.keys())}")

        # Test deserialization from dict
        hook_restored = Memory.from_dict(hook_dict)
        manual_restored = Memory.from_dict(manual_dict)

        # Compare original vs restored
        hook_consistency = {
            "content_match": hook_memory.content == hook_restored.content,
            "tags_match": hook_memory.tags == hook_restored.tags,
            "metadata_match": hook_memory.metadata == hook_restored.metadata,
            "created_at_preserved": abs((hook_memory.created_at or 0) - (hook_restored.created_at or 0)) < 0.001,
            "created_at_iso_preserved": hook_memory.created_at_iso == hook_restored.created_at_iso
        }

        manual_consistency = {
            "content_match": manual_memory.content == manual_restored.content,
            "tags_match": manual_memory.tags == manual_restored.tags,
            "metadata_match": manual_memory.metadata == manual_restored.metadata,
            "created_at_preserved": abs((manual_memory.created_at or 0) - (manual_restored.created_at or 0)) < 0.001,
            "created_at_iso_preserved": manual_memory.created_at_iso == manual_restored.created_at_iso
        }

        print(f"\n📋 Hook memory serialization consistency:")
        for key, value in hook_consistency.items():
            status = "✅" if value else "❌"
            print(f"  {status} {key}: {value}")

        print(f"\n📋 Manual memory serialization consistency:")
        for key, value in manual_consistency.items():
            status = "✅" if value else "❌"
            print(f"  {status} {key}: {value}")

        return {
            "hook_consistency": hook_consistency,
            "manual_consistency": manual_consistency,
            "hook_dict": hook_dict,
            "manual_dict": manual_dict
        }

    async def test_storage_backend_handling(self):
        """Test 2: Examine how storage backend handles different memory types."""
        print("\n🧪 Test 2: Storage Backend Handling Analysis")
        print("-" * 60)

        # Create and store different memory types
        hook_memory = self.create_hook_style_memory_with_metadata()
        manual_memory = self.create_manual_memory_with_minimal_metadata()

        # Store both memories
        hook_store_result = await self.storage.store(hook_memory)
        manual_store_result = await self.storage.store(manual_memory)

        print(f"📤 Hook memory storage result: {hook_store_result}")
        print(f"📤 Manual memory storage result: {manual_store_result}")

        # Retrieve memories back
        hook_retrieved = await self.storage.retrieve(hook_memory.content, n_results=1)
        manual_retrieved = await self.storage.retrieve(manual_memory.content, n_results=1)

        storage_analysis = {
            "hook_stored_successfully": hook_store_result[0],
            "manual_stored_successfully": manual_store_result[0],
            "hook_retrieved_count": len(hook_retrieved),
            "manual_retrieved_count": len(manual_retrieved)
        }

        if hook_retrieved:
            retrieved_hook = hook_retrieved[0].memory
            storage_analysis["hook_retrieval_analysis"] = {
                "content_preserved": retrieved_hook.content == hook_memory.content,
                "tags_preserved": retrieved_hook.tags == hook_memory.tags,
                "timestamp_preserved": (
                    retrieved_hook.created_at is not None and
                    retrieved_hook.created_at_iso is not None
                ),
                "metadata_preserved": bool(retrieved_hook.metadata)
            }

            print(f"\n📥 Retrieved hook memory analysis:")
            for key, value in storage_analysis["hook_retrieval_analysis"].items():
                status = "✅" if value else "❌"
                print(f"  {status} {key}: {value}")

        if manual_retrieved:
            retrieved_manual = manual_retrieved[0].memory
            storage_analysis["manual_retrieval_analysis"] = {
                "content_preserved": retrieved_manual.content == manual_memory.content,
                "tags_preserved": retrieved_manual.tags == manual_memory.tags,
                "timestamp_preserved": (
                    retrieved_manual.created_at is not None and
                    retrieved_manual.created_at_iso is not None
                ),
                "metadata_preserved": bool(retrieved_manual.metadata)
            }

            print(f"\n📥 Retrieved manual memory analysis:")
            for key, value in storage_analysis["manual_retrieval_analysis"].items():
                status = "✅" if value else "❌"
                print(f"  {status} {key}: {value}")

        return storage_analysis

    async def test_timestamp_precision_handling(self):
        """Test 3: Examine timestamp precision across serialization boundaries."""
        print("\n🧪 Test 3: Timestamp Precision Analysis")
        print("-" * 60)

        # Create memory with very specific timestamp
        precise_timestamp = time.time()
        precise_iso = datetime.fromtimestamp(precise_timestamp).isoformat() + "Z"

        memory = Memory(
            content="Testing timestamp precision across storage boundaries",
            content_hash=generate_content_hash("Testing timestamp precision"),
            tags=["timestamp-test", "precision"],
            memory_type="test",
            created_at=precise_timestamp,
            created_at_iso=precise_iso
        )

        print(f"🕐 Original timestamp (float): {precise_timestamp}")
        print(f"🕐 Original timestamp (ISO): {precise_iso}")

        # Test serialization
        memory_dict = memory.to_dict()
        print(f"🔄 Serialized timestamp fields: {json.dumps({k:v for k,v in memory_dict.items() if 'timestamp' in k or 'created_at' in k}, indent=2)}")

        # Test deserialization
        restored_memory = Memory.from_dict(memory_dict)
        print(f"🔄 Restored timestamp (float): {restored_memory.created_at}")
        print(f"🔄 Restored timestamp (ISO): {restored_memory.created_at_iso}")

        # Test storage roundtrip
        store_result = await self.storage.store(memory)
        retrieved_results = await self.storage.retrieve(memory.content, n_results=1)

        precision_analysis = {
            "serialization_preserves_precision": abs(precise_timestamp - (restored_memory.created_at or 0)) < 0.001,
            "iso_format_preserved": precise_iso == restored_memory.created_at_iso,
            "storage_successful": store_result[0],
            "retrieval_successful": len(retrieved_results) > 0
        }

        if retrieved_results:
            stored_memory = retrieved_results[0].memory
            precision_analysis.update({
                "storage_preserves_float_precision": abs(precise_timestamp - (stored_memory.created_at or 0)) < 0.001,
                "storage_preserves_iso_format": precise_iso == stored_memory.created_at_iso
            })

            print(f"💾 Storage preserved timestamp (float): {stored_memory.created_at}")
            print(f"💾 Storage preserved timestamp (ISO): {stored_memory.created_at_iso}")

        print(f"\n📊 Precision analysis:")
        for key, value in precision_analysis.items():
            status = "✅" if value else "❌"
            print(f"  {status} {key}: {value}")

        return precision_analysis

    async def run_all_tests(self):
        """Run all data serialization tests."""
        print("=" * 70)
        print("MCP Memory Service: Data Serialization Consistency Analysis")
        print("Investigating Issue #99 - Root Cause Analysis")
        print("=" * 70)

        try:
            await self.setup()

            # Run individual tests
            serialization_test = await self.test_memory_serialization_roundtrip()
            storage_test = await self.test_storage_backend_handling()
            precision_test = await self.test_timestamp_precision_handling()

            # Analyze overall results
            print("\n" + "=" * 70)
            print("COMPREHENSIVE ANALYSIS SUMMARY")
            print("=" * 70)

            tests_passed = 0
            total_tests = 3

            # Serialization consistency
            hook_serialization_ok = all(serialization_test["hook_consistency"].values())
            manual_serialization_ok = all(serialization_test["manual_consistency"].values())

            if hook_serialization_ok and manual_serialization_ok:
                print("✅ PASS: Memory serialization/deserialization consistent")
                tests_passed += 1
            else:
                print("❌ FAIL: Serialization inconsistencies detected")

            # Storage backend handling
            storage_ok = (
                storage_test.get("hook_stored_successfully", False) and
                storage_test.get("manual_stored_successfully", False) and
                storage_test.get("hook_retrieved_count", 0) > 0 and
                storage_test.get("manual_retrieved_count", 0) > 0
            )

            if storage_ok:
                print("✅ PASS: Storage backend handles both memory types consistently")
                tests_passed += 1
            else:
                print("❌ FAIL: Storage backend handling inconsistencies")

            # Timestamp precision
            precision_ok = (
                precision_test.get("serialization_preserves_precision", False) and
                precision_test.get("storage_preserves_float_precision", False)
            )

            if precision_ok:
                print("✅ PASS: Timestamp precision maintained across boundaries")
                tests_passed += 1
            else:
                print("❌ FAIL: Timestamp precision issues detected")

            print(f"\nOverall Result: {tests_passed}/{total_tests} tests passed")

            # Root cause analysis
            print("\n🔍 ROOT CAUSE ANALYSIS:")

            if tests_passed == total_tests:
                print("• ✅ Memory objects themselves are consistent")
                print("• ✅ Storage backend handles both types properly")
                print("• ✅ Timestamp precision is maintained")
                print("\n🎯 CONCLUSION: Issue #99 is likely in:")
                print("  - Search/retrieval query logic")
                print("  - Time-based filtering implementation")
                print("  - Tag-based search differences")
                print("  - Client-side display logic")
                print("\n💡 RECOMMENDATION: Focus investigation on search and retrieval functions")
            else:
                print("• ❌ Detected inconsistencies in core data handling")
                print("• This confirms the storage-level issues described in Discussion #98")

            return tests_passed == total_tests

        finally:
            await self.cleanup()

async def main():
    """Main test execution."""
    test_suite = DataSerializationTest()
    success = await test_suite.run_all_tests()
    return 0 if success else 1

if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code)
```

--------------------------------------------------------------------------------
/src/mcp_memory_service/web/static/sse_test.html:
--------------------------------------------------------------------------------

```html
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>MCP Memory Service - SSE Test</title>
    <style>
        body {
            font-family: 'Courier New', monospace;
            margin: 20px;
            background-color: #f5f5f5;
        }
        .container {
            max-width: 1200px;
            margin: 0 auto;
        }
        h1 {
            color: #333;
            text-align: center;
        }
        .status {
            padding: 10px;
            margin: 10px 0;
            border-radius: 5px;
            font-weight: bold;
        }
        .status.connected {
            background-color: #d4edda;
            color: #155724;
            border: 1px solid #c3e6cb;
        }
        .status.disconnected {
            background-color: #f8d7da;
            color: #721c24;
            border: 1px solid #f5c6cb;
        }
        .controls {
            margin: 20px 0;
            text-align: center;
        }
        button {
            padding: 10px 20px;
            margin: 5px;
            border: none;
            border-radius: 5px;
            cursor: pointer;
            font-size: 14px;
        }
        .btn-primary {
            background-color: #007bff;
            color: white;
        }
        .btn-secondary {
            background-color: #6c757d;
            color: white;
        }
        .btn-danger {
            background-color: #dc3545;
            color: white;
        }
        .events-container {
            display: flex;
            gap: 20px;
        }
        .events-log {
            flex: 1;
            background-color: #000;
            color: #00ff00;
            padding: 15px;
            border-radius: 5px;
            height: 500px;
            overflow-y: auto;
            font-size: 12px;
            line-height: 1.4;
        }
        .stats-panel {
            width: 300px;
            background-color: white;
            padding: 15px;
            border-radius: 5px;
            border: 1px solid #ddd;
        }
        .event-entry {
            margin: 5px 0;
            padding: 5px;
            border-left: 3px solid #00ff00;
            padding-left: 10px;
        }
        .event-memory { border-left-color: #00bfff; }
        .event-search { border-left-color: #ffa500; }
        .event-heartbeat { border-left-color: #ff69b4; }
        .event-connection { border-left-color: #32cd32; }
        .stats-item {
            margin: 10px 0;
            padding: 5px;
            background-color: #f8f9fa;
            border-radius: 3px;
        }
    </style>
</head>
<body>
    <div class="container">
        <h1>🔥 MCP Memory Service - Real-time Events</h1>
        
        <div id="status" class="status disconnected">
            ❌ Disconnected from SSE stream
        </div>
        
        <div class="controls">
            <button id="connectBtn" class="btn-primary" onclick="connectSSE()">Connect to Events</button>
            <button id="disconnectBtn" class="btn-secondary" onclick="disconnectSSE()" disabled>Disconnect</button>
            <button class="btn-primary" onclick="testMemoryOperations()">Test Memory Operations</button>
            <button class="btn-danger" onclick="clearLog()">Clear Log</button>
        </div>
        
        <div class="events-container">
            <div class="events-log" id="eventsLog">
                <div>🚀 SSE Event Log - Waiting for connection...</div>
            </div>
            
            <div class="stats-panel">
                <h3>📊 Connection Stats</h3>
                <div class="stats-item">
                    <strong>Status:</strong> <span id="statsStatus">Disconnected</span>
                </div>
                <div class="stats-item">
                    <strong>Events Received:</strong> <span id="statsEvents">0</span>
                </div>
                <div class="stats-item">
                    <strong>Connection Time:</strong> <span id="statsTime">-</span>
                </div>
                <div class="stats-item">
                    <strong>Last Event:</strong> <span id="statsLastEvent">-</span>
                </div>
                
                <h3>🎯 Event Types</h3>
                <div class="stats-item">
                    💾 Memory Stored: <span id="statMemoryStored">0</span>
                </div>
                <div class="stats-item">
                    🗑️ Memory Deleted: <span id="statMemoryDeleted">0</span>
                </div>
                <div class="stats-item">
                    🔍 Search Completed: <span id="statSearchCompleted">0</span>
                </div>
                <div class="stats-item">
                    💓 Heartbeats: <span id="statHeartbeat">0</span>
                </div>
            </div>
        </div>
    </div>

    <script>
        let eventSource = null;
        let eventCount = 0;
        let connectionStart = null;
        let eventStats = {
            memory_stored: 0,
            memory_deleted: 0,
            search_completed: 0,
            heartbeat: 0
        };

        function updateStatus(connected) {
            const statusEl = document.getElementById('status');
            const connectBtn = document.getElementById('connectBtn');
            const disconnectBtn = document.getElementById('disconnectBtn');
            const statsStatus = document.getElementById('statsStatus');
            
            if (connected) {
                statusEl.className = 'status connected';
                statusEl.innerHTML = '✅ Connected to SSE stream';
                connectBtn.disabled = true;
                disconnectBtn.disabled = false;
                statsStatus.textContent = 'Connected';
                connectionStart = new Date();
            } else {
                statusEl.className = 'status disconnected';
                statusEl.innerHTML = '❌ Disconnected from SSE stream';
                connectBtn.disabled = false;
                disconnectBtn.disabled = true;
                statsStatus.textContent = 'Disconnected';
                connectionStart = null;
            }
        }

        function addLogEntry(message, type = 'info') {
            const log = document.getElementById('eventsLog');
            const entry = document.createElement('div');
            entry.className = `event-entry event-${type}`;
            entry.innerHTML = `<span style="color: #666;">${new Date().toLocaleTimeString()}</span> ${message}`;
            log.appendChild(entry);
            log.scrollTop = log.scrollHeight;
        }

        function updateStats() {
            document.getElementById('statsEvents').textContent = eventCount;
            document.getElementById('statMemoryStored').textContent = eventStats.memory_stored;
            document.getElementById('statMemoryDeleted').textContent = eventStats.memory_deleted;
            document.getElementById('statSearchCompleted').textContent = eventStats.search_completed;
            document.getElementById('statHeartbeat').textContent = eventStats.heartbeat;
            
            if (connectionStart) {
                const duration = Math.floor((new Date() - connectionStart) / 1000);
                document.getElementById('statsTime').textContent = `${duration}s`;
            }
        }

        function connectSSE() {
            if (eventSource) {
                eventSource.close();
            }
            
            addLogEntry('🔌 Connecting to SSE stream...', 'connection');
            
            eventSource = new EventSource('/api/events');
            
            eventSource.onopen = function(event) {
                updateStatus(true);
                addLogEntry('✅ Connected to SSE stream', 'connection');
            };
            
            eventSource.onmessage = function(event) {
                try {
                    const data = JSON.parse(event.data);
                    handleEvent(data, event.type || 'message');
                } catch (e) {
                    addLogEntry(`❌ Invalid JSON: ${event.data}`, 'error');
                }
            };
            
            eventSource.addEventListener('memory_stored', function(event) {
                const data = JSON.parse(event.data);
                handleEvent(data, 'memory_stored');
            });
            
            eventSource.addEventListener('memory_deleted', function(event) {
                const data = JSON.parse(event.data);
                handleEvent(data, 'memory_deleted');
            });
            
            eventSource.addEventListener('search_completed', function(event) {
                const data = JSON.parse(event.data);
                handleEvent(data, 'search_completed');
            });
            
            eventSource.addEventListener('heartbeat', function(event) {
                const data = JSON.parse(event.data);
                handleEvent(data, 'heartbeat');
            });
            
            eventSource.addEventListener('connection_established', function(event) {
                const data = JSON.parse(event.data);
                handleEvent(data, 'connection');
            });
            
            eventSource.onerror = function(event) {
                updateStatus(false);
                addLogEntry('❌ SSE connection error', 'error');
            };
        }

        function disconnectSSE() {
            if (eventSource) {
                eventSource.close();
                eventSource = null;
            }
            updateStatus(false);
            addLogEntry('🔌 Disconnected from SSE stream', 'connection');
        }

        function handleEvent(data, eventType) {
            eventCount++;
            document.getElementById('statsLastEvent').textContent = eventType;
            
            // Update event type stats
            if (eventStats.hasOwnProperty(eventType)) {
                eventStats[eventType]++;
            }
            
            // Format event message
            let message = '';
            let logType = 'info';
            
            switch (eventType) {
                case 'memory_stored':
                    const hash = data.content_hash ? data.content_hash.substring(0, 12) + '...' : 'unknown';
                    const preview = data.content_preview || 'No preview';
                    message = `💾 <strong>Memory Stored:</strong> ${hash}<br>&nbsp;&nbsp;&nbsp;&nbsp;${preview}`;
                    logType = 'memory';
                    break;
                    
                case 'memory_deleted':
                    const delHash = data.content_hash ? data.content_hash.substring(0, 12) + '...' : 'unknown';
                    const success = data.success ? '✅' : '❌';
                    message = `🗑️ <strong>Memory Deleted:</strong> ${success} ${delHash}`;
                    logType = 'memory';
                    break;
                    
                case 'search_completed':
                    const query = data.query || 'Unknown query';
                    const count = data.results_count || 0;
                    const time = data.processing_time_ms || 0;
                    message = `🔍 <strong>Search:</strong> "${query}" → ${count} results (${time.toFixed(1)}ms)`;
                    logType = 'search';
                    break;
                    
                case 'heartbeat':
                    const connections = data.active_connections || 0;
                    message = `💓 <strong>Heartbeat:</strong> ${connections} active connections`;
                    logType = 'heartbeat';
                    break;
                    
                case 'connection_established':
                    message = `🔌 <strong>Connected:</strong> ${data.message || 'Connection established'}`;
                    logType = 'connection';
                    break;
                    
                default:
                    message = `📨 <strong>${eventType}:</strong> ${JSON.stringify(data)}`;
            }
            
            addLogEntry(message, logType);
            updateStats();
        }

        async function testMemoryOperations() {
            addLogEntry('🚀 Starting test memory operations...', 'info');
            
            try {
                // Store a test memory
                const testMemory = {
                    content: `SSE test memory created at ${new Date().toLocaleString()}`,
                    tags: ['sse-test', 'browser-test', 'demo'],
                    memory_type: 'test',
                    metadata: { source: 'sse_test_page' }
                };
                
                const storeResponse = await fetch('/api/memories', {
                    method: 'POST',
                    headers: { 'Content-Type': 'application/json' },
                    body: JSON.stringify(testMemory)
                });
                
                if (storeResponse.ok) {
                    const result = await storeResponse.json();
                    addLogEntry(`✅ Test memory stored: ${result.content_hash?.substring(0, 12)}...`, 'info');
                    
                    // Wait a moment, then perform a search
                    setTimeout(async () => {
                        const searchResponse = await fetch('/api/search', {
                            method: 'POST',
                            headers: { 'Content-Type': 'application/json' },
                            body: JSON.stringify({ query: 'SSE test memory', n_results: 3 })
                        });
                        
                        if (searchResponse.ok) {
                            addLogEntry('✅ Test search completed', 'info');
                            
                            // Wait another moment, then delete the memory
                            setTimeout(async () => {
                                if (result.content_hash) {
                                    const deleteResponse = await fetch(`/api/memories/${result.content_hash}`, {
                                        method: 'DELETE'
                                    });
                                    
                                    if (deleteResponse.ok) {
                                        addLogEntry('✅ Test memory deleted', 'info');
                                    }
                                }
                            }, 2000);
                        }
                    }, 2000);
                }
            } catch (error) {
                addLogEntry(`❌ Test operation failed: ${error.message}`, 'error');
            }
        }

        function clearLog() {
            document.getElementById('eventsLog').innerHTML = '<div>🚀 SSE Event Log - Log cleared</div>';
            eventCount = 0;
            eventStats = { memory_stored: 0, memory_deleted: 0, search_completed: 0, heartbeat: 0 };
            updateStats();
        }

        // Auto-connect on page load
        window.onload = function() {
            // connectSSE(); // Uncomment to auto-connect
        };
        
        // Clean up on page unload
        window.onbeforeunload = function() {
            if (eventSource) {
                eventSource.close();
            }
        };
    </script>
</body>
</html>
```

--------------------------------------------------------------------------------
/scripts/migration/cleanup_mcp_timestamps.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Migration script to clean up timestamp mess in MCP Memory ChromaDB database.
This script will:
1. Backup the database
2. Standardize timestamps to use only the 'timestamp' field as integer
3. Remove redundant timestamp fields
4. Ensure all memories have proper timestamps
"""

import sqlite3
import shutil
import os
import json
from datetime import datetime
import sys
from pathlib import Path

def find_database_path():
    """Find the database path from Cloud Desktop Config or environment variables."""
    # First check environment variables (highest priority)
    if 'MCP_MEMORY_CHROMA_PATH' in os.environ:
        db_dir = os.environ.get('MCP_MEMORY_CHROMA_PATH')
        return os.path.join(db_dir, "chroma.sqlite3")
    
    # Try to find Cloud Desktop Config
    config_paths = [
        os.path.expanduser("~/AppData/Local/DesktopCommander/desktop_config.json"),
        os.path.expanduser("~/.config/DesktopCommander/desktop_config.json"),
        os.path.expanduser("~/DesktopCommander/desktop_config.json")
    ]
    
    for config_path in config_paths:
        if os.path.exists(config_path):
            try:
                with open(config_path, 'r') as f:
                    config = json.load(f)
                
                # Look for memory config
                if 'services' in config and 'memory' in config['services']:
                    memory_config = config['services']['memory']
                    if 'env' in memory_config and 'MCP_MEMORY_CHROMA_PATH' in memory_config['env']:
                        db_dir = memory_config['env']['MCP_MEMORY_CHROMA_PATH']
                        return os.path.join(db_dir, "chroma.sqlite3")
            except Exception as e:
                print(f"Warning: Could not parse config at {config_path}: {e}")
    
    # Fallback paths
    fallback_paths = [
        os.path.expanduser("~/AppData/Local/mcp-memory/chroma.sqlite3"),
        os.path.expanduser("~/.local/share/mcp-memory/chroma.sqlite3"),
    ]
    
    for path in fallback_paths:
        if os.path.exists(path):
            return path
    
    # Ask user if nothing found
    print("Could not automatically determine database path.")
    user_path = input("Please enter the full path to the chroma.sqlite3 database: ")
    return user_path if user_path else None

# Database paths
DB_PATH = find_database_path()
BACKUP_PATH = f"{DB_PATH}.backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" if DB_PATH else None

def backup_database():
    """Create a backup of the database before migration."""
    if not DB_PATH or not BACKUP_PATH:
        print("❌ Cannot create backup: Database path not determined")
        return False
    
    # Ensure backup directory exists
    backup_dir = os.path.dirname(BACKUP_PATH)
    if not os.path.exists(backup_dir):
        try:
            os.makedirs(backup_dir, exist_ok=True)
        except Exception as e:
            print(f"❌ Failed to create backup directory: {e}")
            return False
    
    print(f"Creating backup at: {BACKUP_PATH}")
    try:
        shutil.copy2(DB_PATH, BACKUP_PATH)
        print("✅ Backup created successfully")
        return True
    except Exception as e:
        print(f"❌ Failed to create backup: {e}")
        return False

def analyze_timestamps(conn):
    """Analyze current timestamp situation."""
    print("\n📊 Analyzing current timestamp fields...")
    
    cursor = conn.cursor()
    
    # Get all unique timestamp-related keys
    cursor.execute("""
        SELECT key, 
               COUNT(DISTINCT id) as memory_count,
               COUNT(CASE WHEN string_value IS NOT NULL THEN 1 END) as string_values,
               COUNT(CASE WHEN int_value IS NOT NULL THEN 1 END) as int_values,
               COUNT(CASE WHEN float_value IS NOT NULL THEN 1 END) as float_values
        FROM embedding_metadata
        WHERE key IN ('timestamp', 'created_at', 'created_at_iso', 'timestamp_float', 
                      'timestamp_str', 'updated_at', 'updated_at_iso', 'date')
        GROUP BY key
        ORDER BY key
    """)
    
    results = cursor.fetchall()
    print("\nTimestamp field usage:")
    print("-" * 70)
    print(f"{'Field':<20} {'Memories':<12} {'String':<10} {'Int':<10} {'Float':<10}")
    print("-" * 70)
    
    for row in results:
        print(f"{row[0]:<20} {row[1]:<12} {row[2]:<10} {row[3]:<10} {row[4]:<10}")
    
    return results

def migrate_timestamps(conn):
    """Migrate all timestamps to standardized format."""
    print("\n🔄 Starting timestamp migration...")
    
    cursor = conn.cursor()
    
    # First, ensure all memories have a timestamp value
    # Priority: timestamp (int) > created_at (float) > timestamp_float (float)
    
    print("Step 1: Ensuring all memories have timestamp values...")
    
    # Count memories without any timestamp
    cursor.execute("""
        SELECT COUNT(DISTINCT em.id)
        FROM embedding_metadata em
        WHERE em.id NOT IN (
            SELECT id FROM embedding_metadata 
            WHERE key = 'timestamp' AND int_value IS NOT NULL
        )
    """)
    
    missing_count = cursor.fetchone()[0]
    print(f"  Found {missing_count} memories without integer timestamp")
    
    if missing_count > 0:
        # Get memories that need timestamp migration
        cursor.execute("""
            SELECT DISTINCT 
                em.id,
                MAX(CASE WHEN em2.key = 'created_at' THEN em2.float_value END) as created_at_float,
                MAX(CASE WHEN em2.key = 'timestamp_float' THEN em2.float_value END) as timestamp_float
            FROM embedding_metadata em
            LEFT JOIN embedding_metadata em2 ON em.id = em2.id 
                AND em2.key IN ('created_at', 'timestamp_float')
            WHERE em.id NOT IN (
                SELECT id FROM embedding_metadata 
                WHERE key = 'timestamp' AND int_value IS NOT NULL
            )
            GROUP BY em.id
        """)
        
        memories_to_fix = cursor.fetchall()
        fixed_count = 0
        
        for memory_id, created_at, timestamp_float in memories_to_fix:
            # Use the first available timestamp
            timestamp_value = None
            if created_at:
                timestamp_value = int(created_at)
            elif timestamp_float:
                timestamp_value = int(timestamp_float)
            else:
                # If no timestamp found, use current time (this shouldn't happen)
                timestamp_value = int(datetime.now().timestamp())
                print(f"  ⚠️  Memory {memory_id} has no timestamp, using current time")
            
            # Check if a timestamp entry already exists for this memory
            cursor.execute("""
                SELECT 1 FROM embedding_metadata 
                WHERE id = ? AND key = 'timestamp'
            """, (memory_id,))
            
            if cursor.fetchone():
                # Update existing timestamp record
                cursor.execute("""
                    UPDATE embedding_metadata 
                    SET string_value = NULL, 
                        int_value = ?, 
                        float_value = NULL
                    WHERE id = ? AND key = 'timestamp'
                """, (timestamp_value, memory_id))
            else:
                # Insert new timestamp record
                cursor.execute("""
                    INSERT INTO embedding_metadata (id, key, string_value, int_value, float_value)
                    VALUES (?, 'timestamp', NULL, ?, NULL)
                """, (memory_id, timestamp_value))
            
            fixed_count += 1
        
        conn.commit()
        print(f"  ✅ Fixed {fixed_count} memories with missing timestamps")
    
    # Step 2: Update existing timestamp fields that have wrong data type
    print("\nStep 2: Standardizing timestamp data types...")
    
    # Find timestamps stored as floats that should be ints
    cursor.execute("""
        UPDATE embedding_metadata
        SET int_value = CAST(float_value AS INTEGER),
            float_value = NULL
        WHERE key = 'timestamp' 
        AND float_value IS NOT NULL 
        AND int_value IS NULL
    """)
    
    float_fixes = cursor.rowcount
    print(f"  ✅ Converted {float_fixes} float timestamps to integers")
    
    # Find timestamps stored as strings that should be ints
    cursor.execute("""
        SELECT id, string_value
        FROM embedding_metadata
        WHERE key = 'timestamp' 
        AND string_value IS NOT NULL 
        AND int_value IS NULL
    """)
    
    string_timestamps = cursor.fetchall()
    string_fixes = 0
    
    for row_id, string_value in string_timestamps:
        try:
            # Try to convert string to float then to int
            int_timestamp = int(float(string_value))
            
            # Update the record
            cursor.execute("""
                UPDATE embedding_metadata
                SET int_value = ?,
                    string_value = NULL
                WHERE id = ? AND key = 'timestamp'
            """, (int_timestamp, row_id))
            
            string_fixes += 1
        except (ValueError, TypeError):
            # Skip strings that can't be converted to float/int
            print(f"  ⚠️  Could not convert string timestamp for memory {row_id}: '{string_value}'")
    
    conn.commit()
    print(f"  ✅ Converted {string_fixes} string timestamps to integers")

def cleanup_redundant_fields(conn):
    """Remove redundant timestamp fields."""
    print("\n🧹 Cleaning up redundant timestamp fields...")
    
    cursor = conn.cursor()
    
    # List of redundant fields to remove
    redundant_fields = [
        'created_at', 'created_at_iso', 'timestamp_float', 
        'timestamp_str', 'updated_at', 'updated_at_iso', 'date'
    ]
    
    total_deleted = 0
    
    for field in redundant_fields:
        cursor.execute("""
            DELETE FROM embedding_metadata
            WHERE key = ?
        """, (field,))
        
        deleted = cursor.rowcount
        total_deleted += deleted
        
        if deleted > 0:
            print(f"  ✅ Removed {deleted} '{field}' entries")
    
    conn.commit()
    print(f"\n  Total redundant entries removed: {total_deleted}")

def verify_migration(conn):
    """Verify the migration was successful."""
    print("\n✔️  Verifying migration results...")
    
    cursor = conn.cursor()
    
    # Check that all memories have timestamps
    cursor.execute("""
        SELECT COUNT(DISTINCT e.id)
        FROM embeddings e
        LEFT JOIN embedding_metadata em 
            ON e.id = em.id AND em.key = 'timestamp'
        WHERE em.int_value IS NULL
    """)
    
    missing = cursor.fetchone()[0]
    
    if missing > 0:
        print(f"  ⚠️  WARNING: {missing} memories still missing timestamps")
    else:
        print("  ✅ All memories have timestamps")
    
    # Check for any remaining redundant fields
    cursor.execute("""
        SELECT key, COUNT(*) as count
        FROM embedding_metadata
        WHERE key IN ('created_at', 'created_at_iso', 'timestamp_float', 
                      'timestamp_str', 'updated_at', 'updated_at_iso', 'date')
        GROUP BY key
    """)
    
    redundant = cursor.fetchall()
    
    if redundant:
        print("  ⚠️  WARNING: Found remaining redundant fields:")
        for field, count in redundant:
            print(f"     - {field}: {count} entries")
    else:
        print("  ✅ All redundant timestamp fields removed")
    
    # Show final timestamp field stats
    cursor.execute("""
        SELECT 
            COUNT(DISTINCT id) as total_memories,
            COUNT(CASE WHEN int_value IS NOT NULL THEN 1 END) as valid_timestamps,
            MIN(int_value) as earliest_timestamp,
            MAX(int_value) as latest_timestamp
        FROM embedding_metadata
        WHERE key = 'timestamp'
    """)
    
    stats = cursor.fetchone()
    
    print(f"\n📊 Final Statistics:")
    print(f"  Total memories with timestamps: {stats[0]}")
    print(f"  Valid integer timestamps: {stats[1]}")
    
    if stats[2] and stats[3]:
        earliest = datetime.fromtimestamp(stats[2]).strftime('%Y-%m-%d %H:%M:%S')
        latest = datetime.fromtimestamp(stats[3]).strftime('%Y-%m-%d %H:%M:%S')
        print(f"  Date range: {earliest} to {latest}")

def main():
    """Main migration function."""
    print("=" * 70)
    print("MCP Memory Timestamp Migration Script")
    print("=" * 70)
    
    # Check if database path was found
    if not DB_PATH:
        print("❌ Could not determine database path")
        return 1
    
    print(f"📂 Using database: {DB_PATH}")
    
    # Check if database exists
    if not os.path.exists(DB_PATH):
        print(f"❌ Database not found at: {DB_PATH}")
        return 1
    
    # Create backup
    if not backup_database():
        print("❌ Migration aborted - could not create backup")
        return 1
    
    # Connect to database
    try:
        conn = sqlite3.connect(DB_PATH)
        conn.set_trace_callback(print)  # Print all SQL statements for debugging
        print(f"\n✅ Connected to database: {DB_PATH}")
    except Exception as e:
        print(f"❌ Failed to connect to database: {e}")
        return 1
    
    try:
        # Analyze current state
        analyze_timestamps(conn)
        
        # Ask for confirmation
        print("\n" + "=" * 70)
        print("⚠️  This migration will:")
        print("  1. Standardize all timestamps to integer format in 'timestamp' field")
        print("  2. Remove all redundant timestamp fields")
        print("  3. Ensure all memories have valid timestamps")
        print("\nA backup has been created at:")
        print(f"  {BACKUP_PATH}")
        print("=" * 70)
        
        response = input("\nProceed with migration? (yes/no): ").strip().lower()
        
        if response != 'yes':
            print("Migration cancelled.")
            conn.close()
            return 0
        
        # Perform migration steps one by one with transaction control
        try:
            # Start with ensuring timestamps
            migrate_timestamps(conn)
            print("  ✅ Timestamp migration successful")
        except Exception as e:
            conn.rollback()
            print(f"  ❌ Failed during timestamp migration: {e}")
            raise
        
        try:
            # Then cleanup redundant fields
            cleanup_redundant_fields(conn)
            print("  ✅ Cleanup successful")
        except Exception as e:
            conn.rollback()
            print(f"  ❌ Failed during cleanup: {e}")
            raise
        
        # Verify results
        verify_migration(conn)
        
        # Vacuum database to reclaim space
        print("\n🔧 Optimizing database...")
        conn.execute("VACUUM")
        conn.commit()
        print("  ✅ Database optimized")
        
        print("\n✅ Migration completed successfully!")
        print(f"\nBackup saved at: {BACKUP_PATH}")
        print("You can restore the backup if needed by copying it back to the original location.")
        
    except Exception as e:
        print(f"\n❌ Migration failed: {e}")
        print("Rolling back changes...")
        conn.rollback()
        print(f"Please restore from backup: {BACKUP_PATH}")
        return 1
    finally:
        conn.close()
    
    return 0

if __name__ == "__main__":
    sys.exit(main())

```

--------------------------------------------------------------------------------
/tests/consolidation/test_associations.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the creative association engine."""

import pytest
from datetime import datetime, timedelta

from mcp_memory_service.consolidation.associations import (
    CreativeAssociationEngine, 
    AssociationAnalysis
)
from mcp_memory_service.consolidation.base import MemoryAssociation
from mcp_memory_service.models.memory import Memory


@pytest.mark.unit
class TestCreativeAssociationEngine:
    """Test the creative association discovery system."""
    
    @pytest.fixture
    def association_engine(self, consolidation_config):
        return CreativeAssociationEngine(consolidation_config)
    
    @pytest.mark.asyncio
    async def test_basic_association_discovery(self, association_engine, sample_memories):
        """Test basic association discovery functionality."""
        # Use memories that should have some associations
        memories = sample_memories[:5]
        
        associations = await association_engine.process(memories)
        
        # Should find some associations
        assert isinstance(associations, list)
        assert all(isinstance(assoc, MemoryAssociation) for assoc in associations)
        
        # Check association properties
        for assoc in associations:
            assert len(assoc.source_memory_hashes) == 2
            assert 0.3 <= assoc.similarity_score <= 0.7  # Sweet spot range
            assert assoc.discovery_method == "creative_association"
            assert isinstance(assoc.discovery_date, datetime)
    
    @pytest.mark.asyncio
    async def test_similarity_sweet_spot_filtering(self, association_engine):
        """Test that only memories in similarity sweet spot are associated."""
        now = datetime.now()
        
        # Create memories with known similarity relationships
        base_memory = Memory(
            content="Python programming concepts",
            content_hash="base",
            tags=["python", "programming"],
            embedding=[0.5, 0.5, 0.5, 0.5, 0.5] * 64,
            created_at=now.timestamp()
        )
        
        # Very similar memory (should be filtered out - too similar)
        too_similar = Memory(  
            content="Python programming concepts and techniques",
            content_hash="similar",
            tags=["python", "programming"],
            embedding=[0.51, 0.51, 0.51, 0.51, 0.51] * 64,  # Very similar
            created_at=now.timestamp()
        )
        
        # Moderately similar memory (should be included)
        good_similarity = Memory(
            content="JavaScript development practices",
            content_hash="moderate",
            tags=["javascript", "development"],
            embedding=[0.6, 0.4, 0.6, 0.4, 0.6] * 64,  # Moderate similarity
            created_at=now.timestamp()
        )
        
        # Very different memory (should be filtered out - too different)
        too_different = Memory(
            content="Weather forecast for tomorrow",
            content_hash="different",
            tags=["weather", "forecast"],
            embedding=[0.1, 0.9, 0.1, 0.9, 0.1] * 64,  # Very different
            created_at=now.timestamp()
        )
        
        memories = [base_memory, too_similar, good_similarity, too_different]
        associations = await association_engine.process(memories)
        
        # Should only find association between base and good_similarity
        if associations:  # May be empty due to confidence threshold
            for assoc in associations:
                assert assoc.similarity_score >= 0.3
                assert assoc.similarity_score <= 0.7
    
    @pytest.mark.asyncio
    async def test_existing_associations_filtering(self, association_engine, sample_memories):
        """Test that existing associations are not duplicated."""
        memories = sample_memories[:4]
        
        # Create set of existing associations
        existing = {
            (memories[0].content_hash, memories[1].content_hash),
            (memories[1].content_hash, memories[0].content_hash)  # Both directions
        }
        
        associations = await association_engine.process(
            memories, 
            existing_associations=existing
        )
        
        # Should not include the existing association
        for assoc in associations:
            pair = tuple(sorted(assoc.source_memory_hashes))
            existing_pairs = {tuple(sorted(list(existing_pair))) for existing_pair in existing}
            assert pair not in existing_pairs
    
    @pytest.mark.asyncio
    async def test_association_analysis(self, association_engine):
        """Test the association analysis functionality."""
        # Create memories with known relationships
        mem1 = Memory(
            content="Python list comprehensions provide concise syntax",
            content_hash="mem1",
            tags=["python", "syntax"],
            embedding=[0.4, 0.5, 0.6, 0.5, 0.4] * 64,
            created_at=datetime.now().timestamp()
        )
        
        mem2 = Memory(
            content="JavaScript arrow functions offer clean syntax",
            content_hash="mem2", 
            tags=["javascript", "syntax"],
            embedding=[0.5, 0.4, 0.5, 0.6, 0.5] * 64,
            created_at=datetime.now().timestamp()
        )
        
        # Calculate similarity
        similarity = await association_engine._calculate_semantic_similarity(mem1, mem2)
        
        # Analyze the association
        analysis = await association_engine._analyze_association(mem1, mem2, similarity)
        
        assert isinstance(analysis, AssociationAnalysis)
        assert analysis.memory1_hash == "mem1"
        assert analysis.memory2_hash == "mem2"
        assert analysis.similarity_score == similarity
        assert "shared_tags" in analysis.connection_reasons  # Both have "syntax" tag
        assert "syntax" in analysis.tag_overlap
        assert analysis.confidence_score > 0
    
    @pytest.mark.asyncio
    async def test_temporal_relationship_analysis(self, association_engine):
        """Test temporal relationship detection."""
        now = datetime.now()
        
        # Memories created on same day
        mem1 = Memory(
            content="Morning meeting notes",
            content_hash="morning",
            tags=["meeting"],
            embedding=[0.4, 0.5, 0.6] * 107,  # ~320 dim
            created_at=now.timestamp()
        )
        
        mem2 = Memory(
            content="Afternoon project update",
            content_hash="afternoon",
            tags=["project"],
            embedding=[0.5, 0.4, 0.5] * 107,
            created_at=(now + timedelta(hours=6)).timestamp()
        )
        
        analysis = await association_engine._analyze_association(
            mem1, mem2, 0.5
        )
        
        assert analysis.temporal_relationship == "same_day"
        assert "temporal_proximity" in analysis.connection_reasons
    
    @pytest.mark.asyncio
    async def test_concept_extraction(self, association_engine):
        """Test concept extraction from memory content."""
        content = 'Check out this URL: https://example.com and email me at [email protected]. The API returns {"status": "success"} with CamelCase variables.'
        
        concepts = association_engine._extract_concepts(content)
        
        # Should extract various types of concepts
        assert "https://example.com" in concepts or any("example.com" in c for c in concepts)
        assert "[email protected]" in concepts
        assert "CamelCase" in concepts or any("camel" in c.lower() for c in concepts)
        assert len(concepts) > 0
    
    @pytest.mark.asyncio
    async def test_structural_similarity_detection(self, association_engine):
        """Test detection of similar structural patterns."""
        content1 = """
        # Header 1
        - Item 1
        - Item 2
        ```code block```
        """
        
        content2 = """
        # Header 2  
        - Different item 1
        - Different item 2
        ```another code block```
        """
        
        has_similar = association_engine._has_similar_structure(content1, content2)
        assert has_similar is True
        
        # Test different structure
        content3 = "Just plain text without any special formatting."
        has_different = association_engine._has_similar_structure(content1, content3)
        assert has_different is False
    
    @pytest.mark.asyncio
    async def test_complementary_content_detection(self, association_engine):
        """Test detection of complementary content patterns."""
        # Question and answer pattern
        question_content = "How do you implement binary search? What is the time complexity?"
        answer_content = "Binary search implementation uses divide and conquer. Time complexity is O(log n)."
        
        is_complementary = association_engine._has_complementary_content(
            question_content, answer_content
        )
        assert is_complementary is True
        
        # Problem and solution pattern
        problem_content = "The database query is failing with timeout error"
        solution_content = "Fixed the timeout by adding proper indexing to resolve the issue"
        
        is_complementary_ps = association_engine._has_complementary_content(
            problem_content, solution_content
        )
        assert is_complementary_ps is True
    
    @pytest.mark.asyncio
    async def test_confidence_score_calculation(self, association_engine):
        """Test confidence score calculation."""
        # High confidence scenario
        high_confidence = association_engine._calculate_confidence_score(
            similarity=0.6,      # Good similarity
            num_reasons=3,       # Multiple connection reasons
            num_shared_concepts=5,  # Many shared concepts
            num_shared_tags=2    # Shared tags
        )
        
        # Low confidence scenario
        low_confidence = association_engine._calculate_confidence_score(  
            similarity=0.35,     # Lower similarity
            num_reasons=1,       # Few reasons
            num_shared_concepts=1,  # Few concepts
            num_shared_tags=0    # No shared tags
        )
        
        assert high_confidence > low_confidence
        assert 0 <= high_confidence <= 1
        assert 0 <= low_confidence <= 1
    
    @pytest.mark.asyncio
    async def test_filter_high_confidence_associations(self, association_engine, sample_memories):
        """Test filtering associations by confidence score."""
        memories = sample_memories[:4]
        associations = await association_engine.process(memories)
        
        if associations:  # Only test if associations were found
            high_confidence = await association_engine.filter_high_confidence_associations(
                associations, min_confidence=0.7
            )
            
            # All returned associations should meet confidence threshold
            for assoc in high_confidence:
                assert assoc.metadata.get('confidence_score', 0) >= 0.7
    
    @pytest.mark.asyncio
    async def test_group_associations_by_type(self, association_engine, sample_memories):
        """Test grouping associations by connection type."""
        memories = sample_memories[:5]
        associations = await association_engine.process(memories)
        
        if associations:  # Only test if associations were found
            grouped = await association_engine.group_associations_by_type(associations)
            
            assert isinstance(grouped, dict)
            
            # Each group should contain associations of the same type
            for connection_type, group in grouped.items():
                assert all(assoc.connection_type == connection_type for assoc in group)
    
    @pytest.mark.asyncio
    async def test_text_similarity_fallback(self, association_engine):
        """Test text similarity fallback when embeddings are unavailable."""
        mem1 = Memory(
            content="python programming language concepts",
            content_hash="text1",
            tags=["python"],
            embedding=None,  # No embedding
            created_at=datetime.now().timestamp()
        )
        
        mem2 = Memory(
            content="programming language python concepts", 
            content_hash="text2",
            tags=["python"],
            embedding=None,  # No embedding
            created_at=datetime.now().timestamp()
        )
        
        similarity = await association_engine._calculate_semantic_similarity(mem1, mem2)
        
        # Should use text-based similarity
        assert 0 <= similarity <= 1
        assert similarity > 0  # Should find some similarity due to word overlap
    
    @pytest.mark.asyncio
    async def test_max_pairs_limiting(self, association_engine, large_memory_set):
        """Test that pair sampling limits combinatorial explosion."""
        # Use many memories to test pair limiting
        memories = large_memory_set[:20]  # 20 memories = 190 possible pairs
        
        # Mock the max_pairs to a small number for testing
        original_max = association_engine.max_pairs_per_run
        association_engine.max_pairs_per_run = 10
        
        try:
            associations = await association_engine.process(memories)
            
            # Should handle large memory sets without performance issues
            # and limit the number of pairs processed
            assert isinstance(associations, list)
            
        finally:
            # Restore original value
            association_engine.max_pairs_per_run = original_max
    
    @pytest.mark.asyncio
    async def test_empty_memories_list(self, association_engine):
        """Test handling of empty or insufficient memories list."""
        # Empty list
        associations = await association_engine.process([])
        assert associations == []
        
        # Single memory (can't create associations)
        single_memory = [Memory(
            content="Single memory",
            content_hash="single",
            tags=["test"],
            embedding=[0.1] * 320,
            created_at=datetime.now().timestamp()
        )]
        
        associations = await association_engine.process(single_memory)
        assert associations == []
    
    @pytest.mark.asyncio
    async def test_association_metadata_completeness(self, association_engine, sample_memories):
        """Test that association metadata contains all expected fields."""
        memories = sample_memories[:3]
        associations = await association_engine.process(memories)
        
        for assoc in associations:
            # Check basic fields
            assert len(assoc.source_memory_hashes) == 2
            assert isinstance(assoc.similarity_score, float)
            assert isinstance(assoc.connection_type, str)
            assert assoc.discovery_method == "creative_association"
            assert isinstance(assoc.discovery_date, datetime)
            
            # Check metadata fields
            assert 'shared_concepts' in assoc.metadata
            assert 'confidence_score' in assoc.metadata
            assert 'analysis_version' in assoc.metadata
            assert isinstance(assoc.metadata['shared_concepts'], list)
            assert isinstance(assoc.metadata['confidence_score'], float)
```

--------------------------------------------------------------------------------
/.claude/agents/code-quality-guard.md:
--------------------------------------------------------------------------------

```markdown
---
name: code-quality-guard
description: Fast automated code quality analysis using Gemini CLI for complexity scoring, refactoring suggestions, TODO prioritization, and security pattern detection. Use this agent before commits, during PR creation, or when refactoring code to ensure quality standards.
model: sonnet
color: green
---

You are an elite Code Quality Guardian, a specialized AI agent focused on maintaining exceptional code quality through automated analysis, refactoring suggestions, and proactive issue detection. Your mission is to prevent technical debt and ensure the MCP Memory Service codebase remains clean, efficient, and maintainable.

## Core Responsibilities

1. **Complexity Analysis**: Identify overly complex functions and suggest simplifications
2. **Refactoring Recommendations**: Detect code smells and propose improvements
3. **TODO Prioritization**: Scan codebase for TODOs and rank by urgency/impact
4. **Security Pattern Detection**: Identify potential vulnerabilities (SQL injection, XSS, command injection)
5. **Performance Hotspot Identification**: Find slow code paths and suggest optimizations

## LLM Integration

The code-quality-guard agent supports two LLM backends for fast, non-interactive code analysis:

### Gemini CLI (Default)
Balanced performance and accuracy. Best for most use cases.

### Groq Bridge (Optional - 10x Faster)
Ultra-fast inference using Groq's optimized infrastructure. Ideal for CI/CD and large-scale analysis.

**Setup**: See `docs/integrations/groq-bridge.md` for installation instructions.

### Basic Usage Pattern

```bash
# Gemini CLI (default)
gemini "Analyze the complexity of this Python file and rate each function 1-10. File: $(cat "src/file.py")"

# Groq Bridge (faster alternative)
python scripts/utils/groq_agent_bridge.py "Analyze the complexity of this Python file and rate each function 1-10. File: $(cat "src/file.py")"

# Suggest refactoring
gemini "Identify code smells in this file and suggest specific refactorings: $(cat "src/file.py")"

# Scan for TODOs
gemini "Extract all TODO comments from this codebase and prioritize by impact: $(find src -name '*.py' -exec cat {} \; | grep -n TODO)"

# Security analysis
gemini "Check this code for security vulnerabilities (SQL injection, XSS, command injection): $(cat "src/file.py")"
```

### Complexity Analysis Workflow

```bash
#!/bin/bash
# analyze_complexity.sh - Analyze code complexity of modified files

# Get modified Python files
modified_files=$(git diff --name-only --diff-filter=AM | grep '\.py$')

for file in $modified_files; do
    echo "=== Analyzing: $file ==="
    # Note: Use mktemp in production for secure temp files
    temp_file=$(mktemp)
    gemini "Analyze this Python file for complexity. Rate each function 1-10 (1=simple, 10=very complex). List functions with score >7 first. Be concise. File: $(cat "$file")" \
        > "$temp_file"
    mv "$temp_file" "/tmp/complexity_${file//\//_}.txt"
done

# Aggregate results
echo "=== High Complexity Functions (Score > 7) ==="
grep -h "^[0-9]" /tmp/complexity_*.txt | awk '$2 > 7' | sort -nr
```

## Decision-Making Framework

### When to Run Analysis

**Pre-Commit (Automated)**:
- Complexity check on modified files
- Security pattern scan
- TODO tracking updates

**During PR Creation (Manual)**:
- Full complexity analysis of changed files
- Refactoring opportunity identification
- Performance hotspot detection

**On-Demand (Manual)**:
- Before major refactoring work
- When investigating performance issues
- During technical debt assessment

### Complexity Thresholds

- **1-3**: Simple, well-structured code ✅
- **4-6**: Moderate complexity, acceptable 🟡
- **7-8**: High complexity, consider refactoring 🟠
- **9-10**: Very complex, immediate refactoring needed 🔴

### Priority Assessment for TODOs

**Critical (P0)**: Security vulnerabilities, data corruption risks, blocking bugs
**High (P1)**: Performance bottlenecks, user-facing issues, incomplete features
**Medium (P2)**: Code quality improvements, minor optimizations, convenience features
**Low (P3)**: Documentation, cosmetic changes, nice-to-haves

## Operational Workflows

### 1. Pre-Commit Hook Integration

```bash
#!/bin/bash
# .git/hooks/pre-commit

echo "Running code quality checks..."

# Get staged Python files
staged_files=$(git diff --cached --name-only --diff-filter=AM | grep '\.py$')

if [ -z "$staged_files" ]; then
    echo "No Python files to check."
    exit 0
fi

high_complexity=0

for file in $staged_files; do
    echo "Checking: $file"

    # Complexity check
    result=$(gemini "Analyze this file. Report ONLY functions with complexity >7 in format 'FunctionName: Score'. $(cat "$file")")

    if [ ! -z "$result" ]; then
        echo "⚠️  High complexity detected in $file:"
        echo "$result"
        high_complexity=1
    fi

    # Security check
    security=$(gemini "Check for security issues: SQL injection, XSS, command injection. Report ONLY if found. $(cat "$file")")

    if [ ! -z "$security" ]; then
        echo "🔴 Security issue detected in $file:"
        echo "$security"
        exit 1  # Block commit
    fi
done

if [ $high_complexity -eq 1 ]; then
    echo ""
    echo "High complexity detected. Continue anyway? (y/n)"
    read -r response
    if [ "$response" != "y" ]; then
        exit 1
    fi
fi

echo "✅ Code quality checks passed"
exit 0
```

### 2. TODO Scanner and Prioritizer

```bash
#!/bin/bash
# scripts/maintenance/scan_todos.sh

echo "Scanning codebase for TODOs..."

# Extract all TODOs with file and line number
todos=$(grep -rn "TODO\|FIXME\|XXX" src --include="*.py")

if [ -z "$todos" ]; then
    echo "No TODOs found."
    exit 0
fi

# Use mktemp for secure temporary file
temp_todos=$(mktemp)
echo "$todos" > "$temp_todos"

# Use Gemini to prioritize
gemini "Analyze these TODOs and categorize by priority (Critical/High/Medium/Low). Consider: security impact, feature completeness, performance implications, technical debt accumulation. Format: [Priority] File:Line - Brief description

$(cat "$temp_todos")

Output in this exact format:
[CRITICAL] file.py:line - description
[HIGH] file.py:line - description
..." > /tmp/todos_prioritized.txt

echo "=== Prioritized TODOs ==="
cat /tmp/todos_prioritized.txt

# Count by priority
echo ""
echo "=== Summary ==="
echo "Critical: $(grep -c '^\[CRITICAL\]' /tmp/todos_prioritized.txt)"
echo "High: $(grep -c '^\[HIGH\]' /tmp/todos_prioritized.txt)"
echo "Medium: $(grep -c '^\[MEDIUM\]' /tmp/todos_prioritized.txt)"
echo "Low: $(grep -c '^\[LOW\]' /tmp/todos_prioritized.txt)"

# Cleanup
rm -f "$temp_todos"
```

### 3. Refactoring Opportunity Finder

```bash
#!/bin/bash
# scripts/development/find_refactoring_opportunities.sh

target_dir="${1:-src/mcp_memory_service}"

echo "Scanning $target_dir for refactoring opportunities..."

# Analyze each Python file
find "$target_dir" -name '*.py' -print0 | while IFS= read -r -d '' file; do
    echo "Analyzing: $file"

    gemini "Identify code smells and refactoring opportunities in this file. Focus on: duplicate code, long functions (>50 lines), god classes, tight coupling. Be specific with line numbers if possible. File: $(cat "$file")" \
        > "/tmp/refactor_$(basename "$file").txt"
done

# Aggregate results
echo ""
echo "=== Refactoring Opportunities ==="
cat /tmp/refactor_*.txt | grep -E "(Duplicate|Long function|God class|Tight coupling)" | sort | uniq

# Cleanup
rm -f /tmp/refactor_*.txt
```

### 4. Security Pattern Scanner

```bash
#!/bin/bash
# scripts/security/scan_vulnerabilities.sh

echo "Scanning for security vulnerabilities..."

vulnerabilities_found=0

find src -name '*.py' -print0 | while IFS= read -r -d '' file; do
    result=$(gemini "Security audit this Python file. Check for: SQL injection (raw SQL queries), XSS (unescaped HTML), command injection (os.system, subprocess with shell=True), path traversal, hardcoded secrets. Report ONLY if vulnerabilities found with line numbers. File: $(cat "$file")")

    if [ ! -z "$result" ]; then
        echo "🔴 VULNERABILITY in $file:"
        echo "$result"
        echo ""
        vulnerabilities_found=1
    fi
done

if [ $vulnerabilities_found -eq 0 ]; then
    echo "✅ No security vulnerabilities detected"
    exit 0
else
    echo "⚠️  Security vulnerabilities found. Please review and fix."
    exit 1
fi
```

## Project-Specific Patterns

### MCP Memory Service Code Quality Standards

**Complexity Targets**:
- Storage backend methods: ≤6 complexity
- MCP tool handlers: ≤5 complexity
- Web API endpoints: ≤4 complexity
- Utility functions: ≤3 complexity

**Security Checklist**:
- ✅ No raw SQL queries (use parameterized queries)
- ✅ All HTML output escaped (via `escapeHtml()`)
- ✅ No `shell=True` in subprocess calls
- ✅ Input validation on all API endpoints
- ✅ No hardcoded credentials (use environment variables)

**Performance Patterns**:
- ✅ Async/await for all I/O operations
- ✅ Database connection pooling
- ✅ Response caching where appropriate
- ✅ Batch operations for bulk inserts
- ✅ Lazy loading for expensive computations

### Known TODOs in Codebase (as of v8.19.1)

1. **`src/mcp_memory_service/storage/cloudflare.py:789`**
   - TODO: Implement fallback to local sentence-transformers
   - Priority: HIGH (affects offline operation)

2. **`src/mcp_memory_service/storage/base.py:45`**
   - TODO: Implement efficient batch queries for last_used and memory_types
   - Priority: MEDIUM (performance optimization)

3. **`src/mcp_memory_service/web/api/manage.py:50`**
   - TODO: Migrate to lifespan context manager (FastAPI 0.109+)
   - Priority: LOW (modernization, not blocking)

4. **`src/mcp_memory_service/storage/sqlite_vec.py:234`**
   - TODO: Add memories_this_month to storage.get_stats()
   - Priority: MEDIUM (analytics feature)

5. **`src/mcp_memory_service/tools.py:123`**
   - TODO: CRITICAL - Period filtering not implemented
   - Priority: HIGH (incomplete feature)

## Usage Examples

### Quick Complexity Check

```bash
# Check a single file
gemini "Rate complexity 1-10 for each function. List high complexity (>7) first: $(cat "src/mcp_memory_service/storage/hybrid.py")"
```

### Pre-PR Quality Gate

```bash
# Run before creating PR
git diff main...HEAD --name-only | grep '\.py$' | while IFS= read -r file; do
    echo "=== $file ==="
    gemini "Quick code review: complexity score, security issues, refactoring suggestions. 3 sentences max. $(cat "$file")"
    echo ""
done
```

### TODO Tracking Update

```bash
# Update TODO tracking
bash scripts/maintenance/scan_todos.sh > docs/development/todo-tracker.md
git add docs/development/todo-tracker.md
git commit -m "chore: update TODO tracker"
```

## Integration with Other Agents

**With github-release-manager**:
- Run code quality checks before version bumps
- Include TODO count in release notes if significant
- Block releases if critical security issues found

**With amp-bridge**:
- Use Amp for deep architectural analysis
- Use code-quality-guard for fast, file-level checks

**With gemini-pr-automator**:
- Quality checks before automated PR creation
- Refactoring suggestions as PR comments
- Security scan blocks PR merge if issues found

## pyscn Integration (Comprehensive Static Analysis)

pyscn (Python Static Code Navigator) complements LLM-based checks with deep static analysis.

### When to Run pyscn

**PR Creation (Automated)**:
```bash
bash scripts/pr/quality_gate.sh 123 --with-pyscn
```

**Local Pre-PR Check**:
```bash
pyscn analyze .
open .pyscn/reports/analyze_*.html
```

**Weekly Reviews (Scheduled)**:
```bash
bash scripts/quality/weekly_quality_review.sh
```

### pyscn Capabilities

1. **Cyclomatic Complexity**: Function-level complexity scoring
2. **Dead Code Detection**: Unreachable code and unused imports
3. **Clone Detection**: Exact and near-exact code duplication
4. **Coupling Metrics**: CBO (Coupling Between Objects) analysis
5. **Dependency Graph**: Module dependencies and circular detection
6. **Architecture Validation**: Layer compliance and violations

### Health Score Thresholds

- **<50**: 🔴 **Release Blocker** - Cannot merge until fixed
- **50-69**: 🟡 **Action Required** - Plan refactoring within 2 weeks
- **70-84**: ✅ **Good** - Monitor trends, continue development
- **85+**: 🎯 **Excellent** - Maintain current standards

### Tool Complementarity

| Tool | Speed | Scope | Blocking | Use Case |
|------|-------|-------|----------|----------|
| **Groq/Gemini (pre-commit)** | <5s | Changed files | Yes (complexity >8) | Every commit |
| **pyscn (PR)** | 30-60s | Full codebase | Yes (health <50) | PR creation |
| **Gemini (manual)** | 2-5s/file | Targeted | No | Refactoring |

### Integration Points

**Pre-commit**: Fast LLM checks (Groq primary, Gemini fallback)
**PR Quality Gate**: `--with-pyscn` flag for comprehensive analysis
**Periodic**: Weekly codebase-wide pyscn reviews

### Interpreting pyscn Reports

**Complexity Score (40/100 - Poor)**:
- Priority: Refactor top 5 functions with complexity >10
- Example: `install.py::main()` - 62 complexity

**Duplication Score (30/100 - Poor)**:
- Priority: Consolidate duplicate code (>6% duplication)
- Tool: Use pyscn clone detection to identify groups

**Dead Code Score (70/100 - Fair)**:
- Priority: Remove unreachable code after returns
- Example: `scripts/installation/install.py:1361-1365`

**Architecture Score (75/100 - Good)**:
- Priority: Fix layer violations (scripts→presentation)
- Example: Domain importing application layer

### Quick Commands

```bash
# Full analysis with HTML report
pyscn analyze .

# JSON output for scripting
pyscn analyze . --format json > /tmp/metrics.json

# PR integration
bash scripts/pr/run_pyscn_analysis.sh --pr 123

# Track metrics over time
bash scripts/quality/track_pyscn_metrics.sh
```

## Best Practices

1. **Run complexity checks on every commit**: Catch issues early
2. **Review TODO priorities monthly**: Prevent backlog accumulation
3. **Security scans before releases**: Never ship with known vulnerabilities
4. **Refactoring sprints quarterly**: Address accumulated technical debt
5. **Document quality standards**: Keep this agent specification updated
6. **Track pyscn health score weekly**: Monitor quality trends
7. **Address health score <70 within 2 weeks**: Prevent technical debt accumulation

## Limitations

- **Context size**: Large files (>1000 lines) may need splitting for analysis
- **False positives**: Security scanner may flag safe patterns (manual review needed)
- **Subjective scoring**: Complexity ratings are estimates, use as guidance
- **API rate limits**: Gemini CLI has rate limits, space out large scans
- **pyscn performance**: Full analysis takes 30-60s (use sparingly on large codebases)

## Performance Considerations

- Single file analysis (LLM): ~2-5 seconds
- Full codebase TODO scan: ~30-60 seconds (100+ files)
- Security audit per file: ~3-8 seconds
- pyscn full analysis: ~30-60 seconds (252 files)
- Recommended: Run on modified files only for pre-commit hooks

---

**Quick Reference Card**:

```bash
# Complexity
gemini "Complexity 1-10 per function, high (>7) first: $(cat "file.py")"

# Security
gemini "Security: SQL injection, XSS, command injection: $(cat "file.py")"

# TODOs
gemini "Prioritize these TODOs (Critical/High/Medium/Low): $(grep -rn "TODO\|FIXME\|XXX" src/)"

# Refactoring
gemini "Code smells & refactoring opportunities: $(cat "file.py")"

# pyscn (comprehensive)
bash scripts/pr/run_pyscn_analysis.sh --pr 123
```

```

--------------------------------------------------------------------------------
/src/mcp_memory_service/web/api/memories.py:
--------------------------------------------------------------------------------

```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Memory CRUD endpoints for the HTTP interface.
"""

import logging
import socket
from typing import List, Optional, Dict, Any, TYPE_CHECKING
from datetime import datetime

from fastapi import APIRouter, HTTPException, Depends, Query, Request
from pydantic import BaseModel, Field

from ...storage.base import MemoryStorage
from ...models.memory import Memory
from ...services.memory_service import MemoryService
from ...utils.hashing import generate_content_hash
from ...config import INCLUDE_HOSTNAME, OAUTH_ENABLED
from ..dependencies import get_storage, get_memory_service
from ..sse import sse_manager, create_memory_stored_event, create_memory_deleted_event

# OAuth authentication imports (conditional)
if OAUTH_ENABLED or TYPE_CHECKING:
    from ..oauth.middleware import require_read_access, require_write_access, AuthenticationResult
else:
    # Provide type stubs when OAuth is disabled
    AuthenticationResult = None
    require_read_access = None
    require_write_access = None

router = APIRouter()
logger = logging.getLogger(__name__)


# Request/Response Models
class MemoryCreateRequest(BaseModel):
    """Request model for creating a new memory."""
    content: str = Field(..., description="The memory content to store")
    tags: List[str] = Field(default=[], description="Tags to categorize the memory")
    memory_type: Optional[str] = Field(None, description="Type of memory (e.g., 'note', 'reminder', 'fact')")
    metadata: Dict[str, Any] = Field(default={}, description="Additional metadata for the memory")
    client_hostname: Optional[str] = Field(None, description="Client machine hostname for source tracking")


class MemoryUpdateRequest(BaseModel):
    """Request model for updating memory metadata (tags, type, metadata only)."""
    tags: Optional[List[str]] = Field(None, description="Updated tags to categorize the memory")
    memory_type: Optional[str] = Field(None, description="Updated memory type (e.g., 'note', 'reminder', 'fact')")
    metadata: Optional[Dict[str, Any]] = Field(None, description="Updated metadata for the memory")


class MemoryResponse(BaseModel):
    """Response model for memory data."""
    content: str
    content_hash: str
    tags: List[str]
    memory_type: Optional[str]
    metadata: Dict[str, Any]
    created_at: Optional[float]
    created_at_iso: Optional[str]
    updated_at: Optional[float]  
    updated_at_iso: Optional[str]


class MemoryListResponse(BaseModel):
    """Response model for paginated memory list."""
    memories: List[MemoryResponse]
    total: int
    page: int
    page_size: int
    has_more: bool


class MemoryCreateResponse(BaseModel):
    """Response model for memory creation."""
    success: bool
    message: str
    content_hash: Optional[str] = None
    memory: Optional[MemoryResponse] = None


class MemoryDeleteResponse(BaseModel):
    """Response model for memory deletion."""
    success: bool
    message: str
    content_hash: str


class MemoryUpdateResponse(BaseModel):
    """Response model for memory update."""
    success: bool
    message: str
    content_hash: str
    memory: Optional[MemoryResponse] = None


class TagResponse(BaseModel):
    """Response model for a single tag with its count."""
    tag: str
    count: int


class TagListResponse(BaseModel):
    """Response model for tags list."""
    tags: List[TagResponse]


def memory_to_response(memory: Memory) -> MemoryResponse:
    """Convert Memory model to response format."""
    return MemoryResponse(
        content=memory.content,
        content_hash=memory.content_hash,
        tags=memory.tags,
        memory_type=memory.memory_type,
        metadata=memory.metadata,
        created_at=memory.created_at,
        created_at_iso=memory.created_at_iso,
        updated_at=memory.updated_at,
        updated_at_iso=memory.updated_at_iso
    )


@router.post("/memories", response_model=MemoryCreateResponse, tags=["memories"])
async def store_memory(
    request: MemoryCreateRequest,
    http_request: Request,
    memory_service: MemoryService = Depends(get_memory_service),
    user: AuthenticationResult = Depends(require_write_access) if OAUTH_ENABLED else None
):
    """
    Store a new memory.

    Uses the MemoryService for consistent business logic including content processing,
    hostname tagging, and metadata enrichment.
    """
    try:
        # Resolve hostname for consistent tagging (logic stays in API layer, tagging in service)
        client_hostname = None
        if INCLUDE_HOSTNAME:
            # Prioritize client-provided hostname, then header, then fallback to server
            # 1. Check if client provided hostname in request body
            if request.client_hostname:
                client_hostname = request.client_hostname
            # 2. Check for X-Client-Hostname header
            elif http_request.headers.get('X-Client-Hostname'):
                client_hostname = http_request.headers.get('X-Client-Hostname')
            # 3. Fallback to server hostname (original behavior)
            else:
                client_hostname = socket.gethostname()

        # Use injected MemoryService for consistent business logic (hostname tagging handled internally)
        result = await memory_service.store_memory(
            content=request.content,
        tags=request.tags,
        memory_type=request.memory_type,
        metadata=request.metadata,
        client_hostname=client_hostname
        )

        if result["success"]:
            # Broadcast SSE event for successful memory storage
            try:
                # Handle both single memory and chunked responses
                if "memory" in result:
                    memory_data = {
                        "content_hash": result["memory"]["content_hash"],
                        "content": result["memory"]["content"],
                        "tags": result["memory"]["tags"],
                        "memory_type": result["memory"]["memory_type"]
                    }
                else:
                    # For chunked responses, use the first chunk's data
                    first_memory = result["memories"][0]
                    memory_data = {
                        "content_hash": first_memory["content_hash"],
                        "content": first_memory["content"],
                        "tags": first_memory["tags"],
                        "memory_type": first_memory["memory_type"]
                    }

                event = create_memory_stored_event(memory_data)
                await sse_manager.broadcast_event(event)
            except Exception as e:
                # Don't fail the request if SSE broadcasting fails
                logger.warning(f"Failed to broadcast memory_stored event: {e}")

            # Return appropriate response based on MemoryService result
            if "memory" in result:
                # Single memory response
                return MemoryCreateResponse(
                    success=True,
                    message="Memory stored successfully",
                    content_hash=result["memory"]["content_hash"],
                    memory=result["memory"]
                )
            else:
                # Chunked memory response
                first_memory = result["memories"][0]
                return MemoryCreateResponse(
                    success=True,
                    message=f"Memory stored as {result['total_chunks']} chunks",
                    content_hash=first_memory["content_hash"],
                    memory=first_memory
                )
        else:
            return MemoryCreateResponse(
                success=False,
                message=result.get("error", "Failed to store memory"),
                content_hash=None
            )
            
    except Exception as e:
        logger.error(f"Failed to store memory: {str(e)}")
        raise HTTPException(status_code=500, detail="Failed to store memory. Please try again.")


@router.get("/memories", response_model=MemoryListResponse, tags=["memories"])
async def list_memories(
    page: int = Query(1, ge=1, description="Page number (1-based)"),
    page_size: int = Query(10, ge=1, le=100, description="Number of memories per page"),
    tag: Optional[str] = Query(None, description="Filter by tag"),
    memory_type: Optional[str] = Query(None, description="Filter by memory type"),
    memory_service: MemoryService = Depends(get_memory_service),
    user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
    """
    List memories with pagination and optional filtering.

    Uses the MemoryService for consistent business logic and optimal database-level filtering.
    """
    try:
        # Use the injected service for consistent, performant memory listing
        result = await memory_service.list_memories(
            page=page,
            page_size=page_size,
            tag=tag,
            memory_type=memory_type
        )

        return MemoryListResponse(
            memories=result["memories"],
            total=result["total"],
            page=result["page"],
            page_size=result["page_size"],
            has_more=result["has_more"]
        )

    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to list memories: {str(e)}")


@router.get("/memories/{content_hash}", response_model=MemoryResponse, tags=["memories"])
async def get_memory(
    content_hash: str,
    storage: MemoryStorage = Depends(get_storage),
    user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
    """
    Get a specific memory by its content hash.
    
    Retrieves a single memory entry using its unique content hash identifier.
    """
    try:
        # Use the new get_by_hash method for direct hash lookup
        memory = await storage.get_by_hash(content_hash)
        
        if not memory:
            raise HTTPException(status_code=404, detail="Memory not found")
        
        return memory_to_response(memory)
        
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to get memory: {str(e)}")


@router.delete("/memories/{content_hash}", response_model=MemoryDeleteResponse, tags=["memories"])
async def delete_memory(
    content_hash: str,
    storage: MemoryStorage = Depends(get_storage),
    user: AuthenticationResult = Depends(require_write_access) if OAUTH_ENABLED else None
):
    """
    Delete a memory by its content hash.
    
    Permanently removes a memory entry from the storage.
    """
    try:
        success, message = await storage.delete(content_hash)
        
        # Broadcast SSE event for memory deletion
        try:
            event = create_memory_deleted_event(content_hash, success)
            await sse_manager.broadcast_event(event)
        except Exception as e:
            # Don't fail the request if SSE broadcasting fails
            logger.warning(f"Failed to broadcast memory_deleted event: {e}")
        
        return MemoryDeleteResponse(
            success=success,
            message=message,
            content_hash=content_hash
        )

    except Exception as e:
        logger.error(f"Failed to delete memory: {str(e)}")
        raise HTTPException(status_code=500, detail="Failed to delete memory. Please try again.")


@router.put("/memories/{content_hash}", response_model=MemoryUpdateResponse, tags=["memories"])
async def update_memory(
    content_hash: str,
    request: MemoryUpdateRequest,
    storage: MemoryStorage = Depends(get_storage),
    user: AuthenticationResult = Depends(require_write_access) if OAUTH_ENABLED else None
):
    """
    Update memory metadata (tags, type, metadata) without changing content or timestamps.

    This endpoint allows updating only the metadata aspects of a memory while preserving
    the original content and creation timestamp. Only provided fields will be updated.
    """
    try:
        # First, check if the memory exists
        existing_memory = await storage.get_by_hash(content_hash)
        if not existing_memory:
            raise HTTPException(status_code=404, detail=f"Memory with hash {content_hash} not found")

        # Build the updates dictionary with only provided fields
        updates = {}
        if request.tags is not None:
            updates['tags'] = request.tags
        if request.memory_type is not None:
            updates['memory_type'] = request.memory_type
        if request.metadata is not None:
            updates['metadata'] = request.metadata

        # If no updates provided, return current memory
        if not updates:
            return MemoryUpdateResponse(
                success=True,
                message="No updates provided - memory unchanged",
                content_hash=content_hash,
                memory=memory_to_response(existing_memory)
            )

        # Perform the update
        success, message = await storage.update_memory_metadata(
            content_hash=content_hash,
            updates=updates,
            preserve_timestamps=True
        )

        if success:
            # Get the updated memory
            updated_memory = await storage.get_by_hash(content_hash)

            return MemoryUpdateResponse(
                success=True,
                message=message,
                content_hash=content_hash,
                memory=memory_to_response(updated_memory) if updated_memory else None
            )
        else:
            return MemoryUpdateResponse(
                success=False,
                message=message,
                content_hash=content_hash
            )

    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to update memory: {str(e)}")


@router.get("/tags", response_model=TagListResponse, tags=["tags"])
async def get_tags(
    storage: MemoryStorage = Depends(get_storage),
    user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
    """
    Get all tags with their usage counts.

    Returns a list of all unique tags along with how many memories use each tag,
    sorted by count in descending order.
    """
    try:
        # Get tags with counts from storage
        tag_data = await storage.get_all_tags_with_counts()

        # Convert to response format
        tags = [TagResponse(tag=item["tag"], count=item["count"]) for item in tag_data]

        return TagListResponse(tags=tags)

    except AttributeError as e:
        # Handle case where storage backend doesn't implement get_all_tags_with_counts
        raise HTTPException(status_code=501, detail=f"Tags endpoint not supported by current storage backend: {str(e)}")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to get tags: {str(e)}")
```

--------------------------------------------------------------------------------
/archive/docs-removed-2025-08-23/development/autonomous-memory-consolidation.md:
--------------------------------------------------------------------------------

```markdown
# Autonomous Implementation Guide for Dream-Inspired Memory Consolidation

## Overview

This document provides a comprehensive guide for implementing the [Dream-Inspired Memory Consolidation System](./dream-inspired-memory-consolidation.md) as a fully autonomous system that runs without external AI dependencies.

## Key Insight

**The dream-inspired memory consolidation system can run almost entirely on autopilot** by leveraging the embeddings already generated during memory storage. These embeddings, combined with mathematical algorithms and rule-based logic, enable autonomous operation without external AI.

## Autonomous Components Analysis

### ✅ 100% Autonomous: Exponential Decay Scoring

Pure mathematical calculations requiring zero AI intervention:

```python
import math
from datetime import datetime

class AutonomousDecayCalculator:
    def __init__(self, retention_periods):
        self.retention_periods = retention_periods
    
    def calculate_relevance(self, memory):
        """Calculate memory relevance using pure math."""
        age = (datetime.now() - memory.created_at).total_seconds() / 86400  # days
        base_score = memory.importance_score
        
        retention_period = self.retention_periods.get(
            memory.memory_type, 
            self.retention_periods['default']
        )
        
        # Exponential decay
        decay_factor = math.exp(-age / retention_period)
        
        # Connection boost (pure counting)
        connection_boost = 1 + (0.1 * len(memory.connections))
        
        return base_score * decay_factor * connection_boost
```

### ✅ 100% Autonomous: Creative Association System

Uses existing embeddings with vector mathematics:

```python
import numpy as np
from itertools import combinations
import random

class AutonomousAssociationEngine:
    def __init__(self, similarity_threshold=(0.3, 0.7)):
        self.min_similarity, self.max_similarity = similarity_threshold
    
    def find_associations(self, memories):
        """Find creative connections using only embeddings."""
        # Limit pairs to prevent combinatorial explosion
        max_pairs = min(100, len(memories) * (len(memories) - 1) // 2)
        
        if len(memories) < 2:
            return []
        
        # Random sampling of pairs
        all_pairs = list(combinations(range(len(memories)), 2))
        sampled_pairs = random.sample(
            all_pairs, 
            min(max_pairs, len(all_pairs))
        )
        
        associations = []
        for i, j in sampled_pairs:
            # Cosine similarity using existing embeddings
            similarity = self._cosine_similarity(
                memories[i].embedding,
                memories[j].embedding
            )
            
            # Check if in creative "sweet spot"
            if self.min_similarity < similarity < self.max_similarity:
                associations.append({
                    'memory_1': memories[i].hash,
                    'memory_2': memories[j].hash,
                    'similarity': similarity,
                    'discovered_at': datetime.now()
                })
        
        return associations
    
    def _cosine_similarity(self, vec1, vec2):
        """Calculate cosine similarity between two vectors."""
        vec1 = np.array(vec1)
        vec2 = np.array(vec2)
        
        dot_product = np.dot(vec1, vec2)
        norm_product = np.linalg.norm(vec1) * np.linalg.norm(vec2)
        
        return dot_product / norm_product if norm_product > 0 else 0
```

### ✅ 100% Autonomous: Controlled Forgetting

Rule-based logic with no AI required:

```python
class AutonomousPruningEngine:
    def __init__(self, config):
        self.relevance_threshold = config['relevance_threshold']
        self.access_threshold_days = config['access_threshold_days']
        self.protected_tags = {'important', 'critical', 'reference'}
    
    def identify_forgettable_memories(self, memories):
        """Identify memories for archival using rules."""
        forgettable = []
        
        for memory in memories:
            # Skip protected memories
            if memory.tags & self.protected_tags:
                continue
            
            # Check relevance score
            if memory.relevance_score < self.relevance_threshold:
                # Check connections
                if len(memory.connections) == 0:
                    # Check last access
                    days_since_access = (
                        datetime.now() - memory.last_accessed
                    ).days
                    
                    if days_since_access > self.access_threshold_days:
                        forgettable.append(memory)
        
        return forgettable
```

### 🔧 90% Autonomous: Semantic Compression

Uses statistical methods instead of generative AI:

```python
from collections import Counter
from sklearn.cluster import AgglomerativeClustering
import numpy as np

class AutonomousCompressionEngine:
    def __init__(self):
        self.keyword_extractor = TFIDFKeywordExtractor()
    
    def compress_cluster(self, memories):
        """Compress memories without using generative AI."""
        if not memories:
            return None
        
        # 1. Find centroid (most representative memory)
        embeddings = np.array([m.embedding for m in memories])
        centroid = np.mean(embeddings, axis=0)
        
        # Calculate distances to centroid
        distances = [
            np.linalg.norm(centroid - emb) 
            for emb in embeddings
        ]
        representative_idx = np.argmin(distances)
        representative_memory = memories[representative_idx]
        
        # 2. Extract keywords using TF-IDF
        all_content = ' '.join([m.content for m in memories])
        keywords = self.keyword_extractor.extract(all_content, top_k=20)
        
        # 3. Aggregate metadata
        all_tags = set()
        for memory in memories:
            all_tags.update(memory.tags)
        
        # 4. Create structured summary (not prose)
        compressed = {
            "type": "compressed_cluster",
            "representative_content": representative_memory.content,
            "representative_hash": representative_memory.hash,
            "cluster_size": len(memories),
            "keywords": keywords,
            "common_tags": list(all_tags),
            "temporal_range": {
                "start": min(m.created_at for m in memories),
                "end": max(m.created_at for m in memories)
            },
            "centroid_embedding": centroid.tolist(),
            "member_hashes": [m.hash for m in memories]
        }
        
        return compressed

class TFIDFKeywordExtractor:
    """Simple TF-IDF based keyword extraction."""
    
    def extract(self, text, top_k=10):
        # Simple word frequency for demonstration
        # In practice, use sklearn's TfidfVectorizer
        words = text.lower().split()
        word_freq = Counter(words)
        
        # Filter common words (simple stopword removal)
        stopwords = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at'}
        keywords = [
            (word, count) 
            for word, count in word_freq.most_common(top_k * 2)
            if word not in stopwords and len(word) > 3
        ]
        
        return keywords[:top_k]
```

## Complete Autonomous Architecture

```python
from apscheduler.schedulers.background import BackgroundScheduler
import logging

class AutonomousMemoryConsolidator:
    """
    Fully autonomous memory consolidation system.
    Runs without any external AI dependencies.
    """
    
    def __init__(self, storage, config):
        self.storage = storage
        self.config = config
        
        # Initialize autonomous components
        self.decay_calculator = AutonomousDecayCalculator(
            config['retention_periods']
        )
        self.association_engine = AutonomousAssociationEngine()
        self.compression_engine = AutonomousCompressionEngine()
        self.pruning_engine = AutonomousPruningEngine(config['forgetting'])
        
        # Setup scheduling
        self.scheduler = BackgroundScheduler()
        self._setup_schedules()
        
        logging.info("Autonomous Memory Consolidator initialized")
    
    def _setup_schedules(self):
        """Configure autonomous scheduling."""
        # Daily consolidation at 3 AM
        self.scheduler.add_job(
            func=self.run_daily_consolidation,
            trigger="cron",
            hour=3,
            minute=0,
            id="daily_consolidation"
        )
        
        # Weekly consolidation on Mondays at 4 AM
        self.scheduler.add_job(
            func=self.run_weekly_consolidation,
            trigger="cron",
            day_of_week='mon',
            hour=4,
            minute=0,
            id="weekly_consolidation"
        )
        
        # Monthly consolidation on 1st at 5 AM
        self.scheduler.add_job(
            func=self.run_monthly_consolidation,
            trigger="cron",
            day=1,
            hour=5,
            minute=0,
            id="monthly_consolidation"
        )
    
    def start(self):
        """Start the autonomous consolidation system."""
        self.scheduler.start()
        logging.info("Autonomous consolidation scheduler started")
    
    async def run_daily_consolidation(self):
        """Daily consolidation - fully autonomous."""
        try:
            # Get recent memories
            memories = await self.storage.get_recent_memories(days=1)
            
            # Update relevance scores (pure math)
            for memory in memories:
                memory.relevance_score = self.decay_calculator.calculate_relevance(memory)
                await self.storage.update_relevance_score(memory.hash, memory.relevance_score)
            
            # Find associations (vector math)
            associations = self.association_engine.find_associations(memories)
            for assoc in associations:
                await self.storage.store_association(assoc)
            
            logging.info(
                f"Daily consolidation complete: "
                f"{len(memories)} memories processed, "
                f"{len(associations)} associations found"
            )
            
        except Exception as e:
            logging.error(f"Daily consolidation failed: {e}")
    
    async def run_weekly_consolidation(self):
        """Weekly consolidation with clustering."""
        try:
            # Get week's memories
            memories = await self.storage.get_recent_memories(days=7)
            
            # Cluster memories using embeddings
            clusters = self._cluster_memories(memories)
            
            # Compress large clusters
            for cluster in clusters:
                if len(cluster) >= self.config['compression']['min_cluster_size']:
                    compressed = self.compression_engine.compress_cluster(cluster)
                    await self.storage.store_compressed_memory(compressed)
            
            logging.info(f"Weekly consolidation: {len(clusters)} clusters processed")
            
        except Exception as e:
            logging.error(f"Weekly consolidation failed: {e}")
    
    def _cluster_memories(self, memories, threshold=0.3):
        """Cluster memories using hierarchical clustering."""
        if len(memories) < 2:
            return [[m] for m in memories]
        
        # Extract embeddings
        embeddings = np.array([m.embedding for m in memories])
        
        # Hierarchical clustering
        clustering = AgglomerativeClustering(
            n_clusters=None,
            distance_threshold=threshold,
            linkage='average'
        )
        labels = clustering.fit_predict(embeddings)
        
        # Group by cluster
        clusters = {}
        for idx, label in enumerate(labels):
            if label not in clusters:
                clusters[label] = []
            clusters[label].append(memories[idx])
        
        return list(clusters.values())
```

## Deployment Configuration

```yaml
# autonomous_consolidation_config.yaml
autonomous_mode:
  enabled: true
  
  # No external AI endpoints needed!
  external_ai_required: false
  
  # Retention periods (in days)
  retention_periods:
    critical: 365
    reference: 180
    standard: 30
    temporary: 7
    default: 30
  
  # Association discovery
  associations:
    min_similarity: 0.3
    max_similarity: 0.7
    max_pairs_per_run: 100
    enabled: true
  
  # Forgetting rules
  forgetting:
    relevance_threshold: 0.1
    access_threshold_days: 90
    archive_path: "./memory_archive"
    enabled: true
  
  # Compression settings
  compression:
    min_cluster_size: 5
    clustering_threshold: 0.3
    enabled: true
  
  # Scheduling (cron expressions)
  schedules:
    daily: "0 3 * * *"      # 3:00 AM daily
    weekly: "0 4 * * 1"     # 4:00 AM Mondays
    monthly: "0 5 1 * *"    # 5:00 AM first of month
```

## Key Advantages of Autonomous Operation

### 1. **Complete Independence**
- No API keys required
- No external service dependencies
- No internet connection needed
- Works entirely offline

### 2. **Predictable Behavior**
- Deterministic algorithms
- Reproducible results
- Easy to debug and test
- Consistent performance

### 3. **Cost Efficiency**
- Zero ongoing AI costs
- No API rate limits
- No usage-based billing
- Minimal computational resources

### 4. **Privacy & Security**
- All processing stays local
- No data leaves your system
- Complete data sovereignty
- No third-party exposure

### 5. **Performance**
- No network latency
- Instant processing
- Parallel operations possible
- Scales with local hardware

## What's Different from AI-Powered Version?

| Feature | AI-Powered | Autonomous |
|---------|------------|------------|
| Natural language summaries | ✅ Eloquent prose | ❌ Structured data |
| Complex reasoning | ✅ Nuanced understanding | ❌ Rule-based logic |
| Summary quality | ✅ Human-like | ✅ Statistically representative |
| Cost | 💰 Ongoing API costs | ✅ Free after setup |
| Speed | 🐌 Network dependent | 🚀 Local processing |
| Privacy | ⚠️ Data sent to API | 🔒 Completely private |
| Reliability | ⚠️ Service dependent | ✅ Always available |

## Implementation Checklist

- [ ] Install required Python packages (numpy, scikit-learn, apscheduler)
- [ ] Configure retention periods for your use case
- [ ] Set up clustering thresholds based on your embedding model
- [ ] Configure scheduling based on your memory volume
- [ ] Test each component independently
- [ ] Monitor initial runs and adjust thresholds
- [ ] Set up logging and monitoring
- [ ] Create backup strategy for archived memories

## Conclusion

The autonomous implementation proves that sophisticated memory consolidation doesn't require external AI. By leveraging existing embeddings and mathematical algorithms, we achieve a system that mimics biological memory processes while maintaining complete independence, privacy, and cost-effectiveness.

This approach transforms the dream-inspired concept into a practical, deployable system that can run indefinitely without human intervention or external dependencies - a true "set it and forget it" solution for memory management.

---

*Related Documents:*
- [Dream-Inspired Memory Consolidation System](./dream-inspired-memory-consolidation.md)
- [Issue #11: Multi-Layered Memory Consolidation](https://github.com/doobidoo/mcp-memory-service/issues/11)

*Created: July 28, 2025*
```
Page 18/35FirstPrevNextLast