#
tokens: 49648/50000 14/625 files (page 17/35)
lines: off (toggle) GitHub
raw markdown copy
This is page 17 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── amp-bridge.md
│   │   ├── amp-pr-automator.md
│   │   ├── code-quality-guard.md
│   │   ├── gemini-pr-automator.md
│   │   └── github-release-manager.md
│   ├── settings.local.json.backup
│   └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   ├── feature_request.yml
│   │   └── performance_issue.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── bridge-tests.yml
│       ├── CACHE_FIX.md
│       ├── claude-code-review.yml
│       ├── claude.yml
│       ├── cleanup-images.yml.disabled
│       ├── dev-setup-validation.yml
│       ├── docker-publish.yml
│       ├── LATEST_FIXES.md
│       ├── main-optimized.yml.disabled
│       ├── main.yml
│       ├── publish-and-test.yml
│       ├── README_OPTIMIZATION.md
│       ├── release-tag.yml.disabled
│       ├── release.yml
│       ├── roadmap-review-reminder.yml
│       ├── SECRET_CONDITIONAL_FIX.md
│       └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│   ├── .gitignore
│   └── reports
│       └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│   ├── deployment
│   │   ├── deploy_fastmcp_fixed.sh
│   │   ├── deploy_http_with_mcp.sh
│   │   └── deploy_mcp_v4.sh
│   ├── deployment-configs
│   │   ├── empty_config.yml
│   │   └── smithery.yaml
│   ├── development
│   │   └── test_fastmcp.py
│   ├── docs-removed-2025-08-23
│   │   ├── authentication.md
│   │   ├── claude_integration.md
│   │   ├── claude-code-compatibility.md
│   │   ├── claude-code-integration.md
│   │   ├── claude-code-quickstart.md
│   │   ├── claude-desktop-setup.md
│   │   ├── complete-setup-guide.md
│   │   ├── database-synchronization.md
│   │   ├── development
│   │   │   ├── autonomous-memory-consolidation.md
│   │   │   ├── CLEANUP_PLAN.md
│   │   │   ├── CLEANUP_README.md
│   │   │   ├── CLEANUP_SUMMARY.md
│   │   │   ├── dream-inspired-memory-consolidation.md
│   │   │   ├── hybrid-slm-memory-consolidation.md
│   │   │   ├── mcp-milestone.md
│   │   │   ├── multi-client-architecture.md
│   │   │   ├── test-results.md
│   │   │   └── TIMESTAMP_FIX_SUMMARY.md
│   │   ├── distributed-sync.md
│   │   ├── invocation_guide.md
│   │   ├── macos-intel.md
│   │   ├── master-guide.md
│   │   ├── mcp-client-configuration.md
│   │   ├── multi-client-server.md
│   │   ├── service-installation.md
│   │   ├── sessions
│   │   │   └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│   │   ├── UBUNTU_SETUP.md
│   │   ├── ubuntu.md
│   │   ├── windows-setup.md
│   │   └── windows.md
│   ├── docs-root-cleanup-2025-08-23
│   │   ├── AWESOME_LIST_SUBMISSION.md
│   │   ├── CLOUDFLARE_IMPLEMENTATION.md
│   │   ├── DOCUMENTATION_ANALYSIS.md
│   │   ├── DOCUMENTATION_CLEANUP_PLAN.md
│   │   ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│   │   ├── LITESTREAM_SETUP_GUIDE.md
│   │   ├── lm_studio_system_prompt.md
│   │   ├── PYTORCH_DOWNLOAD_FIX.md
│   │   └── README-ORIGINAL-BACKUP.md
│   ├── investigations
│   │   └── MACOS_HOOKS_INVESTIGATION.md
│   ├── litestream-configs-v6.3.0
│   │   ├── install_service.sh
│   │   ├── litestream_master_config_fixed.yml
│   │   ├── litestream_master_config.yml
│   │   ├── litestream_replica_config_fixed.yml
│   │   ├── litestream_replica_config.yml
│   │   ├── litestream_replica_simple.yml
│   │   ├── litestream-http.service
│   │   ├── litestream.service
│   │   └── requirements-cloudflare.txt
│   ├── release-notes
│   │   └── release-notes-v7.1.4.md
│   └── setup-development
│       ├── README.md
│       ├── setup_consolidation_mdns.sh
│       ├── STARTUP_SETUP_GUIDE.md
│       └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│   ├── memory-context.md
│   ├── memory-health.md
│   ├── memory-ingest-dir.md
│   ├── memory-ingest.md
│   ├── memory-recall.md
│   ├── memory-search.md
│   ├── memory-store.md
│   ├── README.md
│   └── session-start.md
├── claude-hooks
│   ├── config.json
│   ├── config.template.json
│   ├── CONFIGURATION.md
│   ├── core
│   │   ├── memory-retrieval.js
│   │   ├── mid-conversation.js
│   │   ├── session-end.js
│   │   ├── session-start.js
│   │   └── topic-change.js
│   ├── debug-pattern-test.js
│   ├── install_claude_hooks_windows.ps1
│   ├── install_hooks.py
│   ├── memory-mode-controller.js
│   ├── MIGRATION.md
│   ├── README-NATURAL-TRIGGERS.md
│   ├── README-phase2.md
│   ├── README.md
│   ├── simple-test.js
│   ├── statusline.sh
│   ├── test-adaptive-weights.js
│   ├── test-dual-protocol-hook.js
│   ├── test-mcp-hook.js
│   ├── test-natural-triggers.js
│   ├── test-recency-scoring.js
│   ├── tests
│   │   ├── integration-test.js
│   │   ├── phase2-integration-test.js
│   │   ├── test-code-execution.js
│   │   ├── test-cross-session.json
│   │   ├── test-session-tracking.json
│   │   └── test-threading.json
│   ├── utilities
│   │   ├── adaptive-pattern-detector.js
│   │   ├── context-formatter.js
│   │   ├── context-shift-detector.js
│   │   ├── conversation-analyzer.js
│   │   ├── dynamic-context-updater.js
│   │   ├── git-analyzer.js
│   │   ├── mcp-client.js
│   │   ├── memory-client.js
│   │   ├── memory-scorer.js
│   │   ├── performance-manager.js
│   │   ├── project-detector.js
│   │   ├── session-tracker.js
│   │   ├── tiered-conversation-monitor.js
│   │   └── version-checker.js
│   └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│   ├── amp-cli-bridge.md
│   ├── api
│   │   ├── code-execution-interface.md
│   │   ├── memory-metadata-api.md
│   │   ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│   │   ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│   │   ├── PHASE2_REPORT.md
│   │   └── tag-standardization.md
│   ├── architecture
│   │   ├── search-enhancement-spec.md
│   │   └── search-examples.md
│   ├── architecture.md
│   ├── archive
│   │   └── obsolete-workflows
│   │       ├── load_memory_context.md
│   │       └── README.md
│   ├── assets
│   │   └── images
│   │       ├── dashboard-v3.3.0-preview.png
│   │       ├── memory-awareness-hooks-example.png
│   │       ├── project-infographic.svg
│   │       └── README.md
│   ├── CLAUDE_CODE_QUICK_REFERENCE.md
│   ├── cloudflare-setup.md
│   ├── deployment
│   │   ├── docker.md
│   │   ├── dual-service.md
│   │   ├── production-guide.md
│   │   └── systemd-service.md
│   ├── development
│   │   ├── ai-agent-instructions.md
│   │   ├── code-quality
│   │   │   ├── phase-2a-completion.md
│   │   │   ├── phase-2a-handle-get-prompt.md
│   │   │   ├── phase-2a-index.md
│   │   │   ├── phase-2a-install-package.md
│   │   │   └── phase-2b-session-summary.md
│   │   ├── code-quality-workflow.md
│   │   ├── dashboard-workflow.md
│   │   ├── issue-management.md
│   │   ├── pr-review-guide.md
│   │   ├── refactoring-notes.md
│   │   ├── release-checklist.md
│   │   └── todo-tracker.md
│   ├── docker-optimized-build.md
│   ├── document-ingestion.md
│   ├── DOCUMENTATION_AUDIT.md
│   ├── enhancement-roadmap-issue-14.md
│   ├── examples
│   │   ├── analysis-scripts.js
│   │   ├── maintenance-session-example.md
│   │   ├── memory-distribution-chart.jsx
│   │   └── tag-schema.json
│   ├── first-time-setup.md
│   ├── glama-deployment.md
│   ├── guides
│   │   ├── advanced-command-examples.md
│   │   ├── chromadb-migration.md
│   │   ├── commands-vs-mcp-server.md
│   │   ├── mcp-enhancements.md
│   │   ├── mdns-service-discovery.md
│   │   ├── memory-consolidation-guide.md
│   │   ├── migration.md
│   │   ├── scripts.md
│   │   └── STORAGE_BACKENDS.md
│   ├── HOOK_IMPROVEMENTS.md
│   ├── hooks
│   │   └── phase2-code-execution-migration.md
│   ├── http-server-management.md
│   ├── ide-compatability.md
│   ├── IMAGE_RETENTION_POLICY.md
│   ├── images
│   │   └── dashboard-placeholder.md
│   ├── implementation
│   │   ├── health_checks.md
│   │   └── performance.md
│   ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│   ├── integration
│   │   ├── homebrew.md
│   │   └── multi-client.md
│   ├── integrations
│   │   ├── gemini.md
│   │   ├── groq-bridge.md
│   │   ├── groq-integration-summary.md
│   │   └── groq-model-comparison.md
│   ├── integrations.md
│   ├── legacy
│   │   └── dual-protocol-hooks.md
│   ├── LM_STUDIO_COMPATIBILITY.md
│   ├── maintenance
│   │   └── memory-maintenance.md
│   ├── mastery
│   │   ├── api-reference.md
│   │   ├── architecture-overview.md
│   │   ├── configuration-guide.md
│   │   ├── local-setup-and-run.md
│   │   ├── testing-guide.md
│   │   └── troubleshooting.md
│   ├── migration
│   │   └── code-execution-api-quick-start.md
│   ├── natural-memory-triggers
│   │   ├── cli-reference.md
│   │   ├── installation-guide.md
│   │   └── performance-optimization.md
│   ├── oauth-setup.md
│   ├── pr-graphql-integration.md
│   ├── quick-setup-cloudflare-dual-environment.md
│   ├── README.md
│   ├── remote-configuration-wiki-section.md
│   ├── research
│   │   ├── code-execution-interface-implementation.md
│   │   └── code-execution-interface-summary.md
│   ├── ROADMAP.md
│   ├── sqlite-vec-backend.md
│   ├── statistics
│   │   ├── charts
│   │   │   ├── activity_patterns.png
│   │   │   ├── contributors.png
│   │   │   ├── growth_trajectory.png
│   │   │   ├── monthly_activity.png
│   │   │   └── october_sprint.png
│   │   ├── data
│   │   │   ├── activity_by_day.csv
│   │   │   ├── activity_by_hour.csv
│   │   │   ├── contributors.csv
│   │   │   └── monthly_activity.csv
│   │   ├── generate_charts.py
│   │   └── REPOSITORY_STATISTICS.md
│   ├── technical
│   │   ├── development.md
│   │   ├── memory-migration.md
│   │   ├── migration-log.md
│   │   ├── sqlite-vec-embedding-fixes.md
│   │   └── tag-storage.md
│   ├── testing
│   │   └── regression-tests.md
│   ├── testing-cloudflare-backend.md
│   ├── troubleshooting
│   │   ├── cloudflare-api-token-setup.md
│   │   ├── cloudflare-authentication.md
│   │   ├── general.md
│   │   ├── hooks-quick-reference.md
│   │   ├── pr162-schema-caching-issue.md
│   │   ├── session-end-hooks.md
│   │   └── sync-issues.md
│   └── tutorials
│       ├── advanced-techniques.md
│       ├── data-analysis.md
│       └── demo-session-walkthrough.md
├── examples
│   ├── claude_desktop_config_template.json
│   ├── claude_desktop_config_windows.json
│   ├── claude-desktop-http-config.json
│   ├── config
│   │   └── claude_desktop_config.json
│   ├── http-mcp-bridge.js
│   ├── memory_export_template.json
│   ├── README.md
│   ├── setup
│   │   └── setup_multi_client_complete.py
│   └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│   ├── .claude
│   │   └── settings.local.json
│   ├── archive
│   │   └── check_missing_timestamps.py
│   ├── backup
│   │   ├── backup_memories.py
│   │   ├── backup_sqlite_vec.sh
│   │   ├── export_distributable_memories.sh
│   │   └── restore_memories.py
│   ├── benchmarks
│   │   ├── benchmark_code_execution_api.py
│   │   ├── benchmark_hybrid_sync.py
│   │   └── benchmark_server_caching.py
│   ├── database
│   │   ├── analyze_sqlite_vec_db.py
│   │   ├── check_sqlite_vec_status.py
│   │   ├── db_health_check.py
│   │   └── simple_timestamp_check.py
│   ├── development
│   │   ├── debug_server_initialization.py
│   │   ├── find_orphaned_files.py
│   │   ├── fix_mdns.sh
│   │   ├── fix_sitecustomize.py
│   │   ├── remote_ingest.sh
│   │   ├── setup-git-merge-drivers.sh
│   │   ├── uv-lock-merge.sh
│   │   └── verify_hybrid_sync.py
│   ├── hooks
│   │   └── pre-commit
│   ├── installation
│   │   ├── install_linux_service.py
│   │   ├── install_macos_service.py
│   │   ├── install_uv.py
│   │   ├── install_windows_service.py
│   │   ├── install.py
│   │   ├── setup_backup_cron.sh
│   │   ├── setup_claude_mcp.sh
│   │   └── setup_cloudflare_resources.py
│   ├── linux
│   │   ├── service_status.sh
│   │   ├── start_service.sh
│   │   ├── stop_service.sh
│   │   ├── uninstall_service.sh
│   │   └── view_logs.sh
│   ├── maintenance
│   │   ├── assign_memory_types.py
│   │   ├── check_memory_types.py
│   │   ├── cleanup_corrupted_encoding.py
│   │   ├── cleanup_memories.py
│   │   ├── cleanup_organize.py
│   │   ├── consolidate_memory_types.py
│   │   ├── consolidation_mappings.json
│   │   ├── delete_orphaned_vectors_fixed.py
│   │   ├── fast_cleanup_duplicates_with_tracking.sh
│   │   ├── find_all_duplicates.py
│   │   ├── find_cloudflare_duplicates.py
│   │   ├── find_duplicates.py
│   │   ├── memory-types.md
│   │   ├── README.md
│   │   ├── recover_timestamps_from_cloudflare.py
│   │   ├── regenerate_embeddings.py
│   │   ├── repair_malformed_tags.py
│   │   ├── repair_memories.py
│   │   ├── repair_sqlite_vec_embeddings.py
│   │   ├── repair_zero_embeddings.py
│   │   ├── restore_from_json_export.py
│   │   └── scan_todos.sh
│   ├── migration
│   │   ├── cleanup_mcp_timestamps.py
│   │   ├── legacy
│   │   │   └── migrate_chroma_to_sqlite.py
│   │   ├── mcp-migration.py
│   │   ├── migrate_sqlite_vec_embeddings.py
│   │   ├── migrate_storage.py
│   │   ├── migrate_tags.py
│   │   ├── migrate_timestamps.py
│   │   ├── migrate_to_cloudflare.py
│   │   ├── migrate_to_sqlite_vec.py
│   │   ├── migrate_v5_enhanced.py
│   │   ├── TIMESTAMP_CLEANUP_README.md
│   │   └── verify_mcp_timestamps.py
│   ├── pr
│   │   ├── amp_collect_results.sh
│   │   ├── amp_detect_breaking_changes.sh
│   │   ├── amp_generate_tests.sh
│   │   ├── amp_pr_review.sh
│   │   ├── amp_quality_gate.sh
│   │   ├── amp_suggest_fixes.sh
│   │   ├── auto_review.sh
│   │   ├── detect_breaking_changes.sh
│   │   ├── generate_tests.sh
│   │   ├── lib
│   │   │   └── graphql_helpers.sh
│   │   ├── quality_gate.sh
│   │   ├── resolve_threads.sh
│   │   ├── run_pyscn_analysis.sh
│   │   ├── run_quality_checks.sh
│   │   ├── thread_status.sh
│   │   └── watch_reviews.sh
│   ├── quality
│   │   ├── fix_dead_code_install.sh
│   │   ├── phase1_dead_code_analysis.md
│   │   ├── phase2_complexity_analysis.md
│   │   ├── README_PHASE1.md
│   │   ├── README_PHASE2.md
│   │   ├── track_pyscn_metrics.sh
│   │   └── weekly_quality_review.sh
│   ├── README.md
│   ├── run
│   │   ├── run_mcp_memory.sh
│   │   ├── run-with-uv.sh
│   │   └── start_sqlite_vec.sh
│   ├── run_memory_server.py
│   ├── server
│   │   ├── check_http_server.py
│   │   ├── check_server_health.py
│   │   ├── memory_offline.py
│   │   ├── preload_models.py
│   │   ├── run_http_server.py
│   │   ├── run_memory_server.py
│   │   ├── start_http_server.bat
│   │   └── start_http_server.sh
│   ├── service
│   │   ├── deploy_dual_services.sh
│   │   ├── install_http_service.sh
│   │   ├── mcp-memory-http.service
│   │   ├── mcp-memory.service
│   │   ├── memory_service_manager.sh
│   │   ├── service_control.sh
│   │   ├── service_utils.py
│   │   └── update_service.sh
│   ├── sync
│   │   ├── check_drift.py
│   │   ├── claude_sync_commands.py
│   │   ├── export_memories.py
│   │   ├── import_memories.py
│   │   ├── litestream
│   │   │   ├── apply_local_changes.sh
│   │   │   ├── enhanced_memory_store.sh
│   │   │   ├── init_staging_db.sh
│   │   │   ├── io.litestream.replication.plist
│   │   │   ├── manual_sync.sh
│   │   │   ├── memory_sync.sh
│   │   │   ├── pull_remote_changes.sh
│   │   │   ├── push_to_remote.sh
│   │   │   ├── README.md
│   │   │   ├── resolve_conflicts.sh
│   │   │   ├── setup_local_litestream.sh
│   │   │   ├── setup_remote_litestream.sh
│   │   │   ├── staging_db_init.sql
│   │   │   ├── stash_local_changes.sh
│   │   │   ├── sync_from_remote_noconfig.sh
│   │   │   └── sync_from_remote.sh
│   │   ├── README.md
│   │   ├── safe_cloudflare_update.sh
│   │   ├── sync_memory_backends.py
│   │   └── sync_now.py
│   ├── testing
│   │   ├── run_complete_test.py
│   │   ├── run_memory_test.sh
│   │   ├── simple_test.py
│   │   ├── test_cleanup_logic.py
│   │   ├── test_cloudflare_backend.py
│   │   ├── test_docker_functionality.py
│   │   ├── test_installation.py
│   │   ├── test_mdns.py
│   │   ├── test_memory_api.py
│   │   ├── test_memory_simple.py
│   │   ├── test_migration.py
│   │   ├── test_search_api.py
│   │   ├── test_sqlite_vec_embeddings.py
│   │   ├── test_sse_events.py
│   │   ├── test-connection.py
│   │   └── test-hook.js
│   ├── utils
│   │   ├── claude_commands_utils.py
│   │   ├── generate_personalized_claude_md.sh
│   │   ├── groq
│   │   ├── groq_agent_bridge.py
│   │   ├── list-collections.py
│   │   ├── memory_wrapper_uv.py
│   │   ├── query_memories.py
│   │   ├── smithery_wrapper.py
│   │   ├── test_groq_bridge.sh
│   │   └── uv_wrapper.py
│   └── validation
│       ├── check_dev_setup.py
│       ├── check_documentation_links.py
│       ├── diagnose_backend_config.py
│       ├── validate_configuration_complete.py
│       ├── validate_memories.py
│       ├── validate_migration.py
│       ├── validate_timestamp_integrity.py
│       ├── verify_environment.py
│       ├── verify_pytorch_windows.py
│       └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│   └── mcp_memory_service
│       ├── __init__.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── operations.py
│       │   ├── sync_wrapper.py
│       │   └── types.py
│       ├── backup
│       │   ├── __init__.py
│       │   └── scheduler.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── ingestion.py
│       │   ├── main.py
│       │   └── utils.py
│       ├── config.py
│       ├── consolidation
│       │   ├── __init__.py
│       │   ├── associations.py
│       │   ├── base.py
│       │   ├── clustering.py
│       │   ├── compression.py
│       │   ├── consolidator.py
│       │   ├── decay.py
│       │   ├── forgetting.py
│       │   ├── health.py
│       │   └── scheduler.py
│       ├── dependency_check.py
│       ├── discovery
│       │   ├── __init__.py
│       │   ├── client.py
│       │   └── mdns_service.py
│       ├── embeddings
│       │   ├── __init__.py
│       │   └── onnx_embeddings.py
│       ├── ingestion
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chunker.py
│       │   ├── csv_loader.py
│       │   ├── json_loader.py
│       │   ├── pdf_loader.py
│       │   ├── registry.py
│       │   ├── semtools_loader.py
│       │   └── text_loader.py
│       ├── lm_studio_compat.py
│       ├── mcp_server.py
│       ├── models
│       │   ├── __init__.py
│       │   └── memory.py
│       ├── server.py
│       ├── services
│       │   ├── __init__.py
│       │   └── memory_service.py
│       ├── storage
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloudflare.py
│       │   ├── factory.py
│       │   ├── http_client.py
│       │   ├── hybrid.py
│       │   └── sqlite_vec.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── exporter.py
│       │   ├── importer.py
│       │   └── litestream_config.py
│       ├── utils
│       │   ├── __init__.py
│       │   ├── cache_manager.py
│       │   ├── content_splitter.py
│       │   ├── db_utils.py
│       │   ├── debug.py
│       │   ├── document_processing.py
│       │   ├── gpu_detection.py
│       │   ├── hashing.py
│       │   ├── http_server_manager.py
│       │   ├── port_detection.py
│       │   ├── system_detection.py
│       │   └── time_parser.py
│       └── web
│           ├── __init__.py
│           ├── api
│           │   ├── __init__.py
│           │   ├── analytics.py
│           │   ├── backup.py
│           │   ├── consolidation.py
│           │   ├── documents.py
│           │   ├── events.py
│           │   ├── health.py
│           │   ├── manage.py
│           │   ├── mcp.py
│           │   ├── memories.py
│           │   ├── search.py
│           │   └── sync.py
│           ├── app.py
│           ├── dependencies.py
│           ├── oauth
│           │   ├── __init__.py
│           │   ├── authorization.py
│           │   ├── discovery.py
│           │   ├── middleware.py
│           │   ├── models.py
│           │   ├── registration.py
│           │   └── storage.py
│           ├── sse.py
│           └── static
│               ├── app.js
│               ├── index.html
│               ├── README.md
│               ├── sse_test.html
│               └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── test_compact_types.py
│   │   └── test_operations.py
│   ├── bridge
│   │   ├── mock_responses.js
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   └── test_http_mcp_bridge.js
│   ├── conftest.py
│   ├── consolidation
│   │   ├── __init__.py
│   │   ├── conftest.py
│   │   ├── test_associations.py
│   │   ├── test_clustering.py
│   │   ├── test_compression.py
│   │   ├── test_consolidator.py
│   │   ├── test_decay.py
│   │   └── test_forgetting.py
│   ├── contracts
│   │   └── api-specification.yml
│   ├── integration
│   │   ├── package-lock.json
│   │   ├── package.json
│   │   ├── test_api_key_fallback.py
│   │   ├── test_api_memories_chronological.py
│   │   ├── test_api_tag_time_search.py
│   │   ├── test_api_with_memory_service.py
│   │   ├── test_bridge_integration.js
│   │   ├── test_cli_interfaces.py
│   │   ├── test_cloudflare_connection.py
│   │   ├── test_concurrent_clients.py
│   │   ├── test_data_serialization_consistency.py
│   │   ├── test_http_server_startup.py
│   │   ├── test_mcp_memory.py
│   │   ├── test_mdns_integration.py
│   │   ├── test_oauth_basic_auth.py
│   │   ├── test_oauth_flow.py
│   │   ├── test_server_handlers.py
│   │   └── test_store_memory.py
│   ├── performance
│   │   ├── test_background_sync.py
│   │   └── test_hybrid_live.py
│   ├── README.md
│   ├── smithery
│   │   └── test_smithery.py
│   ├── sqlite
│   │   └── simple_sqlite_vec_test.py
│   ├── test_client.py
│   ├── test_content_splitting.py
│   ├── test_database.py
│   ├── test_hybrid_cloudflare_limits.py
│   ├── test_hybrid_storage.py
│   ├── test_memory_ops.py
│   ├── test_semantic_search.py
│   ├── test_sqlite_vec_storage.py
│   ├── test_time_parser.py
│   ├── test_timestamp_preservation.py
│   ├── timestamp
│   │   ├── test_hook_vs_manual_storage.py
│   │   ├── test_issue99_final_validation.py
│   │   ├── test_search_retrieval_inconsistency.py
│   │   ├── test_timestamp_issue.py
│   │   └── test_timestamp_simple.py
│   └── unit
│       ├── conftest.py
│       ├── test_cloudflare_storage.py
│       ├── test_csv_loader.py
│       ├── test_fastapi_dependencies.py
│       ├── test_import.py
│       ├── test_json_loader.py
│       ├── test_mdns_simple.py
│       ├── test_mdns.py
│       ├── test_memory_service.py
│       ├── test_memory.py
│       ├── test_semtools_loader.py
│       ├── test_storage_interface_compatibility.py
│       └── test_tag_time_filtering.py
├── tools
│   ├── docker
│   │   ├── DEPRECATED.md
│   │   ├── docker-compose.http.yml
│   │   ├── docker-compose.pythonpath.yml
│   │   ├── docker-compose.standalone.yml
│   │   ├── docker-compose.uv.yml
│   │   ├── docker-compose.yml
│   │   ├── docker-entrypoint-persistent.sh
│   │   ├── docker-entrypoint-unified.sh
│   │   ├── docker-entrypoint.sh
│   │   ├── Dockerfile
│   │   ├── Dockerfile.glama
│   │   ├── Dockerfile.slim
│   │   ├── README.md
│   │   └── test-docker-modes.sh
│   └── README.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/src/mcp_memory_service/web/oauth/authorization.py:
--------------------------------------------------------------------------------

