This is page 9 of 35. Use http://codebase.md/doobidoo/mcp-memory-service?page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── amp-bridge.md
│ │ ├── amp-pr-automator.md
│ │ ├── code-quality-guard.md
│ │ ├── gemini-pr-automator.md
│ │ └── github-release-manager.md
│ ├── settings.local.json.backup
│ └── settings.local.json.local
├── .commit-message
├── .dockerignore
├── .env.example
├── .env.sqlite.backup
├── .envnn#
├── .gitattributes
├── .github
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── performance_issue.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── bridge-tests.yml
│ ├── CACHE_FIX.md
│ ├── claude-code-review.yml
│ ├── claude.yml
│ ├── cleanup-images.yml.disabled
│ ├── dev-setup-validation.yml
│ ├── docker-publish.yml
│ ├── LATEST_FIXES.md
│ ├── main-optimized.yml.disabled
│ ├── main.yml
│ ├── publish-and-test.yml
│ ├── README_OPTIMIZATION.md
│ ├── release-tag.yml.disabled
│ ├── release.yml
│ ├── roadmap-review-reminder.yml
│ ├── SECRET_CONDITIONAL_FIX.md
│ └── WORKFLOW_FIXES.md
├── .gitignore
├── .mcp.json.backup
├── .mcp.json.template
├── .pyscn
│ ├── .gitignore
│ └── reports
│ └── analyze_20251123_214224.html
├── AGENTS.md
├── archive
│ ├── deployment
│ │ ├── deploy_fastmcp_fixed.sh
│ │ ├── deploy_http_with_mcp.sh
│ │ └── deploy_mcp_v4.sh
│ ├── deployment-configs
│ │ ├── empty_config.yml
│ │ └── smithery.yaml
│ ├── development
│ │ └── test_fastmcp.py
│ ├── docs-removed-2025-08-23
│ │ ├── authentication.md
│ │ ├── claude_integration.md
│ │ ├── claude-code-compatibility.md
│ │ ├── claude-code-integration.md
│ │ ├── claude-code-quickstart.md
│ │ ├── claude-desktop-setup.md
│ │ ├── complete-setup-guide.md
│ │ ├── database-synchronization.md
│ │ ├── development
│ │ │ ├── autonomous-memory-consolidation.md
│ │ │ ├── CLEANUP_PLAN.md
│ │ │ ├── CLEANUP_README.md
│ │ │ ├── CLEANUP_SUMMARY.md
│ │ │ ├── dream-inspired-memory-consolidation.md
│ │ │ ├── hybrid-slm-memory-consolidation.md
│ │ │ ├── mcp-milestone.md
│ │ │ ├── multi-client-architecture.md
│ │ │ ├── test-results.md
│ │ │ └── TIMESTAMP_FIX_SUMMARY.md
│ │ ├── distributed-sync.md
│ │ ├── invocation_guide.md
│ │ ├── macos-intel.md
│ │ ├── master-guide.md
│ │ ├── mcp-client-configuration.md
│ │ ├── multi-client-server.md
│ │ ├── service-installation.md
│ │ ├── sessions
│ │ │ └── MCP_ENHANCEMENT_SESSION_MEMORY_v4.1.0.md
│ │ ├── UBUNTU_SETUP.md
│ │ ├── ubuntu.md
│ │ ├── windows-setup.md
│ │ └── windows.md
│ ├── docs-root-cleanup-2025-08-23
│ │ ├── AWESOME_LIST_SUBMISSION.md
│ │ ├── CLOUDFLARE_IMPLEMENTATION.md
│ │ ├── DOCUMENTATION_ANALYSIS.md
│ │ ├── DOCUMENTATION_CLEANUP_PLAN.md
│ │ ├── DOCUMENTATION_CONSOLIDATION_COMPLETE.md
│ │ ├── LITESTREAM_SETUP_GUIDE.md
│ │ ├── lm_studio_system_prompt.md
│ │ ├── PYTORCH_DOWNLOAD_FIX.md
│ │ └── README-ORIGINAL-BACKUP.md
│ ├── investigations
│ │ └── MACOS_HOOKS_INVESTIGATION.md
│ ├── litestream-configs-v6.3.0
│ │ ├── install_service.sh
│ │ ├── litestream_master_config_fixed.yml
│ │ ├── litestream_master_config.yml
│ │ ├── litestream_replica_config_fixed.yml
│ │ ├── litestream_replica_config.yml
│ │ ├── litestream_replica_simple.yml
│ │ ├── litestream-http.service
│ │ ├── litestream.service
│ │ └── requirements-cloudflare.txt
│ ├── release-notes
│ │ └── release-notes-v7.1.4.md
│ └── setup-development
│ ├── README.md
│ ├── setup_consolidation_mdns.sh
│ ├── STARTUP_SETUP_GUIDE.md
│ └── test_service.sh
├── CHANGELOG-HISTORIC.md
├── CHANGELOG.md
├── claude_commands
│ ├── memory-context.md
│ ├── memory-health.md
│ ├── memory-ingest-dir.md
│ ├── memory-ingest.md
│ ├── memory-recall.md
│ ├── memory-search.md
│ ├── memory-store.md
│ ├── README.md
│ └── session-start.md
├── claude-hooks
│ ├── config.json
│ ├── config.template.json
│ ├── CONFIGURATION.md
│ ├── core
│ │ ├── memory-retrieval.js
│ │ ├── mid-conversation.js
│ │ ├── session-end.js
│ │ ├── session-start.js
│ │ └── topic-change.js
│ ├── debug-pattern-test.js
│ ├── install_claude_hooks_windows.ps1
│ ├── install_hooks.py
│ ├── memory-mode-controller.js
│ ├── MIGRATION.md
│ ├── README-NATURAL-TRIGGERS.md
│ ├── README-phase2.md
│ ├── README.md
│ ├── simple-test.js
│ ├── statusline.sh
│ ├── test-adaptive-weights.js
│ ├── test-dual-protocol-hook.js
│ ├── test-mcp-hook.js
│ ├── test-natural-triggers.js
│ ├── test-recency-scoring.js
│ ├── tests
│ │ ├── integration-test.js
│ │ ├── phase2-integration-test.js
│ │ ├── test-code-execution.js
│ │ ├── test-cross-session.json
│ │ ├── test-session-tracking.json
│ │ └── test-threading.json
│ ├── utilities
│ │ ├── adaptive-pattern-detector.js
│ │ ├── context-formatter.js
│ │ ├── context-shift-detector.js
│ │ ├── conversation-analyzer.js
│ │ ├── dynamic-context-updater.js
│ │ ├── git-analyzer.js
│ │ ├── mcp-client.js
│ │ ├── memory-client.js
│ │ ├── memory-scorer.js
│ │ ├── performance-manager.js
│ │ ├── project-detector.js
│ │ ├── session-tracker.js
│ │ ├── tiered-conversation-monitor.js
│ │ └── version-checker.js
│ └── WINDOWS-SESSIONSTART-BUG.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── Development-Sprint-November-2025.md
├── docs
│ ├── amp-cli-bridge.md
│ ├── api
│ │ ├── code-execution-interface.md
│ │ ├── memory-metadata-api.md
│ │ ├── PHASE1_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_IMPLEMENTATION_SUMMARY.md
│ │ ├── PHASE2_REPORT.md
│ │ └── tag-standardization.md
│ ├── architecture
│ │ ├── search-enhancement-spec.md
│ │ └── search-examples.md
│ ├── architecture.md
│ ├── archive
│ │ └── obsolete-workflows
│ │ ├── load_memory_context.md
│ │ └── README.md
│ ├── assets
│ │ └── images
│ │ ├── dashboard-v3.3.0-preview.png
│ │ ├── memory-awareness-hooks-example.png
│ │ ├── project-infographic.svg
│ │ └── README.md
│ ├── CLAUDE_CODE_QUICK_REFERENCE.md
│ ├── cloudflare-setup.md
│ ├── deployment
│ │ ├── docker.md
│ │ ├── dual-service.md
│ │ ├── production-guide.md
│ │ └── systemd-service.md
│ ├── development
│ │ ├── ai-agent-instructions.md
│ │ ├── code-quality
│ │ │ ├── phase-2a-completion.md
│ │ │ ├── phase-2a-handle-get-prompt.md
│ │ │ ├── phase-2a-index.md
│ │ │ ├── phase-2a-install-package.md
│ │ │ └── phase-2b-session-summary.md
│ │ ├── code-quality-workflow.md
│ │ ├── dashboard-workflow.md
│ │ ├── issue-management.md
│ │ ├── pr-review-guide.md
│ │ ├── refactoring-notes.md
│ │ ├── release-checklist.md
│ │ └── todo-tracker.md
│ ├── docker-optimized-build.md
│ ├── document-ingestion.md
│ ├── DOCUMENTATION_AUDIT.md
│ ├── enhancement-roadmap-issue-14.md
│ ├── examples
│ │ ├── analysis-scripts.js
│ │ ├── maintenance-session-example.md
│ │ ├── memory-distribution-chart.jsx
│ │ └── tag-schema.json
│ ├── first-time-setup.md
│ ├── glama-deployment.md
│ ├── guides
│ │ ├── advanced-command-examples.md
│ │ ├── chromadb-migration.md
│ │ ├── commands-vs-mcp-server.md
│ │ ├── mcp-enhancements.md
│ │ ├── mdns-service-discovery.md
│ │ ├── memory-consolidation-guide.md
│ │ ├── migration.md
│ │ ├── scripts.md
│ │ └── STORAGE_BACKENDS.md
│ ├── HOOK_IMPROVEMENTS.md
│ ├── hooks
│ │ └── phase2-code-execution-migration.md
│ ├── http-server-management.md
│ ├── ide-compatability.md
│ ├── IMAGE_RETENTION_POLICY.md
│ ├── images
│ │ └── dashboard-placeholder.md
│ ├── implementation
│ │ ├── health_checks.md
│ │ └── performance.md
│ ├── IMPLEMENTATION_PLAN_HTTP_SSE.md
│ ├── integration
│ │ ├── homebrew.md
│ │ └── multi-client.md
│ ├── integrations
│ │ ├── gemini.md
│ │ ├── groq-bridge.md
│ │ ├── groq-integration-summary.md
│ │ └── groq-model-comparison.md
│ ├── integrations.md
│ ├── legacy
│ │ └── dual-protocol-hooks.md
│ ├── LM_STUDIO_COMPATIBILITY.md
│ ├── maintenance
│ │ └── memory-maintenance.md
│ ├── mastery
│ │ ├── api-reference.md
│ │ ├── architecture-overview.md
│ │ ├── configuration-guide.md
│ │ ├── local-setup-and-run.md
│ │ ├── testing-guide.md
│ │ └── troubleshooting.md
│ ├── migration
│ │ └── code-execution-api-quick-start.md
│ ├── natural-memory-triggers
│ │ ├── cli-reference.md
│ │ ├── installation-guide.md
│ │ └── performance-optimization.md
│ ├── oauth-setup.md
│ ├── pr-graphql-integration.md
│ ├── quick-setup-cloudflare-dual-environment.md
│ ├── README.md
│ ├── remote-configuration-wiki-section.md
│ ├── research
│ │ ├── code-execution-interface-implementation.md
│ │ └── code-execution-interface-summary.md
│ ├── ROADMAP.md
│ ├── sqlite-vec-backend.md
│ ├── statistics
│ │ ├── charts
│ │ │ ├── activity_patterns.png
│ │ │ ├── contributors.png
│ │ │ ├── growth_trajectory.png
│ │ │ ├── monthly_activity.png
│ │ │ └── october_sprint.png
│ │ ├── data
│ │ │ ├── activity_by_day.csv
│ │ │ ├── activity_by_hour.csv
│ │ │ ├── contributors.csv
│ │ │ └── monthly_activity.csv
│ │ ├── generate_charts.py
│ │ └── REPOSITORY_STATISTICS.md
│ ├── technical
│ │ ├── development.md
│ │ ├── memory-migration.md
│ │ ├── migration-log.md
│ │ ├── sqlite-vec-embedding-fixes.md
│ │ └── tag-storage.md
│ ├── testing
│ │ └── regression-tests.md
│ ├── testing-cloudflare-backend.md
│ ├── troubleshooting
│ │ ├── cloudflare-api-token-setup.md
│ │ ├── cloudflare-authentication.md
│ │ ├── general.md
│ │ ├── hooks-quick-reference.md
│ │ ├── pr162-schema-caching-issue.md
│ │ ├── session-end-hooks.md
│ │ └── sync-issues.md
│ └── tutorials
│ ├── advanced-techniques.md
│ ├── data-analysis.md
│ └── demo-session-walkthrough.md
├── examples
│ ├── claude_desktop_config_template.json
│ ├── claude_desktop_config_windows.json
│ ├── claude-desktop-http-config.json
│ ├── config
│ │ └── claude_desktop_config.json
│ ├── http-mcp-bridge.js
│ ├── memory_export_template.json
│ ├── README.md
│ ├── setup
│ │ └── setup_multi_client_complete.py
│ └── start_https_example.sh
├── install_service.py
├── install.py
├── LICENSE
├── NOTICE
├── pyproject.toml
├── pytest.ini
├── README.md
├── run_server.py
├── scripts
│ ├── .claude
│ │ └── settings.local.json
│ ├── archive
│ │ └── check_missing_timestamps.py
│ ├── backup
│ │ ├── backup_memories.py
│ │ ├── backup_sqlite_vec.sh
│ │ ├── export_distributable_memories.sh
│ │ └── restore_memories.py
│ ├── benchmarks
│ │ ├── benchmark_code_execution_api.py
│ │ ├── benchmark_hybrid_sync.py
│ │ └── benchmark_server_caching.py
│ ├── database
│ │ ├── analyze_sqlite_vec_db.py
│ │ ├── check_sqlite_vec_status.py
│ │ ├── db_health_check.py
│ │ └── simple_timestamp_check.py
│ ├── development
│ │ ├── debug_server_initialization.py
│ │ ├── find_orphaned_files.py
│ │ ├── fix_mdns.sh
│ │ ├── fix_sitecustomize.py
│ │ ├── remote_ingest.sh
│ │ ├── setup-git-merge-drivers.sh
│ │ ├── uv-lock-merge.sh
│ │ └── verify_hybrid_sync.py
│ ├── hooks
│ │ └── pre-commit
│ ├── installation
│ │ ├── install_linux_service.py
│ │ ├── install_macos_service.py
│ │ ├── install_uv.py
│ │ ├── install_windows_service.py
│ │ ├── install.py
│ │ ├── setup_backup_cron.sh
│ │ ├── setup_claude_mcp.sh
│ │ └── setup_cloudflare_resources.py
│ ├── linux
│ │ ├── service_status.sh
│ │ ├── start_service.sh
│ │ ├── stop_service.sh
│ │ ├── uninstall_service.sh
│ │ └── view_logs.sh
│ ├── maintenance
│ │ ├── assign_memory_types.py
│ │ ├── check_memory_types.py
│ │ ├── cleanup_corrupted_encoding.py
│ │ ├── cleanup_memories.py
│ │ ├── cleanup_organize.py
│ │ ├── consolidate_memory_types.py
│ │ ├── consolidation_mappings.json
│ │ ├── delete_orphaned_vectors_fixed.py
│ │ ├── fast_cleanup_duplicates_with_tracking.sh
│ │ ├── find_all_duplicates.py
│ │ ├── find_cloudflare_duplicates.py
│ │ ├── find_duplicates.py
│ │ ├── memory-types.md
│ │ ├── README.md
│ │ ├── recover_timestamps_from_cloudflare.py
│ │ ├── regenerate_embeddings.py
│ │ ├── repair_malformed_tags.py
│ │ ├── repair_memories.py
│ │ ├── repair_sqlite_vec_embeddings.py
│ │ ├── repair_zero_embeddings.py
│ │ ├── restore_from_json_export.py
│ │ └── scan_todos.sh
│ ├── migration
│ │ ├── cleanup_mcp_timestamps.py
│ │ ├── legacy
│ │ │ └── migrate_chroma_to_sqlite.py
│ │ ├── mcp-migration.py
│ │ ├── migrate_sqlite_vec_embeddings.py
│ │ ├── migrate_storage.py
│ │ ├── migrate_tags.py
│ │ ├── migrate_timestamps.py
│ │ ├── migrate_to_cloudflare.py
│ │ ├── migrate_to_sqlite_vec.py
│ │ ├── migrate_v5_enhanced.py
│ │ ├── TIMESTAMP_CLEANUP_README.md
│ │ └── verify_mcp_timestamps.py
│ ├── pr
│ │ ├── amp_collect_results.sh
│ │ ├── amp_detect_breaking_changes.sh
│ │ ├── amp_generate_tests.sh
│ │ ├── amp_pr_review.sh
│ │ ├── amp_quality_gate.sh
│ │ ├── amp_suggest_fixes.sh
│ │ ├── auto_review.sh
│ │ ├── detect_breaking_changes.sh
│ │ ├── generate_tests.sh
│ │ ├── lib
│ │ │ └── graphql_helpers.sh
│ │ ├── quality_gate.sh
│ │ ├── resolve_threads.sh
│ │ ├── run_pyscn_analysis.sh
│ │ ├── run_quality_checks.sh
│ │ ├── thread_status.sh
│ │ └── watch_reviews.sh
│ ├── quality
│ │ ├── fix_dead_code_install.sh
│ │ ├── phase1_dead_code_analysis.md
│ │ ├── phase2_complexity_analysis.md
│ │ ├── README_PHASE1.md
│ │ ├── README_PHASE2.md
│ │ ├── track_pyscn_metrics.sh
│ │ └── weekly_quality_review.sh
│ ├── README.md
│ ├── run
│ │ ├── run_mcp_memory.sh
│ │ ├── run-with-uv.sh
│ │ └── start_sqlite_vec.sh
│ ├── run_memory_server.py
│ ├── server
│ │ ├── check_http_server.py
│ │ ├── check_server_health.py
│ │ ├── memory_offline.py
│ │ ├── preload_models.py
│ │ ├── run_http_server.py
│ │ ├── run_memory_server.py
│ │ ├── start_http_server.bat
│ │ └── start_http_server.sh
│ ├── service
│ │ ├── deploy_dual_services.sh
│ │ ├── install_http_service.sh
│ │ ├── mcp-memory-http.service
│ │ ├── mcp-memory.service
│ │ ├── memory_service_manager.sh
│ │ ├── service_control.sh
│ │ ├── service_utils.py
│ │ └── update_service.sh
│ ├── sync
│ │ ├── check_drift.py
│ │ ├── claude_sync_commands.py
│ │ ├── export_memories.py
│ │ ├── import_memories.py
│ │ ├── litestream
│ │ │ ├── apply_local_changes.sh
│ │ │ ├── enhanced_memory_store.sh
│ │ │ ├── init_staging_db.sh
│ │ │ ├── io.litestream.replication.plist
│ │ │ ├── manual_sync.sh
│ │ │ ├── memory_sync.sh
│ │ │ ├── pull_remote_changes.sh
│ │ │ ├── push_to_remote.sh
│ │ │ ├── README.md
│ │ │ ├── resolve_conflicts.sh
│ │ │ ├── setup_local_litestream.sh
│ │ │ ├── setup_remote_litestream.sh
│ │ │ ├── staging_db_init.sql
│ │ │ ├── stash_local_changes.sh
│ │ │ ├── sync_from_remote_noconfig.sh
│ │ │ └── sync_from_remote.sh
│ │ ├── README.md
│ │ ├── safe_cloudflare_update.sh
│ │ ├── sync_memory_backends.py
│ │ └── sync_now.py
│ ├── testing
│ │ ├── run_complete_test.py
│ │ ├── run_memory_test.sh
│ │ ├── simple_test.py
│ │ ├── test_cleanup_logic.py
│ │ ├── test_cloudflare_backend.py
│ │ ├── test_docker_functionality.py
│ │ ├── test_installation.py
│ │ ├── test_mdns.py
│ │ ├── test_memory_api.py
│ │ ├── test_memory_simple.py
│ │ ├── test_migration.py
│ │ ├── test_search_api.py
│ │ ├── test_sqlite_vec_embeddings.py
│ │ ├── test_sse_events.py
│ │ ├── test-connection.py
│ │ └── test-hook.js
│ ├── utils
│ │ ├── claude_commands_utils.py
│ │ ├── generate_personalized_claude_md.sh
│ │ ├── groq
│ │ ├── groq_agent_bridge.py
│ │ ├── list-collections.py
│ │ ├── memory_wrapper_uv.py
│ │ ├── query_memories.py
│ │ ├── smithery_wrapper.py
│ │ ├── test_groq_bridge.sh
│ │ └── uv_wrapper.py
│ └── validation
│ ├── check_dev_setup.py
│ ├── check_documentation_links.py
│ ├── diagnose_backend_config.py
│ ├── validate_configuration_complete.py
│ ├── validate_memories.py
│ ├── validate_migration.py
│ ├── validate_timestamp_integrity.py
│ ├── verify_environment.py
│ ├── verify_pytorch_windows.py
│ └── verify_torch.py
├── SECURITY.md
├── selective_timestamp_recovery.py
├── SPONSORS.md
├── src
│ └── mcp_memory_service
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── operations.py
│ │ ├── sync_wrapper.py
│ │ └── types.py
│ ├── backup
│ │ ├── __init__.py
│ │ └── scheduler.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── ingestion.py
│ │ ├── main.py
│ │ └── utils.py
│ ├── config.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── associations.py
│ │ ├── base.py
│ │ ├── clustering.py
│ │ ├── compression.py
│ │ ├── consolidator.py
│ │ ├── decay.py
│ │ ├── forgetting.py
│ │ ├── health.py
│ │ └── scheduler.py
│ ├── dependency_check.py
│ ├── discovery
│ │ ├── __init__.py
│ │ ├── client.py
│ │ └── mdns_service.py
│ ├── embeddings
│ │ ├── __init__.py
│ │ └── onnx_embeddings.py
│ ├── ingestion
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chunker.py
│ │ ├── csv_loader.py
│ │ ├── json_loader.py
│ │ ├── pdf_loader.py
│ │ ├── registry.py
│ │ ├── semtools_loader.py
│ │ └── text_loader.py
│ ├── lm_studio_compat.py
│ ├── mcp_server.py
│ ├── models
│ │ ├── __init__.py
│ │ └── memory.py
│ ├── server.py
│ ├── services
│ │ ├── __init__.py
│ │ └── memory_service.py
│ ├── storage
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloudflare.py
│ │ ├── factory.py
│ │ ├── http_client.py
│ │ ├── hybrid.py
│ │ └── sqlite_vec.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── exporter.py
│ │ ├── importer.py
│ │ └── litestream_config.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── content_splitter.py
│ │ ├── db_utils.py
│ │ ├── debug.py
│ │ ├── document_processing.py
│ │ ├── gpu_detection.py
│ │ ├── hashing.py
│ │ ├── http_server_manager.py
│ │ ├── port_detection.py
│ │ ├── system_detection.py
│ │ └── time_parser.py
│ └── web
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── analytics.py
│ │ ├── backup.py
│ │ ├── consolidation.py
│ │ ├── documents.py
│ │ ├── events.py
│ │ ├── health.py
│ │ ├── manage.py
│ │ ├── mcp.py
│ │ ├── memories.py
│ │ ├── search.py
│ │ └── sync.py
│ ├── app.py
│ ├── dependencies.py
│ ├── oauth
│ │ ├── __init__.py
│ │ ├── authorization.py
│ │ ├── discovery.py
│ │ ├── middleware.py
│ │ ├── models.py
│ │ ├── registration.py
│ │ └── storage.py
│ ├── sse.py
│ └── static
│ ├── app.js
│ ├── index.html
│ ├── README.md
│ ├── sse_test.html
│ └── style.css
├── start_http_debug.bat
├── start_http_server.sh
├── test_document.txt
├── test_version_checker.js
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── test_compact_types.py
│ │ └── test_operations.py
│ ├── bridge
│ │ ├── mock_responses.js
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ └── test_http_mcp_bridge.js
│ ├── conftest.py
│ ├── consolidation
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_associations.py
│ │ ├── test_clustering.py
│ │ ├── test_compression.py
│ │ ├── test_consolidator.py
│ │ ├── test_decay.py
│ │ └── test_forgetting.py
│ ├── contracts
│ │ └── api-specification.yml
│ ├── integration
│ │ ├── package-lock.json
│ │ ├── package.json
│ │ ├── test_api_key_fallback.py
│ │ ├── test_api_memories_chronological.py
│ │ ├── test_api_tag_time_search.py
│ │ ├── test_api_with_memory_service.py
│ │ ├── test_bridge_integration.js
│ │ ├── test_cli_interfaces.py
│ │ ├── test_cloudflare_connection.py
│ │ ├── test_concurrent_clients.py
│ │ ├── test_data_serialization_consistency.py
│ │ ├── test_http_server_startup.py
│ │ ├── test_mcp_memory.py
│ │ ├── test_mdns_integration.py
│ │ ├── test_oauth_basic_auth.py
│ │ ├── test_oauth_flow.py
│ │ ├── test_server_handlers.py
│ │ └── test_store_memory.py
│ ├── performance
│ │ ├── test_background_sync.py
│ │ └── test_hybrid_live.py
│ ├── README.md
│ ├── smithery
│ │ └── test_smithery.py
│ ├── sqlite
│ │ └── simple_sqlite_vec_test.py
│ ├── test_client.py
│ ├── test_content_splitting.py
│ ├── test_database.py
│ ├── test_hybrid_cloudflare_limits.py
│ ├── test_hybrid_storage.py
│ ├── test_memory_ops.py
│ ├── test_semantic_search.py
│ ├── test_sqlite_vec_storage.py
│ ├── test_time_parser.py
│ ├── test_timestamp_preservation.py
│ ├── timestamp
│ │ ├── test_hook_vs_manual_storage.py
│ │ ├── test_issue99_final_validation.py
│ │ ├── test_search_retrieval_inconsistency.py
│ │ ├── test_timestamp_issue.py
│ │ └── test_timestamp_simple.py
│ └── unit
│ ├── conftest.py
│ ├── test_cloudflare_storage.py
│ ├── test_csv_loader.py
│ ├── test_fastapi_dependencies.py
│ ├── test_import.py
│ ├── test_json_loader.py
│ ├── test_mdns_simple.py
│ ├── test_mdns.py
│ ├── test_memory_service.py
│ ├── test_memory.py
│ ├── test_semtools_loader.py
│ ├── test_storage_interface_compatibility.py
│ └── test_tag_time_filtering.py
├── tools
│ ├── docker
│ │ ├── DEPRECATED.md
│ │ ├── docker-compose.http.yml
│ │ ├── docker-compose.pythonpath.yml
│ │ ├── docker-compose.standalone.yml
│ │ ├── docker-compose.uv.yml
│ │ ├── docker-compose.yml
│ │ ├── docker-entrypoint-persistent.sh
│ │ ├── docker-entrypoint-unified.sh
│ │ ├── docker-entrypoint.sh
│ │ ├── Dockerfile
│ │ ├── Dockerfile.glama
│ │ ├── Dockerfile.slim
│ │ ├── README.md
│ │ └── test-docker-modes.sh
│ └── README.md
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/mcp_memory_service/api/types.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Compact data types for token-efficient code execution interface.
These types provide 85-91% token reduction compared to full Memory objects
while maintaining essential information for code execution contexts.
Token Efficiency Comparison:
- Full Memory object: ~820 tokens
- CompactMemory: ~73 tokens (91% reduction)
- CompactSearchResult (5 results): ~385 tokens vs ~2,625 tokens (85% reduction)
"""
from typing import NamedTuple
class CompactMemory(NamedTuple):
"""
Minimal memory representation optimized for token efficiency.
This type reduces token consumption by 91% compared to full Memory objects
by including only essential fields and using compact representations.
Token Cost: ~73 tokens (vs ~820 for full Memory)
Fields:
hash: 8-character content hash for unique identification
preview: First 200 characters of content (sufficient for context)
tags: Immutable tuple of tags for filtering and categorization
created: Unix timestamp (float) for temporal context
score: Relevance score (0.0-1.0) for search ranking
Example:
>>> memory = CompactMemory(
... hash='abc12345',
... preview='Implemented OAuth 2.1 authentication...',
... tags=('authentication', 'security', 'feature'),
... created=1730928000.0,
... score=0.95
... )
>>> print(f"{memory.hash}: {memory.preview[:50]}... (score: {memory.score})")
abc12345: Implemented OAuth 2.1 authentication... (score: 0.95)
"""
hash: str # 8-char content hash (~5 tokens)
preview: str # First 200 chars (~50 tokens)
tags: tuple[str, ...] # Immutable tags tuple (~10 tokens)
created: float # Unix timestamp (~5 tokens)
score: float # Relevance score 0-1 (~3 tokens)
class CompactSearchResult(NamedTuple):
"""
Search result container with minimal overhead.
Provides search results in a token-efficient format with essential
metadata for context understanding.
Token Cost: ~10 tokens + (73 * num_memories)
Example (5 results): ~375 tokens (vs ~2,625 for full results, 86% reduction)
Fields:
memories: Tuple of CompactMemory objects (immutable for safety)
total: Total number of results found
query: Original search query for context
Example:
>>> result = CompactSearchResult(
... memories=(memory1, memory2, memory3),
... total=3,
... query='authentication implementation'
... )
>>> print(result)
SearchResult(found=3, shown=3)
>>> for m in result.memories:
... print(f" {m.hash}: {m.preview[:40]}...")
"""
memories: tuple[CompactMemory, ...] # Immutable results tuple
total: int # Total results count
query: str # Original query string
def __repr__(self) -> str:
"""Compact string representation for minimal token usage."""
return f"SearchResult(found={self.total}, shown={len(self.memories)})"
class CompactHealthInfo(NamedTuple):
"""
Service health information with minimal overhead.
Provides essential service status in a compact format for health checks
and diagnostics.
Token Cost: ~20 tokens (vs ~100 for full health check, 80% reduction)
Fields:
status: Service status ('healthy' | 'degraded' | 'error')
count: Total number of memories stored
backend: Storage backend type ('sqlite_vec' | 'cloudflare' | 'hybrid')
Example:
>>> info = CompactHealthInfo(
... status='healthy',
... count=1247,
... backend='sqlite_vec'
... )
>>> print(f"Status: {info.status}, Backend: {info.backend}, Count: {info.count}")
Status: healthy, Backend: sqlite_vec, Count: 1247
"""
status: str # 'healthy' | 'degraded' | 'error' (~5 tokens)
count: int # Total memories (~5 tokens)
backend: str # Storage backend type (~10 tokens)
class CompactConsolidationResult(NamedTuple):
"""
Consolidation operation result with minimal overhead.
Provides consolidation results in a token-efficient format with essential
metrics for monitoring and analysis.
Token Cost: ~40 tokens (vs ~250 for full result, 84% reduction)
Fields:
status: Operation status ('completed' | 'running' | 'failed')
horizon: Time horizon ('daily' | 'weekly' | 'monthly' | 'quarterly' | 'yearly')
processed: Number of memories processed
compressed: Number of memories compressed
forgotten: Number of memories forgotten/archived
duration: Operation duration in seconds
Example:
>>> result = CompactConsolidationResult(
... status='completed',
... horizon='weekly',
... processed=2418,
... compressed=156,
... forgotten=43,
... duration=24.2
... )
>>> print(f"Consolidated {result.processed} memories in {result.duration}s")
Consolidated 2418 memories in 24.2s
"""
status: str # Operation status (~5 tokens)
horizon: str # Time horizon (~5 tokens)
processed: int # Memories processed (~5 tokens)
compressed: int # Memories compressed (~5 tokens)
forgotten: int # Memories forgotten (~5 tokens)
duration: float # Duration in seconds (~5 tokens)
def __repr__(self) -> str:
"""Compact string representation for minimal token usage."""
return f"Consolidation({self.status}, {self.horizon}, {self.processed} processed)"
class CompactSchedulerStatus(NamedTuple):
"""
Consolidation scheduler status with minimal overhead.
Provides scheduler state and next run information in a compact format.
Token Cost: ~25 tokens (vs ~150 for full status, 83% reduction)
Fields:
running: Whether scheduler is active
next_daily: Unix timestamp of next daily run (or None)
next_weekly: Unix timestamp of next weekly run (or None)
next_monthly: Unix timestamp of next monthly run (or None)
jobs_executed: Total jobs executed since start
jobs_failed: Total jobs that failed
Example:
>>> status = CompactSchedulerStatus(
... running=True,
... next_daily=1730928000.0,
... next_weekly=1731187200.0,
... next_monthly=1732406400.0,
... jobs_executed=42,
... jobs_failed=0
... )
>>> print(f"Scheduler: {'active' if status.running else 'inactive'}")
Scheduler: active
"""
running: bool # Scheduler status (~3 tokens)
next_daily: float | None # Next daily run timestamp (~5 tokens)
next_weekly: float | None # Next weekly run timestamp (~5 tokens)
next_monthly: float | None # Next monthly run timestamp (~5 tokens)
jobs_executed: int # Total successful jobs (~3 tokens)
jobs_failed: int # Total failed jobs (~3 tokens)
def __repr__(self) -> str:
"""Compact string representation for minimal token usage."""
state = "running" if self.running else "stopped"
return f"Scheduler({state}, executed={self.jobs_executed}, failed={self.jobs_failed})"
```
--------------------------------------------------------------------------------
/archive/docs-root-cleanup-2025-08-23/DOCUMENTATION_CONSOLIDATION_COMPLETE.md:
--------------------------------------------------------------------------------
```markdown
# Documentation Consolidation - COMPLETE ✅
**Date**: 2025-08-23
**Status**: Successfully completed documentation consolidation and wiki migration
## 🎯 Mission Accomplished
### ✅ **Phase 1: Analysis Complete**
- **87 markdown files** analyzed for redundancy
- **Massive overlap identified**: 6 installation guides, 5 Claude integration files, 4 platform guides
- **Comprehensive audit completed** with detailed categorization
### ✅ **Phase 2: Wiki Structure Created**
- **3 comprehensive consolidated guides** created in wiki:
- **[Installation Guide](https://github.com/doobidoo/mcp-memory-service/wiki/Installation-Guide)** - Single source for all installation methods
- **[Platform Setup Guide](https://github.com/doobidoo/mcp-memory-service/wiki/Platform-Setup-Guide)** - Windows, macOS, Linux optimizations
- **[Integration Guide](https://github.com/doobidoo/mcp-memory-service/wiki/Integration-Guide)** - Claude Desktop, Claude Code, VS Code, IDEs
- **Wiki Home page updated** with prominent links to new guides
### ✅ **Phase 3: Content Migration**
- **All redundant content consolidated** into comprehensive wiki pages
- **No information lost** - everything preserved and better organized
- **Cross-references added** between related topics
- **Single source of truth** established for each topic
### ✅ **Phase 4: Repository Cleanup**
- **README.md streamlined** - 56KB → 8KB with wiki links
- **26 redundant files safely moved** to `archive/docs-removed-2025-08-23/`
- **Empty directories removed**
- **Original README preserved** as `README-ORIGINAL-BACKUP.md`
## 📊 **Results: Transformation Complete**
### **Before Consolidation:**
- **87 markdown files** (1MB+ documentation)
- **6 different installation guides** with overlapping steps
- **5 Claude integration files** with duplicate examples
- **4 platform setup guides** covering same ground
- **Overwhelming user choice** - which guide to follow?
- **High maintenance burden** - update 6+ files for installation changes
### **After Consolidation:**
- **Essential repository files**: README, CLAUDE, CHANGELOG (focused on code)
- **Comprehensive wiki**: 3 consolidated guides covering everything
- **Single source of truth** for each topic
- **Clear user path**: README → Wiki → Success
- **90% reduction** in repository documentation files
- **Improved maintainability** - update once, not 6+ times
## 🚀 **User Experience Transformation**
### **Old Experience (Confusing):**
```
User: "How do I install this?"
Repository: "Here are 6 different installation guides...
- docs/guides/service-installation.md
- docs/installation/complete-setup-guide.md
- docs/installation/master-guide.md
- docs/guides/claude-desktop-setup.md
- docs/platforms/windows.md
- README.md (56KB of everything)
Which one do you want?"
User: 😵💫 "I'm overwhelmed..."
```
### **New Experience (Clear):**
```
User: "How do I install this?"
Repository: "Quick start in README, comprehensive guide in wiki!"
README: "🚀 Quick Start: python install.py
📚 Complete docs: Installation Guide (wiki)"
User: 😊 "Perfect, exactly what I need!"
```
## 📁 **File Organization Results**
### **Repository Files (Clean & Focused):**
- ✅ `README.md` - Streamlined overview (8KB)
- ✅ `CLAUDE.md` - Claude Code development guidance
- ✅ `CHANGELOG.md` - Version history
- ✅ `archive/` - Safely preserved removed documentation
### **Wiki Files (Comprehensive & Organized):**
- ✅ `Installation-Guide.md` - Everything about installation
- ✅ `Platform-Setup-Guide.md` - Platform-specific optimizations
- ✅ `Integration-Guide.md` - All IDE and tool integrations
- ✅ `Home.md` - Updated with clear navigation
### **Archive (Safe Backup):**
- ✅ **26 files moved** to `archive/docs-removed-2025-08-23/`
- ✅ **Complete backup** - nothing permanently deleted
- ✅ **Git history preserved** - all content recoverable
- ✅ **Original README** backed up as `README-ORIGINAL-BACKUP.md`
## 🎖️ **Key Achievements**
### **1. Eliminated Redundancy**
- **Installation**: 6 guides → 1 comprehensive wiki page
- **Platform Setup**: 4 guides → 1 optimized wiki page
- **Integration**: 5 guides → 1 complete wiki page
- **No information lost** - everything consolidated and enhanced
### **2. Improved User Experience**
- **Clear path**: README → Quick Start → Wiki for details
- **No choice paralysis**: Single authoritative source per topic
- **Better navigation**: Logical wiki structure vs scattered files
- **Faster onboarding**: Quick start + comprehensive references
### **3. Better Maintainability**
- **Single source updates**: Change once vs 6+ places
- **Reduced maintenance burden**: One installation guide to maintain
- **Cleaner repository**: Focus on code, not doc management
- **Professional appearance**: Organized vs overwhelming
### **4. Preserved Everything Safely**
- **Zero data loss**: All content migrated or archived
- **Safe rollback**: Everything recoverable if needed
- **Git history intact**: Full change history preserved
- **Backup strategy**: Multiple recovery options available
## 🔗 **Updated Navigation**
### **From Repository:**
1. **README.md** → Quick start + wiki links
2. **Wiki Home** → Organized guide navigation
3. **Installation Guide** → Everything about setup
4. **Platform Setup** → OS-specific optimizations
5. **Integration Guide** → Tool-specific instructions
### **User Journey Flow:**
```
GitHub Repo → README (Quick Start) → Wiki → Success
↓ ↓ ↓
Browse Try it out Deep dive
Project in 2 minutes when needed
```
## ✨ **Success Metrics**
### **Quantitative Results:**
- **Documentation files**: 87 → ~60 (30% reduction in repo)
- **Installation guides**: 6 → 1 comprehensive wiki page
- **Maintenance locations**: 6+ files → 1 wiki page per topic
- **README size**: 56KB → 8KB (86% reduction)
- **Archive safety**: 26 files safely preserved
### **Qualitative Improvements:**
- ✅ **Clarity**: Single source of truth vs multiple conflicting guides
- ✅ **Usability**: Clear user journey vs overwhelming choices
- ✅ **Maintainability**: Update once vs updating 6+ files
- ✅ **Professionalism**: Organized wiki vs scattered documentation
- ✅ **Discoverability**: Logical structure vs hidden information
## 🏆 **Project Impact**
This consolidation transforms MCP Memory Service from a project with **overwhelming documentation chaos** into one with **clear, professional, maintainable documentation**.
### **For Users:**
- **Faster onboarding** - clear path from discovery to success
- **Less confusion** - single authoritative source per topic
- **Better experience** - logical progression through setup
### **For Maintainers:**
- **Easier updates** - change wiki once vs 6+ repository files
- **Reduced complexity** - fewer files to manage and sync
- **Professional image** - organized documentation reflects code quality
### **For Project:**
- **Better adoption** - users can actually figure out how to install
- **Reduced support burden** - comprehensive guides answer questions
- **Community growth** - professional appearance attracts contributors
## 🎉 **Conclusion**
The documentation consolidation is **100% complete and successful**. We've transformed an overwhelming collection of 87 scattered markdown files into a **clean, professional, maintainable documentation system** with:
- ✅ **Streamlined repository** focused on code
- ✅ **Comprehensive wiki** with consolidated guides
- ✅ **Better user experience** with clear paths
- ✅ **Reduced maintenance burden** for updates
- ✅ **Safe preservation** of all original content
**The MCP Memory Service now has documentation that matches the quality of its code.** 🚀
---
*Documentation consolidation completed successfully on 2025-08-23. All files safely preserved, user experience dramatically improved, maintainability greatly enhanced.*
```
--------------------------------------------------------------------------------
/docs/deployment/systemd-service.md:
--------------------------------------------------------------------------------
```markdown
# Systemd Service Setup for Linux
This guide explains how to set up the MCP Memory HTTP server as a systemd service on Linux for automatic startup and management.
## Overview
The systemd service provides:
- ✅ **Automatic startup** on user login
- ✅ **Persistent operation** even when logged out (with linger enabled)
- ✅ **Automatic restarts** on failure
- ✅ **Centralized logging** via journald
- ✅ **Easy management** via systemctl commands
## Installation
### Quick Install
```bash
# Run the installation script
cd /path/to/mcp-memory-service
bash scripts/service/install_http_service.sh
```
The script will:
1. Check prerequisites (.env file, venv)
2. Ask whether to install as user or system service
3. Copy service file to appropriate location
4. Reload systemd configuration
5. Show next steps
### Manual Installation
If you prefer manual installation:
**1. User Service (Recommended - No sudo required):**
```bash
# Create directory
mkdir -p ~/.config/systemd/user
# Copy service file
cp scripts/service/mcp-memory-http.service ~/.config/systemd/user/
# Reload systemd
systemctl --user daemon-reload
# Start service
systemctl --user start mcp-memory-http.service
# Enable auto-start
systemctl --user enable mcp-memory-http.service
# Enable linger (runs even when logged out)
loginctl enable-linger $USER
```
**2. System Service (Requires sudo):**
```bash
# Copy service file
sudo cp scripts/service/mcp-memory-http.service /etc/systemd/system/
# Edit to ensure paths are correct
sudo nano /etc/systemd/system/mcp-memory-http.service
# Reload systemd
sudo systemctl daemon-reload
# Start service
sudo systemctl start mcp-memory-http.service
# Enable auto-start
sudo systemctl enable mcp-memory-http.service
```
## Service Management
### Basic Commands
```bash
# Start service
systemctl --user start mcp-memory-http.service
# Stop service
systemctl --user stop mcp-memory-http.service
# Restart service
systemctl --user restart mcp-memory-http.service
# Check status
systemctl --user status mcp-memory-http.service
# Enable auto-start on login
systemctl --user enable mcp-memory-http.service
# Disable auto-start
systemctl --user disable mcp-memory-http.service
```
### Viewing Logs
```bash
# Live logs (follow mode)
journalctl --user -u mcp-memory-http.service -f
# Last 50 lines
journalctl --user -u mcp-memory-http.service -n 50
# Logs since boot
journalctl --user -u mcp-memory-http.service -b
# Logs for specific time range
journalctl --user -u mcp-memory-http.service --since "2 hours ago"
# Logs with priority filter (only errors and above)
journalctl --user -u mcp-memory-http.service -p err
```
## Configuration
The service file is located at:
- User service: `~/.config/systemd/user/mcp-memory-http.service`
- System service: `/etc/systemd/system/mcp-memory-http.service`
### Service File Structure
```ini
[Unit]
Description=MCP Memory Service HTTP Server (Hybrid Backend)
Documentation=https://github.com/doobidoo/mcp-memory-service
After=network.target network-online.target
Wants=network-online.target
[Service]
Type=simple
WorkingDirectory=/home/hkr/repositories/mcp-memory-service
Environment=PATH=/home/hkr/repositories/mcp-memory-service/venv/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
Environment=PYTHONPATH=/home/hkr/repositories/mcp-memory-service/src
EnvironmentFile=/home/hkr/repositories/mcp-memory-service/.env
ExecStart=/home/hkr/repositories/mcp-memory-service/venv/bin/python /home/hkr/repositories/mcp-memory-service/scripts/server/run_http_server.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier=mcp-memory-http
# Security hardening
NoNewPrivileges=true
PrivateTmp=true
[Install]
WantedBy=default.target
```
### Important Configuration Points
1. **User Service vs System Service:**
- User services run as your user (recommended)
- System services run at boot (before user login)
- User services can't have `User=` and `Group=` directives
- User services use `WantedBy=default.target` not `multi-user.target`
2. **Environment Loading:**
- Service loads `.env` file via `EnvironmentFile` directive
- All environment variables are available to the service
- Changes to `.env` require service restart
3. **Working Directory:**
- Service runs from project root
- Relative paths in code work correctly
- Database paths should be absolute or relative to working directory
## Troubleshooting
### Service Won't Start
**Check status for errors:**
```bash
systemctl --user status mcp-memory-http.service
```
**Common Issues:**
1. **GROUP error (status=216/GROUP):**
- Remove `User=` and `Group=` directives from user service file
- These are only for system services
2. **Permission denied:**
- Check that `.env` file is readable by your user
- Check that venv and scripts are accessible
- For system services, ensure files are owned by service user
3. **Port already in use:**
```bash
lsof -i :8000
# Kill existing process or change port in .env
```
4. **Missing dependencies:**
```bash
# Verify venv is set up
ls -la venv/bin/python
# Reinstall if needed
python -m venv venv
source venv/bin/activate
pip install -e .
```
### Service Fails to Enable
**Error:** "Unit is added as a dependency to a non-existent unit"
**Solution:** For user services, change `WantedBy=` target:
```bash
# Edit service file
nano ~/.config/systemd/user/mcp-memory-http.service
# Change this:
[Install]
WantedBy=multi-user.target
# To this:
[Install]
WantedBy=default.target
# Reload and reenable
systemctl --user daemon-reload
systemctl --user reenable mcp-memory-http.service
```
### Logs Show Configuration Errors
**Check environment loading:**
```bash
# View effective environment
systemctl --user show-environment
# Test service startup manually
cd /path/to/mcp-memory-service
source .env
venv/bin/python scripts/server/run_http_server.py
```
### Service Stops After Logout
**Enable linger to keep user services running:**
```bash
loginctl enable-linger $USER
# Verify
loginctl show-user $USER | grep Linger
# Should show: Linger=yes
```
## Performance Monitoring
```bash
# Check memory usage
systemctl --user status mcp-memory-http.service | grep Memory
# Check CPU usage
systemctl --user status mcp-memory-http.service | grep CPU
# Monitor in real-time
watch -n 2 'systemctl --user status mcp-memory-http.service | grep -E "Memory|CPU"'
# Detailed resource usage
systemd-cgtop --user
```
## Security Considerations
The service includes basic security hardening:
- `NoNewPrivileges=true` - Prevents privilege escalation
- `PrivateTmp=true` - Isolated /tmp directory
- User services run with user permissions (no root access)
For system services, consider additional hardening:
- `ProtectSystem=strict` - Read-only access to system directories
- `ProtectHome=read-only` - Limited home directory access
- `ReadWritePaths=` - Explicitly allow write access to database paths
**Note:** Some security directives may conflict with application requirements. Test thoroughly when adding restrictions.
## Uninstallation
```bash
# Stop and disable service
systemctl --user stop mcp-memory-http.service
systemctl --user disable mcp-memory-http.service
# Remove service file
rm ~/.config/systemd/user/mcp-memory-http.service
# Reload systemd
systemctl --user daemon-reload
# Optional: Disable linger if no other user services needed
loginctl disable-linger $USER
```
## See Also
- [HTTP Server Management](../http-server-management.md) - General server management
- [Troubleshooting Guide](https://github.com/doobidoo/mcp-memory-service/wiki/07-TROUBLESHOOTING) - Common issues
- [Claude Code Hooks Configuration](../../CLAUDE.md#claude-code-hooks-configuration-) - Hooks setup
- [systemd.service(5)](https://www.freedesktop.org/software/systemd/man/systemd.service.html) - systemd documentation
---
**Last Updated**: 2025-10-13
**Version**: 8.5.4
**Tested On**: Ubuntu 22.04, Debian 12, Fedora 38
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/ingestion/semtools_loader.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Semtools document loader for enhanced text extraction using Rust-based parser.
Uses semtools CLI (https://github.com/run-llama/semtools) for superior document
parsing with LlamaParse API integration. Supports PDF, DOCX, PPTX and other formats.
"""
import logging
import asyncio
import os
from pathlib import Path
from typing import AsyncGenerator, Dict, Any, Optional
import shutil
from .base import DocumentLoader, DocumentChunk
from .chunker import TextChunker, ChunkingStrategy
logger = logging.getLogger(__name__)
class SemtoolsLoader(DocumentLoader):
"""
Document loader using semtools for superior text extraction.
Leverages semtools' Rust-based parser with LlamaParse API for:
- Advanced OCR capabilities
- Table extraction
- Multi-format support (PDF, DOCX, PPTX, etc.)
Falls back gracefully when semtools is not available.
"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
"""
Initialize Semtools loader.
Args:
chunk_size: Target size for text chunks in characters
chunk_overlap: Number of characters to overlap between chunks
"""
super().__init__(chunk_size, chunk_overlap)
self.supported_extensions = ['pdf', 'docx', 'doc', 'pptx', 'xlsx']
self.chunker = TextChunker(ChunkingStrategy(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
respect_paragraph_boundaries=True
))
# Check semtools availability
self._semtools_available = self._check_semtools_availability()
# Get API key from environment
self.api_key = os.getenv('LLAMAPARSE_API_KEY')
if self._semtools_available and not self.api_key:
logger.warning(
"Semtools is available but LLAMAPARSE_API_KEY not set. "
"Document parsing quality may be limited."
)
def _check_semtools_availability(self) -> bool:
"""
Check if semtools is installed and available.
Returns:
True if semtools CLI is available
"""
semtools_path = shutil.which('semtools')
if semtools_path:
logger.info(f"Semtools found at: {semtools_path}")
return True
else:
logger.debug(
"Semtools not available. Install with: npm i -g @llamaindex/semtools "
"or cargo install semtools"
)
return False
def can_handle(self, file_path: Path) -> bool:
"""
Check if this loader can handle the file.
Args:
file_path: Path to the file to check
Returns:
True if semtools is available and file format is supported
"""
if not self._semtools_available:
return False
return (file_path.suffix.lower().lstrip('.') in self.supported_extensions and
file_path.exists() and
file_path.is_file())
async def extract_chunks(self, file_path: Path, **kwargs) -> AsyncGenerator[DocumentChunk, None]:
"""
Extract text chunks from a document using semtools.
Args:
file_path: Path to the document file
**kwargs: Additional options (currently unused)
Yields:
DocumentChunk objects containing parsed content
Raises:
FileNotFoundError: If the file doesn't exist
ValueError: If semtools is not available or parsing fails
"""
await self.validate_file(file_path)
if not self._semtools_available:
raise ValueError(
"Semtools is not available. Install with: npm i -g @llamaindex/semtools"
)
logger.info(f"Extracting chunks from {file_path} using semtools")
try:
# Parse document to markdown using semtools
markdown_content = await self._parse_with_semtools(file_path)
# Get base metadata
base_metadata = self.get_base_metadata(file_path)
base_metadata.update({
'extraction_method': 'semtools',
'parser_backend': 'llamaparse',
'content_type': 'markdown',
'has_api_key': bool(self.api_key)
})
# Chunk the markdown content
chunks = self.chunker.chunk_text(markdown_content, base_metadata)
chunk_index = 0
for chunk_text, chunk_metadata in chunks:
yield DocumentChunk(
content=chunk_text,
metadata=chunk_metadata,
chunk_index=chunk_index,
source_file=file_path
)
chunk_index += 1
except Exception as e:
logger.error(f"Error processing {file_path} with semtools: {e}")
raise ValueError(f"Failed to parse document: {str(e)}") from e
async def _parse_with_semtools(self, file_path: Path) -> str:
"""
Parse document using semtools CLI.
Args:
file_path: Path to document to parse
Returns:
Markdown content extracted from document
Raises:
RuntimeError: If semtools command fails
"""
# Prepare semtools command
cmd = ['semtools', 'parse', str(file_path)]
# Set up environment with API key if available
env = os.environ.copy()
if self.api_key:
env['LLAMAPARSE_API_KEY'] = self.api_key
try:
# Run semtools parse command
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
# Wait for completion with timeout
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=300 # 5 minute timeout for large documents
)
if process.returncode != 0:
error_msg = stderr.decode('utf-8', errors='replace')
logger.error(f"Semtools parsing failed: {error_msg}")
raise RuntimeError(f"Semtools returned error: {error_msg}")
# Parse markdown output
markdown_content = stdout.decode('utf-8', errors='replace')
if not markdown_content.strip():
logger.warning(f"Semtools returned empty content for {file_path}")
raise RuntimeError("Semtools returned empty content")
logger.debug(f"Successfully parsed {file_path}, extracted {len(markdown_content)} characters")
return markdown_content
except asyncio.TimeoutError:
logger.error(f"Semtools parsing timed out for {file_path}")
raise RuntimeError("Document parsing timed out after 5 minutes")
except Exception as e:
logger.error(f"Error running semtools: {e}")
raise
# Register the semtools loader
def _register_semtools_loader():
"""Register semtools loader with the registry."""
try:
from .registry import register_loader
register_loader(SemtoolsLoader, ['pdf', 'docx', 'doc', 'pptx', 'xlsx'])
logger.debug("Semtools loader registered successfully")
except ImportError:
logger.debug("Registry not available during import")
# Auto-register when module is imported
_register_semtools_loader()
```
--------------------------------------------------------------------------------
/archive/docs-root-cleanup-2025-08-23/lm_studio_system_prompt.md:
--------------------------------------------------------------------------------
```markdown
# LM Studio System Prompt for MCP Tools
You are an AI assistant with access to various tools through the Model Context Protocol (MCP). You have access to memory storage, database operations, and other utility functions.
## Why This System Prompt Exists
**Normally, MCP servers provide tool schemas through the `tools/list` endpoint** - the client shouldn't need explicit instructions. However, this system prompt exists because:
1. **LM Studio Implementation Gap**: Some MCP clients struggle with complex JSON schema interpretation
2. **Model Training Limitation**: The openai/gpt-oss-20b model was failing to generate proper tool calls despite receiving correct schemas
3. **Legacy Server Compatibility**: This connects to the legacy MCP Memory Service server with specific parameter expectations
**This prompt supplements, not replaces, the official MCP tool schemas.** It provides concrete examples when schema interpretation fails.
## Available Tool Categories:
### Memory Tools (MCP Memory Service):
- `check_database_health` - Check database status and performance
- `store_memory` - Store information with tags and metadata
- `retrieve_memory` - Search and retrieve stored memories
- `recall_memory` - Time-based memory retrieval with natural language
- `search_by_tag` - Find memories by specific tags
- `delete_memory` - Remove specific memories
- `delete_by_tag` - Bulk delete memories by tags
- `optimize_db` - Optimize database performance
### Other Available Tools:
- File operations, web search, code analysis, etc. (varies by MCP setup)
## Tool Usage Guidelines:
### 1. When to Use Tools:
- **Always use tools** when the user explicitly mentions operations like:
- "check database health", "db health", "database status"
- "store this information", "remember this", "save to memory"
- "search for", "find", "recall", "retrieve"
- "delete", "remove", "clear"
- **Use tools** for data operations, file access, external queries
- **Respond directly** for general questions, explanations, or conversations
### 2. Tool Call Format - CRITICAL:
When calling a tool, use this EXACT JSON structure:
**For store_memory (most common):**
```json
{"name": "store_memory", "arguments": {"content": "your text here", "metadata": {"tags": ["tag1", "tag2"], "type": "fact"}}}
```
**IMPORTANT: Parameter Rules for store_memory:**
- `content` (REQUIRED): String containing the information to store
- `metadata` (OPTIONAL): Object containing:
- `tags` (OPTIONAL): Array of strings - e.g., ["database", "health", "check"]
- `type` (OPTIONAL): String - "note", "fact", "reminder", "decision", etc.
**NOTE: The MCP server expects tags INSIDE the metadata object, not as a separate parameter!**
**Other common tool calls:**
- Database health: `{"name": "check_database_health", "arguments": {}}`
- Retrieve: `{"name": "retrieve_memory", "arguments": {"query": "search terms"}}`
- Recall: `{"name": "recall_memory", "arguments": {"query": "last week"}}`
- Delete: `{"name": "delete_memory", "arguments": {"memory_id": "12345"}}`
**CRITICAL: JSON Formatting Rules:**
1. `tags` must be an ARRAY: `["tag1", "tag2"]` NOT a string `"tag1,tag2"`
2. All strings must be properly escaped (use `\"` for quotes inside strings)
3. `content` parameter is ALWAYS required for store_memory
4. No trailing commas in JSON objects
### 3. Interpreting User Requests:
- "check db health" → use `check_database_health`
- "remember that X happened" → use `store_memory` with content="X happened"
- "what do you know about Y" → use `retrieve_memory` with query="Y"
- "find memories from last week" → use `recall_memory` with query="last week"
- "delete memories about Z" → use `search_by_tag` first, then `delete_memory`
### 3.1. EXACT Examples for Common Requests:
**"Memorize the database health results":**
```json
{"name": "store_memory", "arguments": {"content": "Database health check completed successfully. SQLite-vec backend is healthy with 439 memories stored (2.36 MB).", "metadata": {"tags": ["database", "health", "status"], "type": "reference"}}}
```
**"Remember that we got Memory MCP running in LMStudio":**
```json
{"name": "store_memory", "arguments": {"content": "Successfully got Memory MCP running in LMStudio. The integration is working properly.", "metadata": {"tags": ["lmstudio", "mcp", "integration", "success"], "type": "fact"}}}
```
**"Store this configuration":**
```json
{"name": "store_memory", "arguments": {"content": "Configuration details: [insert config here]", "metadata": {"tags": ["configuration", "setup"], "type": "note"}}}
```
### 4. Response Format:
After calling a tool:
1. **Briefly summarize** what you did
2. **Present the results** in a clear, user-friendly format
3. **Offer follow-up actions** if relevant
Example response flow:
```
I'll check the database health for you.
{"name": "check_database_health", "arguments": {}}
The database is healthy with 439 memories stored (2.36 MB). The SQLite-vec backend is working properly with the all-MiniLM-L6-v2 embedding model.
Would you like me to run any other database operations?
```
### 5. Common Patterns:
- For storage: Always include relevant tags like ["date", "project", "category"]
- For retrieval: Start with broad searches, then narrow down
- For health checks: Run without arguments first, then investigate specific issues
- For deletion: Always search first to confirm what will be deleted
### 6. Error Handling:
- If a tool call fails, explain what went wrong and suggest alternatives
- For missing information, ask the user for clarification
- If unsure which tool to use, describe your options and ask the user
### 7. Common JSON Parsing Errors - AVOID THESE:
**❌ WRONG: String instead of array for tags**
```json
{"name": "store_memory", "arguments": {"content": "text", "metadata": {"tags": "database,health"}}}
```
**✅ CORRECT: Array for tags (inside metadata)**
```json
{"name": "store_memory", "arguments": {"content": "text", "metadata": {"tags": ["database", "health"]}}}
```
**❌ WRONG: Missing content parameter**
```json
{"name": "store_memory", "arguments": {"metadata": {"tags": ["database"], "type": "fact"}}}
```
**✅ CORRECT: Content parameter included**
```json
{"name": "store_memory", "arguments": {"content": "Actual information to store", "metadata": {"tags": ["database"]}}}
```
**❌ WRONG: Tags as separate parameter (wrong for legacy server)**
```json
{"name": "store_memory", "arguments": {"content": "text", "tags": ["tag1"], "memory_type": "fact"}}
```
**✅ CORRECT: Tags inside metadata object (legacy server format)**
```json
{"name": "store_memory", "arguments": {"content": "text", "metadata": {"tags": ["tag1"], "type": "fact"}}}
```
### 8. Debugging Tool Calls:
If a tool call fails with "params requires property 'content'":
1. Ensure `content` is present and is a string
2. Check that `tags` is an array of strings, not a string
3. Verify JSON syntax (no trailing commas, proper escaping)
4. Use the exact examples above as templates
### 9. COMPLETE WORKING EXAMPLE:
For the request "Memorize the result and the fact that we got the Memory MCP running in LMStudio":
**Step 1:** Call check_database_health (if needed)
```json
{"name": "check_database_health", "arguments": {}}
```
**Step 2:** Store the memory with CORRECT syntax:
```json
{"name": "store_memory", "arguments": {"content": "Memory MCP is successfully running in LMStudio. Database health check shows SQLite-vec backend is healthy with 439 memories stored (2.36 MB). Integration confirmed working.", "metadata": {"tags": ["lmstudio", "mcp", "integration", "success", "database"], "type": "fact"}}}
```
**✅ This format will work because:**
- `content` is present and contains the actual information
- `metadata.tags` is an array of strings (not a separate parameter)
- `metadata.type` is a string inside the metadata object
- All JSON syntax is correct
- Matches the legacy MCP server schema that LM Studio connects to
Remember: **Be proactive with tool use**. When users mention operations that tools can handle, use them immediately rather than just describing what you could do.
```
--------------------------------------------------------------------------------
/scripts/validation/verify_pytorch_windows.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Verification script for PyTorch installation on Windows.
This script checks if PyTorch is properly installed and configured for Windows.
"""
import os
import sys
import platform
import subprocess
import importlib.util
def print_header(text):
"""Print a formatted header."""
print("\n" + "=" * 80)
print(f" {text}")
print("=" * 80)
def print_info(text):
"""Print formatted info text."""
print(f" → {text}")
def print_success(text):
"""Print formatted success text."""
print(f" ✅ {text}")
def print_error(text):
"""Print formatted error text."""
print(f" ❌ ERROR: {text}")
def print_warning(text):
"""Print formatted warning text."""
print(f" ⚠️ {text}")
def check_system():
"""Check if running on Windows."""
system = platform.system().lower()
if system != "windows":
print_warning(f"This script is designed for Windows, but you're running on {system.capitalize()}")
else:
print_info(f"Running on {platform.system()} {platform.release()}")
print_info(f"Python version: {platform.python_version()}")
print_info(f"Architecture: {platform.machine()}")
return system == "windows"
def check_pytorch_installation():
"""Check if PyTorch is installed and properly configured."""
try:
import torch
print_success(f"PyTorch is installed (version {torch.__version__})")
# Check if PyTorch was installed from the correct index URL
if hasattr(torch, '_C'):
print_success("PyTorch C extensions are available")
else:
print_warning("PyTorch C extensions might not be properly installed")
# Check CUDA availability
if torch.cuda.is_available():
print_success(f"CUDA is available (version {torch.version.cuda})")
print_info(f"GPU: {torch.cuda.get_device_name(0)}")
print_info(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / (1024**3):.2f} GB")
else:
print_info("CUDA is not available, using CPU only")
# Check if DirectML is available
try:
import torch_directml
print_success(f"DirectML is available (version {torch_directml.__version__})")
# Check for Intel ARC GPU
try:
ps_cmd = "Get-WmiObject Win32_VideoController | Select-Object Name | Format-List"
gpu_output = subprocess.check_output(['powershell', '-Command', ps_cmd],
stderr=subprocess.DEVNULL,
universal_newlines=True)
if 'Intel(R) Arc(TM)' in gpu_output or 'Intel ARC' in gpu_output:
print_success("Intel ARC GPU detected, DirectML support is available")
elif 'Intel' in gpu_output:
print_success("Intel GPU detected, DirectML support is available")
elif 'AMD' in gpu_output or 'Radeon' in gpu_output:
print_success("AMD GPU detected, DirectML support is available")
except (subprocess.SubprocessError, FileNotFoundError):
pass
# Test a simple DirectML tensor operation
try:
dml = torch_directml.device()
x_dml = torch.rand(5, 3, device=dml)
y_dml = torch.rand(5, 3, device=dml)
z_dml = x_dml + y_dml
print_success("DirectML tensor operations work correctly")
except Exception as e:
print_warning(f"DirectML tensor operations failed: {e}")
except ImportError:
print_info("DirectML is not available")
# Check for Intel/AMD GPUs that could benefit from DirectML
try:
ps_cmd = "Get-WmiObject Win32_VideoController | Select-Object Name | Format-List"
gpu_output = subprocess.check_output(['powershell', '-Command', ps_cmd],
stderr=subprocess.DEVNULL,
universal_newlines=True)
if 'Intel(R) Arc(TM)' in gpu_output or 'Intel ARC' in gpu_output:
print_warning("Intel ARC GPU detected, but DirectML is not installed")
print_info("Consider installing torch-directml for better performance")
elif 'Intel' in gpu_output or 'AMD' in gpu_output or 'Radeon' in gpu_output:
print_warning("Intel/AMD GPU detected, but DirectML is not installed")
print_info("Consider installing torch-directml for better performance")
except (subprocess.SubprocessError, FileNotFoundError):
pass
# Test a simple tensor operation
try:
x = torch.rand(5, 3)
y = torch.rand(5, 3)
z = x + y
print_success("Basic tensor operations work correctly")
except Exception as e:
print_error(f"Failed to perform basic tensor operations: {e}")
return False
return True
except ImportError:
print_error("PyTorch is not installed")
return False
except Exception as e:
print_error(f"Error checking PyTorch installation: {e}")
return False
def suggest_installation():
"""Suggest PyTorch installation commands."""
print_header("Installation Suggestions")
print_info("To install PyTorch for Windows, use one of the following commands:")
print_info("\nFor CUDA support (NVIDIA GPUs):")
print("pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118")
print_info("\nFor CPU-only:")
print("pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu")
print_info("\nFor DirectML support (AMD/Intel GPUs):")
print("pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu")
print("pip install torch-directml>=0.2.0")
print_info("\nFor Intel ARC Pro Graphics:")
print("pip install torch==2.2.0 torchvision==2.2.0 torchaudio==2.2.0 --index-url https://download.pytorch.org/whl/cpu")
print("pip install torch-directml>=0.2.0")
print_info("\nFor dual GPU setups (NVIDIA + Intel):")
print("pip install torch==2.2.0 torchvision==2.2.0 torchaudio==2.2.0 --index-url https://download.pytorch.org/whl/cu118")
print("pip install torch-directml>=0.2.0")
print_info("\nAfter installing PyTorch, run this script again to verify the installation.")
def main():
"""Main function."""
print_header("PyTorch Windows Installation Verification")
is_windows = check_system()
if not is_windows:
print_warning("This script is designed for Windows, but may still provide useful information")
pytorch_installed = check_pytorch_installation()
if not pytorch_installed:
suggest_installation()
return 1
print_header("Verification Complete")
print_success("PyTorch is properly installed and configured for Windows")
return 0
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/scripts/quality/README_PHASE2.md:
--------------------------------------------------------------------------------
```markdown
# Phase 2 Complexity Reduction - Quick Reference
## Overview
This guide provides a quick reference for implementing Phase 2 complexity reductions identified in `phase2_complexity_analysis.md`.
## Quick Stats
| Metric | Current | Target | Improvement |
|--------|---------|--------|-------------|
| **Complexity Score** | 40/100 | 50-55/100 | +10-15 points |
| **Overall Health** | 63/100 | 66-68/100 | +3 points |
| **Functions Analyzed** | 10 | - | - |
| **Total Time Estimate** | - | 12-15 hours | - |
| **Complexity Reduction** | - | -39 points | - |
## Priority Matrix
### High Priority (Week 1) - 7 hours
Critical path functions that need careful attention:
1. **install.py::configure_paths()** (15 → 5, -10 points, 4h)
- Extract platform detection
- Extract storage setup
- Extract Claude config update
2. **cloudflare.py::_search_by_tags_internal()** (13 → 8, -5 points, 1.75h)
- Extract tag normalization
- Extract SQL query builder
3. **consolidator.py::consolidate()** (12 → 8, -4 points, 1.25h)
- Extract sync context manager
- Extract phase guards
### Medium Priority (Week 2) - 2.75 hours
Analytics functions (non-critical):
4. **analytics.py::get_memory_growth()** (11 → 6, -5 points, 1.75h)
- Extract period configuration
- Extract interval aggregation
5. **analytics.py::get_tag_usage_analytics()** (10 → 6, -4 points, 1h)
- Extract storage stats retrieval
- Extract tag stats calculation
### Low Priority (Weeks 2-3) - 4.25 hours
Quick wins with minimal risk:
6. **install.py::detect_gpu()** (10 → 7, -3 points, 1h)
7. **cloudflare.py::get_memory_timestamps()** (9 → 7, -2 points, 45m)
8. **consolidator.py::_get_memories_for_horizon()** (10 → 8, -2 points, 45m)
9. **analytics.py::get_activity_breakdown()** (9 → 7, -2 points, 1h)
10. **analytics.py::get_memory_type_distribution()** (9 → 7, -2 points, 45m)
## Refactoring Patterns Cheat Sheet
### Pattern 1: Extract Method
**When to use:** Function > 50 lines, nested logic, repeated code
**Example:**
```python
# Before
def complex_function():
# 20 lines of platform detection
# 30 lines of setup logic
# 15 lines of validation
# After
def detect_platform(): ...
def setup_system(): ...
def validate_config(): ...
def complex_function():
platform = detect_platform()
setup_system(platform)
validate_config()
```
### Pattern 2: Dict Lookup
**When to use:** if/elif/else chains with similar structure
**Example:**
```python
# Before
if period == "week":
days = 7
elif period == "month":
days = 30
elif period == "year":
days = 365
# After
PERIOD_DAYS = {"week": 7, "month": 30, "year": 365}
days = PERIOD_DAYS[period]
```
### Pattern 3: Guard Clause
**When to use:** Nested if statements, early validation
**Example:**
```python
# Before
def process(data):
if data is not None:
if data.valid():
if data.ready():
return process_data(data)
return None
# After
def process(data):
if data is None:
return None
if not data.valid():
return None
if not data.ready():
return None
return process_data(data)
```
### Pattern 4: Context Manager
**When to use:** Resource management, setup/teardown logic
**Example:**
```python
# Before
def process():
resource = acquire()
try:
do_work(resource)
finally:
release(resource)
# After
class ResourceManager:
async def __aenter__(self): ...
async def __aexit__(self, *args): ...
async def process():
async with ResourceManager() as resource:
do_work(resource)
```
### Pattern 5: Configuration Object
**When to use:** Related configuration values, multiple parameters
**Example:**
```python
# Before
def analyze(period, days, interval, format):
...
# After
@dataclass
class AnalysisConfig:
period: str
days: int
interval: int
format: str
def analyze(config: AnalysisConfig):
...
```
## Testing Checklist
For each refactored function:
- [ ] **Unit tests pass** - Run `pytest tests/test_<module>.py`
- [ ] **Integration tests pass** - Run `pytest tests/integration/`
- [ ] **No performance regression** - Benchmark before/after
- [ ] **API contracts unchanged** - Check response formats
- [ ] **Edge cases tested** - Null inputs, empty lists, errors
- [ ] **Documentation updated** - Docstrings, comments
## Implementation Order
### Sequential (Single Developer)
1. Week 1: High priority functions (7h)
2. Week 2: Medium priority functions (2.75h)
3. Week 3: Low priority quick wins (4.25h)
**Total:** 14 hours over 3 weeks
### Parallel (Multiple Developers)
1. **Developer A:** configure_paths, detect_gpu (5h)
2. **Developer B:** cloudflare functions (2.5h)
3. **Developer C:** consolidator functions (2h)
4. **Developer D:** analytics functions (4.75h)
**Total:** ~7 hours (with coordination overhead: 9-10 hours)
### Prioritized (Critical Path Only)
Focus on high-priority functions only:
1. configure_paths (4h)
2. _search_by_tags_internal (1.75h)
3. consolidate (1.25h)
**Total:** 7 hours for core improvements
## Risk Mitigation
### Critical Path Functions
**Extra caution required:**
- _search_by_tags_internal (core search)
- consolidate (memory consolidation)
- _get_memories_for_horizon (consolidation)
**Safety measures:**
- Create feature branch for each
- Comprehensive integration tests
- Performance benchmarking
- Staged rollout (dev → staging → production)
### Low-Risk Functions
**Can be batched:**
- All analytics endpoints (read-only)
- Setup functions (non-critical path)
**Safety measures:**
- Standard unit testing
- Manual smoke testing
- Can be rolled back easily
## Success Metrics
### Quantitative Goals
- [ ] Complexity score: 40 → 50+ (+10 points minimum)
- [ ] Overall health: 63 → 66+ (+3 points minimum)
- [ ] All 10 functions refactored successfully
- [ ] Zero breaking changes
- [ ] All tests passing
### Qualitative Goals
- [ ] Code easier to understand (peer review)
- [ ] Functions are testable in isolation
- [ ] Better separation of concerns
- [ ] Improved maintainability
## Common Pitfalls to Avoid
### 1. Over-Extraction
**Problem:** Creating too many tiny functions
**Solution:** Extract only when it improves clarity (10+ lines minimum)
### 2. Breaking API Contracts
**Problem:** Changing function signatures
**Solution:** Keep public APIs unchanged, refactor internals only
### 3. Performance Regression
**Problem:** Excessive function calls overhead
**Solution:** Benchmark before/after, inline hot paths if needed
### 4. Incomplete Testing
**Problem:** Missing edge cases
**Solution:** Test error paths, null inputs, boundary conditions
### 5. Rushing Critical Functions
**Problem:** Breaking core functionality
**Solution:** Extra time for testing critical path functions
## Command Reference
### Run Quality Analysis
```bash
# Run pyscn baseline report
python -m pyscn baseline --output scripts/quality/baseline_report.txt
# Check specific function complexity
python -m radon cc src/mcp_memory_service/storage/cloudflare.py -a
# Check cyclomatic complexity for all files
python -m radon cc src/ -a
```
### Run Tests
```bash
# All tests
pytest tests/
# Specific module
pytest tests/test_storage.py
# Integration tests only
pytest tests/integration/
# With coverage
pytest tests/ --cov=mcp_memory_service --cov-report=html
```
### Benchmark Performance
```bash
# Before refactoring
python scripts/benchmarks/run_benchmarks.py --baseline
# After refactoring
python scripts/benchmarks/run_benchmarks.py --compare
```
## Getting Help
### Resources
- **Phase 2 Analysis:** `scripts/quality/phase2_complexity_analysis.md` (detailed proposals)
- **Phase 1 Results:** `scripts/quality/phase1_dead_code_analysis.md` (lessons learned)
- **Complexity Guide:** `scripts/quality/complexity_scoring_guide.md` (understanding metrics)
### Questions?
- Review the detailed analysis for each function
- Check the refactoring pattern examples
- Test incrementally after each change
- Ask for peer review on critical functions
---
**Last Updated:** 2024-11-24
**Next Review:** After Phase 2 completion
```
--------------------------------------------------------------------------------
/scripts/maintenance/restore_from_json_export.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Restore Timestamps from Clean JSON Export
Recovers corrupted timestamps using the clean export from the other MacBook
(v8.26, before the hybrid sync bug). Matches memories by content_hash and
restores their original creation timestamps.
This script:
- Reads clean timestamp mapping (content_hash → ISO timestamp)
- Matches memories in current database by content_hash
- Updates created_at and created_at_iso with original timestamps
- Preserves memories not in mapping (created after the clean export)
Usage:
python scripts/maintenance/restore_from_json_export.py [--dry-run|--apply]
"""
import json
import sqlite3
import sys
from datetime import datetime
from pathlib import Path
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src"))
from mcp_memory_service import config
def restore_from_json(db_path: str, mapping_file: str, dry_run: bool = True):
"""
Restore timestamps from JSON export mapping.
Args:
db_path: Path to SQLite database
mapping_file: Path to JSON file with content_hash → timestamp mapping
dry_run: If True, only show what would be changed
"""
print("=" * 80)
print("TIMESTAMP RESTORATION FROM CLEAN JSON EXPORT")
print("=" * 80)
print(f"Database: {db_path}")
print(f"Mapping: {mapping_file}")
print(f"Mode: {'DRY RUN (no changes)' if dry_run else 'LIVE (applying changes)'}")
print()
# Load clean timestamp mapping
print("Loading clean timestamp mapping...")
with open(mapping_file, 'r') as f:
clean_mapping = json.load(f)
print(f"✅ Loaded {len(clean_mapping)} clean timestamps")
print()
# Connect to database
conn = sqlite3.connect(db_path, timeout=30.0)
conn.execute('PRAGMA busy_timeout = 30000')
cursor = conn.cursor()
# Get all memories from current database
print("Analyzing current database...")
cursor.execute('''
SELECT content_hash, created_at, created_at_iso, substr(content, 1, 60)
FROM memories
''')
current_memories = cursor.fetchall()
print(f"✅ Found {len(current_memories)} memories in database")
print()
# Match and analyze
print("=" * 80)
print("MATCHING ANALYSIS:")
print("=" * 80)
matched = []
unmatched = []
already_correct = []
for content_hash, created_at, created_at_iso, content_preview in current_memories:
if content_hash in clean_mapping:
clean_timestamp = clean_mapping[content_hash]
# Check if already correct
if created_at_iso == clean_timestamp:
already_correct.append(content_hash)
else:
matched.append({
'hash': content_hash,
'current_iso': created_at_iso,
'clean_iso': clean_timestamp,
'content': content_preview
})
else:
unmatched.append({
'hash': content_hash,
'created_iso': created_at_iso,
'content': content_preview
})
print(f"✅ Matched (will restore): {len(matched)}")
print(f"✅ Already correct: {len(already_correct)}")
print(f"⏭️ Unmatched (keep as-is): {len(unmatched)}")
print()
# Show samples
print("=" * 80)
print("SAMPLE RESTORATIONS (first 10):")
print("=" * 80)
for i, mem in enumerate(matched[:10], 1):
print(f"{i}. Hash: {mem['hash'][:16]}...")
print(f" CURRENT: {mem['current_iso']}")
print(f" RESTORE: {mem['clean_iso']}")
print(f" Content: {mem['content']}...")
print()
if len(matched) > 10:
print(f" ... and {len(matched) - 10} more")
print()
# Show unmatched samples (new memories)
if unmatched:
print("=" * 80)
print("UNMATCHED MEMORIES (will keep current timestamps):")
print("=" * 80)
print(f"Total: {len(unmatched)} memories")
print("\nSample (first 5):")
for i, mem in enumerate(unmatched[:5], 1):
print(f"{i}. Hash: {mem['hash'][:16]}...")
print(f" Created: {mem['created_iso']}")
print(f" Content: {mem['content']}...")
print()
if dry_run:
print("=" * 80)
print("DRY RUN COMPLETE - No changes made")
print("=" * 80)
print(f"Would restore {len(matched)} timestamps")
print(f"Would preserve {len(unmatched)} new memories")
print("\nTo apply changes, run with --apply flag")
conn.close()
return
# Confirm before proceeding
print("=" * 80)
print(f"⚠️ ABOUT TO RESTORE {len(matched)} TIMESTAMPS")
print("=" * 80)
response = input("Continue with restoration? [y/N]: ")
if response.lower() != 'y':
print("Restoration cancelled")
conn.close()
return
# Apply restorations
print("\nRestoring timestamps...")
restored_count = 0
failed_count = 0
for mem in matched:
try:
content_hash = mem['hash']
clean_iso = mem['clean_iso']
# Convert ISO to Unix timestamp
dt = datetime.fromisoformat(clean_iso.replace('Z', '+00:00'))
clean_unix = dt.timestamp()
# Update database
cursor.execute('''
UPDATE memories
SET created_at = ?, created_at_iso = ?
WHERE content_hash = ?
''', (clean_unix, clean_iso, content_hash))
restored_count += 1
if restored_count % 100 == 0:
print(f" Progress: {restored_count}/{len(matched)} restored...")
conn.commit() # Commit in batches
except Exception as e:
print(f" Error restoring {content_hash[:16]}: {e}")
failed_count += 1
# Final commit
conn.commit()
# Verify results
cursor.execute('''
SELECT created_at_iso, COUNT(*) as count
FROM memories
GROUP BY DATE(created_at_iso)
ORDER BY DATE(created_at_iso) DESC
LIMIT 20
''')
print()
print("=" * 80)
print("RESTORATION COMPLETE")
print("=" * 80)
print(f"✅ Successfully restored: {restored_count}")
print(f"❌ Failed to restore: {failed_count}")
print(f"⏭️ Preserved (new memories): {len(unmatched)}")
print()
# Show date distribution
print("=" * 80)
print("TIMESTAMP DISTRIBUTION (After Restoration):")
print("=" * 80)
from collections import Counter
cursor.execute('SELECT created_at_iso FROM memories')
dates = Counter()
for row in cursor.fetchall():
date_str = row[0][:10] if row[0] else 'Unknown'
dates[date_str] += 1
for date, count in dates.most_common(15):
print(f" {date}: {count:4} memories")
# Check corruption remaining
corruption_dates = {'2025-11-16', '2025-11-17', '2025-11-18'}
corrupted_remaining = sum(count for date, count in dates.items() if date in corruption_dates)
print()
print(f"Corrupted dates remaining: {corrupted_remaining}")
print(f"Expected: ~250-400 (legitimately created Nov 16-18)")
conn.close()
if failed_count == 0 and corrupted_remaining < 500:
print("\n🎉 SUCCESS: Timestamps restored successfully!")
else:
print(f"\n⚠️ WARNING: Some issues occurred during restoration")
if __name__ == "__main__":
dry_run = '--apply' not in sys.argv
db_path = config.SQLITE_VEC_PATH
mapping_file = Path(__file__).parent.parent.parent / "clean_timestamp_mapping.json"
if not mapping_file.exists():
print(f"❌ ERROR: Mapping file not found: {mapping_file}")
print("Run Phase 1 first to extract the clean timestamp mapping")
sys.exit(1)
try:
restore_from_json(str(db_path), str(mapping_file), dry_run=dry_run)
except KeyboardInterrupt:
print("\n\nRestoration cancelled by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Restoration failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
```
--------------------------------------------------------------------------------
/scripts/migration/mcp-migration.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Enhanced migration script for MCP Memory Service.
This script handles migration of memories between different ChromaDB instances,
with support for both local and remote migrations.
"""
import sys
import os
from dotenv import load_dotenv
from pathlib import Path
import chromadb
from chromadb import HttpClient, Settings
import json
import time
from chromadb.utils import embedding_functions
# Import our environment verifier
from verify_environment import EnvironmentVerifier
def verify_environment():
"""Verify the environment before proceeding with migration"""
verifier = EnvironmentVerifier()
verifier.run_verifications()
if not verifier.print_results():
print("\n⚠️ Environment verification failed! Migration cannot proceed.")
sys.exit(1)
print("\n✓ Environment verification passed! Proceeding with migration.")
# Load environment variables
load_dotenv()
def get_claude_desktop_chroma_path():
"""Get ChromaDB path from Claude Desktop config"""
base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
config_path = os.path.join(base_path, 'claude_config', 'mcp-memory', 'chroma_db')
print(f"Using ChromaDB path: {config_path}")
return config_path
def migrate_memories(source_type, source_config, target_type, target_config):
"""
Migrate memories between ChromaDB instances.
Args:
source_type: 'local' or 'remote'
source_config: For local: path to ChromaDB, for remote: {'host': host, 'port': port}
target_type: 'local' or 'remote'
target_config: For local: path to ChromaDB, for remote: {'host': host, 'port': port}
"""
print(f"Starting migration from {source_type} to {target_type}")
try:
# Set up embedding function
embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name='all-MiniLM-L6-v2'
)
# Connect to target ChromaDB
if target_type == 'remote':
target_client = HttpClient(
host=target_config['host'],
port=target_config['port']
)
print(f"Connected to remote ChromaDB at {target_config['host']}:{target_config['port']}")
else:
settings = Settings(
anonymized_telemetry=False,
allow_reset=True,
is_persistent=True,
persist_directory=target_config
)
target_client = chromadb.Client(settings)
print(f"Connected to local ChromaDB at {target_config}")
# Get or create collection for imported memories
try:
target_collection = target_client.get_collection(
name="mcp_imported_memories",
embedding_function=embedding_function
)
print("Found existing collection 'mcp_imported_memories' on target")
except Exception:
target_collection = target_client.create_collection(
name="mcp_imported_memories",
metadata={"hnsw:space": "cosine"},
embedding_function=embedding_function
)
print("Created new collection 'mcp_imported_memories' on target")
# Connect to source ChromaDB
if source_type == 'remote':
source_client = HttpClient(
host=source_config['host'],
port=source_config['port']
)
print(f"Connected to remote ChromaDB at {source_config['host']}:{source_config['port']}")
else:
settings = Settings(
anonymized_telemetry=False,
allow_reset=True,
is_persistent=True,
persist_directory=source_config
)
source_client = chromadb.Client(settings)
print(f"Connected to local ChromaDB at {source_config}")
# List collections
collections = source_client.list_collections()
print(f"Found {len(collections)} collections in source")
for coll in collections:
print(f"- {coll.name}")
# Try to get the memory collection
try:
source_collection = source_client.get_collection(
name="memory_collection",
embedding_function=embedding_function
)
print("Found source memory collection")
except ValueError as e:
print(f"Error accessing source collection: {str(e)}")
return
# Get all memories from source
print("Fetching source memories...")
results = source_collection.get()
if not results["ids"]:
print("No memories found in source collection")
return
print(f"Found {len(results['ids'])} memories to migrate")
# Check for existing memories in target to avoid duplicates
target_existing = target_collection.get()
existing_ids = set(target_existing["ids"])
# Filter out already migrated memories
new_memories = {
"ids": [],
"documents": [],
"metadatas": []
}
for i, memory_id in enumerate(results["ids"]):
if memory_id not in existing_ids:
new_memories["ids"].append(memory_id)
new_memories["documents"].append(results["documents"][i])
new_memories["metadatas"].append(results["metadatas"][i])
if not new_memories["ids"]:
print("All memories are already migrated!")
return
print(f"Found {len(new_memories['ids'])} new memories to migrate")
# Import in batches of 10
batch_size = 10
for i in range(0, len(new_memories['ids']), batch_size):
batch_end = min(i + batch_size, len(new_memories['ids']))
batch_ids = new_memories['ids'][i:batch_end]
batch_documents = new_memories['documents'][i:batch_end]
batch_metadatas = new_memories['metadatas'][i:batch_end]
print(f"Migrating batch {i//batch_size + 1} ({len(batch_ids)} memories)...")
target_collection.add(
documents=batch_documents,
metadatas=batch_metadatas,
ids=batch_ids
)
# Small delay between batches
time.sleep(1)
print("\nMigration complete!")
# Verify migration
target_results = target_collection.get()
print(f"Verification: {len(target_results['ids'])} total memories in target collection")
except Exception as e:
print(f"Error during migration: {str(e)}")
print("Please ensure both ChromaDB instances are running and accessible")
if __name__ == "__main__":
# First verify the environment
verify_environment()
# Example usage:
# Local to remote migration
migrate_memories(
source_type='local',
source_config=get_claude_desktop_chroma_path(),
target_type='remote',
target_config={'host': '16.171.169.46', 'port': 8000}
)
# Remote to local migration
# migrate_memories(
# source_type='remote',
# source_config={'host': '16.171.169.46', 'port': 8000},
# target_type='local',
# target_config=get_claude_desktop_chroma_path()
# )
```
--------------------------------------------------------------------------------
/scripts/quality/weekly_quality_review.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
# scripts/quality/weekly_quality_review.sh - Weekly code quality review
#
# Usage: bash scripts/quality/weekly_quality_review.sh [--create-issue]
#
# Features:
# - Run pyscn analysis
# - Compare to last week's metrics
# - Generate markdown trend report
# - Optionally create GitHub issue if health score dropped >5%
set -e
# Colors for output
RED='\033[0;31m'
YELLOW='\033[1;33m'
GREEN='\033[0;32m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Parse arguments
CREATE_ISSUE=false
if [ "$1" = "--create-issue" ]; then
CREATE_ISSUE=true
fi
echo -e "${BLUE}=== Weekly Quality Review ===${NC}"
echo ""
# Run metrics tracking
echo "Running pyscn metrics tracking..."
if bash scripts/quality/track_pyscn_metrics.sh > /tmp/weekly_review.log 2>&1; then
echo -e "${GREEN}✓${NC} Metrics tracking complete"
else
echo -e "${RED}❌ Metrics tracking failed${NC}"
cat /tmp/weekly_review.log
exit 1
fi
# Extract current and previous metrics
CSV_FILE=".pyscn/history/metrics.csv"
if [ ! -f "$CSV_FILE" ] || [ $(wc -l < "$CSV_FILE") -lt 2 ]; then
echo -e "${YELLOW}⚠️ Insufficient data for weekly review (need at least 1 previous run)${NC}"
exit 0
fi
# Get current (last line) and previous (second to last) metrics
CURRENT_LINE=$(tail -1 "$CSV_FILE")
CURRENT_HEALTH=$(echo "$CURRENT_LINE" | cut -d',' -f3)
CURRENT_DATE=$(echo "$CURRENT_LINE" | cut -d',' -f2)
CURRENT_COMPLEXITY=$(echo "$CURRENT_LINE" | cut -d',' -f4)
CURRENT_DUPLICATION=$(echo "$CURRENT_LINE" | cut -d',' -f6)
# Find last week's metrics (7+ days ago)
SEVEN_DAYS_AGO=$(date -v-7d +%Y%m%d 2>/dev/null || date -d "7 days ago" +%Y%m%d)
PREV_LINE=$(awk -F',' -v cutoff="$SEVEN_DAYS_AGO" '$1 < cutoff {last=$0} END {print last}' "$CSV_FILE")
if [ -z "$PREV_LINE" ]; then
# Fallback to most recent previous entry if no 7-day-old entry exists
PREV_LINE=$(tail -2 "$CSV_FILE" | head -1)
fi
PREV_HEALTH=$(echo "$PREV_LINE" | cut -d',' -f3)
PREV_DATE=$(echo "$PREV_LINE" | cut -d',' -f2)
PREV_COMPLEXITY=$(echo "$PREV_LINE" | cut -d',' -f4)
PREV_DUPLICATION=$(echo "$PREV_LINE" | cut -d',' -f6)
# Calculate deltas
HEALTH_DELTA=$((CURRENT_HEALTH - PREV_HEALTH))
COMPLEXITY_DELTA=$((CURRENT_COMPLEXITY - PREV_COMPLEXITY))
DUPLICATION_DELTA=$((CURRENT_DUPLICATION - PREV_DUPLICATION))
echo ""
echo -e "${BLUE}=== Weekly Comparison ===${NC}"
echo "Period: $(echo "$PREV_DATE" | cut -d' ' -f1) → $(echo "$CURRENT_DATE" | cut -d' ' -f1)"
echo ""
echo "Health Score:"
echo " Previous: $PREV_HEALTH/100"
echo " Current: $CURRENT_HEALTH/100"
echo " Change: $([ $HEALTH_DELTA -ge 0 ] && echo "+")$HEALTH_DELTA points"
echo ""
# Determine overall trend
TREND_EMOJI="➡️"
TREND_TEXT="Stable"
if [ $HEALTH_DELTA -gt 5 ]; then
TREND_EMOJI="📈"
TREND_TEXT="Improving"
elif [ $HEALTH_DELTA -lt -5 ]; then
TREND_EMOJI="📉"
TREND_TEXT="Declining"
fi
echo -e "${TREND_EMOJI} Trend: ${TREND_TEXT}"
echo ""
# Generate markdown report
REPORT_FILE="docs/development/quality-review-$(date +%Y%m%d).md"
mkdir -p docs/development
cat > "$REPORT_FILE" <<EOF
# Weekly Quality Review - $(date +"%B %d, %Y")
## Summary
**Overall Trend:** ${TREND_EMOJI} ${TREND_TEXT}
| Metric | Previous | Current | Change |
|--------|----------|---------|--------|
| Health Score | $PREV_HEALTH/100 | $CURRENT_HEALTH/100 | $([ $HEALTH_DELTA -ge 0 ] && echo "+")$HEALTH_DELTA |
| Complexity | $PREV_COMPLEXITY/100 | $CURRENT_COMPLEXITY/100 | $([ $COMPLEXITY_DELTA -ge 0 ] && echo "+")$COMPLEXITY_DELTA |
| Duplication | $PREV_DUPLICATION/100 | $CURRENT_DUPLICATION/100 | $([ $DUPLICATION_DELTA -ge 0 ] && echo "+")$DUPLICATION_DELTA |
## Analysis Period
- **Start**: $(echo "$PREV_DATE" | cut -d' ' -f1)
- **End**: $(echo "$CURRENT_DATE" | cut -d' ' -f1)
- **Duration**: ~7 days
## Status
EOF
if [ $CURRENT_HEALTH -lt 50 ]; then
cat >> "$REPORT_FILE" <<EOF
### 🔴 Critical - Release Blocker
Health score below 50 requires immediate action:
- Cannot merge PRs until resolved
- Focus on refactoring high-complexity functions
- Remove dead code
- Address duplication
**Action Items:**
1. Review full pyscn report: \`.pyscn/reports/analyze_*.html\`
2. Create refactoring tasks for complexity >10 functions
3. Schedule refactoring sprint (target: 2 weeks)
4. Track progress in issue #240
EOF
elif [ $CURRENT_HEALTH -lt 70 ]; then
cat >> "$REPORT_FILE" <<EOF
### ⚠️ Action Required
Health score 50-69 indicates technical debt accumulation:
- Plan refactoring sprint within 2 weeks
- Review high-complexity functions
- Track improvement progress
**Recommended Actions:**
1. Identify top 5 complexity hotspots
2. Create project board for tracking
3. Allocate 20% of sprint capacity to quality improvements
EOF
else
cat >> "$REPORT_FILE" <<EOF
### ✅ Acceptable
Health score ≥70 indicates good code quality:
- Continue current development practices
- Monitor trends for regressions
- Address new issues proactively
**Maintenance:**
- Monthly quality reviews
- Track complexity trends
- Keep health score above 70
EOF
fi
# Add trend observations
cat >> "$REPORT_FILE" <<EOF
## Observations
EOF
if [ $HEALTH_DELTA -gt 5 ]; then
cat >> "$REPORT_FILE" <<EOF
- ✅ **Health score improved by $HEALTH_DELTA points** - Great progress on code quality
EOF
elif [ $HEALTH_DELTA -lt -5 ]; then
cat >> "$REPORT_FILE" <<EOF
- ⚠️ **Health score declined by ${HEALTH_DELTA#-} points** - Quality regression detected
EOF
fi
if [ $COMPLEXITY_DELTA -gt 0 ]; then
cat >> "$REPORT_FILE" <<EOF
- ⚠️ Complexity score decreased - New complex code introduced
EOF
elif [ $COMPLEXITY_DELTA -lt 0 ]; then
cat >> "$REPORT_FILE" <<EOF
- ✅ Complexity score improved - Refactoring efforts paying off
EOF
fi
if [ $DUPLICATION_DELTA -lt 0 ]; then
cat >> "$REPORT_FILE" <<EOF
- ⚠️ Code duplication increased - Review for consolidation opportunities
EOF
elif [ $DUPLICATION_DELTA -gt 0 ]; then
cat >> "$REPORT_FILE" <<EOF
- ✅ Code duplication reduced - Good refactoring work
EOF
fi
cat >> "$REPORT_FILE" <<EOF
## Next Steps
1. Review detailed pyscn report for specific issues
2. Update project board with quality improvement tasks
3. Schedule next weekly review for $(date -v+7d +"%B %d, %Y" 2>/dev/null || date -d "7 days" +"%B %d, %Y")
## Resources
- [Full pyscn Report](.pyscn/reports/)
- [Metrics History](.pyscn/history/metrics.csv)
- [Code Quality Workflow](docs/development/code-quality-workflow.md)
- [Issue #240](https://github.com/doobidoo/mcp-memory-service/issues/240) - Quality improvements tracking
EOF
echo -e "${GREEN}✓${NC} Report generated: $REPORT_FILE"
echo ""
# Create GitHub issue if significant regression and flag enabled
if [ "$CREATE_ISSUE" = true ] && [ $HEALTH_DELTA -lt -5 ]; then
if command -v gh &> /dev/null; then
echo -e "${YELLOW}Creating GitHub issue for quality regression...${NC}"
ISSUE_BODY="## Quality Regression Detected
Weekly quality review detected a significant health score decline:
**Health Score Change:** $PREV_HEALTH → $CURRENT_HEALTH (${HEALTH_DELTA} points)
### Details
$(cat "$REPORT_FILE" | sed -n '/## Summary/,/## Next Steps/p' | head -n -1)
### Action Required
1. Review full weekly report: [\`$REPORT_FILE\`]($REPORT_FILE)
2. Investigate recent changes: \`git log --since='$PREV_DATE'\`
3. Prioritize quality improvements in next sprint
### Related
- Issue #240 - Code Quality Improvements
- [pyscn Report](.pyscn/reports/)
"
gh issue create \
--title "Weekly Quality Review: Health Score Regression (${HEALTH_DELTA} points)" \
--body "$ISSUE_BODY" \
--label "technical-debt,quality"
echo -e "${GREEN}✓${NC} GitHub issue created"
else
echo -e "${YELLOW}⚠️ gh CLI not found, skipping issue creation${NC}"
fi
fi
echo ""
echo -e "${BLUE}=== Summary ===${NC}"
echo "Review Period: $(echo "$PREV_DATE" | cut -d' ' -f1) → $(echo "$CURRENT_DATE" | cut -d' ' -f1)"
echo "Health Score: $PREV_HEALTH → $CURRENT_HEALTH ($([ $HEALTH_DELTA -ge 0 ] && echo "+")$HEALTH_DELTA)"
echo "Trend: ${TREND_EMOJI} ${TREND_TEXT}"
echo ""
echo "Report: $REPORT_FILE"
echo ""
echo -e "${GREEN}✓${NC} Weekly review complete"
exit 0
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/web/api/health.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Health check endpoints for the HTTP interface.
"""
import time
import platform
import psutil
from datetime import datetime, timezone
from typing import Dict, Any, TYPE_CHECKING
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from ...storage.base import MemoryStorage
from ..dependencies import get_storage
from ... import __version__
from ...config import OAUTH_ENABLED
# OAuth authentication imports (conditional)
if OAUTH_ENABLED or TYPE_CHECKING:
from ..oauth.middleware import require_read_access, AuthenticationResult
else:
# Provide type stubs when OAuth is disabled
AuthenticationResult = None
require_read_access = None
router = APIRouter()
class HealthResponse(BaseModel):
"""Basic health check response."""
status: str
version: str
timestamp: str
uptime_seconds: float
class DetailedHealthResponse(BaseModel):
"""Detailed health check response."""
status: str
version: str
timestamp: str
uptime_seconds: float
storage: Dict[str, Any]
system: Dict[str, Any]
performance: Dict[str, Any]
statistics: Dict[str, Any] = None
# Track startup time for uptime calculation
_startup_time = time.time()
@router.get("/health", response_model=HealthResponse)
async def health_check():
"""Basic health check endpoint."""
return HealthResponse(
status="healthy",
version=__version__,
timestamp=datetime.now(timezone.utc).isoformat(),
uptime_seconds=time.time() - _startup_time
)
@router.get("/health/detailed", response_model=DetailedHealthResponse)
async def detailed_health_check(
storage: MemoryStorage = Depends(get_storage),
user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
"""Detailed health check with system and storage information."""
# Get system information
memory_info = psutil.virtual_memory()
disk_info = psutil.disk_usage('/')
system_info = {
"platform": platform.system(),
"platform_version": platform.version(),
"python_version": platform.python_version(),
"cpu_count": psutil.cpu_count(),
"memory_total_gb": round(memory_info.total / (1024**3), 2),
"memory_available_gb": round(memory_info.available / (1024**3), 2),
"memory_percent": memory_info.percent,
"disk_total_gb": round(disk_info.total / (1024**3), 2),
"disk_free_gb": round(disk_info.free / (1024**3), 2),
"disk_percent": round((disk_info.used / disk_info.total) * 100, 2)
}
# Get storage information (support all storage backends)
try:
# Get statistics from storage using universal get_stats() method
if hasattr(storage, 'get_stats') and callable(getattr(storage, 'get_stats')):
# All storage backends now have async get_stats()
stats = await storage.get_stats()
else:
stats = {"error": "Storage backend doesn't support statistics"}
if "error" not in stats:
# Detect backend type from storage class or stats
backend_name = stats.get("storage_backend", storage.__class__.__name__)
if "sqlite" in backend_name.lower():
backend_type = "sqlite-vec"
elif "cloudflare" in backend_name.lower():
backend_type = "cloudflare"
elif "hybrid" in backend_name.lower():
backend_type = "hybrid"
else:
backend_type = backend_name
storage_info = {
"backend": backend_type,
"status": "connected",
"accessible": True
}
# Add backend-specific information if available
if hasattr(storage, 'db_path'):
storage_info["database_path"] = storage.db_path
if hasattr(storage, 'embedding_model_name'):
storage_info["embedding_model"] = storage.embedding_model_name
# Add sync status for hybrid backend
if backend_type == "hybrid" and hasattr(storage, 'get_sync_status'):
try:
sync_status = await storage.get_sync_status()
storage_info["sync_status"] = {
"is_running": sync_status.get('is_running', False),
"last_sync_time": sync_status.get('last_sync_time', 0),
"pending_operations": sync_status.get('pending_operations', 0),
"operations_processed": sync_status.get('operations_processed', 0),
"operations_failed": sync_status.get('operations_failed', 0),
"time_since_last_sync": time.time() - sync_status.get('last_sync_time', 0) if sync_status.get('last_sync_time', 0) > 0 else 0
}
except Exception as sync_err:
storage_info["sync_status"] = {"error": str(sync_err)}
# Merge all stats
storage_info.update(stats)
else:
storage_info = {
"backend": storage.__class__.__name__,
"status": "error",
"accessible": False,
"error": stats["error"]
}
except Exception as e:
storage_info = {
"backend": storage.__class__.__name__ if hasattr(storage, '__class__') else "unknown",
"status": "error",
"error": str(e)
}
# Performance metrics (basic for now)
performance_info = {
"uptime_seconds": time.time() - _startup_time,
"uptime_formatted": format_uptime(time.time() - _startup_time)
}
# Extract statistics for separate field if available
statistics = {
"total_memories": storage_info.get("total_memories", 0),
"unique_tags": storage_info.get("unique_tags", 0),
"memories_this_week": storage_info.get("memories_this_week", 0),
"database_size_mb": storage_info.get("database_size_mb", 0),
"backend": storage_info.get("backend", "sqlite-vec")
}
return DetailedHealthResponse(
status="healthy",
version=__version__,
timestamp=datetime.now(timezone.utc).isoformat(),
uptime_seconds=time.time() - _startup_time,
storage=storage_info,
system=system_info,
performance=performance_info,
statistics=statistics
)
@router.get("/health/sync-status")
async def sync_status(
storage: MemoryStorage = Depends(get_storage),
user: AuthenticationResult = Depends(require_read_access) if OAUTH_ENABLED else None
):
"""Get current initial sync status for hybrid storage."""
# Check if this is a hybrid storage that supports sync status
if hasattr(storage, 'get_initial_sync_status'):
sync_status = storage.get_initial_sync_status()
return {
"sync_supported": True,
"status": sync_status
}
else:
return {
"sync_supported": False,
"status": {
"in_progress": False,
"total": 0,
"completed": 0,
"finished": True,
"progress_percentage": 100
}
}
def format_uptime(seconds: float) -> str:
"""Format uptime in human-readable format."""
if seconds < 60:
return f"{seconds:.1f} seconds"
elif seconds < 3600:
return f"{seconds/60:.1f} minutes"
elif seconds < 86400:
return f"{seconds/3600:.1f} hours"
else:
return f"{seconds/86400:.1f} days"
```
--------------------------------------------------------------------------------
/scripts/migration/migrate_tags.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# scripts/migrate_tags.py
# python scripts/validate_memories.py --db-path /path/to/your/chroma_db
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from mcp_memory_service.storage.chroma import ChromaMemoryStorage
import argparse
logger = logging.getLogger(__name__)
async def analyze_tag_formats(metadatas):
"""Analyze the current tag formats in the database"""
formats = {
"json_string": 0,
"raw_list": 0,
"comma_string": 0,
"empty": 0,
"invalid": 0
}
for meta in metadatas:
tags = meta.get("tags")
if tags is None:
formats["empty"] += 1
continue
if isinstance(tags, list):
formats["raw_list"] += 1
elif isinstance(tags, str):
try:
parsed = json.loads(tags)
if isinstance(parsed, list):
formats["json_string"] += 1
else:
formats["invalid"] += 1
except json.JSONDecodeError:
if "," in tags:
formats["comma_string"] += 1
else:
formats["invalid"] += 1
else:
formats["invalid"] += 1
return formats
async def find_invalid_tags(metadatas):
"""Find any invalid tag formats"""
invalid_entries = []
for i, meta in enumerate(metadatas):
tags = meta.get("tags")
if tags is None:
continue
try:
if isinstance(tags, str):
json.loads(tags)
except json.JSONDecodeError:
invalid_entries.append({
"memory_id": meta.get("content_hash", f"index_{i}"),
"tags": tags
})
return invalid_entries
async def backup_memories(storage):
"""Create a backup of all memories"""
results = storage.collection.get(include=["metadatas", "documents"])
backup_data = {
"timestamp": datetime.now().isoformat(),
"memories": [{
"id": results["ids"][i],
"content": results["documents"][i],
"metadata": results["metadatas"][i]
} for i in range(len(results["ids"]))]
}
backup_path = Path("backups")
backup_path.mkdir(exist_ok=True)
backup_file = backup_path / f"memory_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(backup_file, 'w') as f:
json.dump(backup_data, f)
return backup_file
async def validate_current_state(storage):
"""Validate the current state of the database"""
results = storage.collection.get(include=["metadatas"])
return {
"total_memories": len(results["ids"]),
"tag_formats": await analyze_tag_formats(results["metadatas"]),
"invalid_tags": await find_invalid_tags(results["metadatas"])
}
async def migrate_tags(storage):
"""Perform the tag migration"""
results = storage.collection.get(include=["metadatas", "documents"])
migrated_count = 0
error_count = 0
for i, meta in enumerate(results["metadatas"]):
try:
# Extract current tags
current_tags = meta.get("tags", "[]")
# Normalize to list format
if isinstance(current_tags, str):
try:
# Try parsing as JSON first
tags = json.loads(current_tags)
if isinstance(tags, str):
tags = [t.strip() for t in tags.split(",")]
elif isinstance(tags, list):
tags = [str(t).strip() for t in tags]
else:
tags = []
except json.JSONDecodeError:
# Handle as comma-separated string
tags = [t.strip() for t in current_tags.split(",")]
elif isinstance(current_tags, list):
tags = [str(t).strip() for t in current_tags]
else:
tags = []
# Update with normalized format
new_meta = meta.copy()
new_meta["tags"] = json.dumps(tags)
# Update memory
storage.collection.update(
ids=[results["ids"][i]],
metadatas=[new_meta]
)
migrated_count += 1
except Exception as e:
error_count += 1
logger.error(f"Error migrating memory {results['ids'][i]}: {str(e)}")
return migrated_count, error_count
async def verify_migration(storage):
"""Verify the migration was successful"""
results = storage.collection.get(include=["metadatas"])
verification = {
"total_memories": len(results["ids"]),
"tag_formats": await analyze_tag_formats(results["metadatas"]),
"invalid_tags": await find_invalid_tags(results["metadatas"])
}
return verification
async def rollback_migration(storage, backup_file):
"""Rollback to the backup if needed"""
with open(backup_file, 'r') as f:
backup = json.load(f)
for memory in backup["memories"]:
storage.collection.update(
ids=[memory["id"]],
metadatas=[memory["metadata"]],
documents=[memory["content"]]
)
async def main():
# Configure logging
log_level = os.getenv('LOG_LEVEL', 'ERROR').upper()
logging.basicConfig(
level=getattr(logging, log_level, logging.ERROR),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr
)
# Initialize storage
# storage = ChromaMemoryStorage("path/to/your/db")
# Parse command line arguments
parser = argparse.ArgumentParser(description='Validate memory data tags')
parser.add_argument('--db-path', required=True, help='Path to ChromaDB database')
args = parser.parse_args()
# Initialize storage with provided path
logger.info(f"Connecting to database at: {args.db_path}")
storage = ChromaMemoryStorage(args.db_path)
# 1. Create backup
logger.info("Creating backup...")
backup_file = await backup_memories(storage)
logger.info(f"Backup created at: {backup_file}")
# 2. Validate current state
logger.info("Validating current state...")
current_state = await validate_current_state(storage)
logger.info("\nCurrent state:")
logger.info(json.dumps(current_state, indent=2))
# 3. Confirm migration
proceed = input("\nProceed with migration? (yes/no): ")
if proceed.lower() == 'yes':
# 4. Run migration
logger.info("Running migration...")
migrated_count, error_count = await migrate_tags(storage)
logger.info(f"Migration completed. Migrated: {migrated_count}, Errors: {error_count}")
# 5. Verify migration
logger.info("Verifying migration...")
verification = await verify_migration(storage)
logger.info("\nMigration verification:")
logger.info(json.dumps(verification, indent=2))
# 6. Check if rollback needed
if error_count > 0:
rollback = input("\nErrors detected. Rollback to backup? (yes/no): ")
if rollback.lower() == 'yes':
logger.info("Rolling back...")
await rollback_migration(storage, backup_file)
logger.info("Rollback completed")
else:
logger.info("Migration cancelled")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/tests/integration/test_oauth_flow.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
OAuth 2.1 Dynamic Client Registration integration test.
Tests the OAuth endpoints for full flow functionality from client registration
through token acquisition and API access.
"""
import asyncio
import json
import sys
from typing import Optional
import httpx
async def test_oauth_endpoints(base_url: str = "http://localhost:8000") -> bool:
"""
Test OAuth 2.1 endpoints for basic functionality.
Returns:
True if all tests pass, False otherwise
"""
print(f"Testing OAuth endpoints at {base_url}")
print("=" * 50)
async with httpx.AsyncClient() as client:
try:
# Test 1: OAuth Authorization Server Metadata
print("1. Testing OAuth Authorization Server Metadata...")
response = await client.get(f"{base_url}/.well-known/oauth-authorization-server/mcp")
if response.status_code != 200:
print(f" ❌ Failed: {response.status_code}")
return False
metadata = response.json()
required_fields = ["issuer", "authorization_endpoint", "token_endpoint", "registration_endpoint"]
for field in required_fields:
if field not in metadata:
print(f" ❌ Missing required field: {field}")
return False
print(f" ✅ Metadata endpoint working")
print(f" 📋 Issuer: {metadata.get('issuer')}")
# Test 2: Client Registration
print("\n2. Testing Dynamic Client Registration...")
registration_data = {
"client_name": "Test Client",
"redirect_uris": ["https://example.com/callback"],
"grant_types": ["authorization_code"],
"response_types": ["code"]
}
response = await client.post(
f"{base_url}/oauth/register",
json=registration_data
)
if response.status_code != 201:
print(f" ❌ Registration failed: {response.status_code}")
print(f" Response: {response.text}")
return False
client_info = response.json()
client_id = client_info.get("client_id")
client_secret = client_info.get("client_secret")
if not client_id or not client_secret:
print(f" ❌ Missing client credentials in response")
return False
print(f" ✅ Client registration successful")
print(f" 📋 Client ID: {client_id}")
# Test 3: Authorization Endpoint (expect redirect)
print("\n3. Testing Authorization Endpoint...")
auth_url = f"{base_url}/oauth/authorize"
auth_params = {
"response_type": "code",
"client_id": client_id,
"redirect_uri": "https://example.com/callback",
"state": "test_state_123"
}
response = await client.get(auth_url, params=auth_params, follow_redirects=False)
if response.status_code not in [302, 307]:
print(f" ❌ Authorization failed: {response.status_code}")
print(f" Response: {response.text}")
return False
location = response.headers.get("location", "")
if "code=" not in location or "state=test_state_123" not in location:
print(f" ❌ Invalid redirect: {location}")
return False
print(f" ✅ Authorization endpoint working")
print(f" 📋 Redirect URL: {location[:100]}...")
# Extract authorization code from redirect
auth_code = None
for param in location.split("?")[1].split("&"):
if param.startswith("code="):
auth_code = param.split("=")[1]
break
if not auth_code:
print(f" ❌ No authorization code in redirect")
return False
# Test 4: Token Endpoint
print("\n4. Testing Token Endpoint...")
token_data = {
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": "https://example.com/callback",
"client_id": client_id,
"client_secret": client_secret
}
response = await client.post(
f"{base_url}/oauth/token",
data=token_data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code != 200:
print(f" ❌ Token request failed: {response.status_code}")
print(f" Response: {response.text}")
return False
token_response = response.json()
access_token = token_response.get("access_token")
if not access_token:
print(f" ❌ No access token in response")
return False
print(f" ✅ Token endpoint working")
print(f" 📋 Token type: {token_response.get('token_type')}")
print(f" 📋 Expires in: {token_response.get('expires_in')} seconds")
# Test 5: Protected Resource Access
print("\n5. Testing Protected API Endpoints...")
headers = {"Authorization": f"Bearer {access_token}"}
# Test health endpoint (should be public, no auth required)
response = await client.get(f"{base_url}/api/health")
if response.status_code == 200:
print(f" ✅ Public health endpoint accessible")
else:
print(f" ❌ Health endpoint failed: {response.status_code}")
# Test protected memories endpoint (requires read access)
response = await client.get(f"{base_url}/api/memories", headers=headers)
if response.status_code == 200:
print(f" ✅ Protected memories endpoint accessible with Bearer token")
else:
print(f" ❌ Protected memories endpoint failed: {response.status_code}")
# Test protected search endpoint (requires read access)
search_data = {"query": "test search", "n_results": 5}
response = await client.post(f"{base_url}/api/search", json=search_data, headers=headers)
if response.status_code in [200, 404]: # 404 is OK if no memories exist
print(f" ✅ Protected search endpoint accessible with Bearer token")
else:
print(f" ❌ Protected search endpoint failed: {response.status_code}")
# Test accessing protected endpoint without token (should fail)
response = await client.get(f"{base_url}/api/memories")
if response.status_code == 401:
print(f" ✅ Protected endpoint correctly rejects unauthenticated requests")
else:
print(f" ⚠️ Protected endpoint security test inconclusive: {response.status_code}")
print("\n" + "=" * 50)
print("🎉 All OAuth 2.1 tests passed!")
print("✅ Ready for Claude Code HTTP transport integration")
print("✅ API endpoints properly protected with OAuth authentication")
return True
except Exception as e:
print(f"\n❌ Test failed with exception: {e}")
return False
async def main():
"""Main test function."""
if len(sys.argv) > 1:
base_url = sys.argv[1]
else:
base_url = "http://localhost:8000"
print("OAuth 2.1 Dynamic Client Registration Test")
print("==========================================")
print(f"Target: {base_url}")
print()
print("Make sure the MCP Memory Service is running with OAuth enabled:")
print(" export MCP_OAUTH_ENABLED=true")
print(" uv run memory server --http")
print()
success = await test_oauth_endpoints(base_url)
if success:
print("\n🚀 OAuth implementation is ready!")
sys.exit(0)
else:
print("\n💥 OAuth tests failed - check implementation")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/docs/api/memory-metadata-api.md:
--------------------------------------------------------------------------------
```markdown
# Memory Metadata Enhancement API
## Overview
The Memory Metadata Enhancement API provides efficient memory metadata updates without requiring complete memory recreation. This addresses the core limitation identified in Issue #10 where updating memory metadata required deleting and recreating entire memory entries.
## API Method
### `update_memory_metadata`
Updates memory metadata while preserving the original memory content, embeddings, and optionally timestamps.
**Signature:**
```python
async def update_memory_metadata(
content_hash: str,
updates: Dict[str, Any],
preserve_timestamps: bool = True
) -> Tuple[bool, str]
```
**Parameters:**
- `content_hash` (string, required): The content hash of the memory to update
- `updates` (object, required): Dictionary of metadata fields to update
- `preserve_timestamps` (boolean, optional): Whether to preserve original created_at timestamp (default: true)
**Returns:**
- `success` (boolean): Whether the update was successful
- `message` (string): Summary of updated fields or error message
## Supported Update Fields
### Core Metadata Fields
1. **tags** (array of strings)
- Replaces existing tags completely
- Example: `"tags": ["important", "reference", "new-tag"]`
2. **memory_type** (string)
- Updates the memory type classification
- Example: `"memory_type": "reminder"`
3. **metadata** (object)
- Merges with existing custom metadata
- Example: `"metadata": {"priority": "high", "due_date": "2024-01-15"}`
### Custom Fields
Any other fields not in the protected list can be updated directly:
- `"priority": "urgent"`
- `"status": "active"`
- `"category": "work"`
- Custom application-specific fields
### Protected Fields
These fields cannot be modified through this API:
- `content` - Memory content is immutable
- `content_hash` - Content hash is immutable
- `embedding` - Embeddings are preserved automatically
- `created_at` / `created_at_iso` - Preserved unless `preserve_timestamps=false`
- Internal timestamp fields (`timestamp`, `timestamp_float`, `timestamp_str`)
## Usage Examples
### Example 1: Add Tags to Memory
```json
{
"content_hash": "abc123def456...",
"updates": {
"tags": ["important", "reference", "project-alpha"]
}
}
```
### Example 2: Update Memory Type and Custom Metadata
```json
{
"content_hash": "abc123def456...",
"updates": {
"memory_type": "reminder",
"metadata": {
"priority": "high",
"due_date": "2024-01-15",
"assignee": "[email protected]"
}
}
}
```
### Example 3: Update Custom Fields Directly
```json
{
"content_hash": "abc123def456...",
"updates": {
"priority": "urgent",
"status": "active",
"category": "work",
"last_reviewed": "2024-01-10"
}
}
```
### Example 4: Update with Timestamp Reset
```json
{
"content_hash": "abc123def456...",
"updates": {
"tags": ["archived", "completed"]
},
"preserve_timestamps": false
}
```
## Timestamp Behavior
### Default Behavior (preserve_timestamps=true)
- `created_at` and `created_at_iso` are preserved from original memory
- `updated_at` and `updated_at_iso` are set to current time
- Legacy timestamp fields are updated for backward compatibility
### Reset Behavior (preserve_timestamps=false)
- All timestamp fields are set to current time
- Useful for marking memories as "refreshed" or "re-activated"
## Implementation Details
### Storage Layer
The API is implemented in the storage abstraction layer:
1. **Base Storage Interface** (`storage/base.py`)
- Abstract method definition
- Consistent interface across storage backends
2. **ChromaDB Implementation** (`storage/chroma.py`)
- Efficient upsert operation preserving embeddings
- Metadata merging with validation
- Timestamp synchronization
3. **Future Storage Backends**
- sqlite-vec implementation will follow same interface
- Other storage backends can implement consistently
### MCP Protocol Integration
The API is exposed via the MCP protocol:
1. **Tool Registration** - Available as `update_memory_metadata` tool
2. **Input Validation** - Comprehensive parameter validation
3. **Error Handling** - Clear error messages for debugging
4. **Logging** - Detailed operation logging for monitoring
## Performance Benefits
### Efficiency Gains
1. **No Content Re-processing**
- Original content remains unchanged
- No need to regenerate embeddings
- Preserves vector database relationships
2. **Minimal Network Transfer**
- Only metadata changes are transmitted
- Reduced bandwidth usage
- Faster operation completion
3. **Database Optimization**
- Single update operation vs delete+insert
- Maintains database indices and relationships
- Reduces transaction overhead
### Resource Savings
- **Memory Usage**: No need to load full memory content
- **CPU Usage**: No embedding regeneration required
- **Storage I/O**: Minimal database operations
- **Network**: Reduced data transfer
## Error Handling
### Common Error Scenarios
1. **Memory Not Found**
```
Error: Memory with hash abc123... not found
```
2. **Invalid Updates Format**
```
Error: updates must be a dictionary
```
3. **Invalid Tags Format**
```
Error: Tags must be provided as a list of strings
```
4. **Storage Not Initialized**
```
Error: Collection not initialized, cannot update memory metadata
```
### Error Recovery
- Detailed error messages for debugging
- Transaction rollback on failures
- Original memory remains unchanged on errors
- Logging for troubleshooting
## Migration and Compatibility
### Backward Compatibility
- Existing memories work without modification
- Legacy timestamp fields are maintained
- No breaking changes to existing APIs
### Migration Strategy
1. **Immediate Availability** - API available immediately after deployment
2. **Gradual Adoption** - Can be adopted incrementally
3. **Fallback Support** - Original store/delete pattern still works
4. **Validation** - Comprehensive testing before production use
## Use Cases
### Memory Organization
1. **Tag Management**
- Add organizational tags over time
- Categorize memories as understanding improves
- Apply bulk tagging for organization
2. **Priority Updates**
- Mark memories as high/low priority
- Update urgency as contexts change
- Implement memory lifecycle management
3. **Status Tracking**
- Track memory processing status
- Mark memories as reviewed/processed
- Implement workflow states
### Advanced Features
1. **Memory Linking**
- Add relationship metadata
- Create memory hierarchies
- Implement reference systems
2. **Time-to-Live Management**
- Add expiration metadata
- Implement memory aging
- Schedule automatic cleanup
3. **Access Control**
- Add ownership metadata
- Implement sharing controls
- Track access permissions
## Testing and Validation
### Unit Tests
- Comprehensive test coverage for all update scenarios
- Error condition testing
- Timestamp behavior validation
- Metadata merging verification
### Integration Tests
- End-to-end MCP protocol testing
- Storage backend compatibility testing
- Performance benchmarking
- Cross-platform validation
### Performance Testing
- Large dataset updates
- Concurrent update operations
- Memory usage monitoring
- Response time measurement
## Future Enhancements
### Planned Improvements
1. **Batch Updates** - Update multiple memories in single operation
2. **Conditional Updates** - Update only if conditions are met
3. **Metadata Validation** - Schema validation for metadata fields
4. **Update History** - Track metadata change history
5. **Selective Updates** - Update only specific metadata fields
### Storage Backend Support
- sqlite-vec implementation (Issue #40)
- Other vector database backends
- Consistent API across all backends
- Performance optimization per backend
## Conclusion
The Memory Metadata Enhancement API provides a robust, efficient solution for memory metadata management. It enables sophisticated memory organization features while maintaining excellent performance and backward compatibility.
This implementation forms the foundation for advanced memory management features like re-tagging systems (Issue #45) and memory consolidation (Issue #11).
```
--------------------------------------------------------------------------------
/scripts/installation/setup_cloudflare_resources.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Automated Cloudflare resource setup for MCP Memory Service.
This script creates the required Cloudflare resources using the HTTP API.
"""
import os
import sys
import asyncio
import json
import logging
from typing import Dict, Any, Optional
import httpx
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CloudflareSetup:
def __init__(self, api_token: str, account_id: str):
self.api_token = api_token
self.account_id = account_id
self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}"
self.client = None
async def _get_client(self) -> httpx.AsyncClient:
if self.client is None:
headers = {
"Authorization": f"Bearer {self.api_token}",
"Content-Type": "application/json"
}
self.client = httpx.AsyncClient(headers=headers, timeout=30.0)
return self.client
async def _make_request(self, method: str, url: str, **kwargs) -> Dict[str, Any]:
"""Make authenticated request to Cloudflare API."""
client = await self._get_client()
response = await client.request(method, url, **kwargs)
if response.status_code not in [200, 201]:
logger.error(f"API request failed: {response.status_code} {response.text}")
response.raise_for_status()
return response.json()
async def create_vectorize_index(self, name: str = "mcp-memory-index") -> str:
"""Create Vectorize index and return its ID."""
logger.info(f"Creating Vectorize index: {name}")
# Check if index already exists
try:
url = f"{self.base_url}/vectorize/indexes/{name}"
result = await self._make_request("GET", url)
if result.get("success"):
logger.info(f"Vectorize index {name} already exists")
return name
except httpx.HTTPStatusError as e:
if e.response.status_code != 404:
raise
# Create new index
url = f"{self.base_url}/vectorize/indexes"
payload = {
"name": name,
"config": {
"dimensions": 768,
"metric": "cosine"
}
}
result = await self._make_request("POST", url, json=payload)
if result.get("success"):
logger.info(f"✅ Created Vectorize index: {name}")
return name
else:
raise ValueError(f"Failed to create Vectorize index: {result}")
async def create_d1_database(self, name: str = "mcp-memory-db") -> str:
"""Create D1 database and return its ID."""
logger.info(f"Creating D1 database: {name}")
# List existing databases to check if it exists
url = f"{self.base_url}/d1/database"
result = await self._make_request("GET", url)
if result.get("success"):
for db in result.get("result", []):
if db.get("name") == name:
db_id = db.get("uuid")
logger.info(f"D1 database {name} already exists with ID: {db_id}")
return db_id
# Create new database
payload = {"name": name}
result = await self._make_request("POST", url, json=payload)
if result.get("success"):
db_id = result["result"]["uuid"]
logger.info(f"✅ Created D1 database: {name} (ID: {db_id})")
return db_id
else:
raise ValueError(f"Failed to create D1 database: {result}")
async def create_r2_bucket(self, name: str = "mcp-memory-content") -> str:
"""Create R2 bucket and return its name."""
logger.info(f"Creating R2 bucket: {name}")
# Check if bucket already exists
try:
url = f"{self.base_url}/r2/buckets/{name}"
result = await self._make_request("GET", url)
if result.get("success"):
logger.info(f"R2 bucket {name} already exists")
return name
except httpx.HTTPStatusError as e:
if e.response.status_code != 404:
raise
# Create new bucket
url = f"{self.base_url}/r2/buckets"
payload = {"name": name}
result = await self._make_request("POST", url, json=payload)
if result.get("success"):
logger.info(f"✅ Created R2 bucket: {name}")
return name
else:
raise ValueError(f"Failed to create R2 bucket: {result}")
async def verify_workers_ai_access(self) -> bool:
"""Verify Workers AI access and embedding model."""
logger.info("Verifying Workers AI access...")
# Test embedding generation
url = f"{self.base_url}/ai/run/@cf/baai/bge-base-en-v1.5"
payload = {"text": ["test embedding"]}
try:
result = await self._make_request("POST", url, json=payload)
if result.get("success"):
logger.info("✅ Workers AI access verified")
return True
else:
logger.warning(f"Workers AI test failed: {result}")
return False
except Exception as e:
logger.warning(f"Workers AI verification failed: {e}")
return False
async def close(self):
"""Close HTTP client."""
if self.client:
await self.client.aclose()
async def main():
"""Main setup routine."""
print("🚀 Cloudflare Backend Setup for MCP Memory Service")
print("=" * 55)
# Check for required environment variables
api_token = os.getenv("CLOUDFLARE_API_TOKEN")
account_id = os.getenv("CLOUDFLARE_ACCOUNT_ID")
if not api_token:
print("❌ CLOUDFLARE_API_TOKEN environment variable not set")
print("Please create an API token at: https://dash.cloudflare.com/profile/api-tokens")
print("Required permissions: Vectorize:Edit, D1:Edit, Workers AI:Edit, R2:Edit")
return False
if not account_id:
print("❌ CLOUDFLARE_ACCOUNT_ID environment variable not set")
print("You can find your account ID in the Cloudflare dashboard sidebar")
return False
setup = CloudflareSetup(api_token, account_id)
try:
# Create resources
vectorize_index = await setup.create_vectorize_index()
d1_database_id = await setup.create_d1_database()
# R2 bucket is optional
r2_bucket = None
create_r2 = input("\n🪣 Create R2 bucket for large content storage? (y/N): ").lower().strip()
if create_r2 in ['y', 'yes']:
try:
r2_bucket = await setup.create_r2_bucket()
except Exception as e:
logger.warning(f"Failed to create R2 bucket: {e}")
logger.warning("Continuing without R2 storage...")
# Verify Workers AI
ai_available = await setup.verify_workers_ai_access()
print("\n🎉 Setup Complete!")
print("=" * 20)
print(f"Vectorize Index: {vectorize_index}")
print(f"D1 Database ID: {d1_database_id}")
print(f"R2 Bucket: {r2_bucket or 'Not configured'}")
print(f"Workers AI: {'Available' if ai_available else 'Limited access'}")
print("\n📝 Environment Variables:")
print("=" * 25)
print(f"export CLOUDFLARE_API_TOKEN=\"{api_token[:10]}...\"")
print(f"export CLOUDFLARE_ACCOUNT_ID=\"{account_id}\"")
print(f"export CLOUDFLARE_VECTORIZE_INDEX=\"{vectorize_index}\"")
print(f"export CLOUDFLARE_D1_DATABASE_ID=\"{d1_database_id}\"")
if r2_bucket:
print(f"export CLOUDFLARE_R2_BUCKET=\"{r2_bucket}\"")
print("export MCP_MEMORY_STORAGE_BACKEND=\"cloudflare\"")
print("\n🧪 Test the setup:")
print("python test_cloudflare_backend.py")
return True
except Exception as e:
logger.error(f"Setup failed: {e}")
return False
finally:
await setup.close()
if __name__ == "__main__":
success = asyncio.run(main())
sys.exit(0 if success else 1)
```
--------------------------------------------------------------------------------
/docs/assets/images/project-infographic.svg:
--------------------------------------------------------------------------------
```
<svg width="800" height="1200" viewBox="0 0 800 1200" xmlns="http://www.w3.org/2000/svg">
<!-- Background -->
<rect width="800" height="1200" fill="#f8f9fa"/>
<!-- Header -->
<rect width="800" height="120" fill="#1a1a1a"/>
<text x="400" y="60" font-family="Arial, sans-serif" font-size="36" font-weight="bold" fill="white" text-anchor="middle">MCP Memory Service</text>
<text x="400" y="90" font-family="Arial, sans-serif" font-size="18" fill="#888" text-anchor="middle">Production-Ready Knowledge Management Platform</text>
<!-- Performance Metrics Section -->
<g transform="translate(0, 140)">
<text x="400" y="30" font-family="Arial, sans-serif" font-size="24" font-weight="bold" fill="#333" text-anchor="middle">Performance Metrics</text>
<!-- Metric Cards -->
<g transform="translate(50, 60)">
<!-- Card 1 -->
<rect x="0" y="0" width="160" height="100" rx="10" fill="#e3f2fd" stroke="#2196f3" stroke-width="2"/>
<text x="80" y="35" font-family="Arial, sans-serif" font-size="32" font-weight="bold" fill="#1976d2" text-anchor="middle">319+</text>
<text x="80" y="60" font-family="Arial, sans-serif" font-size="14" fill="#555" text-anchor="middle">Memories</text>
<text x="80" y="80" font-family="Arial, sans-serif" font-size="14" fill="#555" text-anchor="middle">Managed</text>
<!-- Card 2 -->
<rect x="190" y="0" width="160" height="100" rx="10" fill="#e8f5e9" stroke="#4caf50" stroke-width="2"/>
<text x="270" y="35" font-family="Arial, sans-serif" font-size="32" font-weight="bold" fill="#388e3c" text-anchor="middle">828ms</text>
<text x="270" y="60" font-family="Arial, sans-serif" font-size="14" fill="#555" text-anchor="middle">Avg Query</text>
<text x="270" y="80" font-family="Arial, sans-serif" font-size="14" fill="#555" text-anchor="middle">Time</text>
<!-- Card 3 -->
<rect x="380" y="0" width="160" height="100" rx="10" fill="#fff3e0" stroke="#ff9800" stroke-width="2"/>
<text x="460" y="35" font-family="Arial, sans-serif" font-size="32" font-weight="bold" fill="#f57c00" text-anchor="middle">100%</text>
<text x="460" y="60" font-family="Arial, sans-serif" font-size="14" fill="#555" text-anchor="middle">Cache Hit</text>
<text x="460" y="80" font-family="Arial, sans-serif" font-size="14" fill="#555" text-anchor="middle">Ratio</text>
<!-- Card 4 -->
<rect x="570" y="0" width="160" height="100" rx="10" fill="#fce4ec" stroke="#e91e63" stroke-width="2"/>
<text x="650" y="35" font-family="Arial, sans-serif" font-size="32" font-weight="bold" fill="#c2185b" text-anchor="middle">20MB</text>
<text x="650" y="60" font-family="Arial, sans-serif" font-size="14" fill="#555" text-anchor="middle">Efficient</text>
<text x="650" y="80" font-family="Arial, sans-serif" font-size="14" fill="#555" text-anchor="middle">Storage</text>
</g>
</g>
<!-- Features Section -->
<g transform="translate(0, 380)">
<text x="400" y="30" font-family="Arial, sans-serif" font-size="24" font-weight="bold" fill="#333" text-anchor="middle">16 Comprehensive Operations</text>
<!-- Feature Categories -->
<g transform="translate(50, 60)">
<!-- Memory Operations -->
<rect x="0" y="0" width="220" height="180" rx="10" fill="#f5f5f5" stroke="#999" stroke-width="1"/>
<text x="110" y="25" font-family="Arial, sans-serif" font-size="16" font-weight="bold" fill="#333" text-anchor="middle">Memory Operations</text>
<text x="15" y="50" font-family="Arial, sans-serif" font-size="14" fill="#555">• store_memory</text>
<text x="15" y="70" font-family="Arial, sans-serif" font-size="14" fill="#555">• retrieve_memory</text>
<text x="15" y="90" font-family="Arial, sans-serif" font-size="14" fill="#555">• search_by_tag</text>
<text x="15" y="110" font-family="Arial, sans-serif" font-size="14" fill="#555">• delete_memory</text>
<text x="15" y="130" font-family="Arial, sans-serif" font-size="14" fill="#555">• update_metadata</text>
<text x="15" y="150" font-family="Arial, sans-serif" font-size="14" fill="#555">• exact_match_retrieve</text>
<!-- Database Management -->
<rect x="250" y="0" width="220" height="180" rx="10" fill="#f5f5f5" stroke="#999" stroke-width="1"/>
<text x="360" y="25" font-family="Arial, sans-serif" font-size="16" font-weight="bold" fill="#333" text-anchor="middle">Database Management</text>
<text x="265" y="50" font-family="Arial, sans-serif" font-size="14" fill="#555">• create_backup</text>
<text x="265" y="70" font-family="Arial, sans-serif" font-size="14" fill="#555">• optimize_db</text>
<text x="265" y="90" font-family="Arial, sans-serif" font-size="14" fill="#555">• check_health</text>
<text x="265" y="110" font-family="Arial, sans-serif" font-size="14" fill="#555">• get_stats</text>
<text x="265" y="130" font-family="Arial, sans-serif" font-size="14" fill="#555">• cleanup_duplicates</text>
<!-- Advanced Features -->
<rect x="500" y="0" width="200" height="180" rx="10" fill="#f5f5f5" stroke="#999" stroke-width="1"/>
<text x="600" y="25" font-family="Arial, sans-serif" font-size="16" font-weight="bold" fill="#333" text-anchor="middle">Advanced Features</text>
<text x="515" y="50" font-family="Arial, sans-serif" font-size="14" fill="#555">• debug_retrieve</text>
<text x="515" y="70" font-family="Arial, sans-serif" font-size="14" fill="#555">• recall_memory</text>
<text x="515" y="90" font-family="Arial, sans-serif" font-size="14" fill="#555">• delete_by_timeframe</text>
<text x="515" y="110" font-family="Arial, sans-serif" font-size="14" fill="#555">• check_embedding</text>
</g>
</g>
<!-- Architecture -->
<g transform="translate(0, 650)">
<text x="400" y="30" font-family="Arial, sans-serif" font-size="24" font-weight="bold" fill="#333" text-anchor="middle">Architecture Stack</text>
<g transform="translate(150, 60)">
<!-- Stack layers -->
<rect x="0" y="0" width="500" height="50" rx="5" fill="#4a90e2" stroke="#357abd" stroke-width="2"/>
<text x="250" y="30" font-family="Arial, sans-serif" font-size="16" font-weight="bold" fill="white" text-anchor="middle">React Dashboard + Real-time Statistics</text>
<rect x="0" y="60" width="500" height="50" rx="5" fill="#5cb85c" stroke="#449d44" stroke-width="2"/>
<text x="250" y="90" font-family="Arial, sans-serif" font-size="16" font-weight="bold" fill="white" text-anchor="middle">MCP Protocol (stdin/stdout)</text>
<rect x="0" y="120" width="500" height="50" rx="5" fill="#f0ad4e" stroke="#ec971f" stroke-width="2"/>
<text x="250" y="150" font-family="Arial, sans-serif" font-size="16" font-weight="bold" fill="white" text-anchor="middle">Python Server + Sentence Transformers</text>
<rect x="0" y="180" width="500" height="50" rx="5" fill="#d9534f" stroke="#c9302c" stroke-width="2"/>
<text x="250" y="210" font-family="Arial, sans-serif" font-size="16" font-weight="bold" fill="white" text-anchor="middle">ChromaDB Vector Storage</text>
</g>
</g>
<!-- Sponsorship CTA -->
<g transform="translate(0, 950)">
<rect x="50" y="0" width="700" height="200" rx="15" fill="#1a1a1a"/>
<text x="400" y="40" font-family="Arial, sans-serif" font-size="28" font-weight="bold" fill="white" text-anchor="middle">Support Open Source Development</text>
<text x="400" y="80" font-family="Arial, sans-serif" font-size="16" fill="#ccc" text-anchor="middle">Your sponsorship enables:</text>
<text x="200" y="110" font-family="Arial, sans-serif" font-size="14" fill="#aaa">✓ New feature development</text>
<text x="200" y="135" font-family="Arial, sans-serif" font-size="14" fill="#aaa">✓ Bug fixes & maintenance</text>
<text x="450" y="110" font-family="Arial, sans-serif" font-size="14" fill="#aaa">✓ Documentation improvements</text>
<text x="450" y="135" font-family="Arial, sans-serif" font-size="14" fill="#aaa">✓ Community support</text>
<rect x="300" y="155" width="200" height="35" rx="20" fill="#ea4aaa" stroke="none"/>
<text x="400" y="178" font-family="Arial, sans-serif" font-size="16" font-weight="bold" fill="white" text-anchor="middle">Become a Sponsor</text>
</g>
</svg>
```
--------------------------------------------------------------------------------
/scripts/development/verify_hybrid_sync.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Comprehensive verification of hybrid storage background sync functionality.
"""
import asyncio
import sys
import tempfile
import os
import time
from unittest.mock import patch
sys.path.insert(0, 'src')
from mcp_memory_service.storage.hybrid import HybridMemoryStorage
from mcp_memory_service.models.memory import Memory
import hashlib
class DetailedMockCloudflare:
"""Detailed mock for tracking sync operations."""
def __init__(self, **kwargs):
self.memories = {}
self.operation_log = []
self.initialized = False
self.delay = 0.01 # Simulate network delay
async def initialize(self):
self.initialized = True
self.operation_log.append(('init', time.time()))
async def store(self, memory):
await asyncio.sleep(self.delay) # Simulate network
self.memories[memory.content_hash] = memory
self.operation_log.append(('store', memory.content_hash, time.time()))
return True, "Stored"
async def delete(self, content_hash):
await asyncio.sleep(self.delay)
if content_hash in self.memories:
del self.memories[content_hash]
self.operation_log.append(('delete', content_hash, time.time()))
return True, "Deleted"
async def update_memory_metadata(self, content_hash, updates, preserve_timestamps=True):
await asyncio.sleep(self.delay)
self.operation_log.append(('update', content_hash, time.time()))
return True, "Updated"
async def get_stats(self):
return {"total": len(self.memories)}
async def close(self):
self.operation_log.append(('close', time.time()))
async def verify_sync():
print("🔍 HYBRID STORAGE BACKGROUND SYNC VERIFICATION")
print("=" * 60)
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp:
db_path = tmp.name
try:
config = {
'api_token': 'test',
'account_id': 'test',
'vectorize_index': 'test',
'd1_database_id': 'test'
}
with patch('mcp_memory_service.storage.hybrid.CloudflareStorage', DetailedMockCloudflare):
# Initialize with short sync interval
storage = HybridMemoryStorage(
sqlite_db_path=db_path,
cloudflare_config=config,
sync_interval=0.5, # 500ms for quick testing
batch_size=2
)
await storage.initialize()
print("✅ Hybrid storage initialized with background sync")
print(f" • Primary: SQLite-vec (local)")
print(f" • Secondary: Mock Cloudflare (simulated)")
print(f" • Sync interval: 0.5 seconds")
print(f" • Batch size: 2 operations")
print()
# TEST 1: Store operations are queued
print("📝 TEST 1: Store Operations Queuing")
print("-" * 40)
memories = []
for i in range(4):
content = f"Sync test memory #{i+1} at {time.time()}"
memory = Memory(
content=content,
content_hash=hashlib.sha256(content.encode()).hexdigest(),
tags=['sync-verify'],
memory_type='test'
)
memories.append(memory)
start = time.time()
success, msg = await storage.store(memory)
elapsed = (time.time() - start) * 1000
print(f" Memory #{i+1}: ✅ stored in {elapsed:.1f}ms (local)")
# Check initial queue
status = await storage.sync_service.get_sync_status()
print(f"\n 📊 Queue status after stores:")
print(f" • Queued operations: {status['queue_size']}")
print(f" • Processed: {status['stats']['operations_processed']}")
# TEST 2: Wait for automatic background sync
print("\n⏳ TEST 2: Automatic Background Sync")
print("-" * 40)
print(" Waiting 1.5 seconds for automatic sync...")
await asyncio.sleep(1.5)
status = await storage.sync_service.get_sync_status()
mock_log = storage.secondary.operation_log
print(f"\n 📊 After automatic sync:")
print(f" • Queue remaining: {status['queue_size']}")
print(f" • Operations processed: {status['stats']['operations_processed']}")
print(f" • Mock Cloudflare received: {len([op for op in mock_log if op[0] == 'store'])} stores")
# TEST 3: Delete operation
print("\n🗑️ TEST 3: Delete Operation Sync")
print("-" * 40)
delete_hash = memories[0].content_hash
success, msg = await storage.delete(delete_hash)
print(f" Delete operation: ✅ (local)")
await asyncio.sleep(1) # Wait for sync
delete_ops = [op for op in mock_log if op[0] == 'delete']
print(f" Mock Cloudflare received: {len(delete_ops)} delete operation(s)")
# TEST 4: Force sync
print("\n🔄 TEST 4: Force Sync")
print("-" * 40)
# Add more memories
for i in range(2):
content = f"Force sync test #{i+1}"
memory = Memory(
content=content,
content_hash=hashlib.sha256(content.encode()).hexdigest(),
tags=['force-sync'],
memory_type='test'
)
await storage.store(memory)
print(f" Added 2 more memories")
# Force sync
result = await storage.force_sync()
print(f"\n Force sync result:")
print(f" • Status: {result['status']}")
print(f" • Primary memories: {result['primary_memories']}")
print(f" • Synced to secondary: {result['synced_to_secondary']}")
print(f" • Duration: {result.get('duration', 0):.3f}s")
# Final verification
print("\n✅ FINAL VERIFICATION")
print("-" * 40)
final_status = await storage.sync_service.get_sync_status()
final_mock_ops = storage.secondary.operation_log
print(f" Sync service statistics:")
print(f" • Total operations processed: {final_status['stats']['operations_processed']}")
print(f" • Failed operations: {final_status['stats'].get('operations_failed', 0)}")
print(f" • Cloudflare available: {final_status['cloudflare_available']}")
print(f"\n Mock Cloudflare operations log:")
store_count = len([op for op in final_mock_ops if op[0] == 'store'])
delete_count = len([op for op in final_mock_ops if op[0] == 'delete'])
update_count = len([op for op in final_mock_ops if op[0] == 'update'])
print(f" • Store operations: {store_count}")
print(f" • Delete operations: {delete_count}")
print(f" • Update operations: {update_count}")
print(f" • Total operations: {len(final_mock_ops) - 2}") # Exclude init and close
# Verify memory counts match
primary_count = len(await storage.primary.get_all_memories())
secondary_count = len(storage.secondary.memories)
print(f"\n Memory count verification:")
print(f" • Primary (SQLite-vec): {primary_count}")
print(f" • Secondary (Mock CF): {secondary_count}")
print(f" • Match: {'✅ YES' if primary_count == secondary_count else '❌ NO'}")
await storage.close()
print("\n" + "=" * 60)
print("🎉 BACKGROUND SYNC VERIFICATION COMPLETE")
print("\nSummary: The hybrid storage backend is working correctly!")
print(" ✅ Store operations are queued for background sync")
print(" ✅ Automatic sync processes operations in batches")
print(" ✅ Delete operations are synced to secondary")
print(" ✅ Force sync ensures complete synchronization")
print(" ✅ Both backends maintain consistency")
finally:
if os.path.exists(db_path):
os.unlink(db_path)
if __name__ == "__main__":
asyncio.run(verify_sync())
```
--------------------------------------------------------------------------------
/tests/integration/test_server_handlers.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for MCP handler methods in server.py.
These tests verify that the MCP handlers correctly transform MemoryService
responses to MCP TextContent format, particularly after the fix for issue #198.
"""
import pytest
from mcp import types
from mcp_memory_service.server import MemoryServer
class TestHandleStoreMemory:
"""Test suite for handle_store_memory MCP handler."""
@pytest.mark.asyncio
async def test_store_memory_success(self):
"""Test storing a valid memory returns success message with hash."""
server = MemoryServer()
result = await server.handle_store_memory({
"content": "Test memory content for integration test",
"metadata": {
"tags": ["test", "integration"],
"type": "note"
}
})
# Verify result structure
assert isinstance(result, list)
assert len(result) == 1
assert isinstance(result[0], types.TextContent)
# Verify success message
text = result[0].text
assert "successfully" in text.lower()
assert "hash:" in text.lower()
assert "..." in text # Hash should be truncated
@pytest.mark.asyncio
async def test_store_memory_chunked(self):
"""Test storing long content creates multiple chunks."""
server = MemoryServer()
# Create content that will be auto-split (> 1500 chars)
long_content = "This is a very long memory content. " * 100
result = await server.handle_store_memory({
"content": long_content,
"metadata": {"tags": ["test"], "type": "note"}
})
# Verify result structure
assert isinstance(result, list)
assert len(result) == 1
assert isinstance(result[0], types.TextContent)
# Verify chunked message
text = result[0].text
assert "chunk" in text.lower()
assert "successfully" in text.lower()
@pytest.mark.asyncio
async def test_store_memory_empty_content(self):
"""Test storing empty content returns error."""
server = MemoryServer()
result = await server.handle_store_memory({
"content": "",
"metadata": {}
})
# Verify error message
assert isinstance(result, list)
assert len(result) == 1
text = result[0].text
assert "error" in text.lower()
assert "required" in text.lower()
@pytest.mark.asyncio
async def test_store_memory_missing_content(self):
"""Test storing without content parameter returns error."""
server = MemoryServer()
result = await server.handle_store_memory({
"metadata": {"tags": ["test"]}
})
# Verify error message
assert isinstance(result, list)
assert len(result) == 1
text = result[0].text
assert "error" in text.lower()
@pytest.mark.asyncio
async def test_store_memory_with_tags_string(self):
"""Test storing memory with tags as string (not array)."""
server = MemoryServer()
result = await server.handle_store_memory({
"content": "Test with string tags",
"metadata": {
"tags": "test,integration,string-tags",
"type": "note"
}
})
# Should succeed - MemoryService handles string tags
assert isinstance(result, list)
assert len(result) == 1
text = result[0].text
assert "successfully" in text.lower()
@pytest.mark.asyncio
async def test_store_memory_default_type(self):
"""Test storing memory without explicit type uses default."""
server = MemoryServer()
result = await server.handle_store_memory({
"content": "Memory without explicit type",
"metadata": {"tags": ["test"]}
})
# Should succeed with default type
assert isinstance(result, list)
assert len(result) == 1
text = result[0].text
assert "successfully" in text.lower()
class TestHandleRetrieveMemory:
"""Test suite for handle_retrieve_memory MCP handler."""
@pytest.mark.asyncio
async def test_retrieve_memory_success(self):
"""Test retrieving memories with valid query."""
server = MemoryServer()
# First store a memory
await server.handle_store_memory({
"content": "Searchable test memory for retrieval",
"metadata": {"tags": ["retrieval-test"], "type": "note"}
})
# Now retrieve it
result = await server.handle_retrieve_memory({
"query": "searchable test memory",
"n_results": 5
})
# Verify result structure
assert isinstance(result, list)
assert len(result) == 1
assert isinstance(result[0], types.TextContent)
# Should contain memory data (JSON format)
text = result[0].text
assert "searchable test memory" in text.lower() or "retrieval-test" in text.lower()
@pytest.mark.asyncio
async def test_retrieve_memory_missing_query(self):
"""Test retrieving without query parameter returns error."""
server = MemoryServer()
result = await server.handle_retrieve_memory({
"n_results": 5
})
# Verify error message
assert isinstance(result, list)
assert len(result) == 1
text = result[0].text
assert "error" in text.lower()
assert "query" in text.lower()
class TestHandleSearchByTag:
"""Test suite for handle_search_by_tag MCP handler."""
@pytest.mark.asyncio
async def test_search_by_tag_success(self):
"""Test searching by tag returns matching memories."""
server = MemoryServer()
# Store a memory with specific tag
await server.handle_store_memory({
"content": "Memory with unique tag for search",
"metadata": {"tags": ["unique-search-tag"], "type": "note"}
})
# Search by tag
result = await server.handle_search_by_tag({
"tags": ["unique-search-tag"]
})
# Verify result structure
assert isinstance(result, list)
assert len(result) == 1
assert isinstance(result[0], types.TextContent)
# Should contain memory data
text = result[0].text
assert "unique-search-tag" in text.lower() or "memory with unique tag" in text.lower()
@pytest.mark.asyncio
async def test_search_by_tag_missing_tags(self):
"""Test searching without tags parameter returns error."""
server = MemoryServer()
result = await server.handle_search_by_tag({})
# Verify error message
assert isinstance(result, list)
assert len(result) == 1
text = result[0].text
assert "error" in text.lower()
assert "tags" in text.lower()
# Regression test for issue #198
class TestIssue198Regression:
"""Regression tests specifically for issue #198 - Response format bug."""
@pytest.mark.asyncio
async def test_no_keyerror_on_store_success(self):
"""Verify fix for issue #198: No KeyError on successful store."""
server = MemoryServer()
# This would previously raise KeyError: 'message'
result = await server.handle_store_memory({
"content": "Test for issue 198 regression",
"metadata": {"tags": ["issue-198"], "type": "test"}
})
# Should return success message without KeyError
assert isinstance(result, list)
assert len(result) == 1
assert "successfully" in result[0].text.lower()
# Should NOT contain the string "message" (old buggy behavior)
assert result[0].text != "Error storing memory: 'message'"
@pytest.mark.asyncio
async def test_error_handling_without_keyerror(self):
"""Verify fix for issue #198: Errors handled without KeyError."""
server = MemoryServer()
# Store with empty content (triggers error path)
result = await server.handle_store_memory({
"content": "",
"metadata": {}
})
# Should return error message without KeyError
assert isinstance(result, list)
assert len(result) == 1
assert "error" in result[0].text.lower()
# Should NOT be KeyError message
assert "'message'" not in result[0].text
```
--------------------------------------------------------------------------------
/.github/workflows/dev-setup-validation.yml:
--------------------------------------------------------------------------------
```yaml
name: Development Setup Validation
# Test the development setup procedures and stale venv prevention mechanisms
on:
push:
branches: [ main, develop, release/** ]
paths:
- 'scripts/validation/check_dev_setup.py'
- 'scripts/installation/install.py'
- 'scripts/hooks/pre-commit'
- 'src/mcp_memory_service/__init__.py'
- 'pyproject.toml'
- '.github/workflows/dev-setup-validation.yml'
pull_request:
branches: [ main, develop ]
paths:
- 'scripts/validation/check_dev_setup.py'
- 'scripts/installation/install.py'
- 'scripts/hooks/pre-commit'
- 'src/mcp_memory_service/__init__.py'
- 'pyproject.toml'
workflow_dispatch:
jobs:
test-editable-install:
name: Test Editable Install Detection
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Test editable install workflow
run: |
# Create virtual environment
python -m venv test_venv
source test_venv/bin/activate
# Install in editable mode
pip install -e .
# Verify editable install is detected
python scripts/validation/check_dev_setup.py
# Should exit 0 (success)
if [ $? -ne 0 ]; then
echo "ERROR: Editable install not detected correctly"
exit 1
fi
echo "✅ Editable install detection works correctly"
- name: Verify version consistency check
run: |
source test_venv/bin/activate
# Get source version
SOURCE_VERSION=$(grep '__version__' src/mcp_memory_service/__init__.py | cut -d'"' -f2)
# Get installed version
INSTALLED_VERSION=$(python -c "import mcp_memory_service; print(mcp_memory_service.__version__)")
echo "Source version: $SOURCE_VERSION"
echo "Installed version: $INSTALLED_VERSION"
if [ "$SOURCE_VERSION" != "$INSTALLED_VERSION" ]; then
echo "ERROR: Version mismatch despite editable install"
exit 1
fi
echo "✅ Version consistency check passed"
test-non-editable-detection:
name: Test Non-Editable Install Detection
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Test non-editable install detection
run: |
# Create virtual environment
python -m venv bad_venv
source bad_venv/bin/activate
# Install WITHOUT editable mode (this is the problem case)
pip install .
# Run detection script - should FAIL (exit 1)
EXIT_CODE=0
python scripts/validation/check_dev_setup.py || EXIT_CODE=$?
# We expect failure (exit 1) because it's not editable
if [ $EXIT_CODE -eq 0 ]; then
echo "ERROR: Non-editable install was not detected as a problem"
exit 1
fi
echo "✅ Non-editable install correctly detected as problematic"
test-version-mismatch-detection:
name: Test Version Mismatch Detection
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Test version mismatch scenario
run: |
# Create virtual environment
python -m venv mismatch_venv
source mismatch_venv/bin/activate
# Install current version
pip install .
# Simulate version change in source (the stale venv scenario)
# Save original version
ORIGINAL_VERSION=$(grep '__version__' src/mcp_memory_service/__init__.py)
# Change source version temporarily
sed -i 's/__version__ = ".*"/__version__ = "99.99.99"/' src/mcp_memory_service/__init__.py
# Run detection script - should FAIL because versions don't match
EXIT_CODE=0
python scripts/validation/check_dev_setup.py || EXIT_CODE=$?
# Restore original version
echo "$ORIGINAL_VERSION" | sed 's/.*\(__version__.*\)/\1/' > temp_version
sed -i "s/__version__ = .*$/$(cat temp_version)/" src/mcp_memory_service/__init__.py
rm temp_version
# We expect failure (exit 1) because of version mismatch
if [ $EXIT_CODE -eq 0 ]; then
echo "ERROR: Version mismatch was not detected"
exit 1
fi
echo "✅ Version mismatch correctly detected"
test-install-py-developer-detection:
name: Test install.py Developer Detection
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Verify .git directory is present
run: |
if [ ! -d ".git" ]; then
echo "ERROR: .git directory not found (developer detection won't work)"
exit 1
fi
echo "✅ .git directory present for developer detection"
- name: Test developer context detection
run: |
# Create test script to check if developer detection works
python3 << 'EOF'
import sys
sys.path.insert(0, 'scripts/installation')
# Import the install script's detection function
import install
# Test developer detection
is_dev = install.detect_development_context()
if not is_dev:
print("ERROR: Developer context not detected despite .git directory")
sys.exit(1)
print("✅ Developer context detection works correctly")
EOF
test-runtime-version-warning:
name: Test Runtime Version Warning
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Test version check function
run: |
# Create virtual environment
python -m venv runtime_venv
source runtime_venv/bin/activate
# Install in editable mode
pip install -e .
# Test the runtime version check function
python3 << 'EOF'
from mcp_memory_service.server import check_version_consistency
import logging
# Set up logging to see warnings
logging.basicConfig(level=logging.WARNING)
print("Testing version check function...")
check_version_consistency()
print("✅ Version check function executed without errors")
EOF
summary:
name: Validation Summary
runs-on: ubuntu-latest
needs: [test-editable-install, test-non-editable-detection, test-version-mismatch-detection, test-install-py-developer-detection, test-runtime-version-warning]
if: always()
steps:
- name: Check all tests passed
run: |
echo "Development Setup Validation Results:"
echo "======================================"
if [ "${{ needs.test-editable-install.result }}" == "success" ] && \
[ "${{ needs.test-non-editable-detection.result }}" == "success" ] && \
[ "${{ needs.test-version-mismatch-detection.result }}" == "success" ] && \
[ "${{ needs.test-install-py-developer-detection.result }}" == "success" ] && \
[ "${{ needs.test-runtime-version-warning.result }}" == "success" ]; then
echo "✅ All development setup validation tests passed!"
exit 0
else
echo "❌ Some validation tests failed"
echo "Editable Install: ${{ needs.test-editable-install.result }}"
echo "Non-Editable Detection: ${{ needs.test-non-editable-detection.result }}"
echo "Version Mismatch Detection: ${{ needs.test-version-mismatch-detection.result }}"
echo "install.py Developer Detection: ${{ needs.test-install-py-developer-detection.result }}"
echo "Runtime Version Warning: ${{ needs.test-runtime-version-warning.result }}"
exit 1
fi
```
--------------------------------------------------------------------------------
/src/mcp_memory_service/web/api/consolidation.py:
--------------------------------------------------------------------------------
```python
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Consolidation API endpoints for HTTP server.
Provides RESTful HTTP access to memory consolidation operations
including manual triggers and scheduler status queries.
"""
import logging
from typing import Dict, Any, Optional
from datetime import datetime
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/consolidation", tags=["consolidation"])
class ConsolidationRequest(BaseModel):
"""Request model for triggering consolidation."""
time_horizon: str = Field(
default="weekly",
description="Time horizon for consolidation (daily, weekly, monthly, quarterly, yearly)"
)
class ConsolidationResponse(BaseModel):
"""Response model for consolidation operations."""
status: str = Field(description="Operation status (completed, running, failed)")
horizon: str = Field(description="Time horizon that was consolidated")
processed: int = Field(description="Number of memories processed")
compressed: int = Field(description="Number of memories compressed")
forgotten: int = Field(description="Number of memories forgotten/archived")
duration: float = Field(description="Operation duration in seconds")
class SchedulerStatusResponse(BaseModel):
"""Response model for scheduler status."""
running: bool = Field(description="Whether scheduler is active")
next_daily: Optional[str] = Field(None, description="Next daily run time (ISO format)")
next_weekly: Optional[str] = Field(None, description="Next weekly run time (ISO format)")
next_monthly: Optional[str] = Field(None, description="Next monthly run time (ISO format)")
jobs_executed: int = Field(description="Total successful jobs executed")
jobs_failed: int = Field(description="Total failed jobs")
class RecommendationsResponse(BaseModel):
"""Response model for consolidation recommendations."""
recommendation: str = Field(description="Recommendation status")
memory_count: int = Field(description="Total memories in system")
reasons: list[str] = Field(description="List of recommendation reasons")
estimated_duration: float = Field(description="Estimated duration in seconds")
@router.post("/trigger", response_model=ConsolidationResponse)
async def trigger_consolidation(request: ConsolidationRequest) -> Dict[str, Any]:
"""
Trigger a consolidation operation manually.
This endpoint initiates a consolidation run for the specified time horizon.
The operation runs asynchronously and returns immediately with the result.
Args:
request: ConsolidationRequest with time_horizon
Returns:
ConsolidationResponse with operation metrics
Raises:
HTTPException: If consolidation fails or is not available
Example:
POST /api/consolidation/trigger
{
"time_horizon": "weekly"
}
Response:
{
"status": "completed",
"horizon": "weekly",
"processed": 2418,
"compressed": 156,
"forgotten": 43,
"duration": 24.2
}
"""
try:
from ...api.operations import _consolidate_async
# Call the shared async implementation
result = await _consolidate_async(request.time_horizon)
# Convert to dict for HTTP response
return result._asdict()
except ValueError as e:
# Invalid time horizon
raise HTTPException(status_code=400, detail=str(e))
except RuntimeError as e:
# Consolidator not available
raise HTTPException(status_code=503, detail=str(e))
except Exception as e:
logger.error(f"Consolidation trigger failed: {e}")
raise HTTPException(status_code=500, detail=f"Consolidation failed: {e}")
@router.get("/status", response_model=SchedulerStatusResponse)
async def get_scheduler_status() -> Dict[str, Any]:
"""
Get consolidation scheduler status and next run times.
Returns information about the scheduler state including next
scheduled runs for each time horizon and execution statistics.
Returns:
SchedulerStatusResponse with scheduler state
Example:
GET /api/consolidation/status
Response:
{
"running": true,
"next_daily": "2025-11-10T02:00:00+01:00",
"next_weekly": "2025-11-16T03:00:00+01:00",
"next_monthly": "2025-12-01T04:00:00+01:00",
"jobs_executed": 42,
"jobs_failed": 0
}
"""
try:
from datetime import datetime
from ...api.operations import _scheduler_status_async
# Call the shared async implementation
result = await _scheduler_status_async()
# Convert timestamps to ISO format for HTTP response
status_data = {
"running": result.running,
"next_daily": datetime.fromtimestamp(result.next_daily).isoformat() if result.next_daily else None,
"next_weekly": datetime.fromtimestamp(result.next_weekly).isoformat() if result.next_weekly else None,
"next_monthly": datetime.fromtimestamp(result.next_monthly).isoformat() if result.next_monthly else None,
"jobs_executed": result.jobs_executed,
"jobs_failed": result.jobs_failed
}
return status_data
except Exception as e:
logger.error(f"Failed to get scheduler status: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get status: {e}")
@router.get("/recommendations/{time_horizon}", response_model=RecommendationsResponse)
async def get_recommendations(time_horizon: str) -> Dict[str, Any]:
"""
Get consolidation recommendations for a specific time horizon.
Analyzes the current memory state and provides recommendations
on whether consolidation would be beneficial.
Args:
time_horizon: Time horizon to analyze (daily, weekly, monthly, quarterly, yearly)
Returns:
RecommendationsResponse with recommendation details
Raises:
HTTPException: If analysis fails
Example:
GET /api/consolidation/recommendations/weekly
Response:
{
"recommendation": "CONSOLIDATION_BENEFICIAL",
"memory_count": 2418,
"reasons": [
"Consider running compression to reduce memory usage",
"Many old memories present - consider forgetting/archival",
"Good candidate for association discovery"
],
"estimated_duration": 24.2
}
"""
try:
from ...api.client import get_consolidator
# Validate time horizon
valid_horizons = ['daily', 'weekly', 'monthly', 'quarterly', 'yearly']
if time_horizon not in valid_horizons:
raise HTTPException(
status_code=400,
detail=f"Invalid time_horizon. Must be one of: {', '.join(valid_horizons)}"
)
# Get consolidator instance
consolidator = get_consolidator()
if consolidator is None:
raise HTTPException(
status_code=503,
detail="Consolidator not available. Check server configuration."
)
# Get recommendations
recommendations = await consolidator.get_consolidation_recommendations(time_horizon)
return {
"recommendation": recommendations.get("recommendation", "UNKNOWN"),
"memory_count": recommendations.get("memory_count", 0),
"reasons": recommendations.get("reasons", []),
"estimated_duration": recommendations.get("estimated_duration_seconds", 0.0)
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get recommendations: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get recommendations: {e}")
```
--------------------------------------------------------------------------------
/scripts/server/run_http_server.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run the MCP Memory Service HTTP server.
This script starts the FastAPI server with uvicorn.
"""
import os
import sys
import logging
import asyncio
import tempfile
import subprocess
from datetime import datetime, timedelta
# Add the src directory to the Python path
sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)), 'src'))
def generate_self_signed_cert():
"""Generate a self-signed certificate for development."""
try:
# Create temporary directory for certificates
cert_dir = os.path.join(tempfile.gettempdir(), 'mcp-memory-certs')
os.makedirs(cert_dir, exist_ok=True)
cert_file = os.path.join(cert_dir, 'cert.pem')
key_file = os.path.join(cert_dir, 'key.pem')
# Check if certificates already exist and are still valid
if os.path.exists(cert_file) and os.path.exists(key_file):
try:
# Check certificate expiration
result = subprocess.run([
'openssl', 'x509', '-in', cert_file, '-noout', '-enddate'
], capture_output=True, text=True, check=True)
# Parse expiration date
end_date_str = result.stdout.split('=')[1].strip()
end_date = datetime.strptime(end_date_str, '%b %d %H:%M:%S %Y %Z')
# If certificate expires in more than 7 days, reuse it
if end_date > datetime.now() + timedelta(days=7):
print(f"Using existing self-signed certificate: {cert_file}")
return cert_file, key_file
except Exception:
pass # Fall through to generate new certificate
print("Generating self-signed certificate for HTTPS...")
# Generate private key
subprocess.run([
'openssl', 'genrsa', '-out', key_file, '2048'
], check=True, capture_output=True)
# Generate certificate with Subject Alternative Names for better compatibility
# Get local IP addresses dynamically
import socket
local_ips = []
try:
# Get primary local IP
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
local_ips.append(local_ip)
except Exception:
pass
# Build SAN list with common names and detected IPs
san_entries = [
"DNS:memory.local",
"DNS:localhost",
"DNS:*.local",
"IP:127.0.0.1",
"IP:::1" # IPv6 localhost
]
# Add detected local IPs
for ip in local_ips:
if ip not in ["127.0.0.1"]:
san_entries.append(f"IP:{ip}")
# Add additional IPs from environment variable if specified
additional_ips = os.getenv('MCP_SSL_ADDITIONAL_IPS', '')
if additional_ips:
for ip in additional_ips.split(','):
ip = ip.strip()
if ip and ip not in [entry.split(':')[1] for entry in san_entries if entry.startswith('IP:')]:
san_entries.append(f"IP:{ip}")
# Add additional hostnames from environment variable if specified
additional_hostnames = os.getenv('MCP_SSL_ADDITIONAL_HOSTNAMES', '')
if additional_hostnames:
for hostname in additional_hostnames.split(','):
hostname = hostname.strip()
if hostname and f"DNS:{hostname}" not in san_entries:
san_entries.append(f"DNS:{hostname}")
san_string = ",".join(san_entries)
print(f"Generating certificate with SANs: {san_string}")
subprocess.run([
'openssl', 'req', '-new', '-x509', '-key', key_file, '-out', cert_file,
'-days', '365', '-subj', '/C=US/ST=Local/L=Local/O=MCP Memory Service/CN=memory.local',
'-addext', f'subjectAltName={san_string}'
], check=True, capture_output=True)
print(f"Generated self-signed certificate: {cert_file}")
print("WARNING: This is a development certificate. Use proper certificates in production.")
return cert_file, key_file
except subprocess.CalledProcessError as e:
print(f"Error generating certificate: {e}")
print("Make sure OpenSSL is installed and available in PATH")
return None, None
except Exception as e:
print(f"Unexpected error generating certificate: {e}")
return None, None
def main():
"""Run the HTTP server."""
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Set default environment variables for testing
os.environ.setdefault('MCP_HTTP_ENABLED', 'true')
# Don't override MCP_MEMORY_STORAGE_BACKEND - respect .env and environment settings
# os.environ.setdefault('MCP_MEMORY_STORAGE_BACKEND', 'sqlite_vec')
os.environ.setdefault('LOG_LEVEL', 'INFO')
try:
import uvicorn
from mcp_memory_service.web.app import app
from mcp_memory_service.config import (
HTTP_HOST, HTTP_PORT, HTTPS_ENABLED, SSL_CERT_FILE, SSL_KEY_FILE
)
# SSL configuration
ssl_keyfile = None
ssl_certfile = None
protocol = "http"
if HTTPS_ENABLED:
protocol = "https"
if SSL_CERT_FILE and SSL_KEY_FILE:
# Use provided certificates
if os.path.exists(SSL_CERT_FILE) and os.path.exists(SSL_KEY_FILE):
ssl_certfile = SSL_CERT_FILE
ssl_keyfile = SSL_KEY_FILE
print(f"Using provided SSL certificates: {SSL_CERT_FILE}")
else:
print(f"Error: Provided SSL certificates not found!")
print(f"Cert file: {SSL_CERT_FILE}")
print(f"Key file: {SSL_KEY_FILE}")
sys.exit(1)
else:
# Generate self-signed certificate
ssl_certfile, ssl_keyfile = generate_self_signed_cert()
if not ssl_certfile or not ssl_keyfile:
print("Failed to generate SSL certificate. Falling back to HTTP.")
protocol = "http"
ssl_certfile = ssl_keyfile = None
# Display startup information
host_display = HTTP_HOST if HTTP_HOST != '0.0.0.0' else 'localhost'
print(f"Starting MCP Memory Service {protocol.upper()} server on {HTTP_HOST}:{HTTP_PORT}")
print(f"Dashboard: {protocol}://{host_display}:{HTTP_PORT}")
print(f"API Docs: {protocol}://{host_display}:{HTTP_PORT}/api/docs")
if protocol == "https":
print(f"SSL Certificate: {ssl_certfile}")
print(f"SSL Key: {ssl_keyfile}")
print("NOTE: Browsers may show security warnings for self-signed certificates")
print("Press Ctrl+C to stop")
# Start uvicorn server
uvicorn_kwargs = {
"app": app,
"host": HTTP_HOST,
"port": HTTP_PORT,
"log_level": "info",
"access_log": True
}
if ssl_certfile and ssl_keyfile:
uvicorn_kwargs["ssl_certfile"] = ssl_certfile
uvicorn_kwargs["ssl_keyfile"] = ssl_keyfile
uvicorn.run(**uvicorn_kwargs)
except ImportError as e:
print(f"Error: Missing dependencies. Please run 'python install.py' first.")
print(f"Details: {e}")
sys.exit(1)
except Exception as e:
print(f"Error starting server: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/scripts/sync/import_memories.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
# Copyright 2024 Heinrich Krupp
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Import memories from JSON exports into SQLite-vec database.
This script imports memories from one or more JSON export files into
a central SQLite-vec database, handling deduplication and preserving
original timestamps and metadata.
"""
import asyncio
import sys
import logging
import argparse
import json
from pathlib import Path
from datetime import datetime
# Add project src to path
project_root = Path(__file__).parent.parent.parent
sys.path.insert(0, str(project_root / "src"))
from mcp_memory_service.storage.sqlite_vec import SqliteVecMemoryStorage
from mcp_memory_service.sync.importer import MemoryImporter
from mcp_memory_service.config import SQLITE_VEC_PATH, STORAGE_BACKEND
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def get_default_db_path() -> Path:
"""Get the default database path for this platform."""
if STORAGE_BACKEND == 'sqlite_vec' and SQLITE_VEC_PATH:
return Path(SQLITE_VEC_PATH)
else:
# Fallback to BASE_DIR if not using sqlite_vec backend
from mcp_memory_service.config import BASE_DIR
return Path(BASE_DIR) / "sqlite_vec.db"
async def import_memories(
json_files: list,
db_path: Path,
deduplicate: bool = True,
add_source_tags: bool = True,
dry_run: bool = False
):
"""Import memories from JSON files into database."""
logger.info(f"Starting memory import to {db_path}")
logger.info(f"JSON files: {[str(f) for f in json_files]}")
# Validate input files
for json_file in json_files:
if not json_file.exists():
logger.error(f"JSON file not found: {json_file}")
return False
# Quick validation of JSON format
try:
with open(json_file, 'r') as f:
data = json.load(f)
if "export_metadata" not in data or "memories" not in data:
logger.error(f"Invalid export format in {json_file}")
return False
except Exception as e:
logger.error(f"Error reading {json_file}: {str(e)}")
return False
try:
# Initialize storage
logger.info("Initializing SQLite-vec storage...")
storage = SqliteVecMemoryStorage(str(db_path))
await storage.initialize()
# Create importer
importer = MemoryImporter(storage)
# Show analysis first
logger.info("Analyzing import files...")
analysis = await importer.analyze_import(json_files)
logger.info(f"Import Analysis:")
logger.info(f" Total memories to process: {analysis['total_memories']}")
logger.info(f" Unique memories: {analysis['unique_memories']}")
logger.info(f" Potential duplicates: {analysis['potential_duplicates']}")
logger.info(f" Import conflicts: {len(analysis['conflicts'])}")
logger.info(f" Sources:")
for source, stats in analysis['sources'].items():
logger.info(f" {source}: {stats['new_memories']}/{stats['total_memories']} new memories")
if analysis['conflicts']:
logger.warning(f"Found {len(analysis['conflicts'])} conflicts between import files")
# Ask for confirmation if not dry run
if not dry_run:
logger.info("")
response = input("Proceed with import? (y/N): ")
if response.lower() != 'y':
logger.info("Import cancelled by user")
return False
# Perform import
logger.info(f"{'[DRY RUN] ' if dry_run else ''}Starting import...")
result = await importer.import_from_json(
json_files=json_files,
deduplicate=deduplicate,
add_source_tags=add_source_tags,
dry_run=dry_run
)
# Show results
logger.info(f"Import {'simulation ' if dry_run else ''}completed!")
logger.info(f" Files processed: {result['files_processed']}")
logger.info(f" Total processed: {result['total_processed']}")
logger.info(f" Successfully imported: {result['imported']}")
logger.info(f" Duplicates skipped: {result['duplicates_skipped']}")
logger.info(f" Errors: {result['errors']}")
logger.info(f" Source breakdown:")
for source, stats in result['sources'].items():
logger.info(f" {source}: {stats['imported']}/{stats['total']} imported, {stats['duplicates']} duplicates")
if not dry_run and result['imported'] > 0:
# Show next steps
logger.info("")
logger.info("Next steps:")
logger.info("1. Verify the imported memories using the web interface or API")
logger.info("2. Set up Litestream for ongoing synchronization")
logger.info("3. Configure replica nodes to sync from this central database")
return result['errors'] == 0
except Exception as e:
logger.error(f"Import failed: {str(e)}")
return False
async def main():
"""Main function."""
parser = argparse.ArgumentParser(
description="Import memories from JSON exports into SQLite-vec database",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Import single JSON file
python import_memories.py windows_export.json
# Import multiple JSON files
python import_memories.py windows_export.json macbook_export.json
# Import to specific database
python import_memories.py --db-path /path/to/sqlite_vec.db exports/*.json
# Dry run to see what would be imported
python import_memories.py --dry-run exports/*.json
# Import without deduplication (allow duplicates)
python import_memories.py --no-deduplicate exports/*.json
# Import without adding source tags
python import_memories.py --no-source-tags exports/*.json
"""
)
parser.add_argument(
"json_files",
nargs="+",
type=Path,
help="JSON export files to import"
)
parser.add_argument(
"--db-path",
type=Path,
default=get_default_db_path(),
help=f"Path to SQLite-vec database (default: {get_default_db_path()})"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Analyze imports without actually storing data"
)
parser.add_argument(
"--no-deduplicate",
action="store_true",
help="Allow duplicate memories (don't skip based on content hash)"
)
parser.add_argument(
"--no-source-tags",
action="store_true",
help="Don't add source machine tags to imported memories"
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Show configuration
logger.info("Memory Import Configuration:")
logger.info(f" Database: {args.db_path}")
logger.info(f" JSON files: {[str(f) for f in args.json_files]}")
logger.info(f" Dry run: {args.dry_run}")
logger.info(f" Deduplicate: {not args.no_deduplicate}")
logger.info(f" Add source tags: {not args.no_source_tags}")
logger.info("")
# Validate JSON files exist
missing_files = [f for f in args.json_files if not f.exists()]
if missing_files:
logger.error(f"Missing JSON files: {missing_files}")
sys.exit(1)
# Run import
success = await import_memories(
json_files=args.json_files,
db_path=args.db_path,
deduplicate=not args.no_deduplicate,
add_source_tags=not args.no_source_tags,
dry_run=args.dry_run
)
sys.exit(0 if success else 1)
if __name__ == "__main__":
asyncio.run(main())
```