```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
OAuth 2.1 Authorization Server implementation for MCP Memory Service.

Implements OAuth 2.1 authorization code flow and token endpoints.
"""

import time
import logging
import base64
from typing import Optional, Tuple
from urllib.parse import urlencode
from fastapi import APIRouter, HTTPException, status, Form, Query, Request
from fastapi.responses import RedirectResponse
from jose import jwt

from ...config import (
    OAUTH_ISSUER,
    OAUTH_ACCESS_TOKEN_EXPIRE_MINUTES,
    OAUTH_AUTHORIZATION_CODE_EXPIRE_MINUTES,
    get_jwt_algorithm,
    get_jwt_signing_key
)
from .models import TokenResponse
from .storage import oauth_storage

logger = logging.getLogger(__name__)

router = APIRouter()


def parse_basic_auth(authorization_header: Optional[str]) -> Tuple[Optional[str], Optional[str]]:
    """
    Parse HTTP Basic authentication header.

    Returns:
        Tuple of (client_id, client_secret) or (None, None) if not valid
    """
    if not authorization_header:
        return None, None

    try:
        # Check if it's Basic authentication
        if not authorization_header.startswith('Basic '):
            return None, None

        # Extract and decode the credentials
        encoded_credentials = authorization_header[6:]  # Remove 'Basic ' prefix
        decoded_credentials = base64.b64decode(encoded_credentials).decode('utf-8')

        # Split username:password
        if ':' not in decoded_credentials:
            return None, None

        client_id, client_secret = decoded_credentials.split(':', 1)
        return client_id, client_secret

    except Exception as e:
        logger.debug(f"Failed to parse Basic auth header: {e}")
        return None, None


def create_access_token(client_id: str, scope: Optional[str] = None) -> tuple[str, int]:
    """
    Create a JWT access token for the given client.

    Uses RS256 with RSA key pair if available, otherwise falls back to HS256.

    Returns:
        Tuple of (token, expires_in_seconds)
    """
    expires_in = OAUTH_ACCESS_TOKEN_EXPIRE_MINUTES * 60
    expire_time = time.time() + expires_in

    payload = {
        "iss": OAUTH_ISSUER,
        "sub": client_id,
        "aud": "mcp-memory-service",
        "exp": expire_time,
        "iat": time.time(),
        "scope": scope or "read write"
    }

    algorithm = get_jwt_algorithm()
    signing_key = get_jwt_signing_key()

    logger.debug(f"Creating JWT token with algorithm: {algorithm}")
    token = jwt.encode(payload, signing_key, algorithm=algorithm)
    return token, expires_in


async def validate_redirect_uri(client_id: str, redirect_uri: Optional[str]) -> str:
    """Validate redirect URI against registered client."""
    client = await oauth_storage.get_client(client_id)
    if not client:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail={
                "error": "invalid_client",
                "error_description": "Invalid client_id"
            }
        )

    # If no redirect_uri provided, use the first registered one
    if not redirect_uri:
        if not client.redirect_uris:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail={
                    "error": "invalid_request",
                    "error_description": "redirect_uri is required when client has no registered redirect URIs"
                }
            )
        return client.redirect_uris[0]

    # Validate that the redirect_uri is registered
    if redirect_uri not in client.redirect_uris:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail={
                "error": "invalid_redirect_uri",
                "error_description": "redirect_uri not registered for this client"
            }
        )

    return redirect_uri


@router.get("/authorize")
async def authorize(
    response_type: str = Query(..., description="OAuth response type"),
    client_id: str = Query(..., description="OAuth client identifier"),
    redirect_uri: Optional[str] = Query(None, description="Redirection URI"),
    scope: Optional[str] = Query(None, description="Requested scope"),
    state: Optional[str] = Query(None, description="Opaque value for CSRF protection")
):
    """
    OAuth 2.1 Authorization endpoint.

    Implements the authorization code flow. For MVP, this auto-approves
    all requests without user interaction.
    """
    logger.info(f"Authorization request: client_id={client_id}, response_type={response_type}")

    try:
        # Validate response_type
        if response_type != "code":
            error_params = {
                "error": "unsupported_response_type",
                "error_description": "Only 'code' response type is supported"
            }
            if state:
                error_params["state"] = state

            # If we have a redirect_uri, redirect with error
            if redirect_uri:
                error_url = f"{redirect_uri}?{urlencode(error_params)}"
                return RedirectResponse(url=error_url)
            else:
                raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=error_params)

        # Validate client and redirect_uri
        validated_redirect_uri = await validate_redirect_uri(client_id, redirect_uri)

        # Generate authorization code
        auth_code = oauth_storage.generate_authorization_code()

        # Store authorization code
        await oauth_storage.store_authorization_code(
            code=auth_code,
            client_id=client_id,
            redirect_uri=validated_redirect_uri,
            scope=scope,
            expires_in=OAUTH_AUTHORIZATION_CODE_EXPIRE_MINUTES * 60
        )

        # Build redirect URL with authorization code
        redirect_params = {"code": auth_code}
        if state:
            redirect_params["state"] = state

        redirect_url = f"{validated_redirect_uri}?{urlencode(redirect_params)}"

        logger.info(f"Authorization granted for client_id={client_id}")
        return RedirectResponse(url=redirect_url)

    except HTTPException:
        # Re-raise HTTP exceptions
        raise
    except Exception as e:
        logger.error(f"Authorization error: {e}")

        error_params = {
            "error": "server_error",
            "error_description": "Internal server error"
        }
        if state:
            error_params["state"] = state

        if redirect_uri:
            error_url = f"{redirect_uri}?{urlencode(error_params)}"
            return RedirectResponse(url=error_url)
        else:
            raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=error_params)


@router.post("/token", response_model=TokenResponse)
async def _handle_authorization_code_grant(
    final_client_id: str,
    final_client_secret: str,
    code: Optional[str],
    redirect_uri: Optional[str]
) -> TokenResponse:
    """Handle OAuth authorization_code grant type."""
    if not code:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail={
                "error": "invalid_request",
                "error_description": "Missing required parameter: code"
            }
        )

    if not final_client_id:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail={
                "error": "invalid_request",
                "error_description": "Missing required parameter: client_id"
            }
        )

    # Authenticate client
    if not await oauth_storage.authenticate_client(final_client_id, final_client_secret or ""):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail={
                "error": "invalid_client",
                "error_description": "Client authentication failed"
            }
        )

    # Get and consume authorization code
    code_data = await oauth_storage.get_authorization_code(code)
    if not code_data:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail={
                "error": "invalid_grant",
                "error_description": "Invalid or expired authorization code"
            }
        )

    # Validate client_id matches
    if code_data["client_id"] != final_client_id:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail={
                "error": "invalid_grant",
                "error_description": "Authorization code was issued to a different client"
            }
        )

    # Validate redirect_uri if provided
    if redirect_uri and code_data["redirect_uri"] != redirect_uri:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail={
                "error": "invalid_grant",
                "error_description": "redirect_uri does not match the one used in authorization request"
            }
        )

    # Create access token
    access_token, expires_in = create_access_token(final_client_id, code_data["scope"])

    # Store access token for validation
    await oauth_storage.store_access_token(
        token=access_token,
        client_id=final_client_id,
        scope=code_data["scope"],
        expires_in=expires_in
    )

    logger.info(f"Access token issued for client_id={final_client_id}")
    return TokenResponse(
        access_token=access_token,
        token_type="Bearer",
        expires_in=expires_in,
        scope=code_data["scope"]
    )

async def _handle_client_credentials_grant(
    final_client_id: str,
    final_client_secret: str
) -> TokenResponse:
    """Handle OAuth client_credentials grant type."""
    if not final_client_id or not final_client_secret:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail={
                "error": "invalid_request",
                "error_description": "Missing required parameters: client_id and client_secret"
            }
        )

    # Authenticate client
    if not await oauth_storage.authenticate_client(final_client_id, final_client_secret):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail={
                "error": "invalid_client",
                "error_description": "Client authentication failed"
            }
        )

    # Create access token
    access_token, expires_in = create_access_token(final_client_id, "read write")

    # Store access token
    await oauth_storage.store_access_token(
        token=access_token,
        client_id=final_client_id,
        scope="read write",
        expires_in=expires_in
    )

    logger.info(f"Client credentials token issued for client_id={final_client_id}")
    return TokenResponse(
        access_token=access_token,
        token_type="Bearer",
        expires_in=expires_in,
        scope="read write"
    )

async def token(
    request: Request,
    grant_type: str = Form(..., description="OAuth grant type"),
    code: Optional[str] = Form(None, description="Authorization code"),
    redirect_uri: Optional[str] = Form(None, description="Redirection URI"),
    client_id: Optional[str] = Form(None, description="OAuth client identifier"),
    client_secret: Optional[str] = Form(None, description="OAuth client secret")
):
    """
    OAuth 2.1 Token endpoint.

    Exchanges authorization codes for access tokens.
    Supports both authorization_code and client_credentials grant types.
    Supports both client_secret_post (form data) and client_secret_basic (HTTP Basic auth).
    """
    # Extract client credentials from either HTTP Basic auth or form data
    auth_header = request.headers.get('authorization')
    basic_client_id, basic_client_secret = parse_basic_auth(auth_header)

    # Use Basic auth credentials if available, otherwise fall back to form data
    final_client_id = basic_client_id or client_id
    final_client_secret = basic_client_secret or client_secret

    auth_method = "client_secret_basic" if basic_client_id else "client_secret_post"
    logger.info(f"Token request: grant_type={grant_type}, client_id={final_client_id}, auth_method={auth_method}")

    try:
        if grant_type == "authorization_code":
            return await _handle_authorization_code_grant(
                final_client_id, final_client_secret, code, redirect_uri
            )
        elif grant_type == "client_credentials":
            return await _handle_client_credentials_grant(
                final_client_id, final_client_secret
            )

        else:
            raise HTTPException(
                status_code=status.HTTP_400_BAD_REQUEST,
                detail={
                    "error": "unsupported_grant_type",
                    "error_description": f"Grant type '{grant_type}' is not supported"
                }
            )

    except HTTPException:
        # Re-raise HTTP exceptions
        raise
    except Exception as e:
        logger.error(f"Token endpoint error: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail={
                "error": "server_error",
                "error_description": "Internal server error"
            }
        )
```

--------------------------------------------------------------------------------
/scripts/maintenance/repair_zero_embeddings.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Enhanced repair script to fix zero vector embeddings in SQLite-vec databases.

This script detects and repairs embeddings that are all zeros (invalid) and
regenerates them with proper sentence transformer embeddings.
"""

import asyncio
import os
import sys
import sqlite3
import logging
import numpy as np
from typing import List, Tuple

# Add parent directory to path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

try:
    import sqlite_vec
    from sqlite_vec import serialize_float32
    SQLITE_VEC_AVAILABLE = True
except ImportError:
    SQLITE_VEC_AVAILABLE = False

try:
    from sentence_transformers import SentenceTransformer
    SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
    SENTENCE_TRANSFORMERS_AVAILABLE = False

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class ZeroEmbeddingRepair:
    """Repair SQLite-vec database with zero vector embeddings."""
    
    def __init__(self, db_path: str, model_name: str = "all-MiniLM-L6-v2"):
        self.db_path = db_path
        self.model_name = model_name
        self.conn = None
        self.model = None
        self.embedding_dimension = 384  # Default for all-MiniLM-L6-v2
        
    def check_dependencies(self):
        """Check required dependencies."""
        print("Checking dependencies...")
        
        if not SQLITE_VEC_AVAILABLE:
            print("❌ sqlite-vec not installed. Run: pip install sqlite-vec")
            return False
            
        if not SENTENCE_TRANSFORMERS_AVAILABLE:
            print("❌ sentence-transformers not installed. Run: pip install sentence-transformers torch")
            return False
            
        print("✅ All dependencies available")
        return True
        
    def connect_database(self):
        """Connect to the database."""
        print(f"\nConnecting to database: {self.db_path}")
        
        if not os.path.exists(self.db_path):
            raise FileNotFoundError(f"Database not found: {self.db_path}")
            
        self.conn = sqlite3.connect(self.db_path)
        self.conn.enable_load_extension(True)
        sqlite_vec.load(self.conn)
        self.conn.enable_load_extension(False)
        
        print("✅ Connected to database")
        
    def analyze_database(self) -> dict:
        """Analyze the current state of the database including zero embeddings."""
        print("\nAnalyzing database...")
        
        analysis = {
            "memory_count": 0,
            "embedding_count": 0,
            "missing_embeddings": 0,
            "zero_embeddings": 0,
            "valid_embeddings": 0,
            "embedding_dimension": None,
            "issues": []
        }
        
        # Count memories
        cursor = self.conn.execute("SELECT COUNT(*) FROM memories")
        analysis["memory_count"] = cursor.fetchone()[0]
        
        # Count embeddings
        cursor = self.conn.execute("SELECT COUNT(*) FROM memory_embeddings")
        analysis["embedding_count"] = cursor.fetchone()[0]
        
        # Check embedding dimension
        cursor = self.conn.execute("""
            SELECT sql FROM sqlite_master 
            WHERE type='table' AND name='memory_embeddings'
        """)
        schema = cursor.fetchone()
        if schema:
            import re
            match = re.search(r'FLOAT\\[(\\d+)\\]', schema[0])
            if match:
                analysis["embedding_dimension"] = int(match.group(1))
                
        # Find memories without embeddings
        cursor = self.conn.execute("""
            SELECT COUNT(*) FROM memories m
            WHERE NOT EXISTS (
                SELECT 1 FROM memory_embeddings e WHERE e.rowid = m.id
            )
        """)
        analysis["missing_embeddings"] = cursor.fetchone()[0]
        
        # Check for zero embeddings
        print("  Checking for zero vector embeddings...")
        cursor = self.conn.execute("""
            SELECT e.rowid, e.content_embedding, m.content
            FROM memory_embeddings e
            INNER JOIN memories m ON m.id = e.rowid
        """)
        
        zero_count = 0
        valid_count = 0
        
        for row in cursor.fetchall():
            rowid, embedding_blob, content = row
            
            if embedding_blob and len(embedding_blob) > 0:
                try:
                    # Convert to numpy array
                    embedding_array = np.frombuffer(embedding_blob, dtype=np.float32)
                    
                    # Check if all zeros
                    if np.allclose(embedding_array, 0):
                        zero_count += 1
                        logger.debug(f"Zero embedding found for memory {rowid}: {content[:50]}...")
                    else:
                        valid_count += 1
                        
                except Exception as e:
                    logger.warning(f"Failed to parse embedding for rowid {rowid}: {e}")
                    zero_count += 1  # Treat unparseable as invalid
            else:
                zero_count += 1
        
        analysis["zero_embeddings"] = zero_count
        analysis["valid_embeddings"] = valid_count
        
        # Identify issues
        if analysis["memory_count"] != analysis["embedding_count"]:
            analysis["issues"].append(
                f"Mismatch: {analysis['memory_count']} memories vs {analysis['embedding_count']} embeddings"
            )
            
        if analysis["missing_embeddings"] > 0:
            analysis["issues"].append(
                f"Missing embeddings: {analysis['missing_embeddings']} memories have no embeddings"
            )
            
        if analysis["zero_embeddings"] > 0:
            analysis["issues"].append(
                f"Zero vector embeddings: {analysis['zero_embeddings']} embeddings are all zeros (invalid)"
            )
            
        print(f"  Memories: {analysis['memory_count']}")
        print(f"  Embeddings: {analysis['embedding_count']}")
        print(f"  Missing embeddings: {analysis['missing_embeddings']}")
        print(f"  Zero embeddings: {analysis['zero_embeddings']}")
        print(f"  Valid embeddings: {analysis['valid_embeddings']}")
        print(f"  Embedding dimension: {analysis['embedding_dimension']}")
        
        if analysis["issues"]:
            print("\n⚠️  Issues found:")
            for issue in analysis["issues"]:
                print(f"  - {issue}")
        else:
            print("\n✅ No issues found")
            
        return analysis
        
    def load_model(self):
        """Load the embedding model."""
        print(f"\nLoading embedding model: {self.model_name}")
        
        self.model = SentenceTransformer(self.model_name)
        
        # Get actual dimension
        test_embedding = self.model.encode(["test"], convert_to_numpy=True)
        self.embedding_dimension = test_embedding.shape[1]
        
        print(f"✅ Model loaded (dimension: {self.embedding_dimension})")
        
    def regenerate_zero_embeddings(self, analysis: dict) -> int:
        """Regenerate embeddings that are zero vectors."""
        if analysis["zero_embeddings"] == 0:
            return 0
            
        print(f"\nRegenerating {analysis['zero_embeddings']} zero vector embeddings...")
        
        # Get all memories with zero embeddings
        cursor = self.conn.execute("""
            SELECT e.rowid, e.content_embedding, m.content
            FROM memory_embeddings e
            INNER JOIN memories m ON m.id = e.rowid
        """)
        
        zero_embeddings = []
        for row in cursor.fetchall():
            rowid, embedding_blob, content = row
            
            if embedding_blob and len(embedding_blob) > 0:
                try:
                    embedding_array = np.frombuffer(embedding_blob, dtype=np.float32)
                    if np.allclose(embedding_array, 0):
                        zero_embeddings.append((rowid, content))
                except:
                    zero_embeddings.append((rowid, content))
            else:
                zero_embeddings.append((rowid, content))
        
        fixed_count = 0
        
        for rowid, content in zero_embeddings:
            try:
                # Generate new embedding
                embedding = self.model.encode([content], convert_to_numpy=True)[0]
                
                # Validate the new embedding
                if not np.allclose(embedding, 0) and np.isfinite(embedding).all():
                    # Update the embedding in database
                    self.conn.execute(
                        "UPDATE memory_embeddings SET content_embedding = ? WHERE rowid = ?",
                        (serialize_float32(embedding), rowid)
                    )
                    
                    fixed_count += 1
                    
                    # Show progress
                    if fixed_count % 10 == 0:
                        print(f"  ... {fixed_count}/{len(zero_embeddings)} embeddings regenerated")
                        
                else:
                    logger.error(f"Generated invalid embedding for memory {rowid}")
                    
            except Exception as e:
                logger.error(f"Failed to regenerate embedding for memory {rowid}: {e}")
                
        self.conn.commit()
        print(f"✅ Regenerated {fixed_count} embeddings")
        
        return fixed_count
        
    def verify_search(self) -> bool:
        """Test if semantic search works with proper similarity scores."""
        print("\nTesting semantic search with similarity scores...")
        
        try:
            # Generate a test query embedding
            test_query = "test embedding verification"
            query_embedding = self.model.encode([test_query], convert_to_numpy=True)[0]
            
            # Try to search and get distances
            cursor = self.conn.execute("""
                SELECT m.content, e.distance
                FROM memories m
                INNER JOIN (
                    SELECT rowid, distance 
                    FROM memory_embeddings 
                    WHERE content_embedding MATCH ?
                    ORDER BY distance
                    LIMIT 3
                ) e ON m.id = e.rowid
                ORDER BY e.distance
            """, (serialize_float32(query_embedding),))
            
            results = cursor.fetchall()
            
            if results:
                print("✅ Semantic search working with results:")
                for i, (content, distance) in enumerate(results, 1):
                    similarity = max(0.0, 1.0 - distance)
                    print(f"  {i}. Distance: {distance:.6f}, Similarity: {similarity:.6f}")
                    print(f"     Content: {content[:60]}...")
                
                # Check if we have reasonable similarity scores
                distances = [result[1] for result in results]
                if all(d >= 1.0 for d in distances):
                    print("⚠️  All distances are >= 1.0, similarities will be 0.0")
                    return False
                else:
                    print("✅ Found reasonable similarity scores")
                    return True
            else:
                print("❌ No results returned")
                return False
            
        except Exception as e:
            print(f"❌ Semantic search failed: {e}")
            return False
            
    def run_repair(self):
        """Run the repair process."""
        print("\\n" + "="*60)
        print("SQLite-vec Zero Embedding Repair Tool")
        print("="*60)
        
        try:
            # Check dependencies
            if not self.check_dependencies():
                return
                
            # Connect to database
            self.connect_database()
            
            # Analyze current state
            analysis = self.analyze_database()
            
            if not analysis["issues"]:
                print("\\n✅ Database appears healthy, no repair needed")
                return
                
            # Load model
            self.load_model()
            
            # Fix zero embeddings
            fixed = self.regenerate_zero_embeddings(analysis)
            
            # Verify search works
            search_working = self.verify_search()
            
            # Re-analyze
            print("\\nRe-analyzing database after repair...")
            new_analysis = self.analyze_database()
            
            print("\\n" + "="*60)
            print("Repair Summary")
            print("="*60)
            print(f"Fixed {fixed} zero vector embeddings")
            print(f"Search working: {'✅ Yes' if search_working else '❌ No'}")
            
            if new_analysis["issues"]:
                print("\\n⚠️  Some issues remain:")
                for issue in new_analysis["issues"]:
                    print(f"  - {issue}")
            else:
                print("\\n✅ All issues resolved!")
                
        except Exception as e:
            print(f"\\n❌ Repair failed: {e}")
            logger.exception("Repair failed")
            
        finally:
            if self.conn:
                self.conn.close()


def main():
    """Run the repair tool."""
    if len(sys.argv) < 2:
        print("Usage: python repair_zero_embeddings.py <database_path>")
        print("\\nExample:")
        print("  python repair_zero_embeddings.py ~/.local/share/mcp-memory/sqlite_vec.db")
        print("\\nThis tool will:")
        print("  - Check for zero vector embeddings (invalid)")
        print("  - Regenerate proper embeddings using sentence-transformers")
        print("  - Verify semantic search functionality with similarity scores")
        sys.exit(1)
        
    db_path = sys.argv[1]
    
    repair = ZeroEmbeddingRepair(db_path)
    repair.run_repair()


if __name__ == "__main__":
    main()
```

--------------------------------------------------------------------------------
/scripts/validation/diagnose_backend_config.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Diagnostic script to troubleshoot backend configuration issues.
This helps identify why Cloudflare backend might not be working.
"""

import os
import sys
from pathlib import Path

# Add src to path for imports
src_path = Path(__file__).parent.parent.parent / "src"
sys.path.insert(0, str(src_path))

def print_separator(title):
    print("\n" + "=" * 60)
    print(f" {title}")
    print("=" * 60)

def print_status(status, message):
    """Print status with simple text indicators."""
    if status == "success":
        print(f"[OK] {message}")
    elif status == "warning":
        print(f"[WARN] {message}")
    elif status == "error":
        print(f"[ERROR] {message}")
    else:
        print(f"[INFO] {message}")

def check_env_file():
    """Check if .env file exists and what it contains."""
    print_separator("ENVIRONMENT FILE CHECK")

    project_root = Path(__file__).parent.parent.parent
    env_file = project_root / ".env"

    if env_file.exists():
        print_status("success", f".env file found at: {env_file}")
        print("\n.env file contents:")
        with open(env_file, 'r') as f:
            lines = f.readlines()
            for i, line in enumerate(lines, 1):
                # Mask sensitive values
                if 'TOKEN' in line or 'PASSWORD' in line or 'SECRET' in line:
                    if '=' in line:
                        key, _ = line.split('=', 1)
                        print(f"  {i:2d}: {key}=***MASKED***")
                    else:
                        print(f"  {i:2d}: {line.rstrip()}")
                else:
                    print(f"  {i:2d}: {line.rstrip()}")
    else:
        print_status("error", f"No .env file found at: {env_file}")
        return False
    return True

def check_environment_variables():
    """Check current environment variables."""
    print_separator("ENVIRONMENT VARIABLES CHECK")

    # Check if dotenv is available and load .env file
    try:
        from dotenv import load_dotenv
        project_root = Path(__file__).parent.parent.parent
        env_file = project_root / ".env"
        if env_file.exists():
            load_dotenv(env_file)
            print_status("success", f"Loaded .env file from: {env_file}")
        else:
            print_status("info", "No .env file to load")
    except ImportError:
        print_status("warning", "dotenv not available, skipping .env file loading")

    # Core configuration
    storage_backend = os.getenv('MCP_MEMORY_STORAGE_BACKEND', 'NOT SET')
    print(f"\nCore Configuration:")
    print(f"  MCP_MEMORY_STORAGE_BACKEND: {storage_backend}")

    # Cloudflare variables
    cloudflare_vars = {
        'CLOUDFLARE_API_TOKEN': 'REQUIRED',
        'CLOUDFLARE_ACCOUNT_ID': 'REQUIRED',
        'CLOUDFLARE_VECTORIZE_INDEX': 'REQUIRED',
        'CLOUDFLARE_D1_DATABASE_ID': 'REQUIRED',
        'CLOUDFLARE_R2_BUCKET': 'OPTIONAL',
        'CLOUDFLARE_EMBEDDING_MODEL': 'OPTIONAL',
        'CLOUDFLARE_LARGE_CONTENT_THRESHOLD': 'OPTIONAL',
        'CLOUDFLARE_MAX_RETRIES': 'OPTIONAL',
        'CLOUDFLARE_BASE_DELAY': 'OPTIONAL'
    }

    print(f"\nCloudflare Configuration:")
    missing_required = []
    for var, requirement in cloudflare_vars.items():
        value = os.getenv(var)
        if value:
            if 'TOKEN' in var:
                display_value = f"{value[:8]}***MASKED***"
            else:
                display_value = value
            print_status("success", f"{var}: {display_value} ({requirement})")
        else:
            display_value = "NOT SET"
            if requirement == 'REQUIRED':
                print_status("error", f"{var}: {display_value} ({requirement})")
                missing_required.append(var)
            else:
                print_status("warning", f"{var}: {display_value} ({requirement})")

    if missing_required:
        print_status("error", f"Missing required Cloudflare variables: {', '.join(missing_required)}")
        return False
    else:
        print_status("success", "All required Cloudflare variables are set")
        return True

def test_config_import():
    """Test importing the configuration module."""
    print_separator("CONFIGURATION MODULE TEST")

    try:
        print("Attempting to import config module...")
        from mcp_memory_service.config import (
            STORAGE_BACKEND,
            CLOUDFLARE_API_TOKEN,
            CLOUDFLARE_ACCOUNT_ID,
            CLOUDFLARE_VECTORIZE_INDEX,
            CLOUDFLARE_D1_DATABASE_ID
        )

        print_status("success", "Config import successful")
        print(f"  Configured Backend: {STORAGE_BACKEND}")
        print(f"  API Token Set: {'YES' if CLOUDFLARE_API_TOKEN else 'NO'}")
        print(f"  Account ID: {CLOUDFLARE_ACCOUNT_ID}")
        print(f"  Vectorize Index: {CLOUDFLARE_VECTORIZE_INDEX}")
        print(f"  D1 Database ID: {CLOUDFLARE_D1_DATABASE_ID}")

        return STORAGE_BACKEND

    except SystemExit as e:
        print_status("error", f"Config import failed with SystemExit: {e}")
        print("   This means required Cloudflare variables are missing")
        return None
    except Exception as e:
        print_status("error", f"Config import failed with error: {e}")
        return None

def test_storage_creation():
    """Test creating the storage backend."""
    print_separator("STORAGE BACKEND CREATION TEST")

    try:
        from mcp_memory_service.config import STORAGE_BACKEND
        print(f"Attempting to create {STORAGE_BACKEND} storage...")

        if STORAGE_BACKEND == 'cloudflare':
            from mcp_memory_service.storage.cloudflare import CloudflareStorage
            from mcp_memory_service.config import (
                CLOUDFLARE_API_TOKEN,
                CLOUDFLARE_ACCOUNT_ID,
                CLOUDFLARE_VECTORIZE_INDEX,
                CLOUDFLARE_D1_DATABASE_ID,
                CLOUDFLARE_R2_BUCKET,
                CLOUDFLARE_EMBEDDING_MODEL,
                CLOUDFLARE_LARGE_CONTENT_THRESHOLD,
                CLOUDFLARE_MAX_RETRIES,
                CLOUDFLARE_BASE_DELAY
            )

            storage = CloudflareStorage(
                api_token=CLOUDFLARE_API_TOKEN,
                account_id=CLOUDFLARE_ACCOUNT_ID,
                vectorize_index=CLOUDFLARE_VECTORIZE_INDEX,
                d1_database_id=CLOUDFLARE_D1_DATABASE_ID,
                r2_bucket=CLOUDFLARE_R2_BUCKET,
                embedding_model=CLOUDFLARE_EMBEDDING_MODEL,
                large_content_threshold=CLOUDFLARE_LARGE_CONTENT_THRESHOLD,
                max_retries=CLOUDFLARE_MAX_RETRIES,
                base_delay=CLOUDFLARE_BASE_DELAY
            )
            print_status("success", "CloudflareStorage instance created successfully")
            print(f"  Storage class: {storage.__class__.__name__}")
            return storage

        elif STORAGE_BACKEND == 'sqlite_vec':
            from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
            from mcp_memory_service.config import SQLITE_VEC_PATH
            storage = SqliteVecMemoryStorage(SQLITE_VEC_PATH)
            print_status("success", "SqliteVecMemoryStorage instance created successfully")
            print(f"  Storage class: {storage.__class__.__name__}")
            print(f"  Database path: {SQLITE_VEC_PATH}")
            return storage

        else:
            print_status("error", f"Unknown storage backend: {STORAGE_BACKEND}")
            return None

    except Exception as e:
        print_status("error", f"Storage creation failed: {e}")
        import traceback
        print(f"Full traceback:")
        traceback.print_exc()
        return None

def _verify_token_endpoint(endpoint_url, endpoint_type, api_token, requests):
    """
    Helper function to verify a Cloudflare API token against a specific endpoint.

    Args:
        endpoint_url: The URL to test the token against
        endpoint_type: Description of the endpoint type (e.g., "Account-scoped", "Generic user")
        api_token: The Cloudflare API token to verify
        requests: The requests module

    Returns:
        tuple: (success: bool, result: dict or None)
    """
    print(f"\nTesting {endpoint_type} token verification...")
    try:
        response = requests.get(
            endpoint_url,
            headers={"Authorization": f"Bearer {api_token}"},
            timeout=10
        )

        if response.status_code == 200:
            data = response.json()
            if data.get("success"):
                result = data.get("result", {})
                print_status("success", f"{endpoint_type} verification successful")
                print(f"   Token ID: {result.get('id', 'N/A')}")
                print(f"   Status: {result.get('status', 'N/A')}")
                print(f"   Expires: {result.get('expires_on', 'N/A')}")
                return True, result
            else:
                errors = data.get("errors", [])
                print_status("error", f"{endpoint_type} verification failed")
                for error in errors:
                    print(f"   Error {error.get('code')}: {error.get('message')}")
                return False, None
        else:
            print_status("error", f"{endpoint_type} verification failed: HTTP {response.status_code}")
            print(f"   Response: {response.text}")
            return False, None

    except Exception as e:
        print_status("error", f"{endpoint_type} verification error: {e}")
        return False, None


def test_cloudflare_token():
    """Test Cloudflare API token with both endpoints to help identify token type."""
    print_separator("CLOUDFLARE TOKEN VERIFICATION")

    api_token = os.getenv('CLOUDFLARE_API_TOKEN')
    account_id = os.getenv('CLOUDFLARE_ACCOUNT_ID')

    if not api_token:
        print_status("error", "CLOUDFLARE_API_TOKEN not set, skipping token verification")
        return False

    if not account_id:
        print_status("warning", "CLOUDFLARE_ACCOUNT_ID not set, cannot test account-scoped endpoint")

    try:
        import requests
    except ImportError:
        print_status("warning", "requests not available, skipping token verification")
        return False

    token_verified = False

    # Test 1: Account-scoped endpoint (recommended for scoped tokens)
    if account_id:
        endpoint_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/tokens/verify"
        success, _ = _verify_token_endpoint(endpoint_url, "account-scoped", api_token, requests)
        if success:
            token_verified = True

    # Test 2: Generic user endpoint (works for global tokens)
    endpoint_url = "https://api.cloudflare.com/client/v4/user/tokens/verify"
    success, _ = _verify_token_endpoint(endpoint_url, "generic user", api_token, requests)
    if success:
        token_verified = True

    # Provide guidance
    print("\nTOKEN VERIFICATION GUIDANCE:")
    if account_id:
        print("✅ For account-scoped tokens (recommended), use:")
        print(f"   curl \"https://api.cloudflare.com/client/v4/accounts/{account_id}/tokens/verify\" \\")
        print(f"        -H \"Authorization: Bearer YOUR_TOKEN\"")
    print("✅ For global tokens (legacy), use:")
    print("   curl \"https://api.cloudflare.com/client/v4/user/tokens/verify\" \\")
    print("        -H \"Authorization: Bearer YOUR_TOKEN\"")
    print("❌ Common mistake: Using wrong endpoint for token type")
    print("📖 See docs/troubleshooting/cloudflare-authentication.md for details")

    return token_verified

def main():
    """Run all diagnostic tests."""
    print("MCP Memory Service Backend Configuration Diagnostics")
    print("=" * 60)

    # Step 1: Check .env file
    check_env_file()

    # Step 2: Check environment variables
    cloudflare_ready = check_environment_variables()

    # Step 3: Test Cloudflare token verification
    token_valid = test_cloudflare_token()

    # Step 4: Test config import
    configured_backend = test_config_import()

    # Step 5: Test storage creation if config loaded successfully
    if configured_backend:
        storage = test_storage_creation()
    else:
        storage = None

    # Final summary
    print_separator("DIAGNOSTIC SUMMARY")

    if configured_backend == 'cloudflare' and cloudflare_ready and token_valid and storage:
        print_status("success", "Cloudflare backend should be working correctly")
        print(f"   Configuration loaded: {configured_backend}")
        print(f"   Required variables set: {cloudflare_ready}")
        print(f"   Token verification: {'PASSED' if token_valid else 'NOT TESTED'}")
        print(f"   Storage instance created: {storage.__class__.__name__}")
    elif configured_backend == 'sqlite_vec' and storage:
        print_status("success", "SQLite-vec backend is working")
        print(f"   Configuration loaded: {configured_backend}")
        print(f"   Storage instance created: {storage.__class__.__name__}")
        if cloudflare_ready:
            print_status("warning", "Cloudflare variables are set but backend is sqlite_vec")
            print("         Check MCP_MEMORY_STORAGE_BACKEND environment variable")
    else:
        print_status("error", "Backend configuration has issues")
        print(f"   Configured backend: {configured_backend or 'FAILED TO LOAD'}")
        print(f"   Cloudflare variables ready: {cloudflare_ready}")
        print(f"   Storage created: {'YES' if storage else 'NO'}")

        print("\nTROUBLESHOOTING STEPS:")
        if not cloudflare_ready:
            print("   1. Set missing Cloudflare environment variables")
            print("   2. Create .env file with Cloudflare credentials")
        if not token_valid:
            print("   3. Verify Cloudflare API token is valid and has correct permissions")
            print("   4. Use account-scoped verification endpoint (see docs/troubleshooting/cloudflare-authentication.md)")
        if not configured_backend:
            print("   5. Fix environment variable loading issues")
        if configured_backend and not storage:
            print("   6. Check Cloudflare credentials and connectivity")

if __name__ == "__main__":
    main()
```

--------------------------------------------------------------------------------
/src/mcp_memory_service/consolidation/associations.py:
--------------------------------------------------------------------------------

```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Creative association discovery engine for memory connections."""

import random
import numpy as np
from typing import List, Dict, Any, Optional, Tuple, Set
from itertools import combinations
from datetime import datetime
from dataclasses import dataclass
import re

from .base import ConsolidationBase, ConsolidationConfig, MemoryAssociation
from ..models.memory import Memory

@dataclass
class AssociationAnalysis:
    """Analysis results for a potential memory association."""
    memory1_hash: str
    memory2_hash: str
    similarity_score: float
    connection_reasons: List[str]
    shared_concepts: List[str]
    temporal_relationship: Optional[str]
    tag_overlap: List[str]
    confidence_score: float

class CreativeAssociationEngine(ConsolidationBase):
    """
    Discovers creative connections between seemingly unrelated memories.
    
    Similar to how dreams create unexpected associations, this engine randomly
    pairs memories to discover non-obvious connections in the "sweet spot"
    of moderate similarity (0.3-0.7 range).
    """
    
    def __init__(self, config: ConsolidationConfig):
        super().__init__(config)
        self.min_similarity = config.min_similarity
        self.max_similarity = config.max_similarity
        self.max_pairs_per_run = config.max_pairs_per_run
        
        # Compile regex patterns for concept extraction
        self._concept_patterns = {
            'urls': re.compile(r'https?://[^\s]+'),
            'emails': re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
            'dates': re.compile(r'\b\d{1,2}[/-]\d{1,2}[/-]\d{2,4}\b|\b\d{4}[/-]\d{1,2}[/-]\d{1,2}\b'),
            'numbers': re.compile(r'\b\d+\.?\d*\b'),
            'camelCase': re.compile(r'\b[a-z]+[A-Z][a-zA-Z]*\b'),
            'PascalCase': re.compile(r'\b[A-Z][a-z]*[A-Z][a-zA-Z]*\b'),
            'acronyms': re.compile(r'\b[A-Z]{2,}\b')
        }
    
    async def process(self, memories: List[Memory], **kwargs) -> List[MemoryAssociation]:
        """Discover creative associations between memories."""
        if not self._validate_memories(memories) or len(memories) < 2:
            return []
        
        # Get existing associations to avoid duplicates
        existing_associations = kwargs.get('existing_associations', set())
        
        # Sample memory pairs for analysis
        pairs = self._sample_memory_pairs(memories)
        
        associations = []
        for mem1, mem2 in pairs:
            # Skip if association already exists
            pair_key = tuple(sorted([mem1.content_hash, mem2.content_hash]))
            if pair_key in existing_associations:
                continue
            
            # Calculate semantic similarity
            similarity = await self._calculate_semantic_similarity(mem1, mem2)
            
            # Check if similarity is in the "sweet spot" for creative connections
            if self.min_similarity <= similarity <= self.max_similarity:
                analysis = await self._analyze_association(mem1, mem2, similarity)
                
                if analysis.confidence_score > 0.3:  # Minimum confidence threshold
                    association = await self._create_association_memory(analysis)
                    associations.append(association)
        
        self.logger.info(f"Discovered {len(associations)} creative associations from {len(pairs)} pairs")
        return associations
    
    def _sample_memory_pairs(self, memories: List[Memory]) -> List[Tuple[Memory, Memory]]:
        """Sample random pairs of memories for association discovery."""
        # Calculate maximum possible pairs
        total_possible = len(memories) * (len(memories) - 1) // 2
        max_pairs = min(self.max_pairs_per_run, total_possible)
        
        if total_possible <= max_pairs:
            # Return all possible pairs if total is manageable
            return list(combinations(memories, 2))
        else:
            # Randomly sample pairs to prevent combinatorial explosion
            all_pairs = list(combinations(memories, 2))
            return random.sample(all_pairs, max_pairs)
    
    async def _calculate_semantic_similarity(self, mem1: Memory, mem2: Memory) -> float:
        """Calculate semantic similarity between two memories using embeddings."""
        if not mem1.embedding or not mem2.embedding:
            # Fallback to text-based similarity if embeddings unavailable
            return self._calculate_text_similarity(mem1.content, mem2.content)
        
        # Use cosine similarity for embeddings
        embedding1 = np.array(mem1.embedding)
        embedding2 = np.array(mem2.embedding)
        
        # Normalize embeddings
        norm1 = np.linalg.norm(embedding1)
        norm2 = np.linalg.norm(embedding2)
        
        if norm1 == 0 or norm2 == 0:
            return 0.0
        
        # Calculate cosine similarity
        similarity = np.dot(embedding1, embedding2) / (norm1 * norm2)
        
        # Convert to 0-1 range (cosine similarity can be -1 to 1)
        return (similarity + 1) / 2
    
    def _calculate_text_similarity(self, text1: str, text2: str) -> float:
        """Fallback text similarity using word overlap."""
        words1 = set(text1.lower().split())
        words2 = set(text2.lower().split())
        
        if not words1 or not words2:
            return 0.0
        
        intersection = len(words1.intersection(words2))
        union = len(words1.union(words2))
        
        return intersection / union if union > 0 else 0.0
    
    async def _analyze_association(
        self, 
        mem1: Memory, 
        mem2: Memory, 
        similarity: float
    ) -> AssociationAnalysis:
        """Analyze why two memories might be associated."""
        connection_reasons = []
        shared_concepts = []
        tag_overlap = []
        temporal_relationship = None
        
        # Analyze tag overlap
        tags1 = set(mem1.tags)
        tags2 = set(mem2.tags)
        tag_overlap = list(tags1.intersection(tags2))
        if tag_overlap:
            connection_reasons.append("shared_tags")
        
        # Analyze temporal relationship
        temporal_relationship = self._analyze_temporal_relationship(mem1, mem2)
        if temporal_relationship:
            connection_reasons.append("temporal_proximity")
        
        # Extract and compare concepts
        concepts1 = self._extract_concepts(mem1.content)
        concepts2 = self._extract_concepts(mem2.content)
        shared_concepts = list(concepts1.intersection(concepts2))
        if shared_concepts:
            connection_reasons.append("shared_concepts")
        
        # Analyze content patterns
        if self._has_similar_structure(mem1.content, mem2.content):
            connection_reasons.append("similar_structure")
        
        if self._has_complementary_content(mem1.content, mem2.content):
            connection_reasons.append("complementary_content")
        
        # Calculate confidence score based on multiple factors
        confidence_score = self._calculate_confidence_score(
            similarity, len(connection_reasons), len(shared_concepts), len(tag_overlap)
        )
        
        return AssociationAnalysis(
            memory1_hash=mem1.content_hash,
            memory2_hash=mem2.content_hash,
            similarity_score=similarity,
            connection_reasons=connection_reasons,
            shared_concepts=shared_concepts,
            temporal_relationship=temporal_relationship,
            tag_overlap=tag_overlap,
            confidence_score=confidence_score
        )
    
    def _extract_concepts(self, text: str) -> Set[str]:
        """Extract key concepts from text using various patterns."""
        concepts = set()
        
        # Extract different types of concepts
        for concept_type, pattern in self._concept_patterns.items():
            matches = pattern.findall(text)
            concepts.update(matches)
        
        # Extract capitalized words (potential proper nouns)
        capitalized_words = re.findall(r'\b[A-Z][a-z]+\b', text)
        concepts.update(capitalized_words)
        
        # Extract quoted phrases
        quoted_phrases = re.findall(r'"([^"]*)"', text)
        concepts.update(quoted_phrases)
        
        # Extract common important words (filter out common stop words)
        stop_words = {'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
        words = re.findall(r'\b\w{4,}\b', text.lower())  # Words with 4+ characters
        important_words = [word for word in words if word not in stop_words]
        concepts.update(important_words[:10])  # Limit to top 10 words
        
        return concepts
    
    def _analyze_temporal_relationship(self, mem1: Memory, mem2: Memory) -> Optional[str]:
        """Analyze temporal relationship between memories."""
        if not (mem1.created_at and mem2.created_at):
            return None
        
        time_diff = abs(mem1.created_at - mem2.created_at)
        days_diff = time_diff / (24 * 3600)  # Convert to days
        
        if days_diff < 1:
            return "same_day"
        elif days_diff < 7:
            return "same_week"
        elif days_diff < 30:
            return "same_month"
        elif days_diff < 365:
            return "same_year"
        else:
            return "different_years"
    
    def _has_similar_structure(self, text1: str, text2: str) -> bool:
        """Check if texts have similar structural patterns."""
        # Check for similar formatting patterns
        patterns = [
            r'\n\s*[-*+]\s+',  # List items
            r'\n\s*\d+\.\s+',  # Numbered lists
            r'\n#{1,6}\s+',    # Headers
            r'```[\s\S]*?```', # Code blocks
            r'\[.*?\]\(.*?\)', # Links
        ]
        
        for pattern in patterns:
            matches1 = len(re.findall(pattern, text1))
            matches2 = len(re.findall(pattern, text2))
            
            if matches1 > 0 and matches2 > 0:
                return True
        
        return False
    
    def _has_complementary_content(self, text1: str, text2: str) -> bool:
        """Check if texts contain complementary information."""
        # Look for question-answer patterns
        has_question1 = bool(re.search(r'\?', text1))
        has_question2 = bool(re.search(r'\?', text2))
        
        # If one has questions and the other doesn't, they might be complementary
        if has_question1 != has_question2:
            return True
        
        # Look for problem-solution patterns
        problem_words = ['problem', 'issue', 'error', 'bug', 'fail', 'wrong']
        solution_words = ['solution', 'fix', 'resolve', 'answer', 'correct', 'solve']
        
        has_problem1 = any(word in text1.lower() for word in problem_words)
        has_solution1 = any(word in text1.lower() for word in solution_words)
        has_problem2 = any(word in text2.lower() for word in problem_words)
        has_solution2 = any(word in text2.lower() for word in solution_words)
        
        # Complementary if one focuses on problems, other on solutions
        if (has_problem1 and has_solution2) or (has_solution1 and has_problem2):
            return True
        
        return False
    
    def _calculate_confidence_score(
        self,
        similarity: float,
        num_reasons: int,
        num_shared_concepts: int,
        num_shared_tags: int
    ) -> float:
        """Calculate confidence score for the association."""
        base_score = similarity
        
        # Boost for multiple connection reasons
        reason_boost = min(0.3, num_reasons * 0.1)
        
        # Boost for shared concepts
        concept_boost = min(0.2, num_shared_concepts * 0.05)
        
        # Boost for shared tags
        tag_boost = min(0.2, num_shared_tags * 0.1)
        
        total_score = base_score + reason_boost + concept_boost + tag_boost
        
        return min(1.0, total_score)
    
    async def _create_association_memory(self, analysis: AssociationAnalysis) -> MemoryAssociation:
        """Create a memory association from analysis results."""
        return MemoryAssociation(
            source_memory_hashes=[analysis.memory1_hash, analysis.memory2_hash],
            similarity_score=analysis.similarity_score,
            connection_type=', '.join(analysis.connection_reasons),
            discovery_method="creative_association",
            discovery_date=datetime.now(),
            metadata={
                "shared_concepts": analysis.shared_concepts,
                "temporal_relationship": analysis.temporal_relationship,
                "tag_overlap": analysis.tag_overlap,
                "confidence_score": analysis.confidence_score,
                "analysis_version": "1.0"
            }
        )
    
    async def filter_high_confidence_associations(
        self,
        associations: List[MemoryAssociation],
        min_confidence: float = 0.5
    ) -> List[MemoryAssociation]:
        """Filter associations by confidence score."""
        return [
            assoc for assoc in associations
            if assoc.metadata.get('confidence_score', 0) >= min_confidence
        ]
    
    async def group_associations_by_type(
        self,
        associations: List[MemoryAssociation]
    ) -> Dict[str, List[MemoryAssociation]]:
        """Group associations by their connection type."""
        groups = {}
        for assoc in associations:
            conn_type = assoc.connection_type
            if conn_type not in groups:
                groups[conn_type] = []
            groups[conn_type].append(assoc)
        
        return groups
```

--------------------------------------------------------------------------------
/scripts/archive/check_missing_timestamps.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Check for memories without timestamps in the MCP memory service database.
This script analyzes the storage backend for entries missing timestamp data.
"""

import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))

import asyncio
import json
from typing import List, Dict, Any
from datetime import datetime

from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from mcp_memory_service.models.memory import Memory

class TimestampAnalyzer:
    """Analyze memory database for missing timestamp entries."""

    def __init__(self, storage_backend: str = "sqlite_vec", db_path: str = None):
        self.storage_backend = storage_backend
        self.db_path = db_path or os.path.expanduser("~/.mcp_memory_service/storage.db")
        self.storage = None

    async def setup(self):
        """Initialize storage backend."""
        print(f"=== Analyzing {self.storage_backend} database for timestamp issues ===")
        print(f"Database path: {self.db_path}")

        if not os.path.exists(self.db_path):
            print(f"❌ Database file not found: {self.db_path}")
            print("Possible locations:")
            common_paths = [
                os.path.expanduser("~/.mcp_memory_service/storage.db"),
                os.path.expanduser("~/.mcp_memory_service/memory.db"),
                "storage.db",
                "memory.db"
            ]
            for path in common_paths:
                if os.path.exists(path):
                    print(f"  ✅ Found: {path}")
                else:
                    print(f"  ❌ Not found: {path}")
            return False

        try:
            self.storage = SqliteVecMemoryStorage(
                db_path=self.db_path,
                embedding_model="all-MiniLM-L6-v2"
            )
            await self.storage.initialize()
            print("✅ Storage initialized successfully")
            return True
        except Exception as e:
            print(f"❌ Failed to initialize storage: {e}")
            return False

    async def get_all_memories(self) -> List[Memory]:
        """Retrieve all memories from the database."""
        try:
            # SQLite-Vec has a limit of 4096 for k in knn queries, so use direct database access
            return await self.get_memories_direct_query()
        except Exception as e:
            print(f"❌ Error with direct query, trying search approach: {e}")
            return await self.get_memories_via_search()

    async def get_memories_direct_query(self) -> List[Memory]:
        """Get all memories using direct database queries."""
        import sqlite3
        memories = []

        try:
            # Connect directly to SQLite database
            conn = sqlite3.connect(self.db_path)
            conn.row_factory = sqlite3.Row

            # Get all memory records
            cursor = conn.execute("""
                SELECT id, content, content_hash, tags, memory_type,
                       created_at, created_at_iso, updated_at, updated_at_iso, metadata
                FROM memories
                ORDER BY created_at DESC
            """)

            rows = cursor.fetchall()
            print(f"📊 Found {len(rows)} memory records in database")

            for i, row in enumerate(rows):
                try:
                    # Safely parse JSON fields
                    tags = []
                    if row['tags']:
                        try:
                            tags = json.loads(row['tags'])
                        except (json.JSONDecodeError, TypeError):
                            tags = []

                    metadata = {}
                    if row['metadata']:
                        try:
                            metadata = json.loads(row['metadata'])
                        except (json.JSONDecodeError, TypeError):
                            metadata = {}

                    # Reconstruct Memory object from database row
                    memory_dict = {
                        'content': row['content'] or '',
                        'content_hash': row['content_hash'] or '',
                        'tags': tags,
                        'memory_type': row['memory_type'] or 'unknown',
                        'created_at': row['created_at'],
                        'created_at_iso': row['created_at_iso'],
                        'updated_at': row['updated_at'],
                        'updated_at_iso': row['updated_at_iso'],
                        'metadata': metadata
                    }

                    memory = Memory.from_dict(memory_dict)
                    memories.append(memory)

                except Exception as e:
                    print(f"⚠️  Error processing memory {i+1}: {e}")
                    # Continue processing other memories
                    continue

            conn.close()
            return memories

        except Exception as e:
            print(f"❌ Direct query failed: {e}")
            if 'conn' in locals():
                conn.close()
            return []

    async def get_memories_via_search(self) -> List[Memory]:
        """Get memories using search with smaller batches."""
        memories = []

        try:
            # Try different search approaches with smaller limits
            search_queries = ["", "memory", "note", "session"]

            for query in search_queries:
                try:
                    results = await self.storage.retrieve(query, n_results=1000)  # Well under 4096 limit
                    batch_memories = [result.memory for result in results]

                    # Deduplicate based on content_hash
                    existing_hashes = {m.content_hash for m in memories}
                    new_memories = [m for m in batch_memories if m.content_hash not in existing_hashes]
                    memories.extend(new_memories)

                    print(f"📊 Query '{query}': {len(batch_memories)} results, {len(new_memories)} new")

                except Exception as e:
                    print(f"⚠️  Query '{query}' failed: {e}")
                    continue

            print(f"📊 Total unique memories retrieved: {len(memories)}")
            return memories

        except Exception as e:
            print(f"❌ All search approaches failed: {e}")
            return []

    def analyze_timestamp_fields(self, memories: List[Memory]) -> Dict[str, Any]:
        """Analyze timestamp fields across all memories."""
        analysis = {
            "total_memories": len(memories),
            "missing_created_at": 0,
            "missing_created_at_iso": 0,
            "missing_both_timestamps": 0,
            "invalid_timestamps": 0,
            "problematic_memories": [],
            "timestamp_formats": set(),
            "timestamp_range": {"earliest": None, "latest": None}
        }

        for memory in memories:
            has_created_at = memory.created_at is not None
            has_created_at_iso = memory.created_at_iso is not None

            # Track missing timestamp fields
            if not has_created_at:
                analysis["missing_created_at"] += 1

            if not has_created_at_iso:
                analysis["missing_created_at_iso"] += 1

            if not has_created_at and not has_created_at_iso:
                analysis["missing_both_timestamps"] += 1
                analysis["problematic_memories"].append({
                    "content_hash": memory.content_hash,
                    "content_preview": memory.content[:100] + "..." if len(memory.content) > 100 else memory.content,
                    "tags": memory.tags,
                    "memory_type": memory.memory_type,
                    "issue": "missing_both_timestamps"
                })

            # Track timestamp formats and ranges
            if has_created_at_iso:
                analysis["timestamp_formats"].add(type(memory.created_at_iso).__name__)

            if has_created_at:
                try:
                    if analysis["timestamp_range"]["earliest"] is None or memory.created_at < analysis["timestamp_range"]["earliest"]:
                        analysis["timestamp_range"]["earliest"] = memory.created_at
                    if analysis["timestamp_range"]["latest"] is None or memory.created_at > analysis["timestamp_range"]["latest"]:
                        analysis["timestamp_range"]["latest"] = memory.created_at
                except:
                    analysis["invalid_timestamps"] += 1
                    analysis["problematic_memories"].append({
                        "content_hash": memory.content_hash,
                        "content_preview": memory.content[:100] + "..." if len(memory.content) > 100 else memory.content,
                        "created_at": str(memory.created_at),
                        "issue": "invalid_timestamp"
                    })

        # Convert set to list for JSON serialization
        analysis["timestamp_formats"] = list(analysis["timestamp_formats"])

        return analysis

    def print_analysis_report(self, analysis: Dict[str, Any]):
        """Print a detailed analysis report."""
        print("\n" + "="*70)
        print("TIMESTAMP ANALYSIS REPORT")
        print("="*70)

        total = analysis["total_memories"]

        print(f"\n📊 OVERVIEW:")
        print(f"  Total memories analyzed: {total}")
        print(f"  Missing created_at (float): {analysis['missing_created_at']}")
        print(f"  Missing created_at_iso (ISO string): {analysis['missing_created_at_iso']}")
        print(f"  Missing both timestamps: {analysis['missing_both_timestamps']}")
        print(f"  Invalid timestamp values: {analysis['invalid_timestamps']}")

        if total > 0:
            print(f"\n📈 PERCENTAGES:")
            print(f"  Missing created_at: {analysis['missing_created_at']/total*100:.1f}%")
            print(f"  Missing created_at_iso: {analysis['missing_created_at_iso']/total*100:.1f}%")
            print(f"  Missing both: {analysis['missing_both_timestamps']/total*100:.1f}%")
            print(f"  Invalid timestamps: {analysis['invalid_timestamps']/total*100:.1f}%")

        print(f"\n🕐 TIMESTAMP RANGE:")
        if analysis["timestamp_range"]["earliest"] and analysis["timestamp_range"]["latest"]:
            earliest = datetime.fromtimestamp(analysis["timestamp_range"]["earliest"])
            latest = datetime.fromtimestamp(analysis["timestamp_range"]["latest"])
            print(f"  Earliest: {earliest} ({analysis['timestamp_range']['earliest']})")
            print(f"  Latest: {latest} ({analysis['timestamp_range']['latest']})")
        else:
            print("  No valid timestamps found")

        print(f"\n📝 TIMESTAMP FORMATS DETECTED:")
        for fmt in analysis["timestamp_formats"]:
            print(f"  - {fmt}")

        if analysis["problematic_memories"]:
            print(f"\n⚠️  PROBLEMATIC MEMORIES ({len(analysis['problematic_memories'])}):")
            for i, memory in enumerate(analysis["problematic_memories"][:10]):  # Show first 10
                print(f"  {i+1}. Issue: {memory['issue']}")
                print(f"     Content: {memory['content_preview']}")
                print(f"     Hash: {memory['content_hash']}")
                if 'tags' in memory:
                    print(f"     Tags: {memory.get('tags', [])}")
                print()

            if len(analysis["problematic_memories"]) > 10:
                print(f"  ... and {len(analysis['problematic_memories']) - 10} more")

        # Health assessment
        print(f"\n🏥 DATABASE HEALTH ASSESSMENT:")
        if analysis["missing_both_timestamps"] == 0:
            print("  ✅ EXCELLENT: All memories have at least one timestamp field")
        elif analysis["missing_both_timestamps"] < total * 0.1:
            print(f"  ⚠️  GOOD: Only {analysis['missing_both_timestamps']} memories missing all timestamps")
        elif analysis["missing_both_timestamps"] < total * 0.5:
            print(f"  ⚠️  CONCERNING: {analysis['missing_both_timestamps']} memories missing all timestamps")
        else:
            print(f"  ❌ CRITICAL: {analysis['missing_both_timestamps']} memories missing all timestamps")

        if analysis["missing_created_at"] > 0 or analysis["missing_created_at_iso"] > 0:
            print("  💡 RECOMMENDATION: Run timestamp migration script to fix missing fields")

    async def run_analysis(self):
        """Run the complete timestamp analysis."""
        if not await self.setup():
            return False

        memories = await self.get_all_memories()
        if not memories:
            print("⚠️  No memories found in database")
            return False

        analysis = self.analyze_timestamp_fields(memories)
        self.print_analysis_report(analysis)

        # Save detailed report to file
        report_file = "timestamp_analysis_report.json"
        with open(report_file, 'w') as f:
            # Convert any datetime objects to strings for JSON serialization
            json_analysis = analysis.copy()
            if json_analysis["timestamp_range"]["earliest"]:
                json_analysis["timestamp_range"]["earliest_iso"] = datetime.fromtimestamp(json_analysis["timestamp_range"]["earliest"]).isoformat()
            if json_analysis["timestamp_range"]["latest"]:
                json_analysis["timestamp_range"]["latest_iso"] = datetime.fromtimestamp(json_analysis["timestamp_range"]["latest"]).isoformat()

            json.dump(json_analysis, f, indent=2, default=str)

        print(f"\n📄 Detailed report saved to: {report_file}")

        return analysis["missing_both_timestamps"] == 0

async def main():
    """Main analysis execution."""
    import argparse

    parser = argparse.ArgumentParser(description="Check for memories without timestamps")
    parser.add_argument("--db-path", help="Path to database file")
    parser.add_argument("--storage", default="sqlite_vec", choices=["sqlite_vec"],
                       help="Storage backend to analyze")

    args = parser.parse_args()

    analyzer = TimestampAnalyzer(
        storage_backend=args.storage,
        db_path=args.db_path
    )

    success = await analyzer.run_analysis()
    return 0 if success else 1

if __name__ == "__main__":
    exit_code = asyncio.run(main())
    sys.exit(exit_code)
```

--------------------------------------------------------------------------------
/scripts/installation/install_windows_service.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Windows Service installer for MCP Memory Service.
Installs the service using Python's Windows service capabilities.
"""
import os
import sys
import json
import argparse
import subprocess
from pathlib import Path

# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent))

try:
    from scripts.service_utils import (
        get_project_root, get_service_paths, get_service_environment,
        generate_api_key, save_service_config, load_service_config,
        check_dependencies, get_service_command, print_service_info,
        require_admin
    )
except ImportError as e:
    print(f"Error importing service utilities: {e}")
    print("Please ensure you're running this from the project directory")
    sys.exit(1)


SERVICE_NAME = "MCPMemoryService"
SERVICE_DISPLAY_NAME = "MCP Memory Service"
SERVICE_DESCRIPTION = "Semantic memory and persistent storage service for Claude Desktop"


def create_windows_service_script():
    """Create the Windows service wrapper script."""
    paths = get_service_paths()
    service_script = paths['scripts_dir'] / 'mcp_memory_windows_service.py'
    
    script_content = '''#!/usr/bin/env python3
"""
Windows Service wrapper for MCP Memory Service.
This script runs as a Windows service and manages the MCP Memory server process.
"""
import os
import sys
import time
import subprocess
import win32serviceutil
import win32service
import win32event
import servicemanager
import socket
import json
from pathlib import Path


class MCPMemoryService(win32serviceutil.ServiceFramework):
    _svc_name_ = "MCPMemoryService"
    _svc_display_name_ = "MCP Memory Service"
    _svc_description_ = "Semantic memory and persistent storage service for Claude Desktop"
    
    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
        socket.setdefaulttimeout(60)
        self.is_running = True
        self.process = None
        
    def SvcStop(self):
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        win32event.SetEvent(self.hWaitStop)
        self.is_running = False
        
        # Stop the subprocess
        if self.process:
            self.process.terminate()
            try:
                self.process.wait(timeout=10)
            except subprocess.TimeoutExpired:
                self.process.kill()
                
    def SvcDoRun(self):
        servicemanager.LogMsg(
            servicemanager.EVENTLOG_INFORMATION_TYPE,
            servicemanager.PYS_SERVICE_STARTED,
            (self._svc_name_, '')
        )
        self.main()
        
    def main(self):
        # Load service configuration
        config_dir = Path.home() / '.mcp_memory_service'
        config_file = config_dir / 'service_config.json'
        
        if not config_file.exists():
            servicemanager.LogErrorMsg("Service configuration not found")
            return
            
        with open(config_file, 'r') as f:
            config = json.load(f)
        
        # Set up environment
        env = os.environ.copy()
        env.update(config['environment'])
        
        # Start the service process
        try:
            self.process = subprocess.Popen(
                config['command'],
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            
            servicemanager.LogMsg(
                servicemanager.EVENTLOG_INFORMATION_TYPE,
                0,
                "MCP Memory Service process started"
            )
            
            # Monitor the process
            while self.is_running:
                if self.process.poll() is not None:
                    # Process died, log error and restart
                    stdout, stderr = self.process.communicate()
                    servicemanager.LogErrorMsg(
                        f"Service process died unexpectedly. stderr: {stderr}"
                    )
                    
                    # Wait a bit before restarting
                    time.sleep(5)
                    
                    # Restart the process
                    self.process = subprocess.Popen(
                        config['command'],
                        env=env,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE,
                        text=True
                    )
                    
                # Check if we should stop
                if win32event.WaitForSingleObject(self.hWaitStop, 1000) == win32event.WAIT_OBJECT_0:
                    break
                    
        except Exception as e:
            servicemanager.LogErrorMsg(f"Error in service: {str(e)}")


if __name__ == '__main__':
    if len(sys.argv) == 1:
        servicemanager.Initialize()
        servicemanager.PrepareToHostSingle(MCPMemoryService)
        servicemanager.StartServiceCtrlDispatcher()
    else:
        win32serviceutil.HandleCommandLine(MCPMemoryService)
'''
    
    with open(service_script, 'w') as f:
        f.write(script_content)
    
    return service_script


def create_batch_scripts():
    """Create convenient batch scripts for service management."""
    paths = get_service_paths()
    scripts_dir = paths['scripts_dir'] / 'windows'
    scripts_dir.mkdir(exist_ok=True)
    
    # Start service batch file
    start_script = scripts_dir / 'start_service.bat'
    with open(start_script, 'w') as f:
        f.write(f'''@echo off
echo Starting {SERVICE_DISPLAY_NAME}...
net start {SERVICE_NAME}
if %ERRORLEVEL% == 0 (
    echo Service started successfully!
) else (
    echo Failed to start service. Run as Administrator if needed.
)
pause
''')
    
    # Stop service batch file
    stop_script = scripts_dir / 'stop_service.bat'
    with open(stop_script, 'w') as f:
        f.write(f'''@echo off
echo Stopping {SERVICE_DISPLAY_NAME}...
net stop {SERVICE_NAME}
if %ERRORLEVEL% == 0 (
    echo Service stopped successfully!
) else (
    echo Failed to stop service. Run as Administrator if needed.
)
pause
''')
    
    # Status batch file
    status_script = scripts_dir / 'service_status.bat'
    with open(status_script, 'w') as f:
        f.write(f'''@echo off
echo Checking {SERVICE_DISPLAY_NAME} status...
sc query {SERVICE_NAME}
pause
''')
    
    # Uninstall batch file
    uninstall_script = scripts_dir / 'uninstall_service.bat'
    with open(uninstall_script, 'w') as f:
        f.write(f'''@echo off
echo This will uninstall {SERVICE_DISPLAY_NAME}.
echo.
set /p confirm="Are you sure? (Y/N): "
if /i "%confirm%" neq "Y" exit /b

echo Stopping service...
net stop {SERVICE_NAME} 2>nul

echo Uninstalling service...
python "{paths['scripts_dir'] / 'install_windows_service.py'}" --uninstall

pause
''')
    
    return scripts_dir


def install_service():
    """Install the Windows service."""
    # Check if pywin32 is installed
    try:
        import win32serviceutil
        import win32service
    except ImportError:
        print("\n❌ ERROR: pywin32 is required for Windows service installation")
        print("Please install it with: pip install pywin32")
        sys.exit(1)
    
    # Require administrator privileges
    require_admin("Administrator privileges are required to install Windows services")
    
    print("\n🔍 Checking dependencies...")
    deps_ok, deps_msg = check_dependencies()
    if not deps_ok:
        print(f"❌ {deps_msg}")
        sys.exit(1)
    print(f"✅ {deps_msg}")
    
    # Generate API key
    api_key = generate_api_key()
    print(f"\n🔑 Generated API key: {api_key}")
    
    # Create service configuration
    config = {
        'service_name': SERVICE_NAME,
        'api_key': api_key,
        'command': get_service_command(),
        'environment': get_service_environment()
    }
    config['environment']['MCP_API_KEY'] = api_key
    
    # Save configuration
    config_file = save_service_config(config)
    print(f"💾 Saved configuration to: {config_file}")
    
    # Create service wrapper script
    print("\n📝 Creating service wrapper...")
    service_script = create_windows_service_script()
    
    # Install the service using the wrapper
    print(f"\n🚀 Installing {SERVICE_DISPLAY_NAME}...")
    
    try:
        # First, try to stop and remove existing service
        subprocess.run([
            sys.executable, str(service_script), 'stop'
        ], capture_output=True)
        subprocess.run([
            sys.executable, str(service_script), 'remove'
        ], capture_output=True)
        
        # Install the service
        result = subprocess.run([
            sys.executable, str(service_script), 'install'
        ], capture_output=True, text=True)
        
        if result.returncode != 0:
            print(f"❌ Failed to install service: {result.stderr}")
            sys.exit(1)
            
        # Configure service for automatic startup
        subprocess.run([
            'sc', 'config', SERVICE_NAME, 'start=', 'auto'
        ], capture_output=True)
        
        # Set service description
        subprocess.run([
            'sc', 'description', SERVICE_NAME, SERVICE_DESCRIPTION
        ], capture_output=True)
        
        print(f"✅ Service installed successfully!")
        
    except Exception as e:
        print(f"❌ Error installing service: {e}")
        sys.exit(1)
    
    # Create batch scripts
    scripts_dir = create_batch_scripts()
    print(f"\n📁 Created management scripts in: {scripts_dir}")
    
    # Print service information
    platform_info = {
        'Start Service': f'net start {SERVICE_NAME}',
        'Stop Service': f'net stop {SERVICE_NAME}',
        'Service Status': f'sc query {SERVICE_NAME}',
        'Uninstall': f'python "{Path(__file__)}" --uninstall'
    }
    
    print_service_info(api_key, platform_info)
    
    return True


def uninstall_service():
    """Uninstall the Windows service."""
    require_admin("Administrator privileges are required to uninstall Windows services")
    
    print(f"\n🗑️  Uninstalling {SERVICE_DISPLAY_NAME}...")
    
    paths = get_service_paths()
    service_script = paths['scripts_dir'] / 'mcp_memory_windows_service.py'
    
    if not service_script.exists():
        # Try using sc command directly
        result = subprocess.run([
            'sc', 'delete', SERVICE_NAME
        ], capture_output=True, text=True)
        
        if result.returncode == 0:
            print("✅ Service uninstalled successfully!")
        else:
            print(f"❌ Failed to uninstall service: {result.stderr}")
    else:
        # Stop the service first
        subprocess.run([
            sys.executable, str(service_script), 'stop'
        ], capture_output=True)
        
        # Remove the service
        result = subprocess.run([
            sys.executable, str(service_script), 'remove'
        ], capture_output=True, text=True)
        
        if result.returncode == 0:
            print("✅ Service uninstalled successfully!")
        else:
            print(f"❌ Failed to uninstall service: {result.stderr}")


def start_service():
    """Start the Windows service."""
    print(f"\n▶️  Starting {SERVICE_DISPLAY_NAME}...")
    
    result = subprocess.run([
        'net', 'start', SERVICE_NAME
    ], capture_output=True, text=True)
    
    if result.returncode == 0:
        print("✅ Service started successfully!")
    else:
        if "already been started" in result.stderr:
            print("ℹ️  Service is already running")
        else:
            print(f"❌ Failed to start service: {result.stderr}")
            print("\n💡 Try running as Administrator if you see access denied errors")


def stop_service():
    """Stop the Windows service."""
    print(f"\n⏹️  Stopping {SERVICE_DISPLAY_NAME}...")
    
    result = subprocess.run([
        'net', 'stop', SERVICE_NAME
    ], capture_output=True, text=True)
    
    if result.returncode == 0:
        print("✅ Service stopped successfully!")
    else:
        if "is not started" in result.stderr:
            print("ℹ️  Service is not running")
        else:
            print(f"❌ Failed to stop service: {result.stderr}")


def service_status():
    """Check the Windows service status."""
    print(f"\n📊 {SERVICE_DISPLAY_NAME} Status:")
    print("-" * 40)
    
    result = subprocess.run([
        'sc', 'query', SERVICE_NAME
    ], capture_output=True, text=True)
    
    if result.returncode == 0:
        # Parse the output
        for line in result.stdout.splitlines():
            if "STATE" in line:
                if "RUNNING" in line:
                    print("✅ Service is RUNNING")
                elif "STOPPED" in line:
                    print("⏹️  Service is STOPPED")
                else:
                    print(f"ℹ️  {line.strip()}")
            elif "SERVICE_NAME:" in line:
                print(f"Service Name: {SERVICE_NAME}")
    else:
        print("❌ Service is not installed")
    
    # Show configuration if available
    config = load_service_config()
    if config:
        print(f"\n📋 Configuration:")
        print(f"  API Key: {config.get('api_key', 'Not set')}")
        print(f"  Config File: {get_service_paths()['config_dir'] / 'service_config.json'}")


def main():
    """Main entry point."""
    parser = argparse.ArgumentParser(
        description="Windows Service installer for MCP Memory Service"
    )
    
    parser.add_argument('--uninstall', action='store_true', help='Uninstall the service')
    parser.add_argument('--start', action='store_true', help='Start the service')
    parser.add_argument('--stop', action='store_true', help='Stop the service')
    parser.add_argument('--status', action='store_true', help='Check service status')
    parser.add_argument('--restart', action='store_true', help='Restart the service')
    
    args = parser.parse_args()
    
    if args.uninstall:
        uninstall_service()
    elif args.start:
        start_service()
    elif args.stop:
        stop_service()
    elif args.status:
        service_status()
    elif args.restart:
        stop_service()
        start_service()
    else:
        # Default action is to install
        install_service()


if __name__ == '__main__':
    main()
```

--------------------------------------------------------------------------------
/src/mcp_memory_service/web/api/mcp.py:
--------------------------------------------------------------------------------

```python
"""
MCP (Model Context Protocol) endpoints for Claude Code integration.

This module provides MCP protocol endpoints that allow Claude Code clients
to directly access memory operations using the MCP standard.
"""

import asyncio
import json
import logging
from typing import Dict, List, Any, Optional, Union, TYPE_CHECKING
from fastapi import APIRouter, HTTPException, Request, Depends
from fastapi.responses import JSONResponse
from pydantic import BaseModel, ConfigDict

from ..dependencies import get_storage
from ...utils.hashing import generate_content_hash
from ...config import OAUTH_ENABLED

# Import OAuth dependencies only when needed
if OAUTH_ENABLED or TYPE_CHECKING:
    from ..oauth.middleware import require_read_access, require_write_access, AuthenticationResult
else:
    # Provide type stubs when OAuth is disabled
    AuthenticationResult = None
    require_read_access = None
    require_write_access = None

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/mcp", tags=["mcp"])


class MCPRequest(BaseModel):
    """MCP protocol request structure."""
    jsonrpc: str = "2.0"
    id: Optional[Union[str, int]] = None
    method: str
    params: Optional[Dict[str, Any]] = None


class MCPResponse(BaseModel):
    """MCP protocol response structure.

    Note: JSON-RPC 2.0 spec requires that successful responses EXCLUDE the 'error'
    field entirely (not include it as null), and error responses EXCLUDE 'result'.
    The exclude_none config ensures proper compliance.
    """
    model_config = ConfigDict(exclude_none=True)

    jsonrpc: str = "2.0"
    id: Optional[Union[str, int]] = None
    result: Optional[Dict[str, Any]] = None
    error: Optional[Dict[str, Any]] = None


class MCPTool(BaseModel):
    """MCP tool definition."""
    name: str
    description: str
    inputSchema: Dict[str, Any]


# Define MCP tools available
MCP_TOOLS = [
    MCPTool(
        name="store_memory",
        description="Store a new memory with optional tags, metadata, and client information",
        inputSchema={
            "type": "object",
            "properties": {
                "content": {"type": "string", "description": "The memory content to store"},
                "tags": {"type": "array", "items": {"type": "string"}, "description": "Optional tags for the memory"},
                "memory_type": {"type": "string", "description": "Optional memory type (e.g., 'note', 'reminder', 'fact')"},
                "metadata": {"type": "object", "description": "Additional metadata for the memory"},
                "client_hostname": {"type": "string", "description": "Client machine hostname for source tracking"}
            },
            "required": ["content"]
        }
    ),
    MCPTool(
        name="retrieve_memory", 
        description="Search and retrieve memories using semantic similarity",
        inputSchema={
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "Search query for finding relevant memories"},
                "limit": {"type": "integer", "description": "Maximum number of memories to return", "default": 10},
                "similarity_threshold": {"type": "number", "description": "Minimum similarity score threshold (0.0-1.0)", "default": 0.7, "minimum": 0.0, "maximum": 1.0}
            },
            "required": ["query"]
        }
    ),
    MCPTool(
        name="recall_memory",
        description="Retrieve memories using natural language time expressions and optional semantic search",
        inputSchema={
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "Natural language query specifying the time frame or content to recall"},
                "n_results": {"type": "integer", "description": "Maximum number of results to return", "default": 5}
            },
            "required": ["query"]
        }
    ),
    MCPTool(
        name="search_by_tag",
        description="Search memories by specific tags",
        inputSchema={
            "type": "object", 
            "properties": {
                "tags": {"type": "array", "items": {"type": "string"}, "description": "Tags to search for"},
                "operation": {"type": "string", "enum": ["AND", "OR"], "description": "Tag search operation", "default": "AND"}
            },
            "required": ["tags"]
        }
    ),
    MCPTool(
        name="delete_memory",
        description="Delete a specific memory by content hash",
        inputSchema={
            "type": "object",
            "properties": {
                "content_hash": {"type": "string", "description": "Hash of the memory to delete"}
            },
            "required": ["content_hash"]
        }
    ),
    MCPTool(
        name="check_database_health",
        description="Check the health and status of the memory database",
        inputSchema={
            "type": "object",
            "properties": {}
        }
    ),
    MCPTool(
        name="list_memories",
        description="List memories with pagination and optional filtering",
        inputSchema={
            "type": "object",
            "properties": {
                "page": {"type": "integer", "description": "Page number (1-based)", "default": 1, "minimum": 1},
                "page_size": {"type": "integer", "description": "Number of memories per page", "default": 10, "minimum": 1, "maximum": 100},
                "tag": {"type": "string", "description": "Filter by specific tag"},
                "memory_type": {"type": "string", "description": "Filter by memory type"}
            }
        }
    ),
]


@router.post("/")
@router.post("")
async def mcp_endpoint(
    request: MCPRequest,
    user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
    """Main MCP protocol endpoint for processing MCP requests."""
    try:
        storage = get_storage()

        if request.method == "initialize":
            response = MCPResponse(
                id=request.id,
                result={
                    "protocolVersion": "2024-11-05",
                    "capabilities": {
                        "tools": {}
                    },
                    "serverInfo": {
                        "name": "mcp-memory-service",
                        "version": "4.1.1"
                    }
                }
            )
            return JSONResponse(content=response.model_dump(exclude_none=True))

        elif request.method == "tools/list":
            response = MCPResponse(
                id=request.id,
                result={
                    "tools": [tool.model_dump() for tool in MCP_TOOLS]
                }
            )
            return JSONResponse(content=response.model_dump(exclude_none=True))

        elif request.method == "tools/call":
            tool_name = request.params.get("name") if request.params else None
            arguments = request.params.get("arguments", {}) if request.params else {}

            result = await handle_tool_call(storage, tool_name, arguments)

            response = MCPResponse(
                id=request.id,
                result={
                    "content": [
                        {
                            "type": "text",
                            "text": json.dumps(result)
                        }
                    ]
                }
            )
            return JSONResponse(content=response.model_dump(exclude_none=True))

        else:
            response = MCPResponse(
                id=request.id,
                error={
                    "code": -32601,
                    "message": f"Method not found: {request.method}"
                }
            )
            return JSONResponse(content=response.model_dump(exclude_none=True))

    except Exception as e:
        logger.error(f"MCP endpoint error: {e}")
        response = MCPResponse(
            id=request.id,
            error={
                "code": -32603,
                "message": f"Internal error: {str(e)}"
            }
        )
        return JSONResponse(content=response.model_dump(exclude_none=True))


async def handle_tool_call(storage, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
    """Handle MCP tool calls and route to appropriate memory operations."""
    
    if tool_name == "store_memory":
        from mcp_memory_service.models.memory import Memory
        
        content = arguments.get("content")
        tags = arguments.get("tags", [])
        memory_type = arguments.get("memory_type")
        metadata = arguments.get("metadata", {})
        client_hostname = arguments.get("client_hostname")
        
        # Ensure metadata is a dict
        if isinstance(metadata, str):
            try:
                metadata = json.loads(metadata)
            except:
                metadata = {}
        elif not isinstance(metadata, dict):
            metadata = {}
        
        # Add client_hostname to metadata if provided
        if client_hostname:
            metadata["client_hostname"] = client_hostname
        
        content_hash = generate_content_hash(content, metadata)
        
        memory = Memory(
            content=content,
            content_hash=content_hash,
            tags=tags,
            memory_type=memory_type,
            metadata=metadata
        )
        
        success, message = await storage.store(memory)
        
        return {
            "success": success,
            "message": message,
            "content_hash": memory.content_hash if success else None
        }
    
    elif tool_name == "retrieve_memory":
        query = arguments.get("query")
        limit = arguments.get("limit", 10)
        similarity_threshold = arguments.get("similarity_threshold", 0.0)
        
        # Get results from storage (no similarity filtering at storage level)
        results = await storage.retrieve(query=query, n_results=limit)
        
        # Apply similarity threshold filtering (same as API implementation)
        if similarity_threshold is not None:
            results = [
                result for result in results
                if result.relevance_score and result.relevance_score >= similarity_threshold
            ]
        
        return {
            "results": [
                {
                    "content": r.memory.content,
                    "content_hash": r.memory.content_hash,
                    "tags": r.memory.tags,
                    "similarity_score": r.relevance_score,
                    "created_at": r.memory.created_at_iso
                }
                for r in results
            ],
            "total_found": len(results)
        }

    elif tool_name == "recall_memory":
        query = arguments.get("query")
        n_results = arguments.get("n_results", 5)

        # Use storage recall_memory method which handles time expressions
        memories = await storage.recall_memory(query=query, n_results=n_results)

        return {
            "results": [
                {
                    "content": m.content,
                    "content_hash": m.content_hash,
                    "tags": m.tags,
                    "created_at": m.created_at_iso
                }
                for m in memories
            ],
            "total_found": len(memories)
        }

    elif tool_name == "search_by_tag":
        tags = arguments.get("tags")
        operation = arguments.get("operation", "AND")
        
        results = await storage.search_by_tags(tags=tags, operation=operation)
        
        return {
            "results": [
                {
                    "content": memory.content,
                    "content_hash": memory.content_hash,
                    "tags": memory.tags,
                    "created_at": memory.created_at_iso
                }
                for memory in results
            ],
            "total_found": len(results)
        }
    
    elif tool_name == "delete_memory":
        content_hash = arguments.get("content_hash")
        
        success, message = await storage.delete(content_hash)
        
        return {
            "success": success,
            "message": message
        }
    
    elif tool_name == "check_database_health":
        stats = await storage.get_stats()

        return {
            "status": "healthy",
            "statistics": stats
        }
    
    elif tool_name == "list_memories":
        page = arguments.get("page", 1)
        page_size = arguments.get("page_size", 10)
        tag = arguments.get("tag")
        memory_type = arguments.get("memory_type")
        
        # Calculate offset
        offset = (page - 1) * page_size

        # Use database-level filtering for better performance
        tags_list = [tag] if tag else None
        memories = await storage.get_all_memories(
            limit=page_size,
            offset=offset,
            memory_type=memory_type,
            tags=tags_list
        )
        
        return {
            "memories": [
                {
                    "content": memory.content,
                    "content_hash": memory.content_hash,
                    "tags": memory.tags,
                    "memory_type": memory.memory_type,
                    "metadata": memory.metadata,
                    "created_at": memory.created_at_iso,
                    "updated_at": memory.updated_at_iso
                }
                for memory in memories
            ],
            "page": page,
            "page_size": page_size,
            "total_found": len(memories)
        }
    
    
    else:
        raise ValueError(f"Unknown tool: {tool_name}")


@router.get("/tools")
async def list_mcp_tools(
    user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
    """List available MCP tools for discovery."""
    return {
        "tools": [tool.dict() for tool in MCP_TOOLS],
        "protocol": "mcp",
        "version": "1.0"
    }


@router.get("/health")
async def mcp_health():
    """MCP-specific health check."""
    storage = get_storage()
    stats = await storage.get_stats()

    return {
        "status": "healthy",
        "protocol": "mcp",
        "tools_available": len(MCP_TOOLS),
        "storage_backend": "sqlite-vec",
        "statistics": stats
    }
```

--------------------------------------------------------------------------------
/src/mcp_memory_service/web/api/search.py:
--------------------------------------------------------------------------------

```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Search endpoints for the HTTP interface.

Provides semantic search, tag-based search, and time-based recall functionality.
"""

import logging
from typing import List, Optional, Dict, Any, TYPE_CHECKING
from datetime import datetime, timedelta, timezone

from fastapi import APIRouter, HTTPException, Depends, Query
from pydantic import BaseModel, Field

from ...storage.base import MemoryStorage
from ...models.memory import Memory, MemoryQueryResult
from ...config import OAUTH_ENABLED
from ...utils.time_parser import parse_time_expression
from ..dependencies import get_storage
from .memories import MemoryResponse, memory_to_response
from ..sse import sse_manager, create_search_completed_event

# Constants
_TIME_SEARCH_CANDIDATE_POOL_SIZE = 100  # Number of candidates to retrieve for time filtering (reduced for performance)

# OAuth authentication imports (conditional)
if OAUTH_ENABLED or TYPE_CHECKING:
    from ..oauth.middleware import require_read_access, AuthenticationResult
else:
    # Provide type stubs when OAuth is disabled
    AuthenticationResult = None
    require_read_access = None

router = APIRouter()
logger = logging.getLogger(__name__)


# Request Models
class SemanticSearchRequest(BaseModel):
    """Request model for semantic similarity search."""
    query: str = Field(..., description="The search query for semantic similarity")
    n_results: int = Field(default=10, ge=1, le=100, description="Maximum number of results to return")
    similarity_threshold: Optional[float] = Field(None, ge=0.0, le=1.0, description="Minimum similarity score")


class TagSearchRequest(BaseModel):
    """Request model for tag-based search."""
    tags: List[str] = Field(..., description="List of tags to search for (ANY match)")
    match_all: bool = Field(default=False, description="If true, memory must have ALL tags; if false, ANY tag")
    time_filter: Optional[str] = Field(None, description="Optional natural language time filter (e.g., 'last week', 'yesterday')")


class TimeSearchRequest(BaseModel):
    """Request model for time-based search."""
    query: str = Field(..., description="Natural language time query (e.g., 'last week', 'yesterday')")
    n_results: int = Field(default=10, ge=1, le=100, description="Maximum number of results to return")
    semantic_query: Optional[str] = Field(None, description="Optional semantic query for relevance filtering within time range")


# Response Models
class SearchResult(BaseModel):
    """Individual search result with similarity score."""
    memory: MemoryResponse
    similarity_score: Optional[float] = Field(None, description="Similarity score (0-1, higher is more similar)")
    relevance_reason: Optional[str] = Field(None, description="Why this result was included")


class SearchResponse(BaseModel):
    """Response model for search operations."""
    results: List[SearchResult]
    total_found: int
    query: str
    search_type: str
    processing_time_ms: Optional[float] = None


def memory_query_result_to_search_result(query_result: MemoryQueryResult) -> SearchResult:
    """Convert MemoryQueryResult to SearchResult format."""
    return SearchResult(
        memory=memory_to_response(query_result.memory),
        similarity_score=query_result.relevance_score,
        relevance_reason=f"Semantic similarity: {query_result.relevance_score:.3f}" if query_result.relevance_score else None
    )


def memory_to_search_result(memory: Memory, reason: str = None) -> SearchResult:
    """Convert Memory to SearchResult format."""
    return SearchResult(
        memory=memory_to_response(memory),
        similarity_score=None,
        relevance_reason=reason
    )


@router.post("/search", response_model=SearchResponse, tags=["search"])
async def semantic_search(
    request: SemanticSearchRequest,
    storage: MemoryStorage = Depends(get_storage),
    user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
    """
    Perform semantic similarity search on memory content.
    
    Uses vector embeddings to find memories with similar meaning to the query,
    even if they don't share exact keywords.
    """
    import time
    start_time = time.time()
    
    try:
        # Perform semantic search using the storage layer
        query_results = await storage.retrieve(
            query=request.query,
            n_results=request.n_results
        )
        
        # Filter by similarity threshold if specified
        if request.similarity_threshold is not None:
            query_results = [
                result for result in query_results
                if result.relevance_score and result.relevance_score >= request.similarity_threshold
            ]
        
        # Convert to search results
        search_results = [
            memory_query_result_to_search_result(result)
            for result in query_results
        ]
        
        processing_time = (time.time() - start_time) * 1000
        
        # Broadcast SSE event for search completion
        try:
            event = create_search_completed_event(
                query=request.query,
                search_type="semantic",
                results_count=len(search_results),
                processing_time_ms=processing_time
            )
            await sse_manager.broadcast_event(event)
        except Exception as e:
            logger.warning(f"Failed to broadcast search_completed event: {e}")
        
        return SearchResponse(
            results=search_results,
            total_found=len(search_results),
            query=request.query,
            search_type="semantic",
            processing_time_ms=processing_time
        )
        
    except Exception as e:
        logger.error(f"Semantic search failed: {str(e)}")
        raise HTTPException(status_code=500, detail="Search operation failed. Please try again.")


@router.post("/search/by-tag", response_model=SearchResponse, tags=["search"])
async def tag_search(
    request: TagSearchRequest,
    storage: MemoryStorage = Depends(get_storage),
    user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
    """
    Search memories by tags with optional time filtering.

    Finds memories that contain any of the specified tags (OR search) or
    all of the specified tags (AND search) based on the match_all parameter.

    Optionally filters by time range using natural language expressions like
    'last week', 'yesterday', 'this month', etc.
    """
    import time
    start_time = time.time()

    try:
        if not request.tags:
            raise HTTPException(status_code=400, detail="At least one tag must be specified")

        # Parse time filter if provided
        time_start = None
        if request.time_filter:
            start_ts, _ = parse_time_expression(request.time_filter)
            time_start = start_ts if start_ts else None

        # Use the storage layer's tag search with optional time filtering
        memories = await storage.search_by_tag(request.tags, time_start=time_start)

        # If match_all is True, filter to only memories that have ALL tags
        if request.match_all and len(request.tags) > 1:
            tag_set = set(request.tags)
            memories = [
                memory for memory in memories
                if tag_set.issubset(set(memory.tags))
            ]

        # Convert to search results
        match_type = "ALL" if request.match_all else "ANY"
        search_results = [
            memory_to_search_result(
                memory,
                reason=f"Tags match ({match_type}): {', '.join(set(memory.tags) & set(request.tags))}"
            )
            for memory in memories
        ]

        processing_time = (time.time() - start_time) * 1000

        # Build query string with time filter info if present
        query_string = f"Tags: {', '.join(request.tags)} ({match_type})"
        if request.time_filter:
            query_string += f" | Time: {request.time_filter}"

        # Broadcast SSE event for search completion
        try:
            event = create_search_completed_event(
                query=query_string,
                search_type="tag",
                results_count=len(search_results),
                processing_time_ms=processing_time
            )
            await sse_manager.broadcast_event(event)
        except Exception as e:
            logger.warning(f"Failed to broadcast search_completed event: {e}")

        return SearchResponse(
            results=search_results,
            total_found=len(search_results),
            query=query_string,
            search_type="tag",
            processing_time_ms=processing_time
        )

    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Tag search failed: {str(e)}")


@router.post("/search/by-time", response_model=SearchResponse, tags=["search"])
async def time_search(
    request: TimeSearchRequest,
    storage: MemoryStorage = Depends(get_storage),
    user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
    """
    Search memories by time-based queries.
    
    Supports natural language time expressions like 'yesterday', 'last week',
    'this month', etc. Currently implements basic time filtering - full natural
    language parsing can be enhanced later.
    """
    import time
    start_time = time.time()
    
    try:
        # Parse time query using robust time_parser
        start_ts, end_ts = parse_time_expression(request.query)

        if start_ts is None and end_ts is None:
            raise HTTPException(
                status_code=400,
                detail=f"Could not parse time query: '{request.query}'. Try 'yesterday', 'last week', 'this month', etc."
            )

        # Retrieve memories within time range (with larger candidate pool if semantic query provided)
        candidate_pool_size = _TIME_SEARCH_CANDIDATE_POOL_SIZE if request.semantic_query else request.n_results
        query_results = await storage.recall(
            query=request.semantic_query.strip() if request.semantic_query and request.semantic_query.strip() else None,
            n_results=candidate_pool_size,
            start_timestamp=start_ts,
            end_timestamp=end_ts
        )

        # If semantic query was provided, results are already ranked by relevance
        # Otherwise, sort by recency (newest first)
        if not (request.semantic_query and request.semantic_query.strip()):
            query_results.sort(key=lambda r: r.memory.created_at or 0.0, reverse=True)

        # Limit results
        filtered_memories = query_results[:request.n_results]
        
        # Convert to search results
        search_results = [
            memory_query_result_to_search_result(result)
            for result in filtered_memories
        ]
        
        # Update relevance reason for time-based results
        for result in search_results:
            result.relevance_reason = f"Time match: {request.query}"
        
        processing_time = (time.time() - start_time) * 1000
        
        return SearchResponse(
            results=search_results,
            total_found=len(search_results),
            query=request.query,
            search_type="time",
            processing_time_ms=processing_time
        )
        
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Time search failed: {str(e)}")


@router.get("/search/similar/{content_hash}", response_model=SearchResponse, tags=["search"])
async def find_similar(
    content_hash: str,
    n_results: int = Query(default=10, ge=1, le=100, description="Number of similar memories to find"),
    storage: MemoryStorage = Depends(get_storage),
    user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
    """
    Find memories similar to a specific memory identified by its content hash.
    
    Uses the content of the specified memory as a search query to find
    semantically similar memories.
    """
    import time
    start_time = time.time()
    
    try:
        # First, get the target memory by searching with its hash
        # This is inefficient but works with current storage interface
        target_results = await storage.retrieve(content_hash, n_results=1)
        
        if not target_results or target_results[0].memory.content_hash != content_hash:
            raise HTTPException(status_code=404, detail="Memory not found")
        
        target_memory = target_results[0].memory
        
        # Use the target memory's content to find similar memories
        similar_results = await storage.retrieve(
            query=target_memory.content,
            n_results=n_results + 1  # +1 because the original will be included
        )
        
        # Filter out the original memory
        filtered_results = [
            result for result in similar_results
            if result.memory.content_hash != content_hash
        ][:n_results]
        
        # Convert to search results
        search_results = [
            memory_query_result_to_search_result(result)
            for result in filtered_results
        ]
        
        processing_time = (time.time() - start_time) * 1000
        
        return SearchResponse(
            results=search_results,
            total_found=len(search_results),
            query=f"Similar to: {target_memory.content[:50]}...",
            search_type="similar",
            processing_time_ms=processing_time
        )
        
    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Similar search failed: {str(e)}")

```

--------------------------------------------------------------------------------
/docs/natural-memory-triggers/cli-reference.md:
--------------------------------------------------------------------------------

```markdown
# Natural Memory Triggers v7.1.3 - CLI Reference

Complete reference for the CLI management system that provides real-time configuration and monitoring of Natural Memory Triggers without requiring file edits or Claude Code restarts.

## Overview

The CLI controller (`memory-mode-controller.js`) is the primary interface for managing Natural Memory Triggers. It provides:

- ✅ **Real-time configuration** changes without restart
- ✅ **Performance monitoring** and metrics
- ✅ **Profile management** for different workflows
- ✅ **Sensitivity tuning** for trigger frequency
- ✅ **System diagnostics** and health checks

## Command Syntax

```bash
node ~/.claude/hooks/memory-mode-controller.js <command> [options] [arguments]
```

## Core Commands

### `status` - System Status and Information

Display current system status, configuration, and performance metrics.

```bash
node memory-mode-controller.js status
```

**Output Example:**
```
📊 Memory Hook Status
Current Profile: balanced
Description: Moderate latency, smart memory triggers
Natural Triggers: enabled
Sensitivity: 0.6
Cooldown Period: 30000ms
Max Memories per Trigger: 5
Performance: 145ms avg latency, 2 degradation events
Cache Size: 12 entries
Conversation History: 8 messages
```

**Options:**
- `--verbose` - Show detailed performance metrics
- `--json` - Output in JSON format for scripting

```bash
node memory-mode-controller.js status --verbose
node memory-mode-controller.js status --json
```

### `profiles` - List Available Performance Profiles

Display all available performance profiles with descriptions and configurations.

```bash
node memory-mode-controller.js profiles
```

**Output:**
```
📋 Available Performance Profiles

🏃 speed_focused
  Max Latency: 100ms
  Enabled Tiers: instant
  Description: Fastest response, minimal memory awareness

⚖️ balanced (current)
  Max Latency: 200ms
  Enabled Tiers: instant, fast
  Description: Moderate latency, smart memory triggers

🧠 memory_aware
  Max Latency: 500ms
  Enabled Tiers: instant, fast, intensive
  Description: Full memory awareness, accept higher latency

🤖 adaptive
  Max Latency: auto-adjusting
  Enabled Tiers: dynamic
  Description: Auto-adjust based on performance and user preferences
```

### `profile` - Switch Performance Profile

Change the active performance profile for different workflow requirements.

```bash
node memory-mode-controller.js profile <profile_name>
```

**Available Profiles:**

#### `speed_focused` - Maximum Speed
```bash
node memory-mode-controller.js profile speed_focused
```
- **Latency**: < 100ms
- **Tiers**: Instant only (pattern matching, cache checks)
- **Use Case**: Quick coding sessions, pair programming
- **Trade-off**: Minimal memory awareness for maximum speed

#### `balanced` - Recommended Default
```bash
node memory-mode-controller.js profile balanced
```
- **Latency**: < 200ms
- **Tiers**: Instant + Fast (semantic analysis)
- **Use Case**: General development work, most productive for daily use
- **Trade-off**: Good balance of speed and context awareness

#### `memory_aware` - Maximum Context
```bash
node memory-mode-controller.js profile memory_aware
```
- **Latency**: < 500ms
- **Tiers**: All tiers (deep semantic understanding)
- **Use Case**: Complex projects, architectural decisions, research
- **Trade-off**: Maximum context awareness, higher latency acceptable

#### `adaptive` - Machine Learning
```bash
node memory-mode-controller.js profile adaptive
```
- **Latency**: Auto-adjusting based on usage patterns
- **Tiers**: Dynamic selection based on user feedback
- **Use Case**: Users who want the system to learn automatically
- **Trade-off**: Requires learning period but becomes highly personalized

### `sensitivity` - Adjust Trigger Sensitivity

Control how often Natural Memory Triggers activate by adjusting the confidence threshold.

```bash
node memory-mode-controller.js sensitivity <value>
```

**Sensitivity Values:**
- `0.0` - Maximum triggers (activates on any potential memory-seeking pattern)
- `0.4` - High sensitivity (more triggers, useful for research/architecture work)
- `0.6` - Balanced (recommended default)
- `0.8` - Low sensitivity (fewer triggers, high-confidence only)
- `1.0` - Minimum triggers (only explicit memory requests)

**Examples:**
```bash
# More triggers for architecture work
node memory-mode-controller.js sensitivity 0.4

# Balanced triggers (recommended)
node memory-mode-controller.js sensitivity 0.6

# Fewer triggers for focused coding
node memory-mode-controller.js sensitivity 0.8
```

## System Management Commands

### `enable` - Enable Natural Memory Triggers

Activate the Natural Memory Triggers system.

```bash
node memory-mode-controller.js enable
```

**Output:**
```
✅ Natural Memory Triggers enabled
Current sensitivity: 0.6
Active profile: balanced
Ready to detect memory-seeking patterns
```

### `disable` - Disable Natural Memory Triggers

Temporarily disable the Natural Memory Triggers system without uninstalling.

```bash
node memory-mode-controller.js disable
```

**Output:**
```
⏸️ Natural Memory Triggers disabled
Manual memory commands still available
Use 'enable' to reactivate triggers
```

### `reset` - Reset to Default Settings

Reset all configuration to default values.

```bash
node memory-mode-controller.js reset
```

**What gets reset:**
- Performance profile → `balanced`
- Sensitivity → `0.6`
- Natural triggers → `enabled`
- Cooldown period → `30000ms`
- Max memories per trigger → `5`

**Confirmation prompt:**
```
⚠️ This will reset all Natural Memory Triggers settings to defaults.
Are you sure? (y/N): y
✅ Settings reset to defaults
```

**Options:**
- `--force` - Skip confirmation prompt

```bash
node memory-mode-controller.js reset --force
```

## Testing and Diagnostics

### `test` - Test Trigger Detection

Test the trigger detection system with a specific query to see how it would be processed.

```bash
node memory-mode-controller.js test "your test query"
```

**Example:**
```bash
node memory-mode-controller.js test "What did we decide about authentication?"
```

**Output:**
```
🧪 Testing Natural Memory Triggers

Query: "What did we decide about authentication?"
Processing tiers: instant → fast → intensive

Tier 1 (Instant): 42ms
  - Pattern match: ✅ "what...decide" detected
  - Cache check: ❌ No cached result
  - Confidence: 0.85

Tier 2 (Fast): 127ms
  - Key phrases: ["decide", "authentication"]
  - Topic shift: 0.2 (moderate)
  - Question pattern: ✅ Detected
  - Confidence: 0.78

Memory Query Generated:
  - Type: recent-development
  - Query: "authentication decision approach implementation"
  - Weight: 1.0

Result: Would trigger memory retrieval (confidence 0.85 > threshold 0.6)
```

### `metrics` - Performance Metrics

Display detailed performance metrics and system health information.

```bash
node memory-mode-controller.js metrics
```

**Output:**
```
📊 Natural Memory Triggers Performance Metrics

System Performance:
  - Active Profile: balanced
  - Average Latency: 145ms
  - Degradation Events: 2
  - User Tolerance: 0.7

Tier Performance:
  - Instant Tier: 47ms avg (120 calls)
  - Fast Tier: 142ms avg (89 calls)
  - Intensive Tier: 387ms avg (23 calls)

Trigger Statistics:
  - Total Triggers: 45
  - Success Rate: 89%
  - False Positives: 5%
  - User Satisfaction: 87%

Cache Performance:
  - Cache Size: 15 entries
  - Hit Rate: 34%
  - Average Hit Time: 3ms

Memory Service:
  - Connection Status: ✅ Connected
  - Average Response: 89ms
  - Error Rate: 0%
```

### `health` - System Health Check

Perform comprehensive health check of all system components.

```bash
node memory-mode-controller.js health
```

**Output:**
```
🏥 Natural Memory Triggers Health Check

Core Components:
  ✅ TieredConversationMonitor loaded
  ✅ PerformanceManager initialized
  ✅ GitAnalyzer functional
  ✅ MCP Client connected

Configuration:
  ✅ config.json syntax valid
  ✅ naturalTriggers section present
  ✅ performance profiles configured
  ✅ memory service endpoint accessible

Dependencies:
  ✅ Node.js version compatible (v18.17.0)
  ✅ Required packages available
  ✅ File permissions correct

Memory Service Integration:
  ✅ Connection established
  ✅ Authentication valid
  ✅ API responses normal
  ⚠️ High response latency (245ms)

Git Integration:
  ✅ Repository detected
  ✅ Recent commits available
  ✅ Changelog found
  ❌ Branch name unavailable

Recommendations:
  - Consider optimizing memory service for faster responses
  - Check git configuration for branch detection
```

## Advanced Commands

### `config` - Configuration Management

View and modify configuration settings directly through CLI.

```bash
# View current configuration
node memory-mode-controller.js config show

# Get specific setting
node memory-mode-controller.js config get naturalTriggers.triggerThreshold

# Set specific setting
node memory-mode-controller.js config set naturalTriggers.cooldownPeriod 45000
```

### `cache` - Cache Management

Manage the semantic analysis cache.

```bash
# View cache statistics
node memory-mode-controller.js cache stats

# Clear cache
node memory-mode-controller.js cache clear

# Show cache contents (debug)
node memory-mode-controller.js cache show
```

**Cache Stats Output:**
```
💾 Semantic Cache Statistics

Size: 18/50 entries
Memory Usage: 2.4KB
Hit Rate: 34% (89/260 requests)
Average Hit Time: 2.8ms
Last Cleanup: 15 minutes ago

Most Accessed Patterns:
  1. "what did we decide" (12 hits)
  2. "how did we implement" (8 hits)
  3. "similar to what we" (6 hits)
```

### `export` - Export Configuration and Metrics

Export system configuration and performance data for backup or analysis.

```bash
# Export configuration
node memory-mode-controller.js export config > my-config-backup.json

# Export metrics
node memory-mode-controller.js export metrics > performance-report.json

# Export full system state
node memory-mode-controller.js export all > system-state.json
```

### `import` - Import Configuration

Import previously exported configuration.

```bash
node memory-mode-controller.js import config my-config-backup.json
```

## Scripting and Automation

### JSON Output Mode

Most commands support `--json` flag for machine-readable output:

```bash
# Get status in JSON format
node memory-mode-controller.js status --json

# Example output:
{
  "profile": "balanced",
  "enabled": true,
  "sensitivity": 0.6,
  "performance": {
    "avgLatency": 145,
    "degradationEvents": 2
  },
  "cache": {
    "size": 12,
    "hitRate": 0.34
  }
}
```

### Batch Operations

Run multiple commands in sequence:

```bash
# Setup for architecture work
node memory-mode-controller.js profile memory_aware
node memory-mode-controller.js sensitivity 0.4

# Daily development setup
node memory-mode-controller.js profile balanced
node memory-mode-controller.js sensitivity 0.6

# Quick coding setup
node memory-mode-controller.js profile speed_focused
node memory-mode-controller.js sensitivity 0.8
```

### Environment Variables

Control CLI behavior with environment variables:

```bash
# Enable debug output
export CLAUDE_HOOKS_DEBUG=true
node memory-mode-controller.js status

# Disable colored output
export NO_COLOR=1
node memory-mode-controller.js status

# Set alternative config path
export CLAUDE_HOOKS_CONFIG=/path/to/config.json
node memory-mode-controller.js status
```

## Error Handling and Debugging

### Common Error Messages

#### `Configuration Error: Cannot read config file`
**Cause**: Missing or corrupted configuration file
**Solution**:
```bash
# Check if config exists
ls ~/.claude/hooks/config.json

# Validate JSON syntax
cat ~/.claude/hooks/config.json | node -e "console.log(JSON.parse(require('fs').readFileSync(0, 'utf8')))"

# Reset to defaults if corrupted
node memory-mode-controller.js reset --force
```

#### `Memory Service Connection Failed`
**Cause**: MCP Memory Service not running or unreachable
**Solution**:
```bash
# Check memory service status
curl -k https://localhost:8443/api/health

# Start memory service
uv run memory server

# Check configuration
node memory-mode-controller.js config get memoryService.endpoint
```

#### `Permission Denied`
**Cause**: Incorrect file permissions
**Solution**:
```bash
# Fix permissions
chmod +x ~/.claude/hooks/memory-mode-controller.js
chmod 644 ~/.claude/hooks/config.json
```

### Debug Mode

Enable verbose debugging:

```bash
export CLAUDE_HOOKS_DEBUG=true
node memory-mode-controller.js status
```

**Debug Output Example:**
```
[DEBUG] Loading configuration from ~/.claude/hooks/config.json
[DEBUG] Configuration loaded successfully
[DEBUG] Initializing TieredConversationMonitor
[DEBUG] PerformanceManager initialized with profile: balanced
[DEBUG] GitAnalyzer detecting repository context
[DEBUG] MCP Client connecting to https://localhost:8443
[DEBUG] Status command executed successfully
```

## Integration Examples

### Shell Aliases

Add to your `.bashrc` or `.zshrc`:

```bash
# Quick aliases for common operations
alias nmt-status='node ~/.claude/hooks/memory-mode-controller.js status'
alias nmt-balanced='node ~/.claude/hooks/memory-mode-controller.js profile balanced'
alias nmt-speed='node ~/.claude/hooks/memory-mode-controller.js profile speed_focused'
alias nmt-memory='node ~/.claude/hooks/memory-mode-controller.js profile memory_aware'
alias nmt-metrics='node ~/.claude/hooks/memory-mode-controller.js metrics'
```

### VS Code Integration

Create VS Code tasks (`.vscode/tasks.json`):

```json
{
  "version": "2.0.0",
  "tasks": [
    {
      "label": "NMT: Check Status",
      "type": "shell",
      "command": "node ~/.claude/hooks/memory-mode-controller.js status",
      "group": "build",
      "presentation": {
        "echo": true,
        "reveal": "always",
        "focus": false,
        "panel": "shared"
      }
    },
    {
      "label": "NMT: Switch to Memory Aware",
      "type": "shell",
      "command": "node ~/.claude/hooks/memory-mode-controller.js profile memory_aware",
      "group": "build"
    }
  ]
}
```

### Automated Performance Monitoring

Monitor system performance with cron job:

```bash
# Add to crontab (crontab -e)
# Check metrics every hour and log to file
0 * * * * node ~/.claude/hooks/memory-mode-controller.js metrics --json >> ~/nmt-metrics.log 2>&1
```

---

The CLI controller provides complete control over Natural Memory Triggers v7.1.3, enabling real-time optimization of your intelligent memory awareness system! 🚀
```

--------------------------------------------------------------------------------
/src/mcp_memory_service/storage/base.py:
--------------------------------------------------------------------------------

```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
MCP Memory Service
Copyright (c) 2024 Heinrich Krupp
Licensed under the MIT License. See LICENSE file in the project root for full license text.
"""
import asyncio
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any, Tuple
from datetime import datetime, timezone, timedelta
from ..models.memory import Memory, MemoryQueryResult

class MemoryStorage(ABC):
    """Abstract base class for memory storage implementations."""

    @property
    @abstractmethod
    def max_content_length(self) -> Optional[int]:
        """
        Maximum content length supported by this storage backend.

        Returns:
            Maximum number of characters allowed in memory content, or None for unlimited.
            This limit is based on the underlying embedding model's token limits.
        """
        pass

    @property
    @abstractmethod
    def supports_chunking(self) -> bool:
        """
        Whether this backend supports automatic content chunking.

        Returns:
            True if the backend can store chunked memories with linking metadata.
        """
        pass

    @abstractmethod
    async def initialize(self) -> None:
        """Initialize the storage backend."""
        pass
    
    @abstractmethod
    async def store(self, memory: Memory) -> Tuple[bool, str]:
        """Store a memory. Returns (success, message)."""
        pass

    async def store_batch(self, memories: List[Memory]) -> List[Tuple[bool, str]]:
        """
        Store multiple memories in a single operation.

        Default implementation calls store() for each memory concurrently using asyncio.gather.
        Override this method in concrete storage backends to provide true batch operations
        for improved performance (e.g., single database transaction, bulk network request).

        Args:
            memories: List of Memory objects to store

        Returns:
            A list of (success, message) tuples, one for each memory in the batch.
        """
        if not memories:
            return []

        results = await asyncio.gather(
            *(self.store(memory) for memory in memories),
            return_exceptions=True
        )

        # Process results to handle potential exceptions from gather
        final_results = []
        for res in results:
            if isinstance(res, Exception):
                # If a store operation failed with an exception, record it as a failure
                final_results.append((False, f"Failed to store memory: {res}"))
            else:
                final_results.append(res)
        return final_results
    
    @abstractmethod
    async def retrieve(self, query: str, n_results: int = 5) -> List[MemoryQueryResult]:
        """Retrieve memories by semantic search."""
        pass
    
    @abstractmethod
    async def search_by_tag(self, tags: List[str], time_start: Optional[float] = None) -> List[Memory]:
        """Search memories by tags with optional time filtering.

        Args:
            tags: List of tags to search for
            time_start: Optional Unix timestamp (in seconds) to filter memories created after this time

        Returns:
            List of Memory objects matching the tag criteria and time filter
        """
        pass

    @abstractmethod
    async def search_by_tags(
        self,
        tags: List[str],
        operation: str = "AND",
        time_start: Optional[float] = None,
        time_end: Optional[float] = None
    ) -> List[Memory]:
        """Search memories by tags with AND/OR semantics and time range filtering.

        Args:
            tags: List of tag names to search for
            operation: "AND" (all tags must match) or "OR" (any tag matches)
            time_start: Optional Unix timestamp for inclusive range start
            time_end: Optional Unix timestamp for inclusive range end

        Returns:
            List of Memory objects matching the criteria
        """
        pass

    async def search_by_tag_chronological(self, tags: List[str], limit: int = None, offset: int = 0) -> List[Memory]:
        """
        Search memories by tags with chronological ordering (newest first).

        Args:
            tags: List of tags to search for
            limit: Maximum number of memories to return (None for all)
            offset: Number of memories to skip (for pagination)

        Returns:
            List of Memory objects ordered by created_at DESC
        """
        # Default implementation: use search_by_tag then sort
        memories = await self.search_by_tag(tags)
        memories.sort(key=lambda m: m.created_at or 0, reverse=True)

        # Apply pagination
        if offset > 0:
            memories = memories[offset:]
        if limit is not None:
            memories = memories[:limit]

        return memories
    
    @abstractmethod
    async def delete(self, content_hash: str) -> Tuple[bool, str]:
        """Delete a memory by its hash."""
        pass

    @abstractmethod
    async def get_by_hash(self, content_hash: str) -> Optional[Memory]:
        """
        Get a memory by its content hash using direct O(1) lookup.

        Args:
            content_hash: The content hash of the memory to retrieve

        Returns:
            Memory object if found, None otherwise
        """
        pass

    @abstractmethod
    async def delete_by_tag(self, tag: str) -> Tuple[int, str]:
        """Delete memories by tag. Returns (count_deleted, message)."""
        pass

    async def delete_by_tags(self, tags: List[str]) -> Tuple[int, str]:
        """
        Delete memories matching ANY of the given tags.

        Default implementation calls delete_by_tag for each tag sequentially.
        Override in concrete implementations for better performance (e.g., single query with OR).

        Args:
            tags: List of tags - memories matching ANY tag will be deleted

        Returns:
            Tuple of (total_count_deleted, message)
        """
        if not tags:
            return 0, "No tags provided"

        total_count = 0
        errors = []

        for tag in tags:
            try:
                count, message = await self.delete_by_tag(tag)
                total_count += count
                if "error" in message.lower() or "failed" in message.lower():
                    errors.append(f"{tag}: {message}")
            except Exception as e:
                errors.append(f"{tag}: {str(e)}")

        if errors:
            error_summary = "; ".join(errors[:3])  # Limit error details
            if len(errors) > 3:
                error_summary += f" (+{len(errors) - 3} more errors)"
            return total_count, f"Deleted {total_count} memories with partial failures: {error_summary}"

        return total_count, f"Deleted {total_count} memories across {len(tags)} tag(s)"

    @abstractmethod
    async def cleanup_duplicates(self) -> Tuple[int, str]:
        """Remove duplicate memories. Returns (count_removed, message)."""
        pass
    
    @abstractmethod
    async def update_memory_metadata(self, content_hash: str, updates: Dict[str, Any], preserve_timestamps: bool = True) -> Tuple[bool, str]:
        """
        Update memory metadata without recreating the entire memory entry.

        Args:
            content_hash: Hash of the memory to update
            updates: Dictionary of metadata fields to update
            preserve_timestamps: Whether to preserve original created_at timestamp

        Returns:
            Tuple of (success, message)

        Note:
            - Only metadata, tags, and memory_type can be updated
            - Content and content_hash cannot be modified
            - updated_at timestamp is always refreshed
            - created_at is preserved unless preserve_timestamps=False
        """
        pass

    async def update_memory(self, memory: Memory) -> bool:
        """
        Update an existing memory with new metadata, tags, and memory_type.

        Args:
            memory: Memory object with updated fields

        Returns:
            True if update was successful, False otherwise
        """
        updates = {
            'tags': memory.tags,
            'metadata': memory.metadata,
            'memory_type': memory.memory_type
        }
        success, _ = await self.update_memory_metadata(
            memory.content_hash,
            updates,
            preserve_timestamps=True
        )
        return success

    async def update_memories_batch(self, memories: List[Memory]) -> List[bool]:
        """
        Update multiple memories in a batch operation.

        Default implementation calls update_memory() for each memory concurrently using asyncio.gather.
        Override this method in concrete storage backends to provide true batch operations
        for improved performance (e.g., single database transaction with multiple UPDATEs).

        Args:
            memories: List of Memory objects with updated fields

        Returns:
            List of success booleans, one for each memory in the batch
        """
        if not memories:
            return []

        results = await asyncio.gather(
            *(self.update_memory(memory) for memory in memories),
            return_exceptions=True
        )

        # Process results to handle potential exceptions from gather
        final_results = []
        for res in results:
            if isinstance(res, Exception):
                final_results.append(False)
            else:
                final_results.append(res)
        return final_results
    
    async def get_stats(self) -> Dict[str, Any]:
        """Get storage statistics. Override for specific implementations."""
        return {
            "total_memories": 0,
            "storage_backend": self.__class__.__name__,
            "status": "operational"
        }
    
    async def get_all_tags(self) -> List[str]:
        """Get all unique tags in the storage. Override for specific implementations."""
        return []
    
    async def get_recent_memories(self, n: int = 10) -> List[Memory]:
        """Get n most recent memories. Override for specific implementations."""
        return []
    
    async def recall_memory(self, query: str, n_results: int = 5) -> List[Memory]:
        """Recall memories based on natural language time expression. Override for specific implementations."""
        # Default implementation just uses regular search
        results = await self.retrieve(query, n_results)
        return [r.memory for r in results]
    
    async def search(self, query: str, n_results: int = 5) -> List[MemoryQueryResult]:
        """Search memories. Default implementation uses retrieve."""
        return await self.retrieve(query, n_results)
    
    async def get_all_memories(self, limit: int = None, offset: int = 0, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> List[Memory]:
        """
        Get all memories in storage ordered by creation time (newest first).

        Args:
            limit: Maximum number of memories to return (None for all)
            offset: Number of memories to skip (for pagination)
            memory_type: Optional filter by memory type
            tags: Optional filter by tags (matches ANY of the provided tags)

        Returns:
            List of Memory objects ordered by created_at DESC, optionally filtered by type and tags
        """
        return []
    
    async def count_all_memories(self, memory_type: Optional[str] = None, tags: Optional[List[str]] = None) -> int:
        """
        Get total count of memories in storage.

        Args:
            memory_type: Optional filter by memory type
            tags: Optional filter by tags (memories matching ANY of the tags)

        Returns:
            Total number of memories, optionally filtered by type and/or tags
        """
        return 0

    async def count_memories_by_tag(self, tags: List[str]) -> int:
        """
        Count memories that match any of the given tags.

        Args:
            tags: List of tags to search for

        Returns:
            Number of memories matching any tag
        """
        # Default implementation: search then count
        memories = await self.search_by_tag(tags)
        return len(memories)

    async def get_memories_by_time_range(self, start_time: float, end_time: float) -> List[Memory]:
        """Get memories within a time range. Override for specific implementations."""
        return []
    
    async def get_memory_connections(self) -> Dict[str, int]:
        """Get memory connection statistics. Override for specific implementations."""
        return {}

    async def get_access_patterns(self) -> Dict[str, datetime]:
        """Get memory access pattern statistics. Override for specific implementations."""
        return {}

    async def get_memory_timestamps(self, days: Optional[int] = None) -> List[float]:
        """
        Get memory creation timestamps only, without loading full memory objects.

        This is an optimized method for analytics that only needs timestamps,
        avoiding the overhead of loading full memory content and embeddings.

        Args:
            days: Optional filter to only get memories from last N days

        Returns:
            List of Unix timestamps (float) in descending order (newest first)
        """
        # Default implementation falls back to get_recent_memories
        # Concrete backends should override with optimized SQL queries
        n = 5000 if days is None else days * 100  # Rough estimate
        memories = await self.get_recent_memories(n=n)
        timestamps = [m.created_at for m in memories if m.created_at]

        # Filter by days if specified
        if days is not None:
            cutoff = datetime.now(timezone.utc) - timedelta(days=days)
            cutoff_timestamp = cutoff.timestamp()
            timestamps = [ts for ts in timestamps if ts >= cutoff_timestamp]

        return sorted(timestamps, reverse=True)

```

--------------------------------------------------------------------------------
/docs/architecture.md:
--------------------------------------------------------------------------------

```markdown
# MCP Memory Service Architecture

## Overview

MCP Memory Service is a Model Context Protocol server that provides semantic memory and persistent storage capabilities for AI assistants. It enables long-term memory storage with semantic search, time-based recall, and tag-based organization across conversations.

## System Architecture

```mermaid
graph TB
    subgraph "Client Layer"
        CC[Claude Desktop]
        LMS[LM Studio]
        VSC[VS Code MCP]
        GEN[Generic MCP Client]
    end

    subgraph "Protocol Layer"
        MCP[MCP Server Protocol]
        HTTP[HTTP API Server]
        WEB[Web Dashboard]
    end

    subgraph "Core Services"
        SRV[Memory Service Core]
        AUTH[Authentication]
        CACHE[Model Cache]
        EMB[Embedding Service]
    end

    subgraph "Storage Abstraction"
        ABS[Storage Interface]
        HYBRID[Hybrid Backend ⭐]
        CLOUDFLARE[Cloudflare Backend]
        SQLITE[SQLite-vec Backend]
        REMOTE[HTTP Client Backend]
        CHROMA[ChromaDB ⚠️ DEPRECATED]
    end

    subgraph "Infrastructure"
        DB[(Vector Database)]
        FS[(File System)]
        MDNS[mDNS Discovery]
    end

    CC --> MCP
    LMS --> MCP
    VSC --> MCP
    GEN --> MCP
    
    MCP --> SRV
    HTTP --> SRV
    WEB --> HTTP
    
    SRV --> AUTH
    SRV --> CACHE
    SRV --> EMB
    SRV --> ABS
    
    ABS --> HYBRID
    ABS --> CLOUDFLARE
    ABS --> SQLITE
    ABS --> REMOTE
    ABS --> CHROMA

    HYBRID --> SQLITE
    HYBRID --> CLOUDFLARE
    CLOUDFLARE --> DB
    SQLITE --> DB
    REMOTE --> HTTP
    CHROMA --> DB
    
    DB --> FS
    SRV --> MDNS
```

## Core Components

### 1. Server Layer (`src/mcp_memory_service/server.py`)

The main server implementation that handles MCP protocol communication:

- **Protocol Handler**: Implements the MCP protocol specification
- **Request Router**: Routes incoming requests to appropriate handlers
- **Response Builder**: Constructs protocol-compliant responses
- **Client Detection**: Identifies and adapts to different MCP clients (Claude Desktop, LM Studio, etc.)
- **Logging System**: Client-aware logging with JSON compliance for Claude Desktop

Key responsibilities:
- Async request handling with proper error boundaries
- Global model and embedding cache management
- Lazy initialization of storage backends
- Tool registration and invocation

### 2. Storage Abstraction Layer (`src/mcp_memory_service/storage/`)

Abstract interface that allows multiple storage backend implementations:

#### Base Interface (`storage/base.py`)
```python
class MemoryStorage(ABC):
    async def initialize(self) -> None:
        """Initialize the storage backend."""
        pass

    async def store(self, memory: Memory) -> Tuple[bool, str]:
        """Store a memory object."""
        pass

    async def retrieve(self, query: str, n_results: int) -> List[MemoryQueryResult]:
        """Retrieve memories based on semantic similarity."""
        pass

    async def search_by_tag(self, tags: List[str]) -> List[Memory]:
        """Search memories by tags."""
        pass

    async def delete(self, content_hash: str) -> Tuple[bool, str]:
        """Delete a memory by content hash."""
        pass

    async def recall_memory(self, query: str, n_results: int) -> List[Memory]:
        """Recall memories using natural language time queries."""
        pass
```

#### Hybrid Backend (`storage/hybrid.py`) ⭐ **RECOMMENDED**
- **Production default** - Best performance with cloud synchronization
- **Primary storage**: SQLite-vec for ultra-fast local reads (~5ms)
- **Secondary storage**: Cloudflare for multi-device persistence and cloud backup
- **Background sync**: Zero user-facing latency with async operation queue
- **Graceful degradation**: Works offline, automatically syncs when cloud available
- **Capacity monitoring**: Tracks Cloudflare limits and provides warnings
- **Use cases**: Production deployments, multi-device users, cloud-backed local performance

#### Cloudflare Backend (`storage/cloudflare.py`)
- Cloud-native storage using Cloudflare D1 (SQL) + Vectorize (vectors)
- Global edge distribution for low-latency access worldwide
- Serverless architecture with no infrastructure management
- Automatic scaling and high availability
- **Limits**: 10GB D1 database, 5M vectors in Vectorize
- **Use cases**: Cloud-only deployments, serverless environments, no local storage

#### SQLite-vec Backend (`storage/sqlite_vec.py`)
- Lightweight, fast local storage (5ms read latency)
- Native SQLite with vec0 extension for vector similarity
- ONNX Runtime embeddings (no PyTorch dependency)
- Minimal memory footprint and dependencies
- **Use cases**: Development, single-device deployments, or as primary in Hybrid backend

#### HTTP Client Backend (`storage/http_client.py`)
- Remote storage via HTTP API for distributed architectures
- Enables client-server deployments with centralized memory
- Bearer token authentication with API key support
- Automatic retry logic with exponential backoff
- **Use cases**: Multi-client shared memory, remote MCP servers, load balancing

#### ChromaDB Backend (`storage/chroma.py`) ⚠️ **DEPRECATED**
- **Status**: Deprecated since v5.x, removal planned for v6.0.0
- **Migration path**: Switch to Hybrid backend for production
- Original vector database backend with sentence transformer embeddings
- Heavy dependencies (PyTorch, sentence-transformers, ~2GB download)
- Slower performance (15ms vs 5ms for SQLite-vec)
- Higher memory footprint and complexity
- **Why deprecated**: Hybrid backend provides better performance with cloud sync
- **Historical only**: Not recommended for new deployments

### 3. Models Layer (`src/mcp_memory_service/models/`)

Data structures and validation:

```python
@dataclass
class Memory:
    id: str
    content: str
    content_hash: str
    memory_type: str
    tags: List[str]
    metadata: MemoryMetadata
    created_at: datetime
    updated_at: datetime

@dataclass
class MemoryMetadata:
    source: Optional[str]
    client_id: Optional[str]
    session_id: Optional[str]
    parent_memory_id: Optional[str]
    child_memory_ids: List[str]
```

### 4. Web Interface (`src/mcp_memory_service/web/`)

Modern web dashboard for memory management:

- **Frontend**: Responsive React-based UI
- **API Routes**: RESTful endpoints for memory operations
- **WebSocket Support**: Real-time updates
- **Authentication**: API key-based authentication
- **Health Monitoring**: System status and metrics

### 5. Configuration Management (`src/mcp_memory_service/config.py`)

Environment-based configuration with sensible defaults:

- Storage backend selection
- Model selection and caching
- Platform-specific optimizations
- Hardware acceleration detection (CUDA, MPS, DirectML, ROCm)
- Network configuration (HTTP, HTTPS, mDNS)

## Key Design Patterns

### Async/Await Pattern
All I/O operations use Python's async/await for non-blocking execution:
```python
async def store_memory(self, content: str) -> Memory:
    embedding = await self._generate_embedding(content)
    memory = await self.storage.store(content, embedding)
    return memory
```

### Lazy Initialization
Resources are initialized only when first needed:
```python
async def _ensure_storage_initialized(self):
    if self.storage is None:
        self.storage = await create_storage_backend()
    return self.storage
```

### Global Caching Strategy
Model and embedding caches are shared globally to reduce memory usage:
```python
_MODEL_CACHE = {}
_EMBEDDING_CACHE = LRUCache(maxsize=1000)
```

### Platform Detection and Optimization
Automatic detection and optimization for different platforms:
- **macOS**: MPS acceleration for Apple Silicon
- **Windows**: CUDA or DirectML
- **Linux**: CUDA, ROCm, or CPU
- **Fallback**: ONNX Runtime for compatibility

## MCP Protocol Operations

### Core Memory Operations

| Operation | Description | Parameters |
|-----------|-------------|------------|
| `store_memory` | Store new memory with tags | content, tags, metadata |
| `retrieve_memory` | Semantic search | query, n_results |
| `recall_memory` | Time-based retrieval | time_expression, n_results |
| `search_by_tag` | Tag-based search | tags[] |
| `delete_memory` | Delete by hash | content_hash |
| `delete_by_tags` | Bulk deletion | tags[] |

### Utility Operations

| Operation | Description | Parameters |
|-----------|-------------|------------|
| `check_database_health` | Health status | - |
| `optimize_db` | Database optimization | - |
| `export_memories` | Export to JSON | output_path |
| `import_memories` | Import from JSON | input_path |
| `get_memory_stats` | Usage statistics | - |

### Debug Operations

| Operation | Description | Parameters |
|-----------|-------------|------------|
| `debug_retrieve` | Detailed similarity scores | query, n_results |
| `exact_match_retrieve` | Exact content matching | query |

## Data Flow

### Memory Storage Flow
```
1. Client sends store_memory request
2. Server validates and enriches metadata
3. Content is hashed for deduplication
4. Text is embedded using sentence transformers
5. Memory is stored in vector database
6. Confirmation returned to client
```

### Memory Retrieval Flow
```
1. Client sends retrieve_memory request
2. Query is embedded to vector representation
3. Vector similarity search performed
4. Results ranked by similarity score
5. Metadata enriched results returned
```

### Time-Based Recall Flow
```
1. Client sends recall_memory with time expression
2. Time parser extracts temporal boundaries
3. Semantic query combined with time filter
4. Filtered results returned chronologically
```

## Performance Optimizations

### Model Caching
- Sentence transformer models cached globally
- Single model instance shared across requests
- Lazy loading on first use

### Embedding Cache
- LRU cache for frequently used embeddings
- Configurable cache size
- Cache hit tracking for optimization

### Query Optimization
- Batch processing for multiple operations
- Connection pooling for database access
- Async I/O for non-blocking operations

### Platform-Specific Optimizations
- Hardware acceleration auto-detection
- Optimized tensor operations per platform
- Fallback strategies for compatibility

## Security Considerations

### Authentication
- API key-based authentication for HTTP endpoints
- Bearer token support
- Per-client authentication in multi-client mode

### Data Privacy
- Content hashing for deduplication
- Optional encryption at rest
- Client isolation in shared deployments

### Network Security
- HTTPS support with SSL/TLS
- CORS configuration for web access
- Rate limiting for API endpoints

## Deployment Architectures

### Production (Hybrid Backend) ⭐ **RECOMMENDED**
- **Local performance**: SQLite-vec for 5ms read latency
- **Cloud persistence**: Cloudflare for multi-device sync and backup
- **Background sync**: Zero user-facing latency, async operation queue
- **Offline capability**: Full functionality without internet, syncs when available
- **Multi-device**: Access same memories across desktop, laptop, mobile
- **Use cases**: Individual users, teams with personal instances, production deployments
- **Setup**: `install.py --storage-backend hybrid` or set `MCP_MEMORY_STORAGE_BACKEND=hybrid`

### Cloud-Only (Cloudflare Backend)
- **Serverless deployment**: No local storage, pure cloud architecture
- **Global edge**: Cloudflare's worldwide network for low latency
- **Automatic scaling**: Handles traffic spikes without configuration
- **Use cases**: Serverless environments, ephemeral containers, CI/CD systems
- **Limits**: 10GB D1 database, 5M vectors in Vectorize
- **Setup**: `install.py --storage-backend cloudflare` or set `MCP_MEMORY_STORAGE_BACKEND=cloudflare`

### Development (SQLite-vec Backend)
- **Lightweight**: Minimal dependencies, fast startup
- **Local-only**: No cloud connectivity required
- **Fast iteration**: 5ms read latency, no sync overhead
- **Use cases**: Development, testing, single-device prototypes
- **Setup**: `install.py --storage-backend sqlite_vec` or set `MCP_MEMORY_STORAGE_BACKEND=sqlite_vec`

### Multi-Client Shared (HTTP Server)
- **Centralized HTTP server** with shared memory pool
- **Multiple clients** connect via API (Claude Desktop, VS Code, custom apps)
- **Authentication**: API key-based access control
- **Use cases**: Team collaboration, shared organizational memory
- **Setup**: Enable HTTP server with `MCP_HTTP_ENABLED=true`, clients use HTTP Client backend

### Legacy (ChromaDB Backend) ⚠️ **NOT RECOMMENDED**
- **Deprecated**: Removal planned for v6.0.0
- **Migration required**: Switch to Hybrid backend
- Heavy dependencies, slower performance (15ms vs 5ms)
- Only for existing deployments with migration path to Hybrid

## Extension Points

### Custom Storage Backends
Implement the `MemoryStorage` abstract base class:
```python
class CustomStorage(MemoryStorage):
    async def store(self, memory: Memory) -> Tuple[bool, str]:
        # Custom implementation
```

### Custom Embedding Models
Replace the default sentence transformer:
```python
EMBEDDING_MODEL = "your-model/name"
```

### Protocol Extensions
Add new operations via tool registration:
```python
types.Tool(
    name="custom_operation",
    description="Custom memory operation",
    inputSchema={
        "type": "object",
        "properties": {
            "param1": {
                "type": "string",
                "description": "First parameter"
            },
            "param2": {
                "type": "integer",
                "description": "Second parameter",
                "default": 0
            }
        },
        "required": ["param1"],
        "additionalProperties": false
    }
)
```

## Future Enhancements

### Planned Features (See Issue #91)
- **WFGY Semantic Firewall** - Enhanced memory reliability with 16 failure mode detection/recovery
- **Ontology Foundation Layer** (Phase 0) - Controlled vocabulary, taxonomy, knowledge graph
- Automatic memory consolidation
- Semantic clustering
- Memory importance scoring
- Cross-conversation threading

### Under Consideration
- **Agentic RAG** for intelligent retrieval (see Discussion #86)
- **Graph-based memory relationships** (ontology pipeline integration)
- Memory compression strategies
- Federated learning from memories
- Real-time collaboration features
- Advanced visualization tools

## References

- [MCP Protocol Specification](https://modelcontextprotocol.io/docs)
- [ChromaDB Documentation](https://docs.trychroma.com/)
- [SQLite Vec Extension](https://github.com/asg017/sqlite-vec)
- [Sentence Transformers](https://www.sbert.net/)
```

--------------------------------------------------------------------------------
/scripts/maintenance/find_duplicates.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Find and remove duplicate memories from the database.
Duplicates can occur when:
1. Same content was ingested multiple times
2. Re-ingestion after encoding fixes created duplicates
3. Manual storage of similar content
"""

import sqlite3
import json
import sys
import hashlib
import urllib.request
import urllib.parse
import ssl
from pathlib import Path
from collections import defaultdict
from datetime import datetime

def load_config():
    """Load configuration from Claude hooks config file."""
    config_path = Path.home() / '.claude' / 'hooks' / 'config.json'
    if config_path.exists():
        with open(config_path) as f:
            return json.load(f)
    return None

def get_memories_from_api(endpoint, api_key):
    """Retrieve all memories from the API endpoint using pagination."""
    try:
        # Create SSL context that allows self-signed certificates
        ssl_context = ssl.create_default_context()
        ssl_context.check_hostname = False
        ssl_context.verify_mode = ssl.CERT_NONE
        
        all_memories = []
        page = 1
        page_size = 100  # Use reasonable page size
        
        while True:
            # Create request for current page
            url = f"{endpoint}/api/memories?page={page}&page_size={page_size}"
            req = urllib.request.Request(url)
            req.add_header('Authorization', f'Bearer {api_key}')
            
            # Make request
            with urllib.request.urlopen(req, context=ssl_context, timeout=30) as response:
                if response.status != 200:
                    print(f"❌ API request failed: {response.status}")
                    return []
                
                data = response.read().decode('utf-8')
                api_response = json.loads(data)
            
            # Extract memories from this page
            page_memories = api_response.get('memories', [])
            total = api_response.get('total', 0)
            has_more = api_response.get('has_more', False)
            
            all_memories.extend(page_memories)
            print(f"Retrieved page {page}: {len(page_memories)} memories (total so far: {len(all_memories)}/{total})")
            
            if not has_more:
                break
                
            page += 1
        
        print(f"✅ Retrieved all {len(all_memories)} memories from API")
        
        # Convert API format to internal format
        converted_memories = []
        for mem in all_memories:
            converted_memories.append((
                mem.get('content_hash', ''),
                mem.get('content', ''),
                json.dumps(mem.get('tags', [])),
                mem.get('created_at', ''),
                json.dumps(mem.get('metadata', {}))
            ))
        
        return converted_memories
        
    except Exception as e:
        print(f"❌ Error retrieving memories from API: {e}")
        return []

def content_similarity_hash(content):
    """Create a hash for content similarity detection."""
    # Normalize content for comparison
    normalized = content.strip().lower()
    # Remove extra whitespace
    normalized = ' '.join(normalized.split())
    return hashlib.sha256(normalized.encode('utf-8')).hexdigest()[:16]

def find_duplicates(memories_source, similarity_threshold=0.95):
    """
    Find duplicate memories from either database or API.
    
    Args:
        memories_source: Either a database path (str) or list of memories from API
        similarity_threshold: Threshold for considering memories duplicates (0.0-1.0)
    
    Returns:
        Dict of duplicate groups
    """
    if isinstance(memories_source, str):
        # Database path provided
        conn = sqlite3.connect(memories_source)
        cursor = conn.cursor()
        
        print("Scanning for duplicate memories...")
        
        # Get all memories
        cursor.execute("""
            SELECT content_hash, content, tags, created_at, metadata
            FROM memories 
            ORDER BY created_at DESC
        """)
        
        all_memories = cursor.fetchall()
        conn.close()
    else:
        # API memories provided
        print("Analyzing memories from API...")
        all_memories = memories_source
    
    print(f"Found {len(all_memories)} total memories")
    
    # Group by content similarity
    content_groups = defaultdict(list)
    exact_content_groups = defaultdict(list)
    
    for memory in all_memories:
        content_hash, content, tags_json, created_at, metadata_json = memory
        
        # Parse tags and metadata
        try:
            tags = json.loads(tags_json) if tags_json else []
        except:
            tags = []
            
        try:
            metadata = json.loads(metadata_json) if metadata_json else {}
        except:
            metadata = {}
        
        # Exact content match
        exact_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()
        exact_content_groups[exact_hash].append({
            'hash': content_hash,
            'content': content,
            'tags': tags,
            'created_at': created_at,
            'metadata': metadata,
            'content_length': len(content)
        })
        
        # Similar content match (normalized)
        similarity_hash = content_similarity_hash(content)
        content_groups[similarity_hash].append({
            'hash': content_hash,
            'content': content,
            'tags': tags,
            'created_at': created_at,
            'metadata': metadata,
            'content_length': len(content)
        })
    
    # Find actual duplicates (groups with > 1 memory)
    exact_duplicates = {k: v for k, v in exact_content_groups.items() if len(v) > 1}
    similar_duplicates = {k: v for k, v in content_groups.items() if len(v) > 1}
    
    return {
        'exact': exact_duplicates,
        'similar': similar_duplicates,
        'total_memories': len(all_memories)
    }

def analyze_duplicate_group(group):
    """Analyze a group of duplicate memories to determine which to keep."""
    if len(group) <= 1:
        return None
        
    # Sort by creation date (newest first)
    sorted_group = sorted(group, key=lambda x: x['created_at'], reverse=True)
    
    analysis = {
        'group_size': len(group),
        'recommended_keep': None,
        'recommended_delete': [],
        'reasons': []
    }
    
    # Prefer memories with utf8-fixed tag (these are the corrected versions)
    utf8_fixed = [m for m in sorted_group if 'utf8-fixed' in m['tags']]
    if utf8_fixed:
        analysis['recommended_keep'] = utf8_fixed[0]
        analysis['recommended_delete'] = [m for m in sorted_group if m != utf8_fixed[0]]
        analysis['reasons'].append('Keeping UTF8-fixed version')
        return analysis
    
    # Prefer newer memories
    analysis['recommended_keep'] = sorted_group[0]  # Newest
    analysis['recommended_delete'] = sorted_group[1:]  # Older ones
    analysis['reasons'].append('Keeping newest version')
    
    # Check for different tag sets
    keep_tags = set(analysis['recommended_keep']['tags'])
    for delete_mem in analysis['recommended_delete']:
        delete_tags = set(delete_mem['tags'])
        if delete_tags != keep_tags:
            analysis['reasons'].append(f'Tag differences: {delete_tags - keep_tags}')
    
    return analysis

def remove_duplicates(db_path, duplicate_groups, dry_run=True):
    """
    Remove duplicate memories from the database.
    
    Args:
        db_path: Path to the SQLite database
        duplicate_groups: Dict of duplicate groups from find_duplicates()
        dry_run: If True, only show what would be deleted
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    total_to_delete = 0
    deletion_plan = []
    
    print(f"\n{'DRY RUN - ' if dry_run else ''}Analyzing duplicate groups...")
    
    # Process exact duplicates first
    print(f"\n=== EXACT DUPLICATES ===")
    for content_hash, group in duplicate_groups['exact'].items():
        analysis = analyze_duplicate_group(group)
        if analysis:
            total_to_delete += len(analysis['recommended_delete'])
            deletion_plan.extend(analysis['recommended_delete'])
            
            print(f"\nDuplicate group: {len(group)} memories")
            print(f"  Keep: {analysis['recommended_keep']['hash'][:20]}... ({analysis['recommended_keep']['created_at']})")
            print(f"  Tags: {', '.join(analysis['recommended_keep']['tags'][:3])}")
            print(f"  Delete: {len(analysis['recommended_delete'])} older versions")
            for reason in analysis['reasons']:
                print(f"  Reason: {reason}")
    
    # Process similar duplicates (but not exact)
    print(f"\n=== SIMILAR DUPLICATES ===")
    processed_exact_hashes = set()
    for group in duplicate_groups['exact'].values():
        for mem in group:
            processed_exact_hashes.add(mem['hash'])
    
    for similarity_hash, group in duplicate_groups['similar'].items():
        # Skip if these are exact duplicates we already processed
        group_hashes = {mem['hash'] for mem in group}
        if group_hashes.issubset(processed_exact_hashes):
            continue
            
        analysis = analyze_duplicate_group(group)
        if analysis:
            print(f"\nSimilar group: {len(group)} memories")
            print(f"  Keep: {analysis['recommended_keep']['hash'][:20]}... ({analysis['recommended_keep']['created_at']})")
            print(f"  Content preview: {analysis['recommended_keep']['content'][:100]}...")
            print(f"  Would delete: {len(analysis['recommended_delete'])} similar versions")
            # Don't auto-delete similar (only exact) in this version
    
    print(f"\n{'DRY RUN SUMMARY' if dry_run else 'DELETION SUMMARY'}:")
    print(f"Total exact duplicates to delete: {total_to_delete}")
    print(f"Current total memories: {duplicate_groups['total_memories']}")
    print(f"After cleanup: {duplicate_groups['total_memories'] - total_to_delete}")
    
    if not dry_run and total_to_delete > 0:
        print(f"\n{'='*50}")
        print("DELETING DUPLICATE MEMORIES...")
        
        deleted_count = 0
        for mem_to_delete in deletion_plan:
            try:
                # Delete from memories table
                cursor.execute("DELETE FROM memories WHERE content_hash = ?", (mem_to_delete['hash'],))
                
                # Also try to delete from embeddings if it exists
                try:
                    cursor.execute("DELETE FROM memory_embeddings WHERE rowid = ?", (mem_to_delete['hash'],))
                except:
                    pass  # Embeddings table might use different structure
                    
                deleted_count += 1
                if deleted_count % 10 == 0:
                    print(f"  Deleted {deleted_count}/{total_to_delete}...")
                    
            except Exception as e:
                print(f"  Error deleting {mem_to_delete['hash'][:20]}: {e}")
        
        conn.commit()
        print(f"✅ Successfully deleted {deleted_count} duplicate memories")
        
        # Verify final count
        cursor.execute("SELECT COUNT(*) FROM memories")
        final_count = cursor.fetchone()[0]
        print(f"📊 Final memory count: {final_count}")
    
    elif dry_run and total_to_delete > 0:
        print(f"\nTo actually delete these {total_to_delete} duplicates, run with --execute flag")
    
    conn.close()
    return total_to_delete

def main():
    """Main entry point."""
    import argparse
    
    parser = argparse.ArgumentParser(description='Find and remove duplicate memories')
    parser.add_argument('--db-path', type=str,
                        help='Path to SQLite database (if not using API)')
    parser.add_argument('--use-api', action='store_true',
                        help='Use API endpoint from config instead of database')
    parser.add_argument('--execute', action='store_true',
                        help='Actually delete the duplicates (default is dry run)')
    parser.add_argument('--similarity-threshold', type=float, default=0.95,
                        help='Similarity threshold for duplicate detection (0.0-1.0)')
    
    args = parser.parse_args()
    
    # Try to load config first
    config = load_config()
    
    if args.use_api or (not args.db_path and config):
        if not config:
            print("❌ No configuration found. Use --db-path for local database or ensure config exists.")
            sys.exit(1)
        
        endpoint = config.get('memoryService', {}).get('endpoint')
        api_key = config.get('memoryService', {}).get('apiKey')
        
        if not endpoint or not api_key:
            print("❌ API endpoint or key not found in configuration")
            sys.exit(1)
        
        print(f"🌐 Using API endpoint: {endpoint}")
        
        # Get memories from API
        memories = get_memories_from_api(endpoint, api_key)
        if not memories:
            print("❌ Failed to retrieve memories from API")
            sys.exit(1)
        
        # Find duplicates
        duplicates = find_duplicates(memories, args.similarity_threshold)
        
        if not duplicates['exact'] and not duplicates['similar']:
            print("✅ No duplicates found!")
            return
        
        print(f"\nFound:")
        print(f"  - {len(duplicates['exact'])} exact duplicate groups")
        print(f"  - {len(duplicates['similar'])} similar content groups")
        
        if args.execute:
            print("⚠️  API-based deletion not yet implemented. Use database path for deletion.")
        else:
            # Show analysis only
            remove_duplicates(None, duplicates, dry_run=True)
            
    else:
        # Use database path
        db_path = args.db_path or '/home/hkr/.local/share/mcp-memory/sqlite_vec.db'
        
        if not Path(db_path).exists():
            print(f"❌ Database not found: {db_path}")
            print("💡 Try --use-api to use the API endpoint from config instead")
            sys.exit(1)
        
        # Find duplicates
        duplicates = find_duplicates(db_path, args.similarity_threshold)
        
        if not duplicates['exact'] and not duplicates['similar']:
            print("✅ No duplicates found!")
            return
        
        print(f"\nFound:")
        print(f"  - {len(duplicates['exact'])} exact duplicate groups")
        print(f"  - {len(duplicates['similar'])} similar content groups")
        
        # Remove duplicates
        remove_duplicates(db_path, duplicates, dry_run=not args.execute)

if __name__ == "__main__":
    main()
```

--------------------------------------------------------------------------------
/src/mcp_memory_service/utils/system_detection.py:
--------------------------------------------------------------------------------

```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
System detection utilities for hardware compatibility.
Provides functions to detect hardware architecture, available accelerators,
and determine optimal configurations for different environments.
"""
import os
import sys
import platform
import logging
import subprocess
from typing import Dict, Any, Tuple, Optional, List
import json

logger = logging.getLogger(__name__)

# Hardware acceleration types
class AcceleratorType:
    NONE = "none"
    CUDA = "cuda"
    MPS = "mps"  # Apple Metal Performance Shaders
    CPU = "cpu"
    DIRECTML = "directml"  # DirectML for Windows
    ROCm = "rocm"  # AMD ROCm

class Architecture:
    X86_64 = "x86_64"
    ARM64 = "arm64"
    UNKNOWN = "unknown"

class SystemInfo:
    """Class to store and provide system information."""
    
    def __init__(self):
        self.os_name = platform.system().lower()
        self.os_version = platform.version()
        self.architecture = self._detect_architecture()
        self.python_version = platform.python_version()
        self.cpu_count = os.cpu_count() or 1
        self.memory_gb = self._get_system_memory()
        self.accelerator = self._detect_accelerator()
        self.is_rosetta = self._detect_rosetta()
        self.is_virtual_env = sys.prefix != sys.base_prefix
        
    def _detect_architecture(self) -> str:
        """Detect the system architecture."""
        arch = platform.machine().lower()
        
        if arch in ("x86_64", "amd64", "x64"):
            return Architecture.X86_64
        elif arch in ("arm64", "aarch64"):
            return Architecture.ARM64
        else:
            return Architecture.UNKNOWN
            
    def _get_system_memory(self) -> float:
        """Get the total system memory in GB."""
        try:
            if self.os_name == "linux":
                with open('/proc/meminfo', 'r') as f:
                    for line in f:
                        if line.startswith('MemTotal:'):
                            # Extract the memory value (in kB)
                            memory_kb = int(line.split()[1])
                            return round(memory_kb / (1024 * 1024), 2)  # Convert to GB
                            
            elif self.os_name == "darwin":  # macOS
                output = subprocess.check_output(['sysctl', '-n', 'hw.memsize']).decode('utf-8').strip()
                memory_bytes = int(output)
                return round(memory_bytes / (1024**3), 2)  # Convert to GB
                
            elif self.os_name == "windows":
                import ctypes
                kernel32 = ctypes.windll.kernel32
                c_ulonglong = ctypes.c_ulonglong
                class MEMORYSTATUSEX(ctypes.Structure):
                    _fields_ = [
                        ('dwLength', ctypes.c_ulong),
                        ('dwMemoryLoad', ctypes.c_ulong),
                        ('ullTotalPhys', c_ulonglong),
                        ('ullAvailPhys', c_ulonglong),
                        ('ullTotalPageFile', c_ulonglong),
                        ('ullAvailPageFile', c_ulonglong),
                        ('ullTotalVirtual', c_ulonglong),
                        ('ullAvailVirtual', c_ulonglong),
                        ('ullAvailExtendedVirtual', c_ulonglong),
                    ]
                    
                memoryStatus = MEMORYSTATUSEX()
                memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUSEX)
                kernel32.GlobalMemoryStatusEx(ctypes.byref(memoryStatus))
                return round(memoryStatus.ullTotalPhys / (1024**3), 2)  # Convert to GB
                
        except Exception as e:
            logger.warning(f"Failed to get system memory: {e}")
            
        # Default fallback
        return 4.0  # Assume 4GB as a conservative default
        
    def _detect_accelerator(self) -> str:
        """Detect available hardware acceleration."""
        # Try to detect CUDA
        if self._check_cuda_available():
            return AcceleratorType.CUDA
            
        # Check for Apple MPS (Metal Performance Shaders)
        if self.os_name == "darwin" and self.architecture == Architecture.ARM64:
            if self._check_mps_available():
                return AcceleratorType.MPS
                
        # Check for ROCm on Linux
        if self.os_name == "linux" and self._check_rocm_available():
            return AcceleratorType.ROCm
            
        # Check for DirectML on Windows
        if self.os_name == "windows" and self._check_directml_available():
            return AcceleratorType.DIRECTML
            
        # Default to CPU
        return AcceleratorType.CPU
        
    def _check_cuda_available(self) -> bool:
        """Check if CUDA is available."""
        try:
            # Try to import torch and check for CUDA
            import torch
            # Check if torch is properly installed with CUDA support
            if hasattr(torch, 'cuda'):
                return torch.cuda.is_available()
            else:
                logger.warning("PyTorch installed but appears broken (no cuda attribute)")
                return False
        except (ImportError, AttributeError) as e:
            logger.debug(f"CUDA check failed: {e}")
            # If torch is not installed or broken, try to check for CUDA using environment
            return 'CUDA_HOME' in os.environ or 'CUDA_PATH' in os.environ
            
    def _check_mps_available(self) -> bool:
        """Check if Apple MPS is available."""
        try:
            import torch
            if hasattr(torch, 'backends') and hasattr(torch.backends, 'mps'):
                return torch.backends.mps.is_available()
            else:
                logger.warning("PyTorch installed but appears broken (no backends attribute)")
                return False
        except (ImportError, AttributeError) as e:
            logger.debug(f"MPS check failed: {e}")
            # Check for Metal support using system profiler
            try:
                output = subprocess.check_output(
                    ['system_profiler', 'SPDisplaysDataType'], 
                    stderr=subprocess.DEVNULL
                ).decode('utf-8')
                return 'Metal' in output
            except (subprocess.SubprocessError, FileNotFoundError):
                return False
                
    def _check_rocm_available(self) -> bool:
        """Check if AMD ROCm is available."""
        try:
            # Check for ROCm environment
            if 'ROCM_HOME' in os.environ or 'ROCM_PATH' in os.environ:
                return True
                
            # Check if ROCm libraries are installed
            try:
                output = subprocess.check_output(
                    ['rocminfo'], 
                    stderr=subprocess.DEVNULL
                ).decode('utf-8')
                return 'GPU Agent' in output
            except (subprocess.SubprocessError, FileNotFoundError):
                return False
                
        except Exception:
            return False
            
    def _check_directml_available(self) -> bool:
        """Check if DirectML is available on Windows."""
        try:
            # Check if DirectML package is installed
            import pkg_resources
            pkg_resources.get_distribution('torch-directml')
            return True
        except ImportError:
            # pkg_resources not available
            return False
        except Exception:
            # Any other error (including DistributionNotFound)
            return False
            
    def _detect_rosetta(self) -> bool:
        """Detect if running under Rosetta 2 on Apple Silicon."""
        if self.os_name != "darwin" or self.architecture != Architecture.ARM64:
            return False
            
        try:
            # Check for Rosetta by examining the process
            output = subprocess.check_output(
                ['sysctl', '-n', 'sysctl.proc_translated'], 
                stderr=subprocess.DEVNULL
            ).decode('utf-8').strip()
            return output == '1'
        except (subprocess.SubprocessError, FileNotFoundError):
            return False
            
    def get_optimal_batch_size(self) -> int:
        """Determine optimal batch size based on hardware."""
        # Start with a base batch size
        if self.accelerator == AcceleratorType.CUDA:
            # Scale based on available GPU memory (rough estimate)
            try:
                import torch
                gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3)  # GB
                if gpu_memory > 10:
                    return 32
                elif gpu_memory > 6:
                    return 16
                else:
                    return 8
            except:
                return 8  # Default for CUDA
        elif self.accelerator == AcceleratorType.MPS:
            return 8  # Conservative for Apple Silicon
        elif self.memory_gb > 16:
            return 8  # Larger batch for systems with more RAM
        elif self.memory_gb > 8:
            return 4
        else:
            return 2  # Conservative for low-memory systems
            
    def get_optimal_model(self) -> str:
        """Determine the optimal embedding model based on hardware capabilities."""
        # Default model
        default_model = 'all-MiniLM-L6-v2'
        
        # For very constrained environments, use an even smaller model
        if self.memory_gb < 4:
            return 'paraphrase-MiniLM-L3-v2'
            
        # For high-performance environments, consider a larger model
        if (self.accelerator in [AcceleratorType.CUDA, AcceleratorType.MPS] and 
                self.memory_gb > 8):
            return 'all-mpnet-base-v2'  # Better quality but more resource intensive
            
        return default_model
        
    def get_optimal_thread_count(self) -> int:
        """Determine optimal thread count for parallel operations."""
        # Use 75% of available cores, but at least 1
        return max(1, int(self.cpu_count * 0.75))
        
    def to_dict(self) -> Dict[str, Any]:
        """Convert system info to dictionary."""
        return {
            "os": self.os_name,
            "os_version": self.os_version,
            "architecture": self.architecture,
            "python_version": self.python_version,
            "cpu_count": self.cpu_count,
            "memory_gb": self.memory_gb,
            "accelerator": self.accelerator,
            "is_rosetta": self.is_rosetta,
            "is_virtual_env": self.is_virtual_env,
            "optimal_batch_size": self.get_optimal_batch_size(),
            "optimal_model": self.get_optimal_model(),
            "optimal_thread_count": self.get_optimal_thread_count()
        }
        
    def __str__(self) -> str:
        """String representation of system info."""
        return json.dumps(self.to_dict(), indent=2)


def get_system_info() -> SystemInfo:
    """Get system information singleton."""
    if not hasattr(get_system_info, 'instance'):
        get_system_info.instance = SystemInfo()
    return get_system_info.instance


def get_torch_device() -> str:
    """Get the optimal PyTorch device based on system capabilities."""
    system_info = get_system_info()
    
    try:
        import torch
        
        if system_info.accelerator == AcceleratorType.CUDA and torch.cuda.is_available():
            return "cuda"
        elif (system_info.accelerator == AcceleratorType.MPS and 
              hasattr(torch.backends, 'mps') and 
              torch.backends.mps.is_available()):
            return "mps"
        else:
            return "cpu"
    except ImportError:
        return "cpu"


def get_optimal_embedding_settings() -> Dict[str, Any]:
    """Get optimal settings for embedding operations."""
    system_info = get_system_info()
    
    return {
        "model_name": system_info.get_optimal_model(),
        "batch_size": system_info.get_optimal_batch_size(),
        "device": get_torch_device(),
        "threads": system_info.get_optimal_thread_count()
    }


def print_system_diagnostics(client_type: str = 'lm_studio'):
    """Print detailed system diagnostics for troubleshooting, conditionally based on client."""
    # Only print for LM Studio to avoid JSON parsing errors in Claude Desktop
    if client_type != 'lm_studio':
        return
        
    system_info = get_system_info()
    
    print("\n=== System Diagnostics ===")
    print(f"OS: {system_info.os_name} {system_info.os_version}")
    print(f"Architecture: {system_info.architecture}")
    print(f"Python: {system_info.python_version}")
    print(f"CPU Cores: {system_info.cpu_count}")
    print(f"Memory: {system_info.memory_gb:.2f} GB")
    print(f"Accelerator: {system_info.accelerator}")
    
    if system_info.is_rosetta:
        print("⚠️ Running under Rosetta 2 translation")
        
    print("\n=== Optimal Settings ===")
    print(f"Embedding Model: {system_info.get_optimal_model()}")
    print(f"Batch Size: {system_info.get_optimal_batch_size()}")
    print(f"Thread Count: {system_info.get_optimal_thread_count()}")
    print(f"PyTorch Device: {get_torch_device()}")
    
    # Additional PyTorch diagnostics if available
    try:
        import torch
        print("\n=== PyTorch Diagnostics ===")
        print(f"PyTorch Version: {torch.__version__}")
        print(f"CUDA Available: {torch.cuda.is_available()}")
        
        if torch.cuda.is_available():
            print(f"CUDA Version: {torch.version.cuda}")
            print(f"GPU Device: {torch.cuda.get_device_name(0)}")
            print(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / (1024**3):.2f} GB")
            
        if hasattr(torch.backends, 'mps'):
            print(f"MPS Available: {torch.backends.mps.is_available()}")
            
    except ImportError:
        print("\nPyTorch not installed, skipping PyTorch diagnostics")
        
    print("\n=== Environment Variables ===")
    for var in ['CUDA_HOME', 'CUDA_PATH', 'ROCM_HOME', 'PYTORCH_ENABLE_MPS_FALLBACK']:
        if var in os.environ:
            print(f"{var}: {os.environ[var]}")
```

--------------------------------------------------------------------------------
/src/mcp_memory_service/web/oauth/middleware.py:
--------------------------------------------------------------------------------

```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
OAuth 2.1 authentication middleware for MCP Memory Service.

Provides Bearer token validation with fallback to API key authentication.
"""

import logging
from typing import Optional, Dict, Any
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt, ExpiredSignatureError
from jose.jwt import JWTClaimsError

from ...config import (
    OAUTH_ISSUER,
    API_KEY,
    ALLOW_ANONYMOUS_ACCESS,
    OAUTH_ENABLED,
    get_jwt_algorithm,
    get_jwt_verification_key
)
from .storage import oauth_storage

logger = logging.getLogger(__name__)

# Optional Bearer token security scheme
bearer_scheme = HTTPBearer(auto_error=False)


class AuthenticationResult:
    """Result of authentication attempt."""

    def __init__(
        self,
        authenticated: bool,
        client_id: Optional[str] = None,
        scope: Optional[str] = None,
        auth_method: Optional[str] = None,
        error: Optional[str] = None
    ):
        self.authenticated = authenticated
        self.client_id = client_id
        self.scope = scope
        self.auth_method = auth_method  # "oauth", "api_key", or "none"
        self.error = error

    def has_scope(self, required_scope: str) -> bool:
        """Check if the authenticated user has the required scope."""
        if not self.authenticated or not self.scope:
            return False

        # Split scopes and check if required scope is present
        scopes = self.scope.split()
        return required_scope in scopes

    def require_scope(self, required_scope: str) -> None:
        """Raise an exception if the required scope is not present."""
        if not self.has_scope(required_scope):
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail={
                    "error": "insufficient_scope",
                    "error_description": f"Required scope '{required_scope}' not granted"
                }
            )


def validate_jwt_token(token: str) -> Optional[Dict[str, Any]]:
    """
    Validate a JWT access token with comprehensive error handling.

    Supports both RS256 and HS256 algorithms based on available keys.
    Provides detailed error logging for debugging purposes.

    Returns:
        JWT payload if valid, None if invalid
    """
    # Input validation
    if not token or not isinstance(token, str):
        logger.debug("Invalid token: empty or non-string token provided")
        return None

    # Basic token format validation
    token = token.strip()
    if not token:
        logger.debug("Invalid token: empty token after stripping")
        return None

    # JWT tokens should have 3 parts separated by dots
    parts = token.split('.')
    if len(parts) != 3:
        logger.debug(f"Invalid token format: expected 3 parts, got {len(parts)}")
        return None

    try:
        algorithm = get_jwt_algorithm()
        verification_key = get_jwt_verification_key()

        logger.debug(f"Validating JWT token with algorithm: {algorithm}")
        payload = jwt.decode(
            token,
            verification_key,
            algorithms=[algorithm],
            issuer=OAUTH_ISSUER,
            audience="mcp-memory-service"
        )

        # Additional payload validation
        required_claims = ['sub', 'iss', 'aud', 'exp', 'iat']
        missing_claims = [claim for claim in required_claims if claim not in payload]
        if missing_claims:
            logger.warning(f"JWT token missing required claims: {missing_claims}")
            return None

        logger.debug(f"JWT validation successful for subject: {payload.get('sub')}")
        return payload

    except ExpiredSignatureError:
        logger.debug("JWT validation failed: token has expired")
        return None
    except JWTClaimsError as e:
        logger.debug(f"JWT validation failed: invalid claims - {e}")
        return None
    except ValueError as e:
        logger.debug(f"JWT validation failed: configuration error - {e}")
        return None
    except JWTError as e:
        # Catch-all for other JWT-related errors
        error_type = type(e).__name__
        logger.debug(f"JWT validation failed: {error_type} - {e}")
        return None
    except Exception as e:
        # Unexpected errors should be logged but not crash the system
        error_type = type(e).__name__
        logger.error(f"Unexpected error during JWT validation: {error_type} - {e}")
        return None


async def authenticate_bearer_token(token: str) -> AuthenticationResult:
    """
    Authenticate using OAuth Bearer token with comprehensive error handling.

    Returns:
        AuthenticationResult with authentication status and details
    """
    # Input validation
    if not token or not isinstance(token, str):
        logger.debug("Bearer token authentication failed: invalid token input")
        return AuthenticationResult(
            authenticated=False,
            auth_method="oauth",
            error="invalid_token"
        )

    token = token.strip()
    if not token:
        logger.debug("Bearer token authentication failed: empty token")
        return AuthenticationResult(
            authenticated=False,
            auth_method="oauth",
            error="invalid_token"
        )

    try:
        # First, try JWT validation
        jwt_payload = validate_jwt_token(token)
        if jwt_payload:
            client_id = jwt_payload.get("sub")
            scope = jwt_payload.get("scope", "")

            # Validate client_id is present
            if not client_id:
                logger.warning("JWT authentication failed: missing client_id in token payload")
                return AuthenticationResult(
                    authenticated=False,
                    auth_method="oauth",
                    error="invalid_token"
                )

            logger.debug(f"JWT authentication successful: client_id={client_id}, scope={scope}")
            return AuthenticationResult(
                authenticated=True,
                client_id=client_id,
                scope=scope,
                auth_method="oauth"
            )

        # Fallback: check if token is stored in OAuth storage
        token_data = await oauth_storage.get_access_token(token)
        if token_data:
            client_id = token_data.get("client_id")
            if not client_id:
                logger.warning("OAuth storage authentication failed: missing client_id in stored token")
                return AuthenticationResult(
                    authenticated=False,
                    auth_method="oauth",
                    error="invalid_token"
                )

            logger.debug(f"OAuth storage authentication successful: client_id={client_id}")
            return AuthenticationResult(
                authenticated=True,
                client_id=client_id,
                scope=token_data.get("scope", ""),
                auth_method="oauth"
            )

    except Exception as e:
        # Catch any unexpected errors during authentication
        error_type = type(e).__name__
        logger.error(f"Unexpected error during bearer token authentication: {error_type} - {e}")
        return AuthenticationResult(
            authenticated=False,
            auth_method="oauth",
            error="server_error"
        )

    logger.debug("Bearer token authentication failed: token not found or invalid")
    return AuthenticationResult(
        authenticated=False,
        auth_method="oauth",
        error="invalid_token"
    )


def authenticate_api_key(api_key: str) -> AuthenticationResult:
    """
    Authenticate using legacy API key with enhanced validation.

    Returns:
        AuthenticationResult with authentication status
    """
    # Input validation
    if not api_key or not isinstance(api_key, str):
        logger.debug("API key authentication failed: invalid input")
        return AuthenticationResult(
            authenticated=False,
            auth_method="api_key",
            error="invalid_api_key"
        )

    api_key = api_key.strip()
    if not api_key:
        logger.debug("API key authentication failed: empty key")
        return AuthenticationResult(
            authenticated=False,
            auth_method="api_key",
            error="invalid_api_key"
        )

    # Check if API key is configured
    if not API_KEY:
        logger.debug("API key authentication failed: no API key configured")
        return AuthenticationResult(
            authenticated=False,
            auth_method="api_key",
            error="api_key_not_configured"
        )

    # Validate API key
    if api_key == API_KEY:
        logger.debug("API key authentication successful")
        return AuthenticationResult(
            authenticated=True,
            client_id="api_key_client",
            scope="read write admin",  # API key gets full access
            auth_method="api_key"
        )

    logger.debug("API key authentication failed: key mismatch")
    return AuthenticationResult(
        authenticated=False,
        auth_method="api_key",
        error="invalid_api_key"
    )


async def get_current_user(
    credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme)
) -> AuthenticationResult:
    """
    Get current authenticated user with fallback authentication methods.

    Tries in order:
    1. OAuth Bearer token (JWT or stored token) - only if OAuth is enabled
    2. Legacy API key authentication
    3. Anonymous access (if explicitly enabled)

    Returns:
        AuthenticationResult with authentication details
    """
    # Try OAuth Bearer token authentication first (only if OAuth is enabled)
    if credentials and credentials.scheme.lower() == "bearer":
        # OAuth Bearer token validation only if OAuth is enabled
        if OAUTH_ENABLED:
            auth_result = await authenticate_bearer_token(credentials.credentials)
            if auth_result.authenticated:
                return auth_result

            # OAuth token provided but invalid - log the attempt
            logger.debug(f"OAuth Bearer token validation failed for enabled OAuth system")

        # Try API key authentication as fallback (works regardless of OAuth state)
        if API_KEY:
            # Some clients might send API key as Bearer token
            api_key_result = authenticate_api_key(credentials.credentials)
            if api_key_result.authenticated:
                return api_key_result

        # Determine appropriate error message based on OAuth state
        if OAUTH_ENABLED:
            error_msg = "The access token provided is expired, revoked, malformed, or invalid"
            logger.warning("Invalid Bearer token provided and API key fallback failed")
        else:
            error_msg = "OAuth is disabled. Use API key authentication or enable anonymous access."
            logger.debug("Bearer token provided but OAuth is disabled, API key fallback failed")

        # All Bearer token authentication methods failed
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail={
                "error": "invalid_token",
                "error_description": error_msg
            },
            headers={"WWW-Authenticate": "Bearer"}
        )

    # Allow anonymous access only if explicitly enabled
    if ALLOW_ANONYMOUS_ACCESS:
        logger.debug("Anonymous access explicitly enabled, granting read-only access")
        return AuthenticationResult(
            authenticated=True,
            client_id="anonymous",
            scope="read",  # Anonymous users get read-only access for security
            auth_method="none"
        )

    # No credentials provided and anonymous access not allowed
    if API_KEY or OAUTH_ENABLED:
        logger.debug("No valid authentication provided")
        if OAUTH_ENABLED and API_KEY:
            error_msg = "Authorization required. Provide valid OAuth Bearer token or API key."
        elif OAUTH_ENABLED:
            error_msg = "Authorization required. Provide valid OAuth Bearer token."
        else:
            error_msg = "Authorization required. Provide valid API key."
    else:
        logger.debug("No authentication configured and anonymous access disabled")
        error_msg = "Authentication is required. Set MCP_ALLOW_ANONYMOUS_ACCESS=true to enable anonymous access."

    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail={
            "error": "authorization_required",
            "error_description": error_msg
        },
        headers={"WWW-Authenticate": "Bearer"}
    )


# Convenience dependency for requiring specific scopes
def require_scope(scope: str):
    """
    Create a dependency that requires a specific OAuth scope.

    Usage:
        @app.get("/admin", dependencies=[Depends(require_scope("admin"))])
    """
    async def scope_dependency(user: AuthenticationResult = Depends(get_current_user)):
        user.require_scope(scope)
        return user

    return scope_dependency


# Convenience dependencies for common access patterns
async def require_read_access(user: AuthenticationResult = Depends(get_current_user)) -> AuthenticationResult:
    """Require read access to the resource."""
    user.require_scope("read")
    return user


async def require_write_access(user: AuthenticationResult = Depends(get_current_user)) -> AuthenticationResult:
    """Require write access to the resource."""
    user.require_scope("write")
    return user


async def require_admin_access(user: AuthenticationResult = Depends(get_current_user)) -> AuthenticationResult:
    """Require admin access to the resource."""
    user.require_scope("admin")
    return user


# Optional authentication (for endpoints that work with or without auth)
async def get_optional_user(
    credentials: Optional[HTTPAuthorizationCredentials] = Depends(bearer_scheme)
) -> Optional[AuthenticationResult]:
    """
    Get current user but don't require authentication.

    Returns:
        AuthenticationResult if authenticated, None if not
    """
    try:
        return await get_current_user(credentials)
    except HTTPException:
        return None
```
Page 17/35FirstPrevNextLast