#
tokens: 49439/50000 21/435 files (page 5/29)
lines: off (toggle) GitHub
raw markdown copy
This is page 5 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── feature_request.md
│   │   ├── question.md
│   │   └── security_report.md
│   ├── pull_request_template.md
│   └── workflows
│       ├── claude-code-review.yml
│       └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│   ├── launch.json
│   └── settings.json
├── alembic
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       ├── 001_initial_schema.py
│       ├── 003_add_performance_indexes.py
│       ├── 006_rename_metadata_columns.py
│       ├── 008_performance_optimization_indexes.py
│       ├── 009_rename_to_supply_demand.py
│       ├── 010_self_contained_schema.py
│       ├── 011_remove_proprietary_terms.py
│       ├── 013_add_backtest_persistence_models.py
│       ├── 014_add_portfolio_models.py
│       ├── 08e3945a0c93_merge_heads.py
│       ├── 9374a5c9b679_merge_heads_for_testing.py
│       ├── abf9b9afb134_merge_multiple_heads.py
│       ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│       ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│       ├── f0696e2cac15_add_essential_performance_indexes.py
│       └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── api
│   │   └── backtesting.md
│   ├── BACKTESTING.md
│   ├── COST_BASIS_SPECIFICATION.md
│   ├── deep_research_agent.md
│   ├── exa_research_testing_strategy.md
│   ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│   ├── PORTFOLIO.md
│   ├── SETUP_SELF_CONTAINED.md
│   └── speed_testing_framework.md
├── examples
│   ├── complete_speed_validation.py
│   ├── deep_research_integration.py
│   ├── llm_optimization_example.py
│   ├── llm_speed_demo.py
│   ├── monitoring_example.py
│   ├── parallel_research_example.py
│   ├── speed_optimization_demo.py
│   └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│   ├── __init__.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── circuit_breaker.py
│   │   ├── deep_research.py
│   │   ├── market_analysis.py
│   │   ├── optimized_research.py
│   │   ├── supervisor.py
│   │   └── technical_analysis.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── api_server.py
│   │   ├── connection_manager.py
│   │   ├── dependencies
│   │   │   ├── __init__.py
│   │   │   ├── stock_analysis.py
│   │   │   └── technical_analysis.py
│   │   ├── error_handling.py
│   │   ├── inspector_compatible_sse.py
│   │   ├── inspector_sse.py
│   │   ├── middleware
│   │   │   ├── error_handling.py
│   │   │   ├── mcp_logging.py
│   │   │   ├── rate_limiting_enhanced.py
│   │   │   └── security.py
│   │   ├── openapi_config.py
│   │   ├── routers
│   │   │   ├── __init__.py
│   │   │   ├── agents.py
│   │   │   ├── backtesting.py
│   │   │   ├── data_enhanced.py
│   │   │   ├── data.py
│   │   │   ├── health_enhanced.py
│   │   │   ├── health_tools.py
│   │   │   ├── health.py
│   │   │   ├── intelligent_backtesting.py
│   │   │   ├── introspection.py
│   │   │   ├── mcp_prompts.py
│   │   │   ├── monitoring.py
│   │   │   ├── news_sentiment_enhanced.py
│   │   │   ├── performance.py
│   │   │   ├── portfolio.py
│   │   │   ├── research.py
│   │   │   ├── screening_ddd.py
│   │   │   ├── screening_parallel.py
│   │   │   ├── screening.py
│   │   │   ├── technical_ddd.py
│   │   │   ├── technical_enhanced.py
│   │   │   ├── technical.py
│   │   │   └── tool_registry.py
│   │   ├── server.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   ├── base_service.py
│   │   │   ├── market_service.py
│   │   │   ├── portfolio_service.py
│   │   │   ├── prompt_service.py
│   │   │   └── resource_service.py
│   │   ├── simple_sse.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── insomnia_export.py
│   │       └── postman_export.py
│   ├── application
│   │   ├── __init__.py
│   │   ├── commands
│   │   │   └── __init__.py
│   │   ├── dto
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_dto.py
│   │   ├── queries
│   │   │   ├── __init__.py
│   │   │   └── get_technical_analysis.py
│   │   └── screening
│   │       ├── __init__.py
│   │       ├── dtos.py
│   │       └── queries.py
│   ├── backtesting
│   │   ├── __init__.py
│   │   ├── ab_testing.py
│   │   ├── analysis.py
│   │   ├── batch_processing_stub.py
│   │   ├── batch_processing.py
│   │   ├── model_manager.py
│   │   ├── optimization.py
│   │   ├── persistence.py
│   │   ├── retraining_pipeline.py
│   │   ├── strategies
│   │   │   ├── __init__.py
│   │   │   ├── base.py
│   │   │   ├── ml
│   │   │   │   ├── __init__.py
│   │   │   │   ├── adaptive.py
│   │   │   │   ├── ensemble.py
│   │   │   │   ├── feature_engineering.py
│   │   │   │   └── regime_aware.py
│   │   │   ├── ml_strategies.py
│   │   │   ├── parser.py
│   │   │   └── templates.py
│   │   ├── strategy_executor.py
│   │   ├── vectorbt_engine.py
│   │   └── visualization.py
│   ├── config
│   │   ├── __init__.py
│   │   ├── constants.py
│   │   ├── database_self_contained.py
│   │   ├── database.py
│   │   ├── llm_optimization_config.py
│   │   ├── logging_settings.py
│   │   ├── plotly_config.py
│   │   ├── security_utils.py
│   │   ├── security.py
│   │   ├── settings.py
│   │   ├── technical_constants.py
│   │   ├── tool_estimation.py
│   │   └── validation.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── technical_analysis.py
│   │   └── visualization.py
│   ├── data
│   │   ├── __init__.py
│   │   ├── cache_manager.py
│   │   ├── cache.py
│   │   ├── django_adapter.py
│   │   ├── health.py
│   │   ├── models.py
│   │   ├── performance.py
│   │   ├── session_management.py
│   │   └── validation.py
│   ├── database
│   │   ├── __init__.py
│   │   ├── base.py
│   │   └── optimization.py
│   ├── dependencies.py
│   ├── domain
│   │   ├── __init__.py
│   │   ├── entities
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis.py
│   │   ├── events
│   │   │   └── __init__.py
│   │   ├── portfolio.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   ├── entities.py
│   │   │   ├── services.py
│   │   │   └── value_objects.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_service.py
│   │   ├── stock_analysis
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis_service.py
│   │   └── value_objects
│   │       ├── __init__.py
│   │       └── technical_indicators.py
│   ├── exceptions.py
│   ├── infrastructure
│   │   ├── __init__.py
│   │   ├── cache
│   │   │   └── __init__.py
│   │   ├── caching
│   │   │   ├── __init__.py
│   │   │   └── cache_management_service.py
│   │   ├── connection_manager.py
│   │   ├── data_fetching
│   │   │   ├── __init__.py
│   │   │   └── stock_data_service.py
│   │   ├── health
│   │   │   ├── __init__.py
│   │   │   └── health_checker.py
│   │   ├── persistence
│   │   │   ├── __init__.py
│   │   │   └── stock_repository.py
│   │   ├── providers
│   │   │   └── __init__.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   └── repositories.py
│   │   └── sse_optimizer.py
│   ├── langchain_tools
│   │   ├── __init__.py
│   │   ├── adapters.py
│   │   └── registry.py
│   ├── logging_config.py
│   ├── memory
│   │   ├── __init__.py
│   │   └── stores.py
│   ├── monitoring
│   │   ├── __init__.py
│   │   ├── health_check.py
│   │   ├── health_monitor.py
│   │   ├── integration_example.py
│   │   ├── metrics.py
│   │   ├── middleware.py
│   │   └── status_dashboard.py
│   ├── providers
│   │   ├── __init__.py
│   │   ├── dependencies.py
│   │   ├── factories
│   │   │   ├── __init__.py
│   │   │   ├── config_factory.py
│   │   │   └── provider_factory.py
│   │   ├── implementations
│   │   │   ├── __init__.py
│   │   │   ├── cache_adapter.py
│   │   │   ├── macro_data_adapter.py
│   │   │   ├── market_data_adapter.py
│   │   │   ├── persistence_adapter.py
│   │   │   └── stock_data_adapter.py
│   │   ├── interfaces
│   │   │   ├── __init__.py
│   │   │   ├── cache.py
│   │   │   ├── config.py
│   │   │   ├── macro_data.py
│   │   │   ├── market_data.py
│   │   │   ├── persistence.py
│   │   │   └── stock_data.py
│   │   ├── llm_factory.py
│   │   ├── macro_data.py
│   │   ├── market_data.py
│   │   ├── mocks
│   │   │   ├── __init__.py
│   │   │   ├── mock_cache.py
│   │   │   ├── mock_config.py
│   │   │   ├── mock_macro_data.py
│   │   │   ├── mock_market_data.py
│   │   │   ├── mock_persistence.py
│   │   │   └── mock_stock_data.py
│   │   ├── openrouter_provider.py
│   │   ├── optimized_screening.py
│   │   ├── optimized_stock_data.py
│   │   └── stock_data.py
│   ├── README.md
│   ├── tests
│   │   ├── __init__.py
│   │   ├── README_INMEMORY_TESTS.md
│   │   ├── test_cache_debug.py
│   │   ├── test_fixes_validation.py
│   │   ├── test_in_memory_routers.py
│   │   ├── test_in_memory_server.py
│   │   ├── test_macro_data_provider.py
│   │   ├── test_mailgun_email.py
│   │   ├── test_market_calendar_caching.py
│   │   ├── test_mcp_tool_fixes_pytest.py
│   │   ├── test_mcp_tool_fixes.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_models_functional.py
│   │   ├── test_server.py
│   │   ├── test_stock_data_enhanced.py
│   │   ├── test_stock_data_provider.py
│   │   └── test_technical_analysis.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── performance_monitoring.py
│   │   ├── portfolio_manager.py
│   │   ├── risk_management.py
│   │   └── sentiment_analysis.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── agent_errors.py
│   │   ├── batch_processing.py
│   │   ├── cache_warmer.py
│   │   ├── circuit_breaker_decorators.py
│   │   ├── circuit_breaker_services.py
│   │   ├── circuit_breaker.py
│   │   ├── data_chunking.py
│   │   ├── database_monitoring.py
│   │   ├── debug_utils.py
│   │   ├── fallback_strategies.py
│   │   ├── llm_optimization.py
│   │   ├── logging_example.py
│   │   ├── logging_init.py
│   │   ├── logging.py
│   │   ├── mcp_logging.py
│   │   ├── memory_profiler.py
│   │   ├── monitoring_middleware.py
│   │   ├── monitoring.py
│   │   ├── orchestration_logging.py
│   │   ├── parallel_research.py
│   │   ├── parallel_screening.py
│   │   ├── quick_cache.py
│   │   ├── resource_manager.py
│   │   ├── shutdown.py
│   │   ├── stock_helpers.py
│   │   ├── structured_logger.py
│   │   ├── tool_monitoring.py
│   │   ├── tracing.py
│   │   └── yfinance_pool.py
│   ├── validation
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── data.py
│   │   ├── middleware.py
│   │   ├── portfolio.py
│   │   ├── responses.py
│   │   ├── screening.py
│   │   └── technical.py
│   └── workflows
│       ├── __init__.py
│       ├── agents
│       │   ├── __init__.py
│       │   ├── market_analyzer.py
│       │   ├── optimizer_agent.py
│       │   ├── strategy_selector.py
│       │   └── validator_agent.py
│       ├── backtesting_workflow.py
│       └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── dev.sh
│   ├── INSTALLATION_GUIDE.md
│   ├── load_example.py
│   ├── load_market_data.py
│   ├── load_tiingo_data.py
│   ├── migrate_db.py
│   ├── README_TIINGO_LOADER.md
│   ├── requirements_tiingo.txt
│   ├── run_stock_screening.py
│   ├── run-migrations.sh
│   ├── seed_db.py
│   ├── seed_sp500.py
│   ├── setup_database.sh
│   ├── setup_self_contained.py
│   ├── setup_sp500_database.sh
│   ├── test_seeded_data.py
│   ├── test_tiingo_loader.py
│   ├── tiingo_config.py
│   └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── core
│   │   └── test_technical_analysis.py
│   ├── data
│   │   └── test_portfolio_models.py
│   ├── domain
│   │   ├── conftest.py
│   │   ├── test_portfolio_entities.py
│   │   └── test_technical_analysis_service.py
│   ├── fixtures
│   │   └── orchestration_fixtures.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── README.md
│   │   ├── run_integration_tests.sh
│   │   ├── test_api_technical.py
│   │   ├── test_chaos_engineering.py
│   │   ├── test_config_management.py
│   │   ├── test_full_backtest_workflow_advanced.py
│   │   ├── test_full_backtest_workflow.py
│   │   ├── test_high_volume.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_orchestration_complete.py
│   │   ├── test_portfolio_persistence.py
│   │   ├── test_redis_cache.py
│   │   ├── test_security_integration.py.disabled
│   │   └── vcr_setup.py
│   ├── performance
│   │   ├── __init__.py
│   │   ├── test_benchmarks.py
│   │   ├── test_load.py
│   │   ├── test_profiling.py
│   │   └── test_stress.py
│   ├── providers
│   │   └── test_stock_data_simple.py
│   ├── README.md
│   ├── test_agents_router_mcp.py
│   ├── test_backtest_persistence.py
│   ├── test_cache_management_service.py
│   ├── test_cache_serialization.py
│   ├── test_circuit_breaker.py
│   ├── test_database_pool_config_simple.py
│   ├── test_database_pool_config.py
│   ├── test_deep_research_functional.py
│   ├── test_deep_research_integration.py
│   ├── test_deep_research_parallel_execution.py
│   ├── test_error_handling.py
│   ├── test_event_loop_integrity.py
│   ├── test_exa_research_integration.py
│   ├── test_exception_hierarchy.py
│   ├── test_financial_search.py
│   ├── test_graceful_shutdown.py
│   ├── test_integration_simple.py
│   ├── test_langgraph_workflow.py
│   ├── test_market_data_async.py
│   ├── test_market_data_simple.py
│   ├── test_mcp_orchestration_functional.py
│   ├── test_ml_strategies.py
│   ├── test_optimized_research_agent.py
│   ├── test_orchestration_integration.py
│   ├── test_orchestration_logging.py
│   ├── test_orchestration_tools_simple.py
│   ├── test_parallel_research_integration.py
│   ├── test_parallel_research_orchestrator.py
│   ├── test_parallel_research_performance.py
│   ├── test_performance_optimizations.py
│   ├── test_production_validation.py
│   ├── test_provider_architecture.py
│   ├── test_rate_limiting_enhanced.py
│   ├── test_runner_validation.py
│   ├── test_security_comprehensive.py.disabled
│   ├── test_security_cors.py
│   ├── test_security_enhancements.py.disabled
│   ├── test_security_headers.py
│   ├── test_security_penetration.py
│   ├── test_session_management.py
│   ├── test_speed_optimization_validation.py
│   ├── test_stock_analysis_dependencies.py
│   ├── test_stock_analysis_service.py
│   ├── test_stock_data_fetching_service.py
│   ├── test_supervisor_agent.py
│   ├── test_supervisor_functional.py
│   ├── test_tool_estimation_config.py
│   ├── test_visualization.py
│   └── utils
│       ├── test_agent_errors.py
│       ├── test_logging.py
│       ├── test_parallel_screening.py
│       └── test_quick_cache.py
├── tools
│   ├── check_orchestration_config.py
│   ├── experiments
│   │   ├── validation_examples.py
│   │   └── validation_fixed.py
│   ├── fast_dev.sh
│   ├── hot_reload.py
│   ├── quick_test.py
│   └── templates
│       ├── new_router_template.py
│       ├── new_tool_template.py
│       ├── screening_strategy_template.py
│       └── test_template.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/maverick_mcp/providers/dependencies.py:
--------------------------------------------------------------------------------

```python
"""
Dependency injection utilities for Maverick-MCP.

This module provides dependency injection support for routers and other components,
enabling clean separation of concerns and improved testability.
"""

import logging
from functools import lru_cache

from maverick_mcp.providers.factories.config_factory import ConfigurationFactory
from maverick_mcp.providers.factories.provider_factory import ProviderFactory
from maverick_mcp.providers.interfaces.cache import ICacheManager
from maverick_mcp.providers.interfaces.config import IConfigurationProvider
from maverick_mcp.providers.interfaces.macro_data import IMacroDataProvider
from maverick_mcp.providers.interfaces.market_data import IMarketDataProvider
from maverick_mcp.providers.interfaces.persistence import IDataPersistence
from maverick_mcp.providers.interfaces.stock_data import (
    IStockDataFetcher,
    IStockScreener,
)

logger = logging.getLogger(__name__)

# Global provider factory instance
_provider_factory: ProviderFactory | None = None


def get_provider_factory() -> ProviderFactory:
    """
    Get the global provider factory instance.

    This function implements the singleton pattern to ensure a single
    factory instance is used throughout the application.

    Returns:
        ProviderFactory instance
    """
    global _provider_factory

    if _provider_factory is None:
        config = ConfigurationFactory.auto_detect_config()
        _provider_factory = ProviderFactory(config)
        logger.debug("Global provider factory initialized")

    return _provider_factory


def set_provider_factory(factory: ProviderFactory) -> None:
    """
    Set the global provider factory instance.

    This is primarily used for testing to inject a custom factory.

    Args:
        factory: ProviderFactory instance to use globally
    """
    global _provider_factory
    _provider_factory = factory
    logger.debug("Global provider factory overridden")


def reset_provider_factory() -> None:
    """
    Reset the global provider factory to None.

    This forces re-initialization on the next access, which is useful
    for testing or configuration changes.
    """
    global _provider_factory
    _provider_factory = None
    logger.debug("Global provider factory reset")


# Dependency injection functions for use with FastAPI Depends() or similar


def get_configuration() -> IConfigurationProvider:
    """
    Get configuration provider dependency.

    Returns:
        IConfigurationProvider instance
    """
    return get_provider_factory()._config


def get_cache_manager() -> ICacheManager:
    """
    Get cache manager dependency.

    Returns:
        ICacheManager instance
    """
    return get_provider_factory().get_cache_manager()


def get_persistence() -> IDataPersistence:
    """
    Get persistence layer dependency.

    Returns:
        IDataPersistence instance
    """
    return get_provider_factory().get_persistence()


def get_stock_data_fetcher() -> IStockDataFetcher:
    """
    Get stock data fetcher dependency.

    Returns:
        IStockDataFetcher instance
    """
    return get_provider_factory().get_stock_data_fetcher()


def get_stock_screener() -> IStockScreener:
    """
    Get stock screener dependency.

    Returns:
        IStockScreener instance
    """
    return get_provider_factory().get_stock_screener()


def get_market_data_provider() -> IMarketDataProvider:
    """
    Get market data provider dependency.

    Returns:
        IMarketDataProvider instance
    """
    return get_provider_factory().get_market_data_provider()


def get_macro_data_provider() -> IMacroDataProvider:
    """
    Get macro data provider dependency.

    Returns:
        IMacroDataProvider instance
    """
    return get_provider_factory().get_macro_data_provider()


# Context manager for dependency overrides (useful for testing)


class DependencyOverride:
    """
    Context manager for temporarily overriding dependencies.

    This is primarily useful for testing where you want to inject
    mock implementations for specific test cases.
    """

    def __init__(self, **overrides):
        """
        Initialize dependency override context.

        Args:
            **overrides: Keyword arguments mapping dependency names to override instances
        """
        self.overrides = overrides
        self.original_factory = None
        self.original_providers = {}

    def __enter__(self):
        """Enter the context and apply overrides."""
        global _provider_factory

        # Save original state
        self.original_factory = _provider_factory

        if _provider_factory is not None:
            # Save original provider instances
            for key in self.overrides:
                attr_name = f"_{key}"
                if hasattr(_provider_factory, attr_name):
                    self.original_providers[key] = getattr(_provider_factory, attr_name)

            # Apply overrides
            for key, override in self.overrides.items():
                attr_name = f"_{key}"
                if hasattr(_provider_factory, attr_name):
                    setattr(_provider_factory, attr_name, override)
                else:
                    logger.warning(f"Unknown dependency override: {key}")

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit the context and restore original dependencies."""
        global _provider_factory

        if _provider_factory is not None:
            # Restore original provider instances
            for key, original in self.original_providers.items():
                attr_name = f"_{key}"
                setattr(_provider_factory, attr_name, original)

        # Restore original factory
        _provider_factory = self.original_factory


# Utility functions for testing


def create_test_dependencies(**overrides) -> dict:
    """
    Create a dictionary of test dependencies with optional overrides.

    This is useful for creating dependencies for testing without
    affecting the global state.

    Args:
        **overrides: Keyword arguments for dependency overrides

    Returns:
        Dictionary mapping dependency names to instances
    """
    config = ConfigurationFactory.create_test_config()
    factory = ProviderFactory(config)

    dependencies = {
        "configuration": config,
        "cache_manager": factory.get_cache_manager(),
        "persistence": factory.get_persistence(),
        "stock_data_fetcher": factory.get_stock_data_fetcher(),
        "stock_screener": factory.get_stock_screener(),
        "market_data_provider": factory.get_market_data_provider(),
        "macro_data_provider": factory.get_macro_data_provider(),
    }

    # Apply any overrides
    dependencies.update(overrides)

    return dependencies


def validate_dependencies() -> list[str]:
    """
    Validate that all dependencies are properly configured.

    Returns:
        List of validation errors (empty if valid)
    """
    try:
        factory = get_provider_factory()
        return factory.validate_configuration()
    except Exception as e:
        return [f"Failed to validate dependencies: {e}"]


# Caching decorators for expensive dependency creation


@lru_cache(maxsize=1)
def get_cached_configuration() -> IConfigurationProvider:
    """Get cached configuration provider (singleton)."""
    return get_configuration()


@lru_cache(maxsize=1)
def get_cached_cache_manager() -> ICacheManager:
    """Get cached cache manager (singleton)."""
    return get_cache_manager()


@lru_cache(maxsize=1)
def get_cached_persistence() -> IDataPersistence:
    """Get cached persistence layer (singleton)."""
    return get_persistence()


# Helper functions for router integration


def inject_dependencies(**dependency_overrides):
    """
    Decorator for injecting dependencies into router functions.

    This decorator can be used to automatically inject dependencies
    into router functions without requiring explicit Depends() calls.

    Args:
        **dependency_overrides: Optional dependency overrides

    Returns:
        Decorator function
    """

    def decorator(func):
        def wrapper(*args, **kwargs):
            # Inject dependencies as keyword arguments
            if "stock_data_fetcher" not in kwargs:
                kwargs["stock_data_fetcher"] = dependency_overrides.get(
                    "stock_data_fetcher", get_stock_data_fetcher()
                )

            if "cache_manager" not in kwargs:
                kwargs["cache_manager"] = dependency_overrides.get(
                    "cache_manager", get_cache_manager()
                )

            if "config" not in kwargs:
                kwargs["config"] = dependency_overrides.get(
                    "config", get_configuration()
                )

            return func(*args, **kwargs)

        return wrapper

    return decorator


def get_dependencies_for_testing() -> dict:
    """
    Get a set of dependencies configured for testing.

    Returns:
        Dictionary of test-configured dependencies
    """
    return create_test_dependencies()

```

--------------------------------------------------------------------------------
/maverick_mcp/data/session_management.py:
--------------------------------------------------------------------------------

```python
"""
Enhanced database session management with context managers.

This module provides robust context managers for database session management
that guarantee proper cleanup, automatic rollback on errors, and connection
pool monitoring to prevent connection leaks.

Addresses Issue #55: Implement Proper Database Session Management with Context Managers
"""

import logging
from collections.abc import AsyncGenerator, Generator
from contextlib import asynccontextmanager, contextmanager
from typing import Any

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session

from maverick_mcp.data.models import (
    SessionLocal,
    _get_async_session_factory,
)

logger = logging.getLogger(__name__)


@contextmanager
def get_db_session() -> Generator[Session, None, None]:
    """
    Enhanced sync database session context manager.

    Provides:
    - Automatic session cleanup
    - Auto-commit on success
    - Auto-rollback on exceptions
    - Guaranteed session.close() even if commit/rollback fails

    Usage:
        with get_db_session() as session:
            # Perform database operations
            result = session.query(Model).all()
            # Session is automatically committed and closed

    Returns:
        Database session that will be properly managed

    Raises:
        Exception: Re-raises any database exceptions after rollback
    """
    session = SessionLocal()
    try:
        yield session
        session.commit()
        logger.debug("Database session committed successfully")
    except Exception as e:
        session.rollback()
        logger.warning(f"Database session rolled back due to error: {e}")
        raise
    finally:
        session.close()
        logger.debug("Database session closed")


@asynccontextmanager
async def get_async_db_session() -> AsyncGenerator[AsyncSession, None]:
    """
    Enhanced async database session context manager.

    Provides:
    - Automatic session cleanup for async operations
    - Auto-commit on success
    - Auto-rollback on exceptions
    - Guaranteed session.close() even if commit/rollback fails

    Usage:
        async with get_async_db_session() as session:
            # Perform async database operations
            result = await session.execute(query)
            # Session is automatically committed and closed

    Returns:
        Async database session that will be properly managed

    Raises:
        Exception: Re-raises any database exceptions after rollback
    """
    async_session_factory = _get_async_session_factory()

    async with async_session_factory() as session:
        try:
            yield session
            await session.commit()
            logger.debug("Async database session committed successfully")
        except Exception as e:
            await session.rollback()
            logger.warning(f"Async database session rolled back due to error: {e}")
            raise
        finally:
            await session.close()
            logger.debug("Async database session closed")


@contextmanager
def get_db_session_read_only() -> Generator[Session, None, None]:
    """
    Enhanced sync database session context manager for read-only operations.

    Optimized for read-only operations:
    - No auto-commit (read-only)
    - Rollback on any exception
    - Guaranteed session cleanup

    Usage:
        with get_db_session_read_only() as session:
            # Perform read-only database operations
            result = session.query(Model).all()
            # Session is automatically closed (no commit)

    Returns:
        Database session configured for read-only operations

    Raises:
        Exception: Re-raises any database exceptions after rollback
    """
    session = SessionLocal()
    try:
        yield session
        # No commit for read-only operations
        logger.debug("Read-only database session completed successfully")
    except Exception as e:
        session.rollback()
        logger.warning(f"Read-only database session rolled back due to error: {e}")
        raise
    finally:
        session.close()
        logger.debug("Read-only database session closed")


@asynccontextmanager
async def get_async_db_session_read_only() -> AsyncGenerator[AsyncSession, None]:
    """
    Enhanced async database session context manager for read-only operations.

    Optimized for read-only operations:
    - No auto-commit (read-only)
    - Rollback on any exception
    - Guaranteed session cleanup

    Usage:
        async with get_async_db_session_read_only() as session:
            # Perform read-only async database operations
            result = await session.execute(query)
            # Session is automatically closed (no commit)

    Returns:
        Async database session configured for read-only operations

    Raises:
        Exception: Re-raises any database exceptions after rollback
    """
    async_session_factory = _get_async_session_factory()

    async with async_session_factory() as session:
        try:
            yield session
            # No commit for read-only operations
            logger.debug("Read-only async database session completed successfully")
        except Exception as e:
            await session.rollback()
            logger.warning(
                f"Read-only async database session rolled back due to error: {e}"
            )
            raise
        finally:
            await session.close()
            logger.debug("Read-only async database session closed")


def get_connection_pool_status() -> dict[str, Any]:
    """
    Get current connection pool status for monitoring.

    Returns:
        Dictionary containing pool metrics:
        - pool_size: Current pool size
        - checked_in: Number of connections currently checked in
        - checked_out: Number of connections currently checked out
        - overflow: Number of connections beyond pool_size
        - invalid: Number of invalid connections
    """
    from maverick_mcp.data.models import engine

    pool = engine.pool

    return {
        "pool_size": getattr(pool, "size", lambda: 0)(),
        "checked_in": getattr(pool, "checkedin", lambda: 0)(),
        "checked_out": getattr(pool, "checkedout", lambda: 0)(),
        "overflow": getattr(pool, "overflow", lambda: 0)(),
        "invalid": getattr(pool, "invalid", lambda: 0)(),
        "pool_status": "healthy"
        if getattr(pool, "checkedout", lambda: 0)()
        < getattr(pool, "size", lambda: 10)() * 0.8
        else "warning",
    }


async def get_async_connection_pool_status() -> dict[str, Any]:
    """
    Get current async connection pool status for monitoring.

    Returns:
        Dictionary containing async pool metrics
    """
    from maverick_mcp.data.models import _get_async_engine

    engine = _get_async_engine()
    pool = engine.pool

    return {
        "pool_size": getattr(pool, "size", lambda: 0)(),
        "checked_in": getattr(pool, "checkedin", lambda: 0)(),
        "checked_out": getattr(pool, "checkedout", lambda: 0)(),
        "overflow": getattr(pool, "overflow", lambda: 0)(),
        "invalid": getattr(pool, "invalid", lambda: 0)(),
        "pool_status": "healthy"
        if getattr(pool, "checkedout", lambda: 0)()
        < getattr(pool, "size", lambda: 10)() * 0.8
        else "warning",
    }


def check_connection_pool_health() -> bool:
    """
    Check if connection pool is healthy.

    Returns:
        True if pool is healthy, False if approaching limits
    """
    try:
        status = get_connection_pool_status()
        pool_utilization = (
            status["checked_out"] / status["pool_size"]
            if status["pool_size"] > 0
            else 0
        )

        # Consider unhealthy if > 80% utilization
        if pool_utilization > 0.8:
            logger.warning(f"High connection pool utilization: {pool_utilization:.2%}")
            return False

        # Check for invalid connections
        if status["invalid"] > 0:
            logger.warning(f"Invalid connections detected: {status['invalid']}")
            return False

        return True

    except Exception as e:
        logger.error(f"Failed to check connection pool health: {e}")
        return False


async def check_async_connection_pool_health() -> bool:
    """
    Check if async connection pool is healthy.

    Returns:
        True if pool is healthy, False if approaching limits
    """
    try:
        status = await get_async_connection_pool_status()
        pool_utilization = (
            status["checked_out"] / status["pool_size"]
            if status["pool_size"] > 0
            else 0
        )

        # Consider unhealthy if > 80% utilization
        if pool_utilization > 0.8:
            logger.warning(
                f"High async connection pool utilization: {pool_utilization:.2%}"
            )
            return False

        # Check for invalid connections
        if status["invalid"] > 0:
            logger.warning(f"Invalid async connections detected: {status['invalid']}")
            return False

        return True

    except Exception as e:
        logger.error(f"Failed to check async connection pool health: {e}")
        return False

```

--------------------------------------------------------------------------------
/maverick_mcp/tests/test_mcp_tool_fixes.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test suite to verify the three critical MCP tool fixes are working properly.

This test validates that the fixes for:
1. Research returning empty results (API keys not passed to DeepResearchAgent)
2. Portfolio risk analysis cryptic "'high'" error (DataFrame validation and column case)
3. External API key hard dependency (graceful degradation)

All continue to work correctly after code changes.

## Issues Fixed

### Issue #1: Research Returning Empty Results
- **Root Cause**: API keys weren't passed from settings to DeepResearchAgent constructor
- **Files Modified**:
  - `maverick_mcp/api/routers/research.py:line 35-40` - Added API key parameters
  - `maverick_mcp/providers/llm_factory.py:line 30` - Fixed temperature and streaming
- **Fix**: Pass exa_api_key and tavily_api_key to DeepResearchAgent, fix LLM config

### Issue #2: Portfolio Risk Analysis "'high'" Error
- **Root Cause**: DataFrame column name case mismatch and date range problems
- **Files Modified**: `maverick_mcp/api/routers/portfolio.py:line 66-84`
- **Fixes**:
  - Added DataFrame validation before column access
  - Fixed column name case sensitivity (High/Low/Close vs high/low/close)
  - Used explicit date range to avoid weekend/holiday data fetch issues

### Issue #3: External API Key Hard Dependency
- **Root Cause**: Hard failure when EXTERNAL_DATA_API_KEY not configured
- **Files Modified**: `maverick_mcp/api/routers/data.py:line 244-253`
- **Fix**: Graceful degradation with informative fallback message

## Running This Test

```bash
# Via Makefile (recommended)
make test-fixes

# Direct execution
uv run python maverick_mcp/tests/test_mcp_tool_fixes.py

# Via pytest (if environment allows)
pytest maverick_mcp/tests/test_fixes_validation.py
```

This test should be run after any changes to ensure the MCP tool fixes remain intact.
"""

import asyncio
import os

from maverick_mcp.api.routers.data import get_stock_info
from maverick_mcp.api.routers.portfolio import risk_adjusted_analysis
from maverick_mcp.validation.data import GetStockInfoRequest


def test_portfolio_risk_analysis():
    """
    Test Issue #2: Portfolio risk analysis (formerly returned cryptic 'high' error).

    This test validates:
    - DataFrame is properly retrieved with correct columns
    - Column name case sensitivity is handled correctly
    - Date range calculation avoids weekend/holiday issues
    - Risk calculations complete successfully
    """
    print("🧪 Testing portfolio risk analysis (Issue #2)...")
    try:
        # First test what data we actually get from the provider
        from datetime import UTC, datetime, timedelta

        from maverick_mcp.api.routers.portfolio import stock_provider

        print("   Debugging: Testing data provider directly...")
        end_date = (datetime.now(UTC) - timedelta(days=7)).strftime("%Y-%m-%d")
        start_date = (datetime.now(UTC) - timedelta(days=365)).strftime("%Y-%m-%d")
        df = stock_provider.get_stock_data(
            "MSFT", start_date=start_date, end_date=end_date
        )

        print(f"   DataFrame shape: {df.shape}")
        print(f"   DataFrame columns: {list(df.columns)}")
        print(f"   DataFrame empty: {df.empty}")
        if not df.empty:
            print(f"   Sample data (last 3 rows):\n{df.tail(3)}")

        # Now test the actual function
        result = risk_adjusted_analysis("MSFT", 75.0)
        if "error" in result:
            # If still error, try string conversion
            result = risk_adjusted_analysis("MSFT", "75")
            if "error" in result:
                print(f"❌ Still has error: {result}")
                return False

        print(
            f"✅ Success! Current price: ${result.get('current_price')}, Risk level: {result.get('risk_level')}"
        )
        print(
            f"   Position sizing: ${result.get('position_sizing', {}).get('suggested_position_size')}"
        )
        print(f"   Strategy type: {result.get('analysis', {}).get('strategy_type')}")
        return True
    except Exception as e:
        print(f"❌ Exception: {e}")
        return False


def test_stock_info_external_api():
    """
    Test Issue #3: Stock info requiring EXTERNAL_DATA_API_KEY.

    This test validates:
    - External API dependency is optional
    - Graceful fallback when EXTERNAL_DATA_API_KEY not configured
    - Core stock info functionality still works
    """
    print("\n🧪 Testing stock info external API handling (Issue #3)...")
    try:
        request = GetStockInfoRequest(ticker="MSFT")
        result = get_stock_info(request)
        if "error" in result and "Invalid API key" in str(result.get("error")):
            print(f"❌ Still failing on external API: {result}")
            return False
        else:
            print(f"✅ Success! Company: {result.get('company', {}).get('name')}")
            print(
                f"   Current price: ${result.get('market_data', {}).get('current_price')}"
            )
            return True
    except Exception as e:
        print(f"❌ Exception: {e}")
        return False


async def test_research_empty_results():
    """
    Test Issue #1: Research returning empty results.

    This test validates:
    - DeepResearchAgent is created with API keys from settings
    - Search providers are properly initialized
    - API keys are correctly passed through the configuration chain
    """
    print("\n🧪 Testing research functionality (Issue #1)...")
    try:
        # Import the research function
        from maverick_mcp.api.routers.research import get_research_agent

        # Test that the research agent can be created with API keys
        agent = get_research_agent()

        # Check if API keys are available in environment
        exa_key = os.getenv("EXA_API_KEY")
        tavily_key = os.getenv("TAVILY_API_KEY")

        print(f"   API keys available: EXA={bool(exa_key)}, TAVILY={bool(tavily_key)}")

        # Check if the agent has search providers (indicates API keys were passed correctly)
        if hasattr(agent, "search_providers") and len(agent.search_providers) > 0:
            print(
                f"✅ Research agent created with {len(agent.search_providers)} search providers!"
            )

            # Try to access the provider API keys to verify they're configured
            providers_configured = 0
            for provider in agent.search_providers:
                if hasattr(provider, "api_key") and provider.api_key:
                    providers_configured += 1

            if providers_configured > 0:
                print(
                    f"✅ {providers_configured} search providers have API keys configured"
                )
                return True
            else:
                print("❌ Search providers missing API keys")
                return False
        else:
            print("❌ Research agent has no search providers configured")
            return False
    except Exception as e:
        print(f"❌ Exception: {e}")
        return False


def test_llm_configuration():
    """
    Test LLM configuration fixes.

    This test validates:
    - LLM can be created successfully
    - Temperature and streaming settings are compatible with gpt-5-mini
    - LLM can handle basic queries without errors
    """
    print("\n🧪 Testing LLM configuration...")
    try:
        from maverick_mcp.providers.llm_factory import get_llm

        print("   Creating LLM instance...")
        llm = get_llm()
        print(f"   LLM created: {type(llm).__name__}")

        # Test a simple query to ensure it works
        print("   Testing LLM query...")
        response = llm.invoke("What is 2+2?")
        print(f"✅ LLM response: {response.content}")
        return True
    except Exception as e:
        print(f"❌ LLM test failed: {e}")
        return False


def main():
    """Run comprehensive test suite for MCP tool fixes."""
    print("🚀 Testing MCP Tool Fixes")
    print("=" * 50)

    results = []

    # Test portfolio risk analysis
    results.append(test_portfolio_risk_analysis())

    # Test stock info external API handling
    results.append(test_stock_info_external_api())

    # Test research functionality
    results.append(asyncio.run(test_research_empty_results()))

    # Test LLM configuration
    results.append(test_llm_configuration())

    print("\n" + "=" * 50)
    print("📊 Test Results Summary:")
    print(f"✅ Passed: {sum(results)}/{len(results)}")
    print(f"❌ Failed: {len(results) - sum(results)}/{len(results)}")

    if all(results):
        print("\n🎉 All MCP tool fixes are working correctly!")
        print("\nFixed Issues:")
        print("1. ✅ Research tools return actual content (API keys properly passed)")
        print(
            "2. ✅ Portfolio risk analysis works (DataFrame validation & column case)"
        )
        print("3. ✅ Stock info graceful fallback (external API optional)")
        print("4. ✅ LLM configuration compatible (temperature & streaming)")
    else:
        print("\n⚠️  Some issues remain to be fixed.")
        print("Please check the individual test results above.")

    return all(results)


if __name__ == "__main__":
    import sys

    success = main()
    sys.exit(0 if success else 1)

```

--------------------------------------------------------------------------------
/maverick_mcp/backtesting/visualization.py:
--------------------------------------------------------------------------------

```python
import base64
import io
import logging

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from matplotlib.colors import LinearSegmentedColormap
from matplotlib.figure import Figure

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def set_chart_style(theme: str = "light") -> None:
    """
    Set matplotlib style based on theme.

    Args:
        theme (str): Chart theme, either 'light' or 'dark'
    """
    plt.style.use("seaborn")

    if theme == "dark":
        plt.style.use("dark_background")
        plt.rcParams["axes.facecolor"] = "#1E1E1E"
        plt.rcParams["figure.facecolor"] = "#121212"
        text_color = "white"
    else:
        plt.rcParams["axes.facecolor"] = "white"
        plt.rcParams["figure.facecolor"] = "white"
        text_color = "black"

    plt.rcParams["font.size"] = 10
    plt.rcParams["axes.labelcolor"] = text_color
    plt.rcParams["xtick.color"] = text_color
    plt.rcParams["ytick.color"] = text_color
    plt.rcParams["text.color"] = text_color


def image_to_base64(fig: Figure, dpi: int = 100, max_width: int = 800) -> str:
    """
    Convert matplotlib figure to base64 encoded PNG.

    Args:
        fig (Figure): Matplotlib figure to convert
        dpi (int): Dots per inch for resolution
        max_width (int): Maximum width in pixels

    Returns:
        str: Base64 encoded image
    """
    try:
        # Adjust figure size to maintain aspect ratio
        width, height = fig.get_size_inches()
        aspect_ratio = height / width

        # Resize if wider than max_width
        if width * dpi > max_width:
            width = max_width / dpi
            height = width * aspect_ratio
            fig.set_size_inches(width, height)

        buf = io.BytesIO()
        fig.savefig(buf, format="png", dpi=dpi, bbox_inches="tight")
        buf.seek(0)
        base64_image = base64.b64encode(buf.getvalue()).decode("utf-8")
        plt.close(fig)
        return base64_image
    except Exception as e:
        logger.error(f"Error converting image to base64: {e}")
        return ""


def generate_equity_curve(
    returns: pd.Series,
    drawdown: pd.Series | None = None,
    title: str = "Equity Curve",
    theme: str = "light",
) -> str:
    """
    Generate equity curve with optional drawdown subplot.

    Args:
        returns (pd.Series): Cumulative returns series
        drawdown (pd.Series, optional): Drawdown series
        title (str): Chart title
        theme (str): Chart theme

    Returns:
        str: Base64 encoded image
    """
    set_chart_style(theme)

    try:
        fig, (ax1, ax2) = plt.subplots(
            2, 1, figsize=(10, 6), gridspec_kw={"height_ratios": [3, 1]}
        )

        # Equity curve
        returns.plot(ax=ax1, linewidth=2, color="blue")
        ax1.set_title(title)
        ax1.set_xlabel("")
        ax1.set_ylabel("Cumulative Returns")
        ax1.grid(True, linestyle="--", alpha=0.7)

        # Drawdown subplot
        if drawdown is not None:
            drawdown.plot(ax=ax2, linewidth=2, color="red")
            ax2.set_title("Maximum Drawdown")
            ax2.set_ylabel("Drawdown (%)")
            ax2.grid(True, linestyle="--", alpha=0.7)

        plt.tight_layout()
        return image_to_base64(fig)
    except Exception as e:
        logger.error(f"Error generating equity curve: {e}")
        return ""


def generate_trade_scatter(
    prices: pd.Series,
    trades: pd.DataFrame,
    title: str = "Trade Scatter Plot",
    theme: str = "light",
) -> str:
    """
    Generate trade scatter plot on price chart.

    Args:
        prices (pd.Series): Price series
        trades (pd.DataFrame): Trades DataFrame with entry/exit points
        title (str): Chart title
        theme (str): Chart theme

    Returns:
        str: Base64 encoded image
    """
    set_chart_style(theme)

    try:
        fig, ax = plt.subplots(figsize=(10, 6))

        # Plot price
        prices.plot(ax=ax, linewidth=1, label="Price", color="blue")

        # Plot entry/exit points
        entry_trades = trades[trades["type"] == "entry"]
        exit_trades = trades[trades["type"] == "exit"]

        ax.scatter(
            entry_trades.index,
            entry_trades["price"],
            color="green",
            marker="^",
            label="Entry",
            s=100,
        )
        ax.scatter(
            exit_trades.index,
            exit_trades["price"],
            color="red",
            marker="v",
            label="Exit",
            s=100,
        )

        ax.set_title(title)
        ax.set_xlabel("Date")
        ax.set_ylabel("Price")
        ax.legend()
        ax.grid(True, linestyle="--", alpha=0.7)

        plt.tight_layout()
        return image_to_base64(fig)
    except Exception as e:
        logger.error(f"Error generating trade scatter plot: {e}")
        return ""


def generate_optimization_heatmap(
    param_results: dict[str, dict[str, float]],
    title: str = "Parameter Optimization",
    theme: str = "light",
) -> str:
    """
    Generate heatmap for parameter optimization results.

    Args:
        param_results (Dict): Dictionary of parameter combinations and performance
        title (str): Chart title
        theme (str): Chart theme

    Returns:
        str: Base64 encoded image
    """
    set_chart_style(theme)

    try:
        # Prepare data for heatmap
        params = list(param_results.keys())
        results = [list(result.values()) for result in param_results.values()]

        fig, ax = plt.subplots(figsize=(10, 8))

        # Custom colormap
        cmap = LinearSegmentedColormap.from_list(
            "performance", ["red", "yellow", "green"]
        )

        sns.heatmap(
            results,
            annot=True,
            cmap=cmap,
            xticklabels=params,
            yticklabels=params,
            ax=ax,
            fmt=".2f",
        )

        ax.set_title(title)
        plt.tight_layout()
        return image_to_base64(fig)
    except Exception as e:
        logger.error(f"Error generating optimization heatmap: {e}")
        return ""


def generate_portfolio_allocation(
    allocations: dict[str, float],
    title: str = "Portfolio Allocation",
    theme: str = "light",
) -> str:
    """
    Generate portfolio allocation pie chart.

    Args:
        allocations (Dict): Dictionary of symbol allocations
        title (str): Chart title
        theme (str): Chart theme

    Returns:
        str: Base64 encoded image
    """
    set_chart_style(theme)

    try:
        fig, ax = plt.subplots(figsize=(8, 8))

        symbols = list(allocations.keys())
        weights = list(allocations.values())

        # Color palette
        colors = plt.cm.Pastel1(np.linspace(0, 1, len(symbols)))

        ax.pie(
            weights,
            labels=symbols,
            colors=colors,
            autopct="%1.1f%%",
            startangle=90,
            pctdistance=0.85,
        )
        ax.set_title(title)

        plt.tight_layout()
        return image_to_base64(fig)
    except Exception as e:
        logger.error(f"Error generating portfolio allocation chart: {e}")
        return ""


def generate_strategy_comparison(
    strategies: dict[str, pd.Series],
    title: str = "Strategy Comparison",
    theme: str = "light",
) -> str:
    """
    Generate strategy comparison chart.

    Args:
        strategies (Dict): Dictionary of strategy returns
        title (str): Chart title
        theme (str): Chart theme

    Returns:
        str: Base64 encoded image
    """
    set_chart_style(theme)

    try:
        fig, ax = plt.subplots(figsize=(10, 6))

        for name, returns in strategies.items():
            returns.plot(ax=ax, label=name, linewidth=2)

        ax.set_title(title)
        ax.set_xlabel("Date")
        ax.set_ylabel("Cumulative Returns")
        ax.legend()
        ax.grid(True, linestyle="--", alpha=0.7)

        plt.tight_layout()
        return image_to_base64(fig)
    except Exception as e:
        logger.error(f"Error generating strategy comparison chart: {e}")
        return ""


def generate_performance_dashboard(
    metrics: dict[str, float | str],
    title: str = "Performance Dashboard",
    theme: str = "light",
) -> str:
    """
    Generate performance metrics dashboard as a table image.

    Args:
        metrics (Dict): Dictionary of performance metrics
        title (str): Dashboard title
        theme (str): Chart theme

    Returns:
        str: Base64 encoded image
    """
    set_chart_style(theme)

    try:
        fig, ax = plt.subplots(figsize=(8, 6))
        ax.axis("off")

        # Prepare table data
        metric_names = list(metrics.keys())
        metric_values = [str(val) for val in metrics.values()]

        table = ax.table(
            cellText=[metric_names, metric_values], loc="center", cellLoc="center"
        )
        table.auto_set_font_size(False)
        table.set_fontsize(10)

        ax.set_title(title)
        plt.tight_layout()
        return image_to_base64(fig)
    except Exception as e:
        logger.error(f"Error generating performance dashboard: {e}")
        return ""

```

--------------------------------------------------------------------------------
/maverick_mcp/utils/mcp_logging.py:
--------------------------------------------------------------------------------

```python
"""
Integration of structured logging with FastMCP.

This module provides:
- Automatic request context capture from MCP
- Tool execution logging
- Performance monitoring
- Error tracking
"""

import functools
import time
from collections.abc import Callable
from typing import Any

from fastmcp import Context
from fastmcp.exceptions import ToolError

from .logging import (
    PerformanceMonitor,
    get_logger,
    log_cache_operation,
    log_database_query,
    log_external_api_call,
    request_id_var,
    request_start_var,
    tool_name_var,
    user_id_var,
)


def with_logging(tool_name: str | None = None):
    """
    Decorator for FastMCP tools that adds structured logging.

    Automatically logs:
    - Tool invocation with parameters
    - Execution time
    - Success/failure status
    - Context information (request ID, user)

    Example:
        @mcp.tool()
        @with_logging()
        async def fetch_stock_data(context: Context, ticker: str) -> dict:
            # Tool implementation
            pass
    """

    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # Extract context
            context = None
            for arg in args:
                if isinstance(arg, Context):
                    context = arg
                    break

            # Get tool name
            actual_tool_name = tool_name or func.__name__

            # Set context variables
            if context:
                # Extract request ID from context metadata if available
                request_id = getattr(context, "request_id", None) or str(time.time())
                request_id_var.set(request_id)

                # Extract user info if available
                user_id = getattr(context, "user_id", None)
                if user_id:
                    user_id_var.set(user_id)

            tool_name_var.set(actual_tool_name)
            request_start_var.set(time.time())

            # Get logger
            logger = get_logger(f"maverick_mcp.tools.{actual_tool_name}")

            # Log tool invocation
            logger.info(
                f"Tool invoked: {actual_tool_name}",
                extra={
                    "tool_name": actual_tool_name,
                    "has_context": context is not None,
                    "args_count": len(args),
                    "kwargs_keys": list(kwargs.keys()),
                },
            )

            try:
                # Use context's progress callback if available
                if context and hasattr(context, "report_progress"):
                    await context.report_progress(
                        progress=0, total=100, message=f"Starting {actual_tool_name}"
                    )

                # Execute the tool
                with PerformanceMonitor(f"tool_{actual_tool_name}"):
                    result = await func(*args, **kwargs)

                # Log success
                logger.info(
                    f"Tool completed: {actual_tool_name}",
                    extra={"tool_name": actual_tool_name, "status": "success"},
                )

                # Report completion
                if context and hasattr(context, "report_progress"):
                    await context.report_progress(
                        progress=100, total=100, message=f"Completed {actual_tool_name}"
                    )

                return result

            except ToolError as e:
                # Log tool-specific error
                logger.warning(
                    f"Tool error in {actual_tool_name}: {str(e)}",
                    extra={
                        "tool_name": actual_tool_name,
                        "status": "tool_error",
                        "error_message": str(e),
                    },
                )
                raise

            except Exception as e:
                # Log unexpected error
                logger.error(
                    f"Unexpected error in {actual_tool_name}: {str(e)}",
                    exc_info=True,
                    extra={
                        "tool_name": actual_tool_name,
                        "status": "error",
                        "error_type": type(e).__name__,
                    },
                )
                raise

            finally:
                # Clear context vars
                request_id_var.set(None)
                tool_name_var.set(None)
                user_id_var.set(None)
                request_start_var.set(None)

        return wrapper

    return decorator


def log_mcp_context(context: Context, operation: str, **extra):
    """
    Log information from MCP context.

    Args:
        context: FastMCP context object
        operation: Description of the operation
        **extra: Additional fields to log
    """
    logger = get_logger("maverick_mcp.context")

    log_data = {
        "operation": operation,
        "has_request_id": hasattr(context, "request_id"),
        "can_report_progress": hasattr(context, "report_progress"),
        "can_log": hasattr(context, "info"),
    }

    # Add any extra fields
    log_data.update(extra)

    logger.info(f"MCP Context: {operation}", extra=log_data)


class LoggingStockDataProvider:
    """
    Wrapper for StockDataProvider that adds logging.

    This demonstrates how to add logging to existing classes.
    """

    def __init__(self, provider):
        self.provider = provider
        self.logger = get_logger("maverick_mcp.providers.stock_data")

    async def get_stock_data(
        self, ticker: str, start_date: str, end_date: str, **kwargs
    ):
        """Get stock data with logging."""
        with PerformanceMonitor(f"fetch_stock_data_{ticker}"):
            # Check cache first
            cache_key = f"stock:{ticker}:{start_date}:{end_date}"

            # Log cache check
            start = time.time()
            cached_data = await self._check_cache(cache_key)
            cache_duration = int((time.time() - start) * 1000)

            if cached_data:
                log_cache_operation(
                    "get", cache_key, hit=True, duration_ms=cache_duration
                )
                return cached_data
            else:
                log_cache_operation(
                    "get", cache_key, hit=False, duration_ms=cache_duration
                )

            # Fetch from provider
            try:
                start = time.time()
                data = await self.provider.get_stock_data(
                    ticker, start_date, end_date, **kwargs
                )
                api_duration = int((time.time() - start) * 1000)

                log_external_api_call(
                    service="yfinance",
                    endpoint=f"/quote/{ticker}",
                    method="GET",
                    status_code=200,
                    duration_ms=api_duration,
                )

                # Cache the result
                await self._set_cache(cache_key, data)

                return data

            except Exception as e:
                log_external_api_call(
                    service="yfinance",
                    endpoint=f"/quote/{ticker}",
                    method="GET",
                    error=str(e),
                )
                raise

    async def _check_cache(self, key: str):
        """Check cache (placeholder)."""
        # This would integrate with actual cache
        return None

    async def _set_cache(self, key: str, data: Any):
        """Set cache (placeholder)."""
        # This would integrate with actual cache
        pass


# SQL query logging wrapper
class LoggingSession:
    """Wrapper for SQLAlchemy session that logs queries."""

    def __init__(self, session):
        self.session = session
        self.logger = get_logger("maverick_mcp.database")

    def execute(self, query, params=None):
        """Execute query with logging."""
        start = time.time()
        try:
            result = self.session.execute(query, params)
            duration = int((time.time() - start) * 1000)
            log_database_query(str(query), params, duration)
            return result
        except Exception as e:
            duration = int((time.time() - start) * 1000)
            log_database_query(str(query), params, duration)
            self.logger.error(
                f"Database query failed: {str(e)}",
                extra={"query": str(query)[:200], "error_type": type(e).__name__},
            )
            raise

    def __getattr__(self, name):
        """Proxy other methods to the wrapped session."""
        return getattr(self.session, name)


# Example usage in routers
def setup_router_logging(router):
    """
    Add logging middleware to a FastMCP router.

    This should be called when setting up routers.
    """
    logger = get_logger(f"maverick_mcp.routers.{router.__class__.__name__}")

    # Log router initialization
    logger.info(
        "Router initialized",
        extra={
            "router_class": router.__class__.__name__,
            "tool_count": len(getattr(router, "tools", [])),
        },
    )

    # Add middleware to log all requests (if supported by FastMCP)
    # This is a placeholder for when FastMCP supports middleware
    pass

```

--------------------------------------------------------------------------------
/maverick_mcp/api/routers/performance.py:
--------------------------------------------------------------------------------

```python
"""
Performance monitoring router for Maverick-MCP.

This router provides endpoints for monitoring system performance,
including Redis connection health, cache performance, query optimization,
and database index analysis.
"""

import logging
from typing import Any

from fastmcp import FastMCP
from pydantic import Field

from maverick_mcp.tools.performance_monitoring import (
    analyze_database_indexes,
    clear_performance_caches,
    get_cache_performance_metrics,
    get_comprehensive_performance_report,
    get_query_performance_metrics,
    get_redis_connection_health,
    optimize_cache_settings,
)
from maverick_mcp.validation.base import BaseRequest, BaseResponse

logger = logging.getLogger(__name__)

# Create router
performance_router = FastMCP("Performance_Monitoring")


# Request/Response Models
class PerformanceHealthRequest(BaseRequest):
    """Request model for performance health check."""

    include_detailed_metrics: bool = Field(
        default=False, description="Include detailed metrics in the response"
    )


class CacheClearRequest(BaseRequest):
    """Request model for cache clearing operations."""

    cache_types: list[str] | None = Field(
        default=None,
        description="Types of caches to clear: stock_data, screening, market_data, all",
    )


class PerformanceMetricsResponse(BaseResponse):
    """Response model for performance metrics."""

    metrics: dict[str, Any] = Field(description="Performance metrics data")


class PerformanceReportResponse(BaseResponse):
    """Response model for comprehensive performance report."""

    overall_health_score: float = Field(
        description="Overall system health score (0-100)"
    )
    component_scores: dict[str, float] = Field(
        description="Individual component scores"
    )
    recommendations: list[str] = Field(
        description="Performance improvement recommendations"
    )
    detailed_metrics: dict[str, Any] | None = Field(description="Detailed metrics data")


async def get_system_performance_health(
    request: PerformanceHealthRequest,
) -> PerformanceReportResponse:
    """
    Get comprehensive system performance health report.

    This tool provides an overall health assessment of the MaverickMCP system,
    including Redis connectivity, cache performance, database query metrics,
    and index usage analysis. Use this for general system health monitoring.

    Args:
        request: Performance health check request

    Returns:
        Comprehensive performance health report with scores and recommendations
    """
    try:
        logger.info("Generating comprehensive performance health report")

        # Get comprehensive performance report
        report = await get_comprehensive_performance_report()

        if "error" in report:
            return PerformanceReportResponse(
                overall_health_score=0.0,
                component_scores={},
                recommendations=[f"System health check failed: {report['error']}"],
                detailed_metrics=None,
            )

        # Extract main components
        overall_score = report.get("overall_health_score", 0.0)
        component_scores = report.get("component_scores", {})
        recommendations = report.get("recommendations", [])
        detailed_metrics = (
            report.get("detailed_metrics") if request.include_detailed_metrics else None
        )

        logger.info(
            f"Performance health report generated: overall score {overall_score}"
        )

        return PerformanceReportResponse(
            overall_health_score=overall_score,
            component_scores=component_scores,
            recommendations=recommendations,
            detailed_metrics=detailed_metrics,
        )

    except Exception as e:
        logger.error(f"Error getting system performance health: {e}")
        return PerformanceReportResponse(
            overall_health_score=0.0,
            component_scores={},
            recommendations=[f"Failed to assess system health: {str(e)}"],
            detailed_metrics=None,
        )


async def get_redis_health_status() -> PerformanceMetricsResponse:
    """
    Get Redis connection pool health and performance metrics.

    This tool provides detailed information about Redis connectivity,
    connection pool status, operation latency, and basic health tests.
    Use this when diagnosing Redis-related performance issues.

    Returns:
        Redis health status and connection metrics
    """
    try:
        logger.info("Checking Redis connection health")

        redis_health = await get_redis_connection_health()

        return PerformanceMetricsResponse(metrics=redis_health)

    except Exception as e:
        logger.error(f"Error getting Redis health status: {e}")
        return PerformanceMetricsResponse(metrics={"error": str(e)})


async def get_cache_performance_status() -> PerformanceMetricsResponse:
    """
    Get cache performance metrics and optimization suggestions.

    This tool provides cache hit/miss ratios, operation latencies,
    Redis memory usage, and performance test results. Use this
    to optimize caching strategies and identify cache bottlenecks.

    Returns:
        Cache performance metrics and test results
    """
    try:
        logger.info("Getting cache performance metrics")

        cache_metrics = await get_cache_performance_metrics()

        return PerformanceMetricsResponse(metrics=cache_metrics)

    except Exception as e:
        logger.error(f"Error getting cache performance status: {e}")
        return PerformanceMetricsResponse(metrics={"error": str(e)})


async def get_database_performance_status() -> PerformanceMetricsResponse:
    """
    Get database query performance metrics and connection pool status.

    This tool provides database query statistics, slow query detection,
    connection pool metrics, and database health tests. Use this to
    identify database performance bottlenecks and optimization opportunities.

    Returns:
        Database performance metrics and query statistics
    """
    try:
        logger.info("Getting database performance metrics")

        query_metrics = await get_query_performance_metrics()

        return PerformanceMetricsResponse(metrics=query_metrics)

    except Exception as e:
        logger.error(f"Error getting database performance status: {e}")
        return PerformanceMetricsResponse(metrics={"error": str(e)})


async def analyze_database_index_usage() -> PerformanceMetricsResponse:
    """
    Analyze database index usage and provide optimization recommendations.

    This tool examines database index usage statistics, identifies missing
    indexes, analyzes table scan patterns, and provides specific recommendations
    for database performance optimization. Use this for database tuning.

    Returns:
        Database index analysis and optimization recommendations
    """
    try:
        logger.info("Analyzing database index usage")

        index_analysis = await analyze_database_indexes()

        return PerformanceMetricsResponse(metrics=index_analysis)

    except Exception as e:
        logger.error(f"Error analyzing database index usage: {e}")
        return PerformanceMetricsResponse(metrics={"error": str(e)})


async def optimize_cache_configuration() -> PerformanceMetricsResponse:
    """
    Analyze cache usage patterns and recommend optimal configuration.

    This tool analyzes current cache hit rates, memory usage, and access
    patterns to recommend optimal TTL values, cache sizes, and configuration
    settings for maximum performance. Use this for cache tuning.

    Returns:
        Cache optimization analysis and recommended settings
    """
    try:
        logger.info("Optimizing cache configuration")

        optimization_analysis = await optimize_cache_settings()

        return PerformanceMetricsResponse(metrics=optimization_analysis)

    except Exception as e:
        logger.error(f"Error optimizing cache configuration: {e}")
        return PerformanceMetricsResponse(metrics={"error": str(e)})


async def clear_system_caches(
    request: CacheClearRequest,
) -> PerformanceMetricsResponse:
    """
    Clear specific performance caches for maintenance or testing.

    This tool allows selective clearing of different cache types:
    - stock_data: Stock price and company information caches
    - screening: Maverick and trending stock screening caches
    - market_data: High volume and market analysis caches
    - all: Clear all performance caches

    Use this for cache maintenance, testing, or when stale data is suspected.

    Args:
        request: Cache clearing request with specific cache types

    Returns:
        Cache clearing results and statistics
    """
    try:
        cache_types = request.cache_types or ["all"]
        logger.info(f"Clearing performance caches: {cache_types}")

        clear_results = await clear_performance_caches(cache_types)

        return PerformanceMetricsResponse(metrics=clear_results)

    except Exception as e:
        logger.error(f"Error clearing system caches: {e}")
        return PerformanceMetricsResponse(metrics={"error": str(e)})


# Router configuration
def get_performance_router():
    """Get the configured performance monitoring router."""
    return performance_router

```

--------------------------------------------------------------------------------
/scripts/test_tiingo_loader.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test script for the Tiingo data loader.

This script performs basic validation that the loader components work correctly
without requiring an actual API call or database connection.
"""

import sys
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch

# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))

from scripts.load_tiingo_data import (
    SP500_SYMBOLS,
    ProgressTracker,
    RateLimiter,
    TiingoDataLoader,
)
from scripts.tiingo_config import (
    SCREENING_CONFIGS,
    SYMBOL_LISTS,
    TiingoConfig,
    get_config_for_environment,
)


class TestProgressTracker(unittest.TestCase):
    """Test the progress tracking functionality."""

    def setUp(self):
        self.tracker = ProgressTracker("test_progress.json")

    def test_initialization(self):
        """Test that progress tracker initializes correctly."""
        self.assertEqual(self.tracker.processed_symbols, 0)
        self.assertEqual(self.tracker.successful_symbols, 0)
        self.assertEqual(len(self.tracker.failed_symbols), 0)
        self.assertEqual(len(self.tracker.completed_symbols), 0)

    def test_update_progress_success(self):
        """Test updating progress for successful symbol."""
        self.tracker.total_symbols = 5
        self.tracker.update_progress("AAPL", True)

        self.assertEqual(self.tracker.processed_symbols, 1)
        self.assertEqual(self.tracker.successful_symbols, 1)
        self.assertIn("AAPL", self.tracker.completed_symbols)
        self.assertEqual(len(self.tracker.failed_symbols), 0)

    def test_update_progress_failure(self):
        """Test updating progress for failed symbol."""
        self.tracker.total_symbols = 5
        self.tracker.update_progress("BADSTOCK", False, "Not found")

        self.assertEqual(self.tracker.processed_symbols, 1)
        self.assertEqual(self.tracker.successful_symbols, 0)
        self.assertIn("BADSTOCK", self.tracker.failed_symbols)
        self.assertEqual(len(self.tracker.errors), 1)


class TestRateLimiter(unittest.TestCase):
    """Test the rate limiting functionality."""

    def test_initialization(self):
        """Test rate limiter initialization."""
        limiter = RateLimiter(3600)  # 1 request per second
        self.assertEqual(limiter.max_requests, 3600)
        self.assertEqual(limiter.min_interval, 1.0)

    def test_tiingo_rate_limit(self):
        """Test Tiingo-specific rate limit calculation."""
        limiter = RateLimiter(2400)  # Tiingo free tier
        expected_interval = 3600.0 / 2400  # 1.5 seconds
        self.assertEqual(limiter.min_interval, expected_interval)


class TestTiingoConfig(unittest.TestCase):
    """Test configuration management."""

    def test_default_config(self):
        """Test default configuration values."""
        config = TiingoConfig()

        self.assertEqual(config.rate_limit_per_hour, 2400)
        self.assertEqual(config.max_retries, 3)
        self.assertEqual(config.default_batch_size, 50)
        self.assertEqual(config.rsi_period, 14)
        self.assertIsInstance(config.sma_periods, list)
        self.assertIn(50, config.sma_periods)
        self.assertIn(200, config.sma_periods)

    def test_environment_configs(self):
        """Test environment-specific configurations."""
        dev_config = get_config_for_environment("development")
        prod_config = get_config_for_environment("production")
        test_config = get_config_for_environment("testing")

        # Production should have higher limits
        self.assertGreaterEqual(
            prod_config.max_concurrent_requests, dev_config.max_concurrent_requests
        )
        self.assertGreaterEqual(
            prod_config.default_batch_size, dev_config.default_batch_size
        )

        # Test should have lower limits
        self.assertLessEqual(
            test_config.max_concurrent_requests, dev_config.max_concurrent_requests
        )
        self.assertLessEqual(
            test_config.default_batch_size, dev_config.default_batch_size
        )

    def test_symbol_lists(self):
        """Test that symbol lists are properly configured."""
        self.assertIn("sp500_top_100", SYMBOL_LISTS)
        self.assertIn("nasdaq_100", SYMBOL_LISTS)
        self.assertIn("dow_30", SYMBOL_LISTS)

        # Check that lists have reasonable sizes
        self.assertGreater(len(SYMBOL_LISTS["sp500_top_100"]), 50)
        self.assertLess(len(SYMBOL_LISTS["dow_30"]), 35)

    def test_screening_configs(self):
        """Test screening algorithm configurations."""
        maverick_config = SCREENING_CONFIGS["maverick_momentum"]

        self.assertIn("min_momentum_score", maverick_config)
        self.assertIn("scoring_weights", maverick_config)
        self.assertIsInstance(maverick_config["scoring_weights"], dict)


class TestTiingoDataLoader(unittest.TestCase):
    """Test the main TiingoDataLoader class."""

    @patch.dict("os.environ", {"TIINGO_API_TOKEN": "test_token"})
    def test_initialization(self):
        """Test loader initialization."""
        loader = TiingoDataLoader(batch_size=25, max_concurrent=3)

        self.assertEqual(loader.batch_size, 25)
        self.assertEqual(loader.max_concurrent, 3)
        self.assertEqual(loader.api_token, "test_token")
        self.assertIsNotNone(loader.rate_limiter)

    def test_initialization_without_token(self):
        """Test that loader fails without API token."""
        with patch.dict("os.environ", {}, clear=True):
            with self.assertRaises(ValueError):
                TiingoDataLoader()

    @patch("aiohttp.ClientSession")
    async def test_context_manager(self, mock_session_class):
        """Test async context manager functionality."""
        mock_session = MagicMock()
        mock_session_class.return_value = mock_session

        with patch.dict("os.environ", {"TIINGO_API_TOKEN": "test_token"}):
            async with TiingoDataLoader() as loader:
                self.assertIsNotNone(loader.session)

            # Session should be closed after context exit
            mock_session.close.assert_called_once()


class TestSymbolValidation(unittest.TestCase):
    """Test symbol validation and processing."""

    def test_sp500_symbols(self):
        """Test that S&P 500 symbols are valid."""
        self.assertIsInstance(SP500_SYMBOLS, list)
        self.assertGreater(len(SP500_SYMBOLS), 90)  # Should have at least 90 symbols

        # Check that symbols are uppercase strings
        for symbol in SP500_SYMBOLS[:10]:  # Check first 10
            self.assertIsInstance(symbol, str)
            self.assertEqual(symbol, symbol.upper())
            self.assertGreater(len(symbol), 0)
            self.assertLess(len(symbol), 10)  # Reasonable symbol length


class TestUtilityFunctions(unittest.TestCase):
    """Test utility functions."""

    def test_symbol_file_content(self):
        """Test the format that would be expected in symbol files."""
        # Test comma-separated format
        test_content = "AAPL,MSFT,GOOGL\nTSLA,NVDA\n# Comment\nAMZN"
        lines = test_content.split("\n")

        symbols = []
        for line in lines:
            line = line.strip()
            if line and not line.startswith("#"):
                line_symbols = [s.strip().upper() for s in line.split(",")]
                symbols.extend(line_symbols)

        expected = ["AAPL", "MSFT", "GOOGL", "TSLA", "NVDA", "AMZN"]
        self.assertEqual(symbols, expected)


def run_basic_validation():
    """Run basic validation without external dependencies."""
    print("🧪 Running basic validation tests...")

    # Test imports
    try:
        from scripts.load_tiingo_data import ProgressTracker
        from scripts.tiingo_config import SYMBOL_LISTS, TiingoConfig

        print("✅ All imports successful")
    except ImportError as e:
        print(f"❌ Import error: {e}")
        return False

    # Test configuration
    try:
        config = TiingoConfig()
        assert config.rate_limit_per_hour == 2400
        assert len(config.sma_periods) > 0
        print("✅ Configuration validation passed")
    except Exception as e:
        print(f"❌ Configuration error: {e}")
        return False

    # Test symbol lists
    try:
        assert len(SP500_SYMBOLS) > 90
        assert len(SYMBOL_LISTS["sp500_top_100"]) > 90
        assert all(isinstance(s, str) for s in SP500_SYMBOLS[:10])
        print("✅ Symbol list validation passed")
    except Exception as e:
        print(f"❌ Symbol list error: {e}")
        return False

    # Test progress tracker
    try:
        tracker = ProgressTracker("test.json")
        tracker.update_progress("TEST", True)
        assert tracker.successful_symbols == 1
        assert "TEST" in tracker.completed_symbols
        print("✅ Progress tracker validation passed")
    except Exception as e:
        print(f"❌ Progress tracker error: {e}")
        return False

    print("🎉 All basic validations passed!")
    return True


if __name__ == "__main__":
    print("Tiingo Data Loader Test Suite")
    print("=" * 40)

    # Run basic validation first
    if not run_basic_validation():
        sys.exit(1)

    # Run unit tests
    print("\n🧪 Running unit tests...")
    unittest.main(verbosity=2, exit=False)

    print("\n✅ Test suite completed!")

```

--------------------------------------------------------------------------------
/scripts/INSTALLATION_GUIDE.md:
--------------------------------------------------------------------------------

```markdown
# Tiingo Data Loader Installation Guide

This guide will help you set up and use the comprehensive Tiingo data loader for Maverick-MCP.

## 📋 What You Get

The Tiingo data loader provides:

- **Comprehensive Data Loading**: Fetch stock metadata, OHLCV price data from Tiingo API
- **Technical Indicators**: 50+ indicators calculated using pandas-ta
- **Screening Algorithms**: Built-in Maverick, Bear Market, and Supply/Demand screens
- **Progress Tracking**: Resume interrupted loads with checkpoint files
- **Performance Optimized**: Async operations with rate limiting and batch processing
- **Production Ready**: Error handling, logging, and database optimization

## 🚀 Quick Start

### 1. Check Your Setup
```bash
cd /path/to/maverick-mcp
python3 scripts/validate_setup.py
```

This will show you exactly what needs to be installed or configured.

### 2. Install Dependencies
```bash
# Install required Python packages
pip install -r scripts/requirements_tiingo.txt

# Or install individually:
pip install aiohttp pandas pandas-ta sqlalchemy psycopg2-binary
```

### 3. Get Tiingo API Token
1. Sign up at [tiingo.com](https://www.tiingo.com) (free tier gives 2400 requests/hour)
2. Get your API token from the dashboard
3. Set environment variable:
```bash
export TIINGO_API_TOKEN=your_token_here
```

### 4. Configure Database
```bash
# Set your database URL
export DATABASE_URL=postgresql://user:password@localhost/maverick_mcp

# Or use existing environment variables
export POSTGRES_URL=postgresql://user:password@localhost/maverick_mcp
```

### 5. Verify Setup
```bash
python3 scripts/validate_setup.py
```
You should see "🎉 Setup validation PASSED!"

## 📊 Usage Examples

### Load Sample Stocks
```bash
# Load 5 popular stocks with 2 years of data
python3 scripts/load_tiingo_data.py --symbols AAPL,MSFT,GOOGL,AMZN,TSLA --years 2 --calculate-indicators
```

### Load S&P 500 (Top 100)
```bash
# Load top 100 S&P 500 stocks with screening
python3 scripts/load_tiingo_data.py --sp500 --years 1 --run-screening
```

### Load from File
```bash
# Create symbol file
echo -e "AAPL\nMSFT\nGOOGL\nTSLA\nNVDA" > my_stocks.txt

# Load from file
python3 scripts/load_tiingo_data.py --file my_stocks.txt --calculate-indicators --run-screening
```

### Interactive Examples
```bash
# Run guided examples
python3 scripts/load_example.py
```

## 🏗️ Architecture

### Files Created
- **`load_tiingo_data.py`**: Main comprehensive data loader script
- **`tiingo_config.py`**: Configuration settings and symbol lists  
- **`load_example.py`**: Interactive examples and tutorials
- **`validate_setup.py`**: Setup validation and dependency checking
- **`test_tiingo_loader.py`**: Unit tests and validation
- **`requirements_tiingo.txt`**: Python package requirements
- **`README_TIINGO_LOADER.md`**: Comprehensive documentation

### Data Flow
1. **Fetch Metadata**: Get stock information from Tiingo
2. **Load Prices**: Download historical OHLCV data
3. **Calculate Indicators**: Compute 50+ technical indicators
4. **Store Data**: Bulk insert into Maverick-MCP database tables
5. **Run Screens**: Execute screening algorithms
6. **Track Progress**: Save checkpoints for resume capability

### Database Tables
- **`mcp_stocks`**: Basic stock information
- **`mcp_price_cache`**: Historical OHLCV price data  
- **`mcp_technical_cache`**: Calculated technical indicators
- **`mcp_maverick_stocks`**: Momentum screening results
- **`mcp_maverick_bear_stocks`**: Bear market screening results
- **`mcp_supply_demand_breakouts`**: Supply/demand pattern results

## ⚙️ Configuration Options

### Environment Variables
```bash
# Required
export TIINGO_API_TOKEN=your_token
export DATABASE_URL=postgresql://user:pass@localhost/db

# Optional
export DB_POOL_SIZE=20
export DB_ECHO=false
export ENVIRONMENT=development
```

### Symbol Sources
- **S&P 500**: `--sp500` (top 100) or `--sp500-full` (all 500)
- **Custom**: `--symbols AAPL,MSFT,GOOGL`
- **File**: `--file symbols.txt`
- **All Supported**: `--supported` (3000+ symbols)

### Performance Tuning
- **Batch Size**: `--batch-size 100` (default: 50)
- **Concurrency**: `--max-concurrent 10` (default: 5)
- **Date Range**: `--years 5` or `--start-date 2020-01-01`

### Processing Options
- **Technical Indicators**: `--calculate-indicators` (default: on)
- **Screening**: `--run-screening` (run after data load)
- **Resume**: `--resume` (continue from checkpoint)

## 📈 Technical Indicators

### Trend Indicators
- Simple Moving Averages (SMA 20, 50, 150, 200)
- Exponential Moving Average (EMA 21)
- Average Directional Index (ADX 14)

### Momentum Indicators  
- Relative Strength Index (RSI 14)
- MACD (12, 26, 9)
- Stochastic Oscillator (14, 3, 3)
- Relative Strength Rating vs Market

### Volatility Indicators
- Average True Range (ATR 14)
- Bollinger Bands (20, 2.0)
- Average Daily Range percentage

### Volume Indicators
- Volume Moving Averages
- Volume Ratio vs Average
- Volume-Weighted Average Price (VWAP)

### Custom Indicators
- Price Momentum (10, 20 period)
- Bollinger Band Squeeze Detection
- Position vs Moving Averages

## 🔍 Screening Algorithms

### Maverick Momentum Screen
**Criteria:**
- Price > 21-day EMA
- EMA-21 > SMA-50  
- SMA-50 > SMA-200
- Relative Strength > 70
- Volume > 500K daily

**Scoring:** 0-10 points based on strength of signals

### Bear Market Screen
**Criteria:**
- Price < 21-day EMA
- EMA-21 < SMA-50
- Relative Strength < 30
- High volume on declines

**Use Case:** Short candidates or stocks to avoid

### Supply/Demand Breakout Screen  
**Criteria:**
- Price > SMA-50 and SMA-200
- Strong relative strength (>60)
- Accumulation patterns
- Institutional buying signals

**Use Case:** Stocks with strong fundamental demand

## 🚨 Troubleshooting

### Common Issues

#### 1. Missing Dependencies
```bash
# Error: ModuleNotFoundError: No module named 'aiohttp'
pip install -r scripts/requirements_tiingo.txt
```

#### 2. API Rate Limiting
```bash
# Reduce concurrency if getting rate limited
python3 scripts/load_tiingo_data.py --symbols AAPL --max-concurrent 2
```

#### 3. Database Connection Issues
```bash
# Test database connection
python3 -c "
from maverick_mcp.data.models import SessionLocal
with SessionLocal() as session:
    print('Database connection OK')
"
```

#### 4. Memory Issues
```bash
# Reduce batch size for large loads
python3 scripts/load_tiingo_data.py --sp500 --batch-size 25 --max-concurrent 3
```

#### 5. Checkpoint File Corruption
```bash
# Remove corrupted checkpoint and restart
rm load_progress.json
python3 scripts/load_tiingo_data.py --symbols AAPL,MSFT
```

### Getting Help
1. **Validation Script**: `python3 scripts/validate_setup.py`
2. **Check Logs**: `tail -f tiingo_data_loader.log`
3. **Test Individual Components**: `python3 scripts/test_tiingo_loader.py`
4. **Interactive Examples**: `python3 scripts/load_example.py`

## 🎯 Best Practices

### For Development
```bash
# Start small for testing
python3 scripts/load_tiingo_data.py --symbols AAPL,MSFT --years 0.5 --batch-size 10

# Use checkpoints for large loads
python3 scripts/load_tiingo_data.py --sp500 --checkpoint-file dev_progress.json
```

### For Production
```bash
# Higher performance settings
python3 scripts/load_tiingo_data.py --sp500-full \
    --batch-size 100 \
    --max-concurrent 10 \
    --years 2 \
    --run-screening

# Schedule regular updates
# Add to crontab: 0 18 * * 1-5 /path/to/load_script.sh
```

### For Resume Operations
```bash
# Always use checkpoints for large operations
python3 scripts/load_tiingo_data.py --supported --checkpoint-file full_load.json

# If interrupted, resume with:
python3 scripts/load_tiingo_data.py --resume --checkpoint-file full_load.json
```

## 📊 Performance Benchmarks

**Typical Loading Times (on modern hardware):**
- 10 symbols, 1 year: 2-3 minutes
- 100 symbols, 2 years: 15-20 minutes  
- 500 symbols, 2 years: 1-2 hours
- 3000+ symbols, 2 years: 6-12 hours

**Rate Limits:**
- Tiingo Free: 2400 requests/hour
- Recommended: 5 concurrent requests max
- With indicators: ~1.5 seconds per symbol

## 🔗 Integration

### With Maverick-MCP API
The loaded data is immediately available through:
- `/api/v1/stocks` - Stock metadata
- `/api/v1/prices/{symbol}` - Price data
- `/api/v1/technical/{symbol}` - Technical indicators
- `/api/v1/screening/*` - Screening results

### With MCP Tools
- `get_stock_analysis` - Uses loaded data
- `run_screening` - Operates on cached data
- `portfolio_analysis` - Leverages technical indicators

### Custom Workflows
```python
# Example: Load data then run custom analysis
from scripts.load_tiingo_data import TiingoDataLoader
from maverick_mcp.data.models import SessionLocal

async with TiingoDataLoader() as loader:
    await loader.load_symbol_data("AAPL", "2023-01-01")

with SessionLocal() as session:
    # Your custom analysis here
    pass
```

## 🎉 Success!

Once setup is complete, you should be able to:

1. ✅ Load market data from Tiingo efficiently
2. ✅ Calculate comprehensive technical indicators  
3. ✅ Run sophisticated screening algorithms
4. ✅ Resume interrupted loads seamlessly
5. ✅ Access all data through Maverick-MCP APIs
6. ✅ Build custom trading strategies

**Next Steps:**
- Explore the interactive examples: `python3 scripts/load_example.py`
- Read the full documentation: `scripts/README_TIINGO_LOADER.md`
- Set up automated daily updates
- Customize screening algorithms for your strategy
```

--------------------------------------------------------------------------------
/maverick_mcp/utils/quick_cache.py:
--------------------------------------------------------------------------------

```python
"""
Quick in-memory cache decorator for development.

This module provides a simple LRU cache decorator with TTL support
to avoid repeated API calls during development and testing.
"""

import asyncio
import functools
import hashlib
import json
import time
from collections import OrderedDict
from collections.abc import Callable
from typing import Any, TypeVar

from maverick_mcp.config.settings import settings
from maverick_mcp.utils.logging import get_logger

logger = get_logger(__name__)

T = TypeVar("T")


class QuickCache:
    """Simple in-memory LRU cache with TTL support."""

    def __init__(self, max_size: int = 1000):
        self.cache: OrderedDict[str, tuple[Any, float]] = OrderedDict()
        self.max_size = max_size
        self.hits = 0
        self.misses = 0
        self._lock = asyncio.Lock()

    def make_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
        """Generate a cache key from function name and arguments."""
        # Convert args and kwargs to a stable string representation
        key_data = {
            "func": func_name,
            "args": args,
            "kwargs": sorted(kwargs.items()),
        }
        key_str = json.dumps(key_data, sort_keys=True, default=str)
        # Use hash for shorter keys
        return hashlib.md5(key_str.encode()).hexdigest()

    async def get(self, key: str) -> Any | None:
        """Get value from cache if not expired."""
        async with self._lock:
            if key in self.cache:
                value, expiry = self.cache[key]
                if time.time() < expiry:
                    # Move to end (LRU)
                    self.cache.move_to_end(key)
                    self.hits += 1
                    return value
                else:
                    # Expired, remove it
                    del self.cache[key]

            self.misses += 1
            return None

    async def set(self, key: str, value: Any, ttl_seconds: float):
        """Set value in cache with TTL."""
        async with self._lock:
            expiry = time.time() + ttl_seconds

            # Remove oldest if at capacity
            if len(self.cache) >= self.max_size:
                self.cache.popitem(last=False)

            self.cache[key] = (value, expiry)

    def get_stats(self) -> dict[str, Any]:
        """Get cache statistics."""
        total = self.hits + self.misses
        hit_rate = (self.hits / total * 100) if total > 0 else 0

        return {
            "hits": self.hits,
            "misses": self.misses,
            "total": total,
            "hit_rate": round(hit_rate, 2),
            "size": len(self.cache),
            "max_size": self.max_size,
        }

    def clear(self):
        """Clear the cache."""
        self.cache.clear()
        self.hits = 0
        self.misses = 0


# Global cache instance
_cache = QuickCache()


def quick_cache(
    ttl_seconds: float = 300,  # 5 minutes default
    max_size: int = 1000,
    key_prefix: str = "",
    log_stats: bool | None = None,
) -> Callable[[Callable[..., T]], Callable[..., T]]:
    """
    Decorator for in-memory caching with TTL.

    Args:
        ttl_seconds: Time to live in seconds (default: 300)
        max_size: Maximum cache size (default: 1000)
        key_prefix: Optional prefix for cache keys
        log_stats: Whether to log cache statistics (default: settings.api.debug)

    Usage:
        @quick_cache(ttl_seconds=60)
        async def expensive_api_call(symbol: str):
            return await fetch_data(symbol)

        @quick_cache(ttl_seconds=300, key_prefix="stock_data")
        def get_stock_info(symbol: str, period: str):
            return fetch_stock_data(symbol, period)
    """
    if log_stats is None:
        log_stats = settings.api.debug

    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        # Update global cache size if specified
        if max_size != _cache.max_size:
            _cache.max_size = max_size

        @functools.wraps(func)
        async def async_wrapper(*args: Any, **kwargs: Any) -> T:
            # Generate cache key
            cache_key = _cache.make_key(
                f"{key_prefix}:{func.__name__}" if key_prefix else func.__name__,
                args,
                kwargs,
            )

            # Try to get from cache
            cached_value = await _cache.get(cache_key)
            if cached_value is not None:
                if log_stats:
                    stats = _cache.get_stats()
                    logger.debug(
                        f"Cache HIT for {func.__name__}",
                        extra={
                            "function": func.__name__,
                            "cache_key": cache_key[:8] + "...",
                            "hit_rate": stats["hit_rate"],
                            "cache_size": stats["size"],
                        },
                    )
                return cached_value

            # Cache miss - execute function
            if log_stats:
                logger.debug(
                    f"Cache MISS for {func.__name__}",
                    extra={
                        "function": func.__name__,
                        "cache_key": cache_key[:8] + "...",
                    },
                )

            # Execute the function
            start_time = time.time()
            # func is guaranteed to be async since we're in async_wrapper
            result = await func(*args, **kwargs)  # type: ignore[misc]
            execution_time = time.time() - start_time

            # Cache the result
            await _cache.set(cache_key, result, ttl_seconds)

            if log_stats:
                stats = _cache.get_stats()
                logger.debug(
                    f"Cached result for {func.__name__}",
                    extra={
                        "function": func.__name__,
                        "execution_time": round(execution_time, 3),
                        "ttl_seconds": ttl_seconds,
                        "cache_stats": stats,
                    },
                )

            return result

        @functools.wraps(func)
        def sync_wrapper(*args: Any, **kwargs: Any) -> T:
            # For sync functions, we need to run the async cache operations
            # in a thread to avoid blocking
            loop_policy = asyncio.get_event_loop_policy()
            try:
                previous_loop = loop_policy.get_event_loop()
            except RuntimeError:
                previous_loop = None

            loop = loop_policy.new_event_loop()
            asyncio.set_event_loop(loop)
            try:
                cache_key = _cache.make_key(
                    f"{key_prefix}:{func.__name__}" if key_prefix else func.__name__,
                    args,
                    kwargs,
                )

                # Try to get from cache (sync version)
                cached_value = loop.run_until_complete(_cache.get(cache_key))
                if cached_value is not None:
                    if log_stats:
                        stats = _cache.get_stats()
                        logger.debug(
                            f"Cache HIT for {func.__name__}",
                            extra={
                                "function": func.__name__,
                                "hit_rate": stats["hit_rate"],
                            },
                        )
                    return cached_value

                # Cache miss
                result = func(*args, **kwargs)

                # Cache the result
                loop.run_until_complete(_cache.set(cache_key, result, ttl_seconds))

                return result
            finally:
                loop.close()
                if previous_loop is not None:
                    asyncio.set_event_loop(previous_loop)
                else:
                    asyncio.set_event_loop(None)

        # Return appropriate wrapper based on function type
        if asyncio.iscoroutinefunction(func):
            return async_wrapper  # type: ignore[return-value]
        else:
            return sync_wrapper

    return decorator


def get_cache_stats() -> dict[str, Any]:
    """Get global cache statistics."""
    return _cache.get_stats()


def clear_cache():
    """Clear the global cache."""
    _cache.clear()
    logger.info("Cache cleared")


# Convenience decorators with common TTLs
cache_1min = functools.partial(quick_cache, ttl_seconds=60)
cache_5min = functools.partial(quick_cache, ttl_seconds=300)
cache_15min = functools.partial(quick_cache, ttl_seconds=900)
cache_1hour = functools.partial(quick_cache, ttl_seconds=3600)


# Example usage for API calls
@quick_cache(ttl_seconds=300, key_prefix="stock")
async def cached_stock_data(symbol: str, start_date: str, end_date: str) -> dict:
    """Example of caching stock data API calls."""
    # This would normally make an expensive API call
    logger.info(f"Fetching stock data for {symbol}")
    # Simulate API call
    await asyncio.sleep(0.1)
    return {
        "symbol": symbol,
        "start": start_date,
        "end": end_date,
        "data": "mock_data",
    }


# Cache management commands for development
if settings.api.debug:

    @quick_cache(ttl_seconds=1)  # Very short TTL for testing
    def test_cache_function(value: str) -> str:
        """Test function for cache debugging."""
        return f"processed_{value}_{time.time()}"

```

--------------------------------------------------------------------------------
/docs/COST_BASIS_SPECIFICATION.md:
--------------------------------------------------------------------------------

```markdown
# Cost Basis Specification for Portfolio Management

## 1. Overview

This document specifies the cost basis tracking algorithm for MaverickMCP's portfolio management system. The system uses the **Average Cost Method** for educational simplicity.

## 2. Cost Basis Method: Average Cost

### Definition
The average cost method calculates the cost basis by taking the total cost of all shares purchased and dividing by the total number of shares owned.

### Formula
```
Average Cost Basis = Total Cost of All Shares / Total Number of Shares
```

### Why Average Cost?
1. **Simplicity**: Easiest to understand for educational purposes
2. **Consistency**: Matches existing `PortfolioManager` implementation
3. **No Tax Complexity**: Avoids FIFO/LIFO tax accounting rules
4. **Educational Focus**: Appropriate for learning, not tax optimization

## 3. Edge Cases and Handling

### 3.1 Multiple Purchases at Different Prices

**Scenario**: User buys same stock multiple times at different prices

**Example**:
```
Purchase 1: 10 shares @ $150.00 = $1,500.00
Purchase 2: 10 shares @ $170.00 = $1,700.00
Result: 20 shares @ $160.00 average cost = $3,200.00 total
```

**Algorithm**:
```python
new_total_shares = existing_shares + new_shares
new_total_cost = existing_total_cost + (new_shares * new_price)
new_average_cost = new_total_cost / new_total_shares
```

**Precision**: Use Decimal type throughout, round final result to 4 decimal places

### 3.2 Partial Position Sales

**Scenario**: User sells portion of position

**Example**:
```
Holding: 20 shares @ $160.00 average cost = $3,200.00 total
Sell: 10 shares
Result: 10 shares @ $160.00 average cost = $1,600.00 total
```

**Algorithm**:
```python
new_shares = existing_shares - sold_shares
new_total_cost = new_shares * average_cost_basis
# Average cost basis remains unchanged
```

**Important**: Average cost basis does NOT change on partial sales

### 3.3 Full Position Close

**Scenario**: User sells all shares

**Algorithm**:
```python
if sold_shares >= existing_shares:
    # Remove position entirely from portfolio
    position = None
```

**Database**: Delete PortfolioPosition row

### 3.4 Zero or Negative Shares

**Validation Rules**:
- Shares to add: Must be > 0
- Shares to remove: Must be > 0
- Result after removal: Must be >= 0

**Error Handling**:
```python
if new_shares <= 0:
    raise ValueError("Invalid share quantity")
```

### 3.5 Zero or Negative Prices

**Validation Rules**:
- Purchase price: Must be > 0
- Sell price: Optional (not used in cost basis calculation)

### 3.6 Fractional Shares

**Support**: YES - Use Numeric(20, 8) for up to 8 decimal places

**Example**:
```
Purchase: 10.5 shares @ $150.25 = $1,577.625
Valid and supported
```

### 3.7 Rounding and Precision

**Database Storage**:
- Shares: `Numeric(20, 8)` - 8 decimal places
- Prices: `Numeric(12, 4)` - 4 decimal places (cents precision)
- Total Cost: `Numeric(20, 4)` - 4 decimal places

**Calculation Precision**:
- Use Python `Decimal` type throughout calculations
- Only round when storing to database or displaying to user
- Never use float for financial calculations

**Rounding Rules**:
```python
from decimal import Decimal, ROUND_HALF_UP

# For display (2 decimal places)
display_value = value.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP)

# For database storage (4 decimal places for prices)
db_price = price.quantize(Decimal('0.0001'), rounding=ROUND_HALF_UP)

# For database storage (8 decimal places for shares)
db_shares = shares.quantize(Decimal('0.00000001'), rounding=ROUND_HALF_UP)
```

### 3.8 Division by Zero

**Scenario**: Calculating average when shares = 0

**Prevention**:
```python
if total_shares == 0:
    raise ValueError("Cannot calculate average cost with zero shares")
```

**Should never occur** due to validation preventing zero-share positions

## 4. P&L Calculation

### Unrealized P&L Formula
```
Current Value = Shares × Current Price
Unrealized P&L = Current Value - Total Cost
P&L Percentage = (Unrealized P&L / Total Cost) × 100
```

### Example
```
Position: 20 shares @ $160.00 cost basis = $3,200.00 total cost
Current Price: $175.50
Current Value: 20 × $175.50 = $3,510.00
Unrealized P&L: $3,510.00 - $3,200.00 = $310.00
P&L %: ($310.00 / $3,200.00) × 100 = 9.69%
```

### Edge Cases
- **Current price unavailable**: Use cost basis as fallback
- **Zero cost basis**: Return 0% (should never occur with validation)

## 5. Database Constraints

### Unique Constraint
```sql
UNIQUE (portfolio_id, ticker)
```
**Rationale**: One position per ticker per portfolio

### Check Constraints (Optional - Enforce in Application Layer)
```python
# Application-level validation (preferred)
assert shares > 0, "Shares must be positive"
assert average_cost_basis > 0, "Cost basis must be positive"
assert total_cost > 0, "Total cost must be positive"
```

## 6. Concurrency Considerations

### Single-User System
- No concurrent writes expected (personal use)
- Database-level unique constraints prevent duplicates
- SQLAlchemy sessions with auto-rollback handle errors

### Future Multi-User Support
- Would require row-level locking: `SELECT FOR UPDATE`
- Optimistic concurrency with version column
- Currently not needed for personal use

## 7. Performance Benchmarks

### Expected Performance (100 Positions, 1000 Transactions)
- Add position: < 10ms (with database write)
- Calculate portfolio value: < 50ms (without live prices)
- Calculate portfolio value with live prices: < 2s (network bound)

### Optimization Strategies
- Batch price fetches for portfolio valuation
- Cache live prices (5-minute expiry)
- Use database indexes for ticker lookups
- Lazy-load positions only when needed

## 8. Migration Strategy

### Initial Migration (014_add_portfolio_models)
```sql
CREATE TABLE mcp_portfolios (
    id UUID PRIMARY KEY,
    user_id VARCHAR(50) DEFAULT 'default',
    name VARCHAR(200) DEFAULT 'My Portfolio',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE TABLE mcp_portfolio_positions (
    id UUID PRIMARY KEY,
    portfolio_id UUID REFERENCES mcp_portfolios(id) ON DELETE CASCADE,
    ticker VARCHAR(20) NOT NULL,
    shares NUMERIC(20, 8) NOT NULL,
    average_cost_basis NUMERIC(12, 4) NOT NULL,
    total_cost NUMERIC(20, 4) NOT NULL,
    purchase_date TIMESTAMP WITH TIME ZONE NOT NULL,
    notes TEXT,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    UNIQUE(portfolio_id, ticker)
);
```

### Data Migration from PortfolioManager (Future)
If users have existing portfolio JSON files:
```python
def migrate_from_json(json_file: str) -> None:
    """Migrate existing portfolio from JSON to database."""
    # Load JSON portfolio
    # Create UserPortfolio
    # Create PortfolioPositions for each holding
    # Verify cost basis calculations match
```

## 9. Testing Requirements

### Unit Tests (Domain Layer)
- ✅ Add shares: Multiple purchases, average calculation
- ✅ Remove shares: Partial removal, full removal
- ✅ P&L calculation: Various price scenarios
- ✅ Edge cases: Zero shares, negative values, division by zero
- ✅ Precision: Decimal arithmetic accuracy

### Integration Tests (Database Layer)
- ✅ CRUD operations: Create, read, update, delete positions
- ✅ Unique constraint: Prevent duplicate tickers
- ✅ Cascade delete: Portfolio deletion removes positions
- ✅ Transaction rollback: Error handling

### Property-Based Tests
- ✅ Adding and removing shares always maintains valid state
- ✅ Average cost formula always correct
- ✅ P&L calculations always sum correctly

## 10. Example Scenarios

### Scenario 1: Build Position Over Time
```
Day 1: Buy 10 AAPL @ $150.00
  - Shares: 10, Avg Cost: $150.00, Total: $1,500.00

Day 30: Buy 5 AAPL @ $160.00
  - Shares: 15, Avg Cost: $153.33, Total: $2,300.00

Day 60: Buy 10 AAPL @ $145.00
  - Shares: 25, Avg Cost: $150.80, Total: $3,770.00
```

### Scenario 2: Take Profits
```
Start: 25 AAPL @ $150.80 = $3,770.00
Current Price: $175.50
Unrealized P&L: +$617.50 (+16.38%)

Sell 10 shares @ $175.50 (realized gain: $247.00)
Remaining: 15 AAPL @ $150.80 = $2,262.00
Current Value @ $175.50: $2,632.50
Unrealized P&L: +$370.50 (+16.38% - same percentage)
```

### Scenario 3: Dollar-Cost Averaging
```
Monthly purchases of $1,000:
Month 1: 6.67 shares @ $150.00 = $1,000.00
Month 2: 6.25 shares @ $160.00 = $1,000.00
Month 3: 6.90 shares @ $145.00 = $1,000.00
Total: 19.82 shares @ $151.26 avg = $3,000.00
```

## 11. Compliance and Disclaimers

### Educational Purpose
This cost basis tracking is for **educational purposes only** and should not be used for tax reporting.

### Tax Reporting
Users should consult tax professionals and use official brokerage cost basis reporting for tax purposes.

### Disclaimers in Tools
All portfolio tools include:
```
DISCLAIMER: This portfolio tracking is for educational purposes only and does not
constitute investment advice. All investments carry risk of loss. Consult qualified
financial and tax professionals for investment and tax advice.
```

## 12. References

- **IRS Publication 550**: Investment Income and Expenses
- **Existing Code**: `maverick_mcp/tools/portfolio_manager.py` (average cost implementation)
- **Financial Precision**: IEEE 754 vs Decimal arithmetic
- **SQLAlchemy Numeric**: Column type documentation

---

**Document Version**: 1.0
**Last Updated**: 2025-11-01
**Author**: Portfolio Personalization Feature Team

```

--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------

```python
"""
Pytest configuration for MaverickMCP integration testing.

This module sets up test containers for PostgreSQL and Redis to enable
real integration testing without mocking database or cache dependencies.
"""

# Set test environment before any other imports
import os

os.environ["MAVERICK_TEST_ENV"] = "true"

import asyncio
import sys
from collections.abc import AsyncGenerator, Generator

import pytest
from httpx import ASGITransport, AsyncClient
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from testcontainers.postgres import PostgresContainer
from testcontainers.redis import RedisContainer

# Add the parent directory to the path to enable imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from maverick_mcp.api.api_server import create_api_app

# Import all models to ensure they're registered with Base
from maverick_mcp.data.models import get_db
from maverick_mcp.database.base import Base


# Container fixtures (session scope for efficiency)
@pytest.fixture(scope="session")
def postgres_container():
    """Create a PostgreSQL test container for the test session."""
    with PostgresContainer("postgres:15-alpine") as postgres:
        postgres.with_env("POSTGRES_PASSWORD", "test")
        postgres.with_env("POSTGRES_USER", "test")
        postgres.with_env("POSTGRES_DB", "test")
        yield postgres


@pytest.fixture(scope="session")
def redis_container():
    """Create a Redis test container for the test session."""
    with RedisContainer("redis:7-alpine") as redis:
        yield redis


# Database setup fixtures
@pytest.fixture(scope="session")
def database_url(postgres_container: PostgresContainer) -> str:
    """Get the database URL from the test container."""
    return postgres_container.get_connection_url()


@pytest.fixture(scope="session")
def redis_url(redis_container: RedisContainer) -> str:
    """Get the Redis URL from the test container."""
    host = redis_container.get_container_host_ip()
    port = redis_container.get_exposed_port(6379)
    return f"redis://{host}:{port}/0"


@pytest.fixture(scope="session")
def engine(database_url: str):
    """Create a SQLAlchemy engine for the test database."""
    engine = create_engine(database_url)

    # Create all tables in proper order, handling duplicate errors
    try:
        Base.metadata.create_all(bind=engine, checkfirst=True)
    except Exception as e:
        # Only ignore duplicate table/index errors, attempt partial creation
        if "already exists" in str(e) or "DuplicateTable" in str(type(e)):
            # Try to create tables individually
            for _table_name, table in Base.metadata.tables.items():
                try:
                    table.create(bind=engine, checkfirst=True)
                except Exception as table_error:
                    if "already exists" not in str(table_error):
                        # Re-raise non-duplicate errors
                        raise table_error
        else:
            raise

    yield engine

    # Drop all tables after tests
    try:
        Base.metadata.drop_all(bind=engine)
    except Exception:
        # Ignore errors when dropping tables
        pass


@pytest.fixture(scope="function")
def db_session(engine) -> Generator[Session, None, None]:
    """Create a database session for each test."""
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    session = SessionLocal()
    try:
        yield session
    finally:
        session.rollback()
        session.close()


# Environment setup
@pytest.fixture(scope="session", autouse=True)
def setup_test_env(database_url: str, redis_url: str):
    """Set up test environment variables."""
    os.environ["DATABASE_URL"] = database_url
    os.environ["REDIS_URL"] = redis_url
    os.environ["ENVIRONMENT"] = "test"
    os.environ["AUTH_ENABLED"] = "true"
    os.environ["LOG_LEVEL"] = "INFO"
    # Use test JWT keys
    os.environ["JWT_PRIVATE_KEY"] = """-----BEGIN PRIVATE KEY-----
MIIEuwIBADANBgkqhkiG9w0BAQEFAASCBKUwggShAgEAAoIBAQCONQjZiRlHlDGO
XHjbUyfyQhDWJsvzeaXtFcDGw0qCY+AITiCBVBukzDWf/1wGJ/lhdYX5c1DuNVXq
+JDFY15RjcR9fCbxtiEeuSJ2sh3hVDrQ1BmAWAUV4cFUJXAxC+1PmcqCQEGwfzUi
89Jq76hLyMtxlia2OefN+Cv3hKp37PKrPkdv3SU/moXs5RM5hx01E2dQELzl7X39
O+vzhI4EvIILFqCBKbSv4ylHADrFZH6MiFjhxdPZNdoLbUs5mBjjFXhLOtjFiHRx
6hTYdb6q6fUBWaKtG9jyXs6q8J1lxovgsNHwXCDGeIAaWtmK4V0mRrRfKPFeArwD
Ez5A0rxtAgMBAAECgf9lbytBbqZMN/lkx28p1uf5jlHGNSUBd/3nkqFxfFj7c53l
oMYpXLzsBOQK7tI3iEI8ne6ICbkflg2CkedpYf7pwtnAxUHM91GtWbMLMTa5loaN
wG8nwSNrkHC2toTl0vfdK05pX/NeNUFkZJm8ISLlhi20Y7MSlWamAbrdM4B3/6uM
EXYBSOV2u50g8a3pytsp/dvdkXgJ0BroztJM3FMtY52vUaF3D7xesqv6gS0sxpbn
NyOl8hk9SQhEI3L0p/daozuXjNa3y2p4R0h9+ibEnUlNeREFGkIOAt1F6pClLjAh
elOkYkm4uG0LE8GkKYtiTUrMouYvplPla/ryS8ECgYEAxSga2KYIOCglSyDdvXw6
tkkiNDvNj2v02EFxV4X8TzDdmKPoGUQ+fUTua8j/kclfZ1C/AMwyt4e1S14mbk0A
R/jat49uoXNqT8qVAWvbekLTLXwTfmubrfvOUnrlya13PZ9F5pE7Fxw4FARALP8n
MK/5Tg+WFqY/m027em1MKKUCgYEAuKZ5eAy24gsfSPUlakfnz90oUcB5ETITvpc5
hn6yAlvPdnjqm4MM+mx2zEGT2764BfYED3Qt5A9+9ayI6lynZlpigdOrqJTktsXP
XVxyKdzHS4Z8AknjDTIt9cISkPZMmnMxMfY68+EuH1ZWf2rGy5jaIJMFIBXLt+iI
xKHwMikCgYARPNpsCsg5MLliAjOg95Wijm5hJsFoQsYbik1Am8RdoCYfzGTkoKTe
CwLVhbNiqbqfq92nUjM0/LaLKmYtyqm1oTpuRiokD5VB+LJid22vGNyh43FI4luw
MI3vhDNHGNWOG7je2d/Su3LjvSNnS7+/cANaId67iDmTeI5lu9ymyQKBgGbRpD/Z
7JgwE0qf3yawRX+0qXfkUkXl+aKeOJUQxXSUxRA2QoU30yk67mfMeFXbfEMte5NT
YR5mFo8cdNzznO9ckw+x2xszVawEt/RHvvZajssaZsErfXfioj7/wzDfRUaXsCQe
9TLKB9HBVMb8oRfL1GJhG3CDUn3kyQudFNAJAoGBAJNTpD53wyyPor7RpPXy1huD
UwLk4MGD0X6AGzl7m5ZS7VppTrM0WLgCDICetyc35yjQto3lrlr7Wer33gIZRe+g
QFbUNCZrfvHzFj5Ug9gLwj7V+7hfEk+Obx0azY2C7UT9lbDI+rpn6TT10kuN3KZN
VLVde7wz9h17BALhp84I
-----END PRIVATE KEY-----"""
    os.environ["JWT_PUBLIC_KEY"] = """-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAjjUI2YkZR5Qxjlx421Mn
8kIQ1ibL83ml7RXAxsNKgmPgCE4ggVQbpMw1n/9cBif5YXWF+XNQ7jVV6viQxWNe
UY3EfXwm8bYhHrkidrId4VQ60NQZgFgFFeHBVCVwMQvtT5nKgkBBsH81IvPSau+o
S8jLcZYmtjnnzfgr94Sqd+zyqz5Hb90lP5qF7OUTOYcdNRNnUBC85e19/Tvr84SO
BLyCCxaggSm0r+MpRwA6xWR+jIhY4cXT2TXaC21LOZgY4xV4SzrYxYh0ceoU2HW+
qun1AVmirRvY8l7OqvCdZcaL4LDR8FwgxniAGlrZiuFdJka0XyjxXgK8AxM+QNK8
bQIDAQAB
-----END PUBLIC KEY-----"""
    yield
    # Clean up (optional)


# FastAPI test client fixtures
@pytest.fixture(scope="function")
async def app(db_session: Session):
    """Create a FastAPI app instance for testing."""
    app = create_api_app()

    # Override the database dependency
    def override_get_db():
        try:
            yield db_session
        finally:
            pass

    app.dependency_overrides[get_db] = override_get_db

    yield app

    # Clean up overrides
    app.dependency_overrides.clear()


@pytest.fixture(scope="function")
async def client(app) -> AsyncGenerator[AsyncClient, None]:
    """Create an async HTTP client for testing API endpoints."""
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        yield client


# Authentication fixtures (disabled for personal use)
@pytest.fixture
async def test_user(db_session: Session):
    """Create a test user for authenticated scenarios (legacy billing disabled)."""
    # Auth disabled for personal use - return None
    # All auth-related imports and functionality removed
    return None


@pytest.fixture
async def auth_headers(client: AsyncClient, test_user):
    """Get authentication headers for a test user (disabled for personal use)."""
    # Auth disabled for personal use - return empty headers
    return {}


# Event loop configuration for async tests
@pytest.fixture(scope="session")
def event_loop():
    """Create an event loop for the test session."""
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()


# Mock fixtures for external APIs
@pytest.fixture
def vcr_config():
    """Configure VCR for recording/replaying HTTP requests."""
    return {
        "filter_headers": ["authorization", "api-key", "x-api-key"],
        "filter_query_parameters": ["apikey", "token"],
        "filter_post_data_parameters": ["api_key", "token"],
        "record_mode": "once",  # Record once, then replay
        "match_on": ["method", "scheme", "host", "port", "path", "query"],
    }


# Utility fixtures
@pytest.fixture
def sample_stock_data():
    """Provide sample stock data for testing."""
    from datetime import datetime

    import numpy as np
    import pandas as pd

    dates = pd.date_range(end=datetime.now(), periods=100, freq="D")
    data = {
        "Open": np.random.uniform(100, 200, 100),
        "High": np.random.uniform(100, 200, 100),
        "Low": np.random.uniform(100, 200, 100),
        "Close": np.random.uniform(100, 200, 100),
        "Volume": np.random.randint(1000000, 10000000, 100),
    }
    df = pd.DataFrame(data, index=dates)
    # Ensure High >= Open, Close, Low and Low <= Open, Close, High
    df["High"] = df[["Open", "High", "Close"]].max(axis=1)
    df["Low"] = df[["Open", "Low", "Close"]].min(axis=1)
    return df


# Performance testing utilities
@pytest.fixture
def benchmark_timer():
    """Simple timer for performance benchmarking."""
    import time

    class Timer:
        def __init__(self):
            self.start_time = None
            self.elapsed = None

        def __enter__(self):
            self.start_time = time.time()
            return self

        def __exit__(self, *args):
            self.elapsed = time.time() - self.start_time

    return Timer

```

--------------------------------------------------------------------------------
/maverick_mcp/providers/factories/config_factory.py:
--------------------------------------------------------------------------------

```python
"""
Configuration factory for creating configuration providers.

This module provides factory methods for creating different types of
configuration providers based on the environment or requirements.
"""

import logging

from maverick_mcp.providers.interfaces.config import (
    EnvironmentConfigurationProvider,
    IConfigurationProvider,
)

logger = logging.getLogger(__name__)


class ConfigurationFactory:
    """
    Factory class for creating configuration provider instances.

    This factory provides methods to create different types of configuration
    providers based on the deployment environment or specific requirements.
    """

    @staticmethod
    def create_environment_config() -> IConfigurationProvider:
        """
        Create a configuration provider that reads from environment variables.

        Returns:
            Environment-based configuration provider
        """
        logger.debug("Creating environment configuration provider")
        return EnvironmentConfigurationProvider()

    @staticmethod
    def create_test_config(
        overrides: dict[str, str] | None = None,
    ) -> IConfigurationProvider:
        """
        Create a configuration provider for testing with optional overrides.

        Args:
            overrides: Dictionary of configuration overrides for testing

        Returns:
            Test configuration provider
        """
        logger.debug("Creating test configuration provider")

        # Create a test implementation that uses safe defaults
        class TestConfigurationProvider:
            def __init__(self, overrides: dict[str, str] | None = None):
                self._overrides = overrides or {}
                self._defaults = {
                    "DATABASE_URL": "sqlite:///:memory:",
                    "REDIS_HOST": "localhost",
                    "REDIS_PORT": "6379",
                    "REDIS_DB": "1",  # Use different DB for tests
                    "CACHE_ENABLED": "false",  # Disable cache in tests by default
                    "LOG_LEVEL": "DEBUG",
                    "ENVIRONMENT": "test",
                    "REQUEST_TIMEOUT": "5",
                    "MAX_RETRIES": "1",
                    "DB_POOL_SIZE": "1",
                    "DB_MAX_OVERFLOW": "0",
                }

            def get_database_url(self) -> str:
                return self._overrides.get(
                    "DATABASE_URL", self._defaults["DATABASE_URL"]
                )

            def get_redis_host(self) -> str:
                return self._overrides.get("REDIS_HOST", self._defaults["REDIS_HOST"])

            def get_redis_port(self) -> int:
                return int(
                    self._overrides.get("REDIS_PORT", self._defaults["REDIS_PORT"])
                )

            def get_redis_db(self) -> int:
                return int(self._overrides.get("REDIS_DB", self._defaults["REDIS_DB"]))

            def get_redis_password(self) -> str | None:
                password = self._overrides.get("REDIS_PASSWORD", "")
                return password if password else None

            def get_redis_ssl(self) -> bool:
                return self._overrides.get("REDIS_SSL", "false").lower() == "true"

            def is_cache_enabled(self) -> bool:
                return (
                    self._overrides.get(
                        "CACHE_ENABLED", self._defaults["CACHE_ENABLED"]
                    ).lower()
                    == "true"
                )

            def get_cache_ttl(self) -> int:
                return int(
                    self._overrides.get("CACHE_TTL_SECONDS", "300")
                )  # 5 minutes for tests

            def get_fred_api_key(self) -> str:
                return self._overrides.get("FRED_API_KEY", "")

            def get_external_api_key(self) -> str:
                return self._overrides.get("CAPITAL_COMPANION_API_KEY", "")

            def get_tiingo_api_key(self) -> str:
                return self._overrides.get("TIINGO_API_KEY", "")

            def get_log_level(self) -> str:
                return self._overrides.get("LOG_LEVEL", self._defaults["LOG_LEVEL"])

            def is_development_mode(self) -> bool:
                env = self._overrides.get(
                    "ENVIRONMENT", self._defaults["ENVIRONMENT"]
                ).lower()
                return env in ("development", "dev", "test")

            def is_production_mode(self) -> bool:
                env = self._overrides.get(
                    "ENVIRONMENT", self._defaults["ENVIRONMENT"]
                ).lower()
                return env in ("production", "prod")

            def get_request_timeout(self) -> int:
                return int(
                    self._overrides.get(
                        "REQUEST_TIMEOUT", self._defaults["REQUEST_TIMEOUT"]
                    )
                )

            def get_max_retries(self) -> int:
                return int(
                    self._overrides.get("MAX_RETRIES", self._defaults["MAX_RETRIES"])
                )

            def get_pool_size(self) -> int:
                return int(
                    self._overrides.get("DB_POOL_SIZE", self._defaults["DB_POOL_SIZE"])
                )

            def get_max_overflow(self) -> int:
                return int(
                    self._overrides.get(
                        "DB_MAX_OVERFLOW", self._defaults["DB_MAX_OVERFLOW"]
                    )
                )

            def get_config_value(self, key: str, default=None):
                return self._overrides.get(key, self._defaults.get(key, default))

            def set_config_value(self, key: str, value) -> None:
                self._overrides[key] = str(value)

            def get_all_config(self) -> dict[str, str]:
                config = self._defaults.copy()
                config.update(self._overrides)
                return config

            def reload_config(self) -> None:
                pass  # No-op for test config

        return TestConfigurationProvider(overrides)

    @staticmethod
    def create_production_config() -> IConfigurationProvider:
        """
        Create a configuration provider optimized for production.

        Returns:
            Production-optimized configuration provider
        """
        logger.debug("Creating production configuration provider")

        # For now, use the environment provider but could be enhanced with
        # additional validation, secret management, etc.
        config = EnvironmentConfigurationProvider()

        # Validate production requirements
        errors = []
        if not config.get_database_url().startswith(("postgresql://", "mysql://")):
            errors.append("Production requires PostgreSQL or MySQL database")

        if config.is_development_mode():
            logger.warning("Running production config in development mode")

        if errors:
            error_msg = "Production configuration validation failed: " + ", ".join(
                errors
            )
            logger.error(error_msg)
            raise ValueError(error_msg)

        return config

    @staticmethod
    def create_development_config() -> IConfigurationProvider:
        """
        Create a configuration provider optimized for development.

        Returns:
            Development-optimized configuration provider
        """
        logger.debug("Creating development configuration provider")
        return EnvironmentConfigurationProvider()

    @staticmethod
    def auto_detect_config() -> IConfigurationProvider:
        """
        Auto-detect the appropriate configuration provider based on environment.

        Returns:
            Appropriate configuration provider for the current environment
        """
        # Check environment variables to determine the mode
        import os

        environment = os.getenv("ENVIRONMENT", "development").lower()

        if environment in ("production", "prod"):
            return ConfigurationFactory.create_production_config()
        elif environment in ("test", "testing"):
            return ConfigurationFactory.create_test_config()
        else:
            return ConfigurationFactory.create_development_config()

    @staticmethod
    def validate_config(config: IConfigurationProvider) -> list[str]:
        """
        Validate a configuration provider for common issues.

        Args:
            config: Configuration provider to validate

        Returns:
            List of validation errors (empty if valid)
        """
        errors = []

        # Check required configuration
        if not config.get_database_url():
            errors.append("Database URL is required")

        # Check production-specific requirements
        if config.is_production_mode():
            if config.get_database_url().startswith("sqlite://"):
                errors.append("SQLite is not recommended for production")

        # Check cache configuration consistency
        if config.is_cache_enabled():
            if not config.get_redis_host():
                errors.append("Redis host is required when caching is enabled")

            if config.get_redis_port() <= 0 or config.get_redis_port() > 65535:
                errors.append("Invalid Redis port number")

        # Check timeout values
        if config.get_request_timeout() <= 0:
            errors.append("Request timeout must be positive")

        if config.get_max_retries() < 0:
            errors.append("Max retries cannot be negative")

        return errors

```

--------------------------------------------------------------------------------
/maverick_mcp/api/routers/screening_parallel.py:
--------------------------------------------------------------------------------

```python
"""
Enhanced parallel screening router for Maverick-MCP.

This router provides parallel versions of screening operations
for significantly improved performance.
"""

import time
from typing import Any

from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel, Field

from maverick_mcp.utils.logging import get_logger
from maverick_mcp.utils.parallel_screening import (
    make_parallel_safe,
    parallel_screen_async,
)

logger = get_logger(__name__)

# Create router
router = APIRouter(
    prefix="/screening/parallel",
    tags=["parallel_screening"],
)


class ParallelScreeningRequest(BaseModel):
    """Request model for parallel screening."""

    symbols: list[str] = Field(..., description="List of symbols to screen")
    strategy: str = Field("momentum", description="Screening strategy to use")
    max_workers: int | None = Field(None, description="Maximum parallel workers")
    min_score: float = Field(70.0, ge=0, le=100, description="Minimum score to pass")


class ScreeningResult(BaseModel):
    """Individual screening result."""

    symbol: str
    passed: bool
    score: float
    metrics: dict[str, Any]


class ParallelScreeningResponse(BaseModel):
    """Response model for parallel screening."""

    status: str
    total_symbols: int
    passed_count: int
    execution_time: float
    results: list[ScreeningResult]
    speedup_factor: float


# Module-level screening functions (required for multiprocessing)
@make_parallel_safe
def screen_momentum_parallel(symbol: str, min_score: float = 70.0) -> dict[str, Any]:
    """Momentum screening function for parallel execution."""
    from maverick_mcp.core.technical_analysis import (
        calculate_macd,
        calculate_rsi,
        calculate_sma,
    )
    from maverick_mcp.providers.stock_data import StockDataProvider

    try:
        provider = StockDataProvider(use_cache=False)
        data = provider.get_stock_data(symbol, "2023-06-01", "2024-01-01")

        if len(data) < 50:
            return {"symbol": symbol, "passed": False, "score": 0}

        # Calculate indicators
        current_price = data["Close"].iloc[-1]
        sma_20 = calculate_sma(data, 20).iloc[-1]
        sma_50 = calculate_sma(data, 50).iloc[-1]
        rsi = calculate_rsi(data, 14).iloc[-1]
        macd_line, signal_line, _ = calculate_macd(data)

        # Calculate score
        score = 0.0
        if current_price > sma_20:
            score += 25
        if current_price > sma_50:
            score += 25
        if 40 <= rsi <= 70:
            score += 25
        if macd_line.iloc[-1] > signal_line.iloc[-1]:
            score += 25

        return {
            "symbol": symbol,
            "passed": score >= min_score,
            "score": score,
            "metrics": {
                "price": round(current_price, 2),
                "sma_20": round(sma_20, 2),
                "sma_50": round(sma_50, 2),
                "rsi": round(rsi, 2),
                "above_sma_20": current_price > sma_20,
                "above_sma_50": current_price > sma_50,
                "macd_bullish": macd_line.iloc[-1] > signal_line.iloc[-1],
            },
        }

    except Exception as e:
        logger.error(f"Error screening {symbol}: {e}")
        return {"symbol": symbol, "passed": False, "score": 0, "error": str(e)}


@make_parallel_safe
def screen_value_parallel(symbol: str, min_score: float = 70.0) -> dict[str, Any]:
    """Value screening function for parallel execution."""
    from maverick_mcp.core.technical_analysis import calculate_rsi, calculate_sma
    from maverick_mcp.providers.stock_data import StockDataProvider

    try:
        provider = StockDataProvider(use_cache=False)
        data = provider.get_stock_data(symbol, "2023-01-01", "2024-01-01")

        if len(data) < 200:
            return {"symbol": symbol, "passed": False, "score": 0}

        # Calculate value metrics
        current_price = data["Close"].iloc[-1]
        sma_200 = calculate_sma(data, 200).iloc[-1]
        year_high = data["High"].max()
        year_low = data["Low"].min()
        price_range_position = (current_price - year_low) / (year_high - year_low)

        # RSI for oversold conditions
        rsi = calculate_rsi(data, 14).iloc[-1]

        # Value scoring
        score = 0.0
        if current_price < sma_200 * 0.95:  # 5% below 200 SMA
            score += 30
        if price_range_position < 0.3:  # Lower 30% of range
            score += 30
        if rsi < 35:  # Oversold
            score += 20
        if current_price < year_high * 0.7:  # 30% off highs
            score += 20

        return {
            "symbol": symbol,
            "passed": score >= min_score,
            "score": score,
            "metrics": {
                "price": round(current_price, 2),
                "sma_200": round(sma_200, 2),
                "year_high": round(year_high, 2),
                "year_low": round(year_low, 2),
                "rsi": round(rsi, 2),
                "discount_from_high": round((1 - current_price / year_high) * 100, 2),
                "below_sma_200": current_price < sma_200,
            },
        }

    except Exception as e:
        logger.error(f"Error screening {symbol}: {e}")
        return {"symbol": symbol, "passed": False, "score": 0, "error": str(e)}


# Screening strategy mapping
SCREENING_STRATEGIES = {
    "momentum": screen_momentum_parallel,
    "value": screen_value_parallel,
}


@router.post("/screen", response_model=ParallelScreeningResponse)
async def parallel_screen_stocks(request: ParallelScreeningRequest):
    """
    Screen multiple stocks in parallel for improved performance.

    This endpoint uses multiprocessing to analyze multiple stocks
    simultaneously, providing up to 4x speedup compared to sequential
    processing.
    """
    start_time = time.time()

    # Get screening function
    screening_func = SCREENING_STRATEGIES.get(request.strategy)
    if not screening_func:
        raise HTTPException(
            status_code=400,
            detail=f"Unknown strategy: {request.strategy}. "
            f"Available: {list(SCREENING_STRATEGIES.keys())}",
        )

    # Create partial function with min_score
    def screen_func(symbol):
        return screening_func(symbol, request.min_score)

    try:
        # Run parallel screening
        results = await parallel_screen_async(
            symbols=request.symbols,
            screening_func=screen_func,
            max_workers=request.max_workers,
            batch_size=10,
        )

        # Calculate execution time and speedup
        execution_time = time.time() - start_time
        sequential_estimate = len(request.symbols) * 0.5  # Assume 0.5s per symbol
        speedup_factor = sequential_estimate / execution_time

        # Format results
        formatted_results = [
            ScreeningResult(
                symbol=r["symbol"],
                passed=r.get("passed", False),
                score=r.get("score", 0),
                metrics=r.get("metrics", {}),
            )
            for r in results
        ]

        passed_count = sum(1 for r in results if r.get("passed", False))

        return ParallelScreeningResponse(
            status="success",
            total_symbols=len(request.symbols),
            passed_count=passed_count,
            execution_time=round(execution_time, 2),
            results=formatted_results,
            speedup_factor=round(speedup_factor, 2),
        )

    except Exception as e:
        logger.error(f"Parallel screening error: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail=str(e))


@router.get("/benchmark")
async def benchmark_parallel_screening(
    symbols: list[str] = Query(..., description="Symbols to benchmark"),
    strategy: str = Query("momentum", description="Strategy to benchmark"),
):
    """
    Benchmark parallel vs sequential screening performance.

    Useful for demonstrating the performance improvements.
    """

    screening_func = SCREENING_STRATEGIES.get(strategy)
    if not screening_func:
        raise HTTPException(status_code=400, detail=f"Unknown strategy: {strategy}")

    # Sequential timing
    sequential_start = time.time()
    sequential_results = []
    for symbol in symbols[:5]:  # Limit sequential test
        result = screening_func(symbol)
        sequential_results.append(result)
    sequential_time = (time.time() - sequential_start) * (
        len(symbols) / 5
    )  # Extrapolate

    # Parallel timing
    parallel_start = time.time()
    parallel_results = await parallel_screen_async(
        symbols=symbols,
        screening_func=screening_func,
        max_workers=4,
    )
    parallel_time = time.time() - parallel_start

    return {
        "symbols_count": len(symbols),
        "sequential_time_estimate": round(sequential_time, 2),
        "parallel_time_actual": round(parallel_time, 2),
        "speedup_factor": round(sequential_time / parallel_time, 2),
        "parallel_results_count": len(parallel_results),
        "performance_gain": f"{round((sequential_time / parallel_time - 1) * 100, 1)}%",
    }


@router.get("/progress/{task_id}")
async def get_screening_progress(task_id: str):
    """
    Get progress of a running screening task.

    For future implementation with background tasks.
    """
    # TODO: Implement with background task queue
    return {
        "task_id": task_id,
        "status": "not_implemented",
        "message": "Background task tracking coming soon",
    }

```

--------------------------------------------------------------------------------
/scripts/run-migrations.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash

# MaverickMCP Database Migration Script
# This script manages database migrations separately from server startup

set -e

# Colors
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
BLUE='\033[0;34m'
NC='\033[0m'

# Change to project root
cd "$(dirname "$0")/.."

# Load environment variables
if [ -f .env ]; then
    source .env
else
    echo -e "${RED}Error: .env file not found${NC}"
    exit 1
fi

# Function to display usage
usage() {
    echo -e "${BLUE}MaverickMCP Database Migration Tool${NC}"
    echo ""
    echo "Usage: $0 [command] [options]"
    echo ""
    echo "Commands:"
    echo "  status      Show current migration status"
    echo "  upgrade     Apply all pending migrations"
    echo "  downgrade   Downgrade to a specific revision"
    echo "  history     Show migration history"
    echo "  validate    Validate migration files"
    echo "  backup      Create database backup before migration"
    echo ""
    echo "Options:"
    echo "  -r, --revision <rev>   Target revision for downgrade"
    echo "  -n, --dry-run         Show what would be done without applying"
    echo "  -f, --force           Skip confirmation prompts"
    echo "  -h, --help            Show this help message"
    echo ""
    echo "Examples:"
    echo "  $0 status                    # Check migration status"
    echo "  $0 upgrade                   # Apply all migrations"
    echo "  $0 upgrade --dry-run         # Preview migrations"
    echo "  $0 downgrade -r 001          # Downgrade to revision 001"
    echo "  $0 backup                    # Create backup"
}

# Function to check database connection
check_database() {
    echo -e "${YELLOW}Checking database connection...${NC}"
    
    if [ -z "$DATABASE_URL" ]; then
        echo -e "${RED}Error: DATABASE_URL not set${NC}"
        exit 1
    fi
    
    # Extract database name from URL
    DB_NAME=$(echo $DATABASE_URL | sed -n 's/.*\/\([^?]*\).*/\1/p')
    
    # Test connection with Python
    uv run python -c "
import sys
from sqlalchemy import create_engine, text
try:
    engine = create_engine('$DATABASE_URL')
    with engine.connect() as conn:
        result = conn.execute(text('SELECT 1'))
        print('Database connection successful')
except Exception as e:
    print(f'Database connection failed: {e}')
    sys.exit(1)
" || exit 1
    
    echo -e "${GREEN}✓ Connected to database: $DB_NAME${NC}"
}

# Function to validate alembic configuration
validate_alembic() {
    if [ ! -f alembic.ini ]; then
        echo -e "${RED}Error: alembic.ini not found${NC}"
        exit 1
    fi
    
    if [ ! -d alembic/versions ]; then
        echo -e "${RED}Error: alembic/versions directory not found${NC}"
        exit 1
    fi
    
    echo -e "${GREEN}✓ Alembic configuration validated${NC}"
}

# Function to show migration status
show_status() {
    echo -e "${BLUE}Current Migration Status${NC}"
    echo "========================"
    
    # Show current revision
    echo -e "\n${YELLOW}Current database revision:${NC}"
    alembic current 2>/dev/null || echo "No migrations applied"
    
    # Show pending migrations
    echo -e "\n${YELLOW}Pending migrations:${NC}"
    alembic heads 2>/dev/null || echo "No pending migrations"
    
    # Count migration files
    MIGRATION_COUNT=$(find alembic/versions -name "*.py" | grep -v __pycache__ | wc -l)
    echo -e "\n${YELLOW}Total migration files:${NC} $MIGRATION_COUNT"
}

# Function to show migration history
show_history() {
    echo -e "${BLUE}Migration History${NC}"
    echo "================="
    alembic history --verbose
}

# Function to validate migrations
validate_migrations() {
    echo -e "${BLUE}Validating Migrations${NC}"
    echo "===================="
    
    # Check for duplicate revisions
    echo -e "${YELLOW}Checking for duplicate revisions...${NC}"
    DUPLICATES=$(find alembic/versions -name "*.py" -exec grep -H "^revision = " {} \; | 
                 grep -v __pycache__ | 
                 awk -F: '{print $2}' | 
                 sort | uniq -d)
    
    if [ -n "$DUPLICATES" ]; then
        echo -e "${RED}Error: Duplicate revisions found:${NC}"
        echo "$DUPLICATES"
        exit 1
    else
        echo -e "${GREEN}✓ No duplicate revisions${NC}"
    fi
    
    # Check for broken dependencies
    echo -e "${YELLOW}Checking migration dependencies...${NC}"
    uv run python -c "
from alembic.config import Config
from alembic.script import ScriptDirectory
config = Config('alembic.ini')
script_dir = ScriptDirectory.from_config(config)
try:
    script_dir.walk_revisions()
    print('✓ All migration dependencies valid')
except Exception as e:
    print(f'Error: {e}')
    exit(1)
" || exit 1
}

# Function to create database backup
create_backup() {
    echo -e "${BLUE}Creating Database Backup${NC}"
    echo "======================"
    
    # Extract connection details from DATABASE_URL
    DB_HOST=$(echo $DATABASE_URL | sed -n 's/.*@\([^:]*\):.*/\1/p')
    DB_NAME=$(echo $DATABASE_URL | sed -n 's/.*\/\([^?]*\).*/\1/p')
    
    BACKUP_FILE="backups/db_backup_$(date +%Y%m%d_%H%M%S).sql"
    mkdir -p backups
    
    echo -e "${YELLOW}Creating backup: $BACKUP_FILE${NC}"
    
    # Use pg_dump if PostgreSQL
    if [[ $DATABASE_URL == *"postgresql"* ]]; then
        pg_dump $DATABASE_URL > $BACKUP_FILE
    else
        echo -e "${RED}Backup not implemented for this database type${NC}"
        exit 1
    fi
    
    if [ -f $BACKUP_FILE ]; then
        SIZE=$(du -h $BACKUP_FILE | cut -f1)
        echo -e "${GREEN}✓ Backup created: $BACKUP_FILE ($SIZE)${NC}"
    else
        echo -e "${RED}Error: Backup failed${NC}"
        exit 1
    fi
}

# Function to apply migrations
apply_migrations() {
    local DRY_RUN=$1
    local FORCE=$2
    
    echo -e "${BLUE}Applying Migrations${NC}"
    echo "=================="
    
    # Show pending migrations
    echo -e "${YELLOW}Checking for pending migrations...${NC}"
    PENDING=$(alembic upgrade head --sql 2>/dev/null | grep -c "UPDATE alembic_version" || echo "0")
    
    if [ "$PENDING" -eq "0" ]; then
        echo -e "${GREEN}✓ Database is up to date${NC}"
        return 0
    fi
    
    echo -e "${YELLOW}Found pending migrations${NC}"
    
    # Dry run mode
    if [ "$DRY_RUN" == "true" ]; then
        echo -e "\n${YELLOW}SQL to be executed:${NC}"
        alembic upgrade head --sql
        return 0
    fi
    
    # Confirmation prompt
    if [ "$FORCE" != "true" ]; then
        echo -e "\n${YELLOW}Do you want to apply these migrations? (y/N)${NC}"
        read -r response
        if [[ ! "$response" =~ ^[Yy]$ ]]; then
            echo -e "${RED}Migration cancelled${NC}"
            exit 0
        fi
    fi
    
    # Apply migrations
    echo -e "\n${YELLOW}Applying migrations...${NC}"
    alembic upgrade head
    
    echo -e "${GREEN}✓ Migrations applied successfully${NC}"
    
    # Show new status
    echo -e "\n${YELLOW}New database revision:${NC}"
    alembic current
}

# Function to downgrade
downgrade_migration() {
    local REVISION=$1
    local DRY_RUN=$2
    local FORCE=$3
    
    echo -e "${BLUE}Downgrading Migration${NC}"
    echo "==================="
    
    if [ -z "$REVISION" ]; then
        echo -e "${RED}Error: Revision required for downgrade${NC}"
        usage
        exit 1
    fi
    
    # Show current revision
    echo -e "${YELLOW}Current revision:${NC}"
    alembic current
    
    # Dry run mode
    if [ "$DRY_RUN" == "true" ]; then
        echo -e "\n${YELLOW}SQL to be executed:${NC}"
        alembic downgrade $REVISION --sql
        return 0
    fi
    
    # Confirmation prompt
    if [ "$FORCE" != "true" ]; then
        echo -e "\n${RED}WARNING: This will downgrade the database to revision $REVISION${NC}"
        echo -e "${YELLOW}Do you want to continue? (y/N)${NC}"
        read -r response
        if [[ ! "$response" =~ ^[Yy]$ ]]; then
            echo -e "${RED}Downgrade cancelled${NC}"
            exit 0
        fi
    fi
    
    # Create backup first
    create_backup
    
    # Apply downgrade
    echo -e "\n${YELLOW}Downgrading to revision $REVISION...${NC}"
    alembic downgrade $REVISION
    
    echo -e "${GREEN}✓ Downgrade completed successfully${NC}"
    
    # Show new status
    echo -e "\n${YELLOW}New database revision:${NC}"
    alembic current
}

# Parse command line arguments
COMMAND=""
REVISION=""
DRY_RUN=false
FORCE=false

while [[ $# -gt 0 ]]; do
    case $1 in
        status|upgrade|downgrade|history|validate|backup)
            COMMAND=$1
            shift
            ;;
        -r|--revision)
            REVISION="$2"
            shift 2
            ;;
        -n|--dry-run)
            DRY_RUN=true
            shift
            ;;
        -f|--force)
            FORCE=true
            shift
            ;;
        -h|--help)
            usage
            exit 0
            ;;
        *)
            echo -e "${RED}Unknown option: $1${NC}"
            usage
            exit 1
            ;;
    esac
done

# Validate environment
check_database
validate_alembic

# Execute command
case $COMMAND in
    status)
        show_status
        ;;
    upgrade)
        apply_migrations $DRY_RUN $FORCE
        ;;
    downgrade)
        downgrade_migration $REVISION $DRY_RUN $FORCE
        ;;
    history)
        show_history
        ;;
    validate)
        validate_migrations
        ;;
    backup)
        create_backup
        ;;
    *)
        echo -e "${RED}Error: Command required${NC}"
        usage
        exit 1
        ;;
esac

echo -e "\n${GREEN}Migration tool completed successfully${NC}"
```

--------------------------------------------------------------------------------
/maverick_mcp/config/validation.py:
--------------------------------------------------------------------------------

```python
"""
Environment configuration validation for MaverickMCP.

This module validates all required environment variables and configuration
settings at startup to prevent runtime errors in production.
"""

import os
import sys
from typing import Any
from urllib.parse import urlparse

from maverick_mcp.config.settings import settings
from maverick_mcp.utils.logging import get_logger

logger = get_logger(__name__)


class ConfigurationError(Exception):
    """Raised when configuration validation fails."""

    pass


class EnvironmentValidator:
    """Validates environment configuration at startup."""

    def __init__(self):
        self.errors: list[str] = []
        self.warnings: list[str] = []
        self.validated_vars: set[str] = set()

    def validate_all(self) -> bool:
        """
        Run all validation checks.

        Returns:
            True if validation passes, False otherwise
        """
        logger.info("Starting environment validation...")

        # Core settings
        self._validate_core_settings()

        # Database settings
        self._validate_database_settings()

        # Redis settings
        self._validate_redis_settings()

        # API settings
        self._validate_api_settings()

        # External service settings
        self._validate_external_services()

        # Report results
        self._report_results()

        return len(self.errors) == 0

    def _validate_core_settings(self):
        """Validate core application settings."""
        # App name
        if not settings.app_name:
            self.errors.append("APP_NAME is required")

        # Environment
        if settings.environment not in ["development", "staging", "production"]:
            self.warnings.append(
                f"Unknown environment: {settings.environment}. "
                "Expected: development, staging, or production"
            )

        # Production-specific checks
        if settings.environment == "production":
            if settings.api.debug:
                self.errors.append("DEBUG must be false in production")

    def _validate_database_settings(self):
        """Validate database configuration."""
        if not settings.database.url:
            self.errors.append("DATABASE_URL is required")
            return

        # Parse and validate URL
        try:
            parsed = urlparse(settings.database.url)

            if not parsed.scheme:
                self.errors.append("DATABASE_URL missing scheme")
                return

            # SQLite validation (for personal use)
            if parsed.scheme == "sqlite":
                if not parsed.path:
                    self.errors.append("SQLite DATABASE_URL missing database path")
                return

            # PostgreSQL validation (for production)
            if parsed.scheme.startswith("postgresql"):
                if not parsed.hostname:
                    self.errors.append("PostgreSQL DATABASE_URL missing hostname")

                if not parsed.path or parsed.path == "/":
                    self.errors.append("PostgreSQL DATABASE_URL missing database name")
            else:
                self.warnings.append(
                    f"Database scheme: {parsed.scheme}. "
                    "MaverickMCP supports SQLite (personal use) and PostgreSQL (production)."
                )

            # Production-specific PostgreSQL checks
            if settings.environment == "production" and parsed.scheme.startswith(
                "postgresql"
            ):
                if parsed.hostname in ["localhost", "127.0.0.1"]:
                    self.warnings.append(
                        "Using localhost database in production. "
                        "Consider using a managed database service."
                    )

                # SSL mode check
                query_params = dict(
                    param.split("=")
                    for param in (parsed.query.split("&") if parsed.query else [])
                )
                if query_params.get("sslmode") != "require":
                    self.warnings.append(
                        "DATABASE_URL should use sslmode=require in production"
                    )

        except Exception as e:
            self.errors.append(f"Invalid DATABASE_URL format: {e}")

    def _validate_redis_settings(self):
        """Validate Redis configuration."""
        redis_url = settings.redis.url

        if not redis_url:
            self.warnings.append(
                "Redis URL not configured. Performance may be impacted."
            )
            return

        # Production Redis checks
        if settings.environment == "production":
            if "localhost" in redis_url or "127.0.0.1" in redis_url:
                self.warnings.append(
                    "Using localhost Redis in production. "
                    "Consider using a managed Redis service."
                )

            if settings.redis.password is None:
                self.warnings.append(
                    "Consider using password-protected Redis in production"
                )

            if not settings.redis.ssl:
                self.warnings.append("Consider using SSL for Redis in production")

    def _validate_api_settings(self):
        """Validate API settings."""
        # CORS origins
        if settings.environment == "production":
            if "*" in settings.api.cors_origins:
                self.errors.append(
                    "CORS wildcard (*) not allowed in production. "
                    "Set specific allowed origins."
                )

            if not settings.api.cors_origins:
                self.warnings.append("No CORS origins configured")
            else:
                # Validate each origin
                for origin in settings.api.cors_origins:
                    if (
                        origin.startswith("http://")
                        and origin != "http://localhost:3000"
                    ):
                        self.warnings.append(
                            f"Insecure HTTP origin in production: {origin}"
                        )

        # Rate limiting validation - check environment variables directly
        rate_limit_per_ip = os.getenv("RATE_LIMIT_PER_IP")
        if rate_limit_per_ip:
            try:
                if int(rate_limit_per_ip) <= 0:
                    self.errors.append("RATE_LIMIT_PER_IP must be positive")
            except ValueError:
                self.errors.append("RATE_LIMIT_PER_IP must be a valid integer")

    def _validate_external_services(self):
        """Validate external service configurations."""
        # Email service (if configured)
        if os.getenv("MAILGUN_API_KEY"):
            if not os.getenv("MAILGUN_DOMAIN"):
                self.errors.append(
                    "MAILGUN_DOMAIN required when MAILGUN_API_KEY is set"
                )

            if not os.getenv("MAILGUN_FROM_EMAIL"):
                self.warnings.append("MAILGUN_FROM_EMAIL not set, using default")

        # Monitoring services
        if settings.environment == "production":
            if not os.getenv("SENTRY_DSN"):
                self.warnings.append(
                    "SENTRY_DSN not configured. Error tracking is disabled."
                )

        # Optional API keys
        optional_keys = [
            "FRED_API_KEY",
            "TIINGO_API_KEY",
            "OPENAI_API_KEY",
            "ANTHROPIC_API_KEY",
            "CAPITAL_COMPANION_API_KEY",
        ]

        missing_optional = [key for key in optional_keys if not os.getenv(key)]
        if missing_optional:
            self.warnings.append(
                f"Optional API keys not configured: {', '.join(missing_optional)}. "
                "Some features may be limited."
            )

    def _report_results(self):
        """Report validation results."""
        if self.errors:
            logger.error(
                f"Environment validation failed with {len(self.errors)} errors:"
            )
            for error in self.errors:
                logger.error(f"  ✗ {error}")

        if self.warnings:
            logger.warning(
                f"Environment validation found {len(self.warnings)} warnings:"
            )
            for warning in self.warnings:
                logger.warning(f"  ⚠ {warning}")

        if not self.errors and not self.warnings:
            logger.info("✓ Environment validation passed successfully")
        elif not self.errors:
            logger.info(
                f"✓ Environment validation passed with {len(self.warnings)} warnings"
            )

    def get_status_dict(self) -> dict[str, Any]:
        """Get validation status as a dictionary."""
        return {
            "valid": len(self.errors) == 0,
            "errors": self.errors,
            "warnings": self.warnings,
            "environment": settings.environment,
            "auth_enabled": False,
        }


def validate_environment(fail_on_error: bool = True) -> bool:
    """
    Validate environment configuration.

    Args:
        fail_on_error: If True, exit process on validation errors

    Returns:
        True if validation passes, False otherwise
    """
    validator = EnvironmentValidator()
    is_valid = validator.validate_all()

    if not is_valid and fail_on_error:
        logger.error("Environment validation failed. Exiting...")
        sys.exit(1)

    return is_valid


def get_validation_status() -> dict[str, Any]:
    """Get current validation status without failing."""
    validator = EnvironmentValidator()
    validator.validate_all()
    return validator.get_status_dict()

```

--------------------------------------------------------------------------------
/tests/test_stock_data_fetching_service.py:
--------------------------------------------------------------------------------

```python
"""
Tests for StockDataFetchingService.
"""

from unittest.mock import Mock, patch

import pandas as pd
import pytest

from maverick_mcp.infrastructure.data_fetching import StockDataFetchingService


class TestStockDataFetchingService:
    """Test cases for StockDataFetchingService."""

    def setup_method(self):
        """Set up test fixtures."""
        self.service = StockDataFetchingService(timeout=30, max_retries=3)

    def test_init(self):
        """Test service initialization."""
        assert self.service.timeout == 30
        assert self.service.max_retries == 3

    @patch("maverick_mcp.infrastructure.data_fetching.stock_data_service.yf.Ticker")
    def test_fetch_stock_data_with_period(self, mock_ticker_class):
        """Test fetching stock data with period parameter."""
        # Mock data
        mock_data = pd.DataFrame(
            {
                "Open": [150.0, 151.0],
                "High": [152.0, 153.0],
                "Low": [149.0, 150.0],
                "Close": [151.0, 152.0],
                "Volume": [1000000, 1100000],
            },
            index=pd.date_range("2024-01-01", periods=2),
        )

        mock_ticker = Mock()
        mock_ticker.history.return_value = mock_data
        mock_ticker_class.return_value = mock_ticker

        # Test
        result = self.service.fetch_stock_data("AAPL", period="1mo")

        # Assertions
        assert not result.empty
        assert len(result) == 2
        assert list(result.columns) == ["Open", "High", "Low", "Close", "Volume"]
        assert result.index.name == "Date"
        mock_ticker.history.assert_called_once_with(period="1mo", interval="1d")

    @patch("maverick_mcp.infrastructure.data_fetching.stock_data_service.yf.Ticker")
    def test_fetch_stock_data_with_dates(self, mock_ticker_class):
        """Test fetching stock data with start and end dates."""
        # Mock data
        mock_data = pd.DataFrame(
            {
                "Open": [150.0, 151.0],
                "High": [152.0, 153.0],
                "Low": [149.0, 150.0],
                "Close": [151.0, 152.0],
                "Volume": [1000000, 1100000],
            },
            index=pd.date_range("2024-01-01", periods=2),
        )

        mock_ticker = Mock()
        mock_ticker.history.return_value = mock_data
        mock_ticker_class.return_value = mock_ticker

        # Test
        result = self.service.fetch_stock_data(
            "AAPL", start_date="2024-01-01", end_date="2024-01-02"
        )

        # Assertions
        assert not result.empty
        assert len(result) == 2
        mock_ticker.history.assert_called_once_with(
            start="2024-01-01", end="2024-01-02", interval="1d"
        )

    @patch("maverick_mcp.infrastructure.data_fetching.stock_data_service.yf.Ticker")
    def test_fetch_stock_data_empty_response(self, mock_ticker_class):
        """Test handling of empty response from data source."""
        mock_ticker = Mock()
        mock_ticker.history.return_value = pd.DataFrame()
        mock_ticker_class.return_value = mock_ticker

        # Test
        result = self.service.fetch_stock_data("INVALID")

        # Assertions
        assert result.empty  # Should return empty DataFrame with correct columns
        assert list(result.columns) == ["Open", "High", "Low", "Close", "Volume"]

    @patch("maverick_mcp.infrastructure.data_fetching.stock_data_service.yf.Ticker")
    def test_fetch_stock_data_missing_columns(self, mock_ticker_class):
        """Test handling of missing columns in response."""
        # Mock data missing some columns
        mock_data = pd.DataFrame(
            {
                "Open": [150.0, 151.0],
                "Close": [151.0, 152.0],
                # Missing High, Low, Volume
            },
            index=pd.date_range("2024-01-01", periods=2),
        )

        mock_ticker = Mock()
        mock_ticker.history.return_value = mock_data
        mock_ticker_class.return_value = mock_ticker

        # Test
        result = self.service.fetch_stock_data("AAPL")

        # Assertions
        assert not result.empty
        assert "High" in result.columns
        assert "Low" in result.columns
        assert "Volume" in result.columns
        # Check that missing columns are filled with appropriate defaults
        assert (result["Volume"] == 0).all()
        assert (result["High"] == 0.0).all()

    @patch("maverick_mcp.infrastructure.data_fetching.stock_data_service.yf.Ticker")
    def test_fetch_stock_info(self, mock_ticker_class):
        """Test fetching stock information."""
        mock_info = {
            "longName": "Apple Inc.",
            "sector": "Technology",
            "industry": "Consumer Electronics",
        }

        mock_ticker = Mock()
        mock_ticker.info = mock_info
        mock_ticker_class.return_value = mock_ticker

        # Test
        result = self.service.fetch_stock_info("AAPL")

        # Assertions
        assert result == mock_info

    @patch("maverick_mcp.infrastructure.data_fetching.stock_data_service.yf.Ticker")
    def test_fetch_realtime_data_success(self, mock_ticker_class):
        """Test successful real-time data fetching."""
        # Mock history data
        mock_history = pd.DataFrame(
            {
                "Close": [150.0],
                "Volume": [1000000],
            },
            index=pd.date_range("2024-01-01", periods=1),
        )

        # Mock info data
        mock_info = {"previousClose": 149.0}

        mock_ticker = Mock()
        mock_ticker.history.return_value = mock_history
        mock_ticker.info = mock_info
        mock_ticker_class.return_value = mock_ticker

        # Test
        result = self.service.fetch_realtime_data("AAPL")

        # Assertions
        assert result is not None
        assert result["symbol"] == "AAPL"
        assert result["price"] == 150.0
        assert result["change"] == 1.0
        assert result["change_percent"] == pytest.approx(0.67, rel=1e-1)
        assert result["volume"] == 1000000
        assert result["is_real_time"] is False

    @patch("maverick_mcp.infrastructure.data_fetching.stock_data_service.yf.Ticker")
    def test_fetch_realtime_data_empty(self, mock_ticker_class):
        """Test real-time data fetching with empty response."""
        mock_ticker = Mock()
        mock_ticker.history.return_value = pd.DataFrame()
        mock_ticker_class.return_value = mock_ticker

        # Test
        result = self.service.fetch_realtime_data("INVALID")

        # Assertions
        assert result is None

    def test_fetch_multiple_realtime_data(self):
        """Test fetching real-time data for multiple symbols."""
        with patch.object(self.service, "fetch_realtime_data") as mock_fetch:
            # Mock responses
            mock_fetch.side_effect = [
                {"symbol": "AAPL", "price": 150.0},
                None,  # Failed for INVALID
                {"symbol": "MSFT", "price": 300.0},
            ]

            # Test
            result = self.service.fetch_multiple_realtime_data(
                ["AAPL", "INVALID", "MSFT"]
            )

            # Assertions
            assert len(result) == 2  # Only successful fetches
            assert "AAPL" in result
            assert "MSFT" in result
            assert "INVALID" not in result

    @patch("maverick_mcp.infrastructure.data_fetching.stock_data_service.yf.Ticker")
    def test_fetch_news(self, mock_ticker_class):
        """Test fetching news data."""
        mock_news = [
            {
                "title": "Apple Reports Strong Earnings",
                "publisher": "Reuters",
                "link": "https://example.com",
                "providerPublishTime": 1640995200,  # Unix timestamp
                "type": "STORY",
            }
        ]

        mock_ticker = Mock()
        mock_ticker.news = mock_news
        mock_ticker_class.return_value = mock_ticker

        # Test
        result = self.service.fetch_news("AAPL", limit=1)

        # Assertions
        assert not result.empty
        assert len(result) == 1
        assert result.iloc[0]["title"] == "Apple Reports Strong Earnings"
        assert "providerPublishTime" in result.columns

    @patch("maverick_mcp.infrastructure.data_fetching.stock_data_service.yf.Ticker")
    def test_check_if_etf_true(self, mock_ticker_class):
        """Test ETF check returning True."""
        mock_info = {"quoteType": "ETF"}

        mock_ticker = Mock()
        mock_ticker.info = mock_info
        mock_ticker_class.return_value = mock_ticker

        # Test
        result = self.service.check_if_etf("SPY")

        # Assertions
        assert result is True

    @patch("maverick_mcp.infrastructure.data_fetching.stock_data_service.yf.Ticker")
    def test_check_if_etf_false(self, mock_ticker_class):
        """Test ETF check returning False."""
        mock_info = {"quoteType": "EQUITY"}

        mock_ticker = Mock()
        mock_ticker.info = mock_info
        mock_ticker_class.return_value = mock_ticker

        # Test
        result = self.service.check_if_etf("AAPL")

        # Assertions
        assert result is False

    @patch("maverick_mcp.infrastructure.data_fetching.stock_data_service.yf.Ticker")
    def test_check_if_etf_fallback(self, mock_ticker_class):
        """Test ETF check using fallback logic."""
        mock_info = {}  # No quoteType

        mock_ticker = Mock()
        mock_ticker.info = mock_info
        mock_ticker_class.return_value = mock_ticker

        # Test with known ETF symbol
        result = self.service.check_if_etf("QQQ")

        # Assertions
        assert result is True

```

--------------------------------------------------------------------------------
/examples/monitoring_example.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Example demonstrating MaverickMCP monitoring and observability features.

This example shows how to:
1. Enable monitoring and tracing
2. Use monitoring utilities in your code
3. Access monitoring endpoints
4. View metrics and traces
"""

import asyncio
import os
import time
from contextlib import asynccontextmanager

# Set environment variables for monitoring
os.environ.update(
    {
        "OTEL_TRACING_ENABLED": "true",
        "JAEGER_ENDPOINT": "http://localhost:14268/api/traces",
        "SENTRY_DSN": "",  # Optional: add your Sentry DSN
    }
)

from maverick_mcp.utils.logging import get_logger
from maverick_mcp.utils.monitoring import (
    track_cache_operation,
    track_external_api_call,
    track_tool_usage,
    update_performance_metrics,
)
from maverick_mcp.utils.tracing import (
    trace_cache_operation,
    trace_database_query,
    trace_external_api_call,
    trace_operation,
)

logger = get_logger(__name__)


@asynccontextmanager
async def example_database_operation(query_type: str, table: str):
    """Example of monitoring a database operation."""
    with trace_database_query(query_type, table, f"SELECT * FROM {table}"):
        start_time = time.time()
        try:
            # Simulate database operation
            await asyncio.sleep(0.1)
            yield "mock_result"

            # Track successful operation
            duration = time.time() - start_time
            from maverick_mcp.utils.monitoring import track_database_query

            track_database_query(query_type, table, duration, "success")

        except Exception:
            # Track failed operation
            duration = time.time() - start_time
            from maverick_mcp.utils.monitoring import track_database_query

            track_database_query(query_type, table, duration, "error")
            raise


@asynccontextmanager
async def example_external_api_call(service: str, endpoint: str):
    """Example of monitoring an external API call."""
    with trace_external_api_call(service, endpoint):
        start_time = time.time()
        try:
            # Simulate external API call
            await asyncio.sleep(0.2)
            status_code = 200
            yield {"data": "mock_response"}

            # Track successful API call
            duration = time.time() - start_time
            track_external_api_call(service, endpoint, "GET", status_code, duration)

        except Exception as e:
            # Track failed API call
            duration = time.time() - start_time
            track_external_api_call(
                service, endpoint, "GET", 500, duration, str(type(e).__name__)
            )
            raise


@asynccontextmanager
async def example_cache_operation(key: str):
    """Example of monitoring a cache operation."""
    with trace_cache_operation("get", "redis"):
        time.time()
        try:
            # Simulate cache operation
            await asyncio.sleep(0.01)
            hit = True  # Simulate cache hit
            yield "cached_value"

            # Track cache operation
            track_cache_operation("redis", "get", hit, key.split(":")[0])

        except Exception:
            # Track cache miss/error
            track_cache_operation("redis", "get", False, key.split(":")[0])
            raise


async def example_tool_execution(tool_name: str, user_id: str):
    """Example of monitoring a tool execution."""
    with trace_operation(
        f"tool.{tool_name}", {"tool.name": tool_name, "user.id": user_id}
    ):
        start_time = time.time()

        try:
            # Simulate tool execution with some operations
            logger.info(f"Executing tool: {tool_name}", extra={"user_id": user_id})

            # Example: Database query
            async with example_database_operation("SELECT", "stocks") as db_result:
                logger.info(f"Database query result: {db_result}")

            # Example: External API call
            async with example_external_api_call(
                "yahoo_finance", "/quote/AAPL"
            ) as api_result:
                logger.info(f"API call result: {api_result}")

            # Example: Cache operation
            async with example_cache_operation("stock:AAPL:price") as cache_result:
                logger.info(f"Cache result: {cache_result}")

            # Simulate processing time
            await asyncio.sleep(0.5)

            # Track successful tool execution
            duration = time.time() - start_time
            track_tool_usage(
                tool_name=tool_name,
                user_id=user_id,
                duration=duration,
                status="success",
                complexity="standard",
            )

            return {
                "status": "success",
                "data": "Tool execution completed",
                "duration_ms": int(duration * 1000),
            }

        except Exception as e:
            # Track failed tool execution
            duration = time.time() - start_time
            from maverick_mcp.utils.monitoring import track_tool_error

            track_tool_error(tool_name, type(e).__name__, "standard")

            logger.error(f"Tool execution failed: {tool_name}", exc_info=True)
            raise


async def demonstrate_monitoring():
    """Demonstrate various monitoring features."""
    logger.info("Starting monitoring demonstration...")

    # Initialize monitoring (this would normally be done by the server)
    from maverick_mcp.utils.monitoring import initialize_monitoring
    from maverick_mcp.utils.tracing import initialize_tracing

    initialize_monitoring()
    initialize_tracing()

    # Example 1: Tool execution monitoring
    logger.info("=== Example 1: Tool Execution Monitoring ===")
    result = await example_tool_execution("get_stock_data", "user123")
    print(f"Tool result: {result}")

    # Example 2: Performance metrics
    logger.info("=== Example 2: Performance Metrics ===")
    update_performance_metrics()
    print("Performance metrics updated")

    # Example 3: Multiple tool executions for metrics
    logger.info("=== Example 3: Multiple Tool Executions ===")
    tools = ["get_technical_analysis", "screen_stocks", "get_portfolio_data"]
    users = ["user123", "user456", "user789"]

    for i in range(5):
        tool = tools[i % len(tools)]
        user = users[i % len(users)]
        try:
            result = await example_tool_execution(tool, user)
            print(f"Tool {tool} for {user}: {result['status']}")
        except Exception as e:
            print(f"Tool {tool} for {user}: FAILED - {e}")

        # Small delay between executions
        await asyncio.sleep(0.1)

    # Example 4: Error scenarios
    logger.info("=== Example 4: Error Scenarios ===")
    try:
        # Simulate a tool that fails
        with trace_operation("tool.failing_tool", {"tool.name": "failing_tool"}):
            raise ValueError("Simulated tool failure")
    except ValueError as e:
        logger.error(f"Expected error: {e}")
        from maverick_mcp.utils.monitoring import track_tool_error

        track_tool_error("failing_tool", "ValueError", "standard")

    # Example 5: Security events
    logger.info("=== Example 5: Security Events ===")
    from maverick_mcp.utils.monitoring import (
        track_authentication,
        track_security_violation,
    )

    # Simulate authentication attempts
    track_authentication("bearer_token", "success", "Mozilla/5.0")
    track_authentication("bearer_token", "failure", "suspicious-bot/1.0")

    # Simulate security violation
    track_security_violation("invalid_token", "high")

    print("Security events tracked")

    # Example 6: Business metrics
    logger.info("=== Example 6: Business Metrics ===")
    from maverick_mcp.utils.monitoring import (
        track_user_session,
        update_active_users,
    )

    # Simulate engagement events
    track_user_session("registered", "api_key", duration=360.0)
    track_user_session("anonymous", "public", duration=120.0)
    update_active_users(daily_count=42, monthly_count=156)

    print("Business metrics tracked")

    logger.info("Monitoring demonstration completed!")
    print("\n=== Check Your Monitoring Stack ===")
    print("1. Prometheus metrics: http://localhost:9090")
    print("2. Grafana dashboards: http://localhost:3000")
    print("3. Jaeger traces: http://localhost:16686")
    print("4. MaverickMCP metrics: http://localhost:8000/metrics")
    print("5. MaverickMCP health: http://localhost:8000/health")


def print_monitoring_setup_instructions():
    """Print instructions for setting up the monitoring stack."""
    print("=== MaverickMCP Monitoring Setup ===")
    print()
    print("1. Start the monitoring stack:")
    print("   cd monitoring/")
    print("   docker-compose -f docker-compose.monitoring.yml up -d")
    print()
    print("2. Install monitoring dependencies:")
    print("   pip install prometheus-client opentelemetry-distro sentry-sdk")
    print()
    print("3. Set environment variables:")
    print("   export OTEL_TRACING_ENABLED=true")
    print("   export JAEGER_ENDPOINT=http://localhost:14268/api/traces")
    print()
    print("4. Start MaverickMCP:")
    print("   make dev")
    print()
    print("5. Access monitoring services:")
    print("   - Grafana: http://localhost:3000 (admin/admin)")
    print("   - Prometheus: http://localhost:9090")
    print("   - Jaeger: http://localhost:16686")
    print("   - MaverickMCP metrics: http://localhost:8000/metrics")
    print()


if __name__ == "__main__":
    import sys

    if len(sys.argv) > 1 and sys.argv[1] == "--setup":
        print_monitoring_setup_instructions()
    else:
        # Run the monitoring demonstration
        asyncio.run(demonstrate_monitoring())

```

--------------------------------------------------------------------------------
/maverick_mcp/tests/test_technical_analysis.py:
--------------------------------------------------------------------------------

```python
"""
Tests for technical analysis module.
"""

import unittest
from datetime import datetime, timedelta

import numpy as np
import pandas as pd

from maverick_mcp.core.technical_analysis import (
    add_technical_indicators,
    analyze_bollinger_bands,
    analyze_macd,
    analyze_rsi,
    analyze_stochastic,
    analyze_trend,
    analyze_volume,
    generate_outlook,
    identify_chart_patterns,
    identify_resistance_levels,
    identify_support_levels,
)


def create_test_dataframe(days=100):
    """Create a test dataframe with price data."""
    date_today = datetime.now()
    dates = [date_today - timedelta(days=i) for i in range(days)]
    dates.reverse()  # Oldest first

    # Start with a seed value and generate slightly random walk data
    np.random.seed(42)  # For reproducibility

    close_price = 100.0
    prices = []
    volumes = []

    for _ in range(days):
        # Simulate some volatility with random noise and a slight upward trend
        pct_change = np.random.normal(0.0005, 0.015)  # mean, std dev
        close_price = close_price * (1 + pct_change)

        # Calculate OHLC and volume
        open_price = close_price * (1 + np.random.normal(0, 0.005))
        high_price = max(open_price, close_price) * (
            1 + abs(np.random.normal(0, 0.008))
        )
        low_price = min(open_price, close_price) * (1 - abs(np.random.normal(0, 0.008)))
        volume = int(np.random.normal(1000000, 300000))

        prices.append([open_price, high_price, low_price, close_price])
        volumes.append(volume)

    # Create DataFrame
    df = pd.DataFrame(
        prices, index=pd.DatetimeIndex(dates), columns=["open", "high", "low", "close"]
    )
    df["volume"] = volumes

    return df


class TestTechnicalAnalysis(unittest.TestCase):
    """Test cases for technical analysis functions."""

    def setUp(self):
        """Set up test fixtures."""
        self.df = create_test_dataframe(days=200)
        self.df_with_indicators = add_technical_indicators(self.df)

    def test_add_technical_indicators(self):
        """Test that indicators are added to the dataframe."""
        result = add_technical_indicators(self.df)

        # Check that all expected columns are present
        expected_columns = [
            "open",
            "high",
            "low",
            "close",
            "volume",
            "ema_21",
            "sma_50",
            "sma_200",
            "rsi",
            "macd_12_26_9",
            "macds_12_26_9",
            "macdh_12_26_9",
            "sma_20",
            "stdev",
            "bbu_20_2.0",
            "bbl_20_2.0",
            "atr",
            "stochk_14_3_3",
            "stochd_14_3_3",
            "adx_14",
        ]

        for col in expected_columns:
            self.assertIn(col, result.columns)

        # Check that NaN values are only in the beginning (for moving windows)
        self.assertTrue(
            pd.notna(result["sma_200"].iloc[199])
            or isinstance(result["sma_200"].iloc[199], float)
        )

    def test_identify_support_levels(self):
        """Test identification of support levels."""
        support_levels = identify_support_levels(self.df_with_indicators)

        # We expect at least one support level
        self.assertGreater(len(support_levels), 0)

        # Support levels should be sorted
        self.assertEqual(support_levels, sorted(support_levels))

        # Support levels should be below current price
        current_price = self.df_with_indicators["close"].iloc[-1]
        self.assertLessEqual(support_levels[0], current_price)

    def test_identify_resistance_levels(self):
        """Test identification of resistance levels."""
        resistance_levels = identify_resistance_levels(self.df_with_indicators)

        # We expect at least one resistance level
        self.assertGreater(len(resistance_levels), 0)

        # Resistance levels should be sorted
        self.assertEqual(resistance_levels, sorted(resistance_levels))

        # At least one resistance level should be above current price
        current_price = self.df_with_indicators["close"].iloc[-1]
        self.assertGreaterEqual(resistance_levels[-1], current_price)

    def test_analyze_trend(self):
        """Test trend analysis function."""
        trend = analyze_trend(self.df_with_indicators)

        # Check that trend is an integer between 0 and 7 (trend strength score)
        self.assertIsInstance(trend, int)
        self.assertGreaterEqual(trend, 0)
        self.assertLessEqual(trend, 7)

    def test_analyze_rsi(self):
        """Test RSI analysis function."""
        rsi_analysis = analyze_rsi(self.df_with_indicators)

        # Check that analysis contains expected keys
        expected_keys = ["current", "signal", "description"]
        for key in expected_keys:
            self.assertIn(key, rsi_analysis)

        # Check value ranges
        self.assertGreaterEqual(rsi_analysis["current"], 0)
        self.assertLessEqual(rsi_analysis["current"], 100)

        # Check signal values
        self.assertIn(
            rsi_analysis["signal"], ["bullish", "bearish", "overbought", "oversold"]
        )

    def test_analyze_macd(self):
        """Test MACD analysis function."""
        macd_analysis = analyze_macd(self.df_with_indicators)

        # Check that analysis contains expected keys
        expected_keys = [
            "macd",
            "signal",
            "histogram",
            "indicator",
            "crossover",
            "description",
        ]
        for key in expected_keys:
            self.assertIn(key, macd_analysis)

        # Check signal values
        self.assertIn(
            macd_analysis["indicator"],
            ["bullish", "bearish", "improving", "weakening", "neutral"],
        )

        self.assertIn(
            macd_analysis["crossover"],
            [
                "bullish crossover detected",
                "bearish crossover detected",
                "no recent crossover",
            ],
        )

    def test_generate_outlook(self):
        """Test outlook generation function."""
        # First, get required analyses
        trend = analyze_trend(self.df_with_indicators)
        rsi_analysis = analyze_rsi(self.df_with_indicators)
        macd_analysis = analyze_macd(self.df_with_indicators)
        stoch_analysis = analyze_stochastic(self.df_with_indicators)

        # Generate outlook
        trend_direction = (
            "bullish" if trend > 3 else "bearish" if trend < -3 else "neutral"
        )
        outlook = generate_outlook(
            self.df_with_indicators,
            trend_direction,
            rsi_analysis,
            macd_analysis,
            stoch_analysis,
        )

        # Check output
        self.assertIn(
            outlook,
            [
                "strongly bullish",
                "moderately bullish",
                "strongly bearish",
                "moderately bearish",
                "neutral",
            ],
        )

    def test_analyze_bollinger_bands(self):
        """Test Bollinger Bands analysis function."""
        bb_analysis = analyze_bollinger_bands(self.df_with_indicators)

        # Check that analysis contains expected keys
        expected_keys = [
            "upper_band",
            "middle_band",
            "lower_band",
            "position",
            "signal",
            "volatility",
            "description",
        ]
        for key in expected_keys:
            self.assertIn(key, bb_analysis)

        # Check value types
        self.assertIsInstance(bb_analysis["upper_band"], float)
        self.assertIsInstance(bb_analysis["middle_band"], float)
        self.assertIsInstance(bb_analysis["lower_band"], float)
        self.assertIsInstance(bb_analysis["position"], str)
        self.assertIsInstance(bb_analysis["signal"], str)
        self.assertIsInstance(bb_analysis["volatility"], str)
        self.assertIsInstance(bb_analysis["description"], str)

        # Check plausible signal values
        self.assertIn(
            bb_analysis["signal"], ["overbought", "oversold", "bullish", "bearish"]
        )

    def test_analyze_volume(self):
        """Test volume analysis function."""
        volume_analysis = analyze_volume(self.df_with_indicators)

        # Check that analysis contains expected keys
        expected_keys = ["current", "average", "ratio", "description", "signal"]
        for key in expected_keys:
            self.assertIn(key, volume_analysis)

        # Check value types
        self.assertIsInstance(volume_analysis["current"], int)
        self.assertIsInstance(volume_analysis["average"], int)
        self.assertIsInstance(volume_analysis["ratio"], float)
        self.assertIsInstance(volume_analysis["description"], str)
        self.assertIsInstance(volume_analysis["signal"], str)

        # Check plausible signal values
        self.assertIn(
            volume_analysis["signal"],
            [
                "bullish (high volume on up move)",
                "bearish (high volume on down move)",
                "weak conviction",
                "neutral",
            ],
        )

    def test_identify_chart_patterns(self):
        """Test chart pattern identification function."""
        patterns = identify_chart_patterns(self.df_with_indicators)

        # Should return a list
        self.assertIsInstance(patterns, list)

        # All elements should be strings
        for pattern in patterns:
            self.assertIsInstance(pattern, str)

        # All patterns should be from the known set
        known_patterns = [
            "Double Bottom (W)",
            "Double Top (M)",
            "Bullish Flag/Pennant",
            "Bearish Flag/Pennant",
        ]
        for pattern in patterns:
            self.assertIn(pattern, known_patterns)


if __name__ == "__main__":
    unittest.main()

```

--------------------------------------------------------------------------------
/maverick_mcp/api/services/portfolio_service.py:
--------------------------------------------------------------------------------

```python
"""
Portfolio service for MaverickMCP API.

Handles portfolio analysis, watchlist management, and portfolio-related operations.
Extracted from server.py to improve code organization and maintainability.
"""

from typing import Any

from .base_service import BaseService


class PortfolioService(BaseService):
    """
    Service class for portfolio operations.

    Provides portfolio summaries, watchlist management, and portfolio analysis functionality.
    """

    def register_tools(self):
        """Register portfolio tools with MCP."""

        @self.mcp.tool()
        async def get_user_portfolio_summary() -> dict[str, Any]:
            """
            Get comprehensive portfolio summary for the authenticated user.

            Requires authentication. Provides detailed portfolio analytics including
            holdings, performance metrics, risk analysis, and recommendations.

            Returns:
                Dictionary containing comprehensive portfolio analysis
            """
            return await self._get_user_portfolio_summary()

        @self.mcp.tool()
        async def get_watchlist(limit: int = 20) -> dict[str, Any]:
            """
            Get user's stock watchlist with current prices and key metrics.

            Args:
                limit: Maximum number of stocks to return (1-100, default: 20)

            Returns:
                Dictionary containing watchlist stocks with current market data
            """
            return await self._get_watchlist(limit)

    async def _get_user_portfolio_summary(self) -> dict[str, Any]:
        """Get user portfolio summary implementation."""
        if not self.is_auth_enabled():
            return {
                "error": "Authentication required for portfolio access",
                "auth_required": True,
            }

        try:
            # TODO: Implement actual portfolio retrieval from database
            # This would integrate with user portfolio data

            # Placeholder portfolio data
            portfolio_summary = {
                "account_info": {
                    "account_value": 125_450.67,
                    "cash_balance": 12_340.50,
                    "invested_amount": 113_110.17,
                    "currency": "USD",
                },
                "performance": {
                    "total_return": 15_450.67,
                    "total_return_pct": 14.05,
                    "day_change": -523.45,
                    "day_change_pct": -0.42,
                    "ytd_return": 8_950.23,
                    "ytd_return_pct": 7.68,
                },
                "holdings": [
                    {
                        "symbol": "AAPL",
                        "name": "Apple Inc.",
                        "shares": 50,
                        "avg_cost": 150.25,
                        "current_price": 175.80,
                        "market_value": 8_790.00,
                        "unrealized_gain": 1_277.50,
                        "unrealized_gain_pct": 17.00,
                        "weight": 7.01,
                    },
                    {
                        "symbol": "MSFT",
                        "name": "Microsoft Corporation",
                        "shares": 25,
                        "avg_cost": 280.50,
                        "current_price": 310.45,
                        "market_value": 7_761.25,
                        "unrealized_gain": 748.75,
                        "unrealized_gain_pct": 10.67,
                        "weight": 6.19,
                    },
                    # ... more holdings
                ],
                "asset_allocation": {
                    "stocks": 85.5,
                    "cash": 9.8,
                    "bonds": 4.7,
                },
                "sector_allocation": {
                    "Technology": 35.2,
                    "Healthcare": 18.5,
                    "Financial Services": 12.3,
                    "Consumer Cyclical": 10.8,
                    "Other": 23.2,
                },
                "risk_metrics": {
                    "beta": 1.15,
                    "sharpe_ratio": 1.42,
                    "max_drawdown": -8.5,
                    "volatility": 16.8,
                },
                "recommendations": [
                    "Consider rebalancing technology allocation (currently 35.2%)",
                    "Increase cash position for upcoming opportunities",
                    "Review underperforming positions",
                ],
                "last_updated": self._get_current_timestamp(),
            }

            self.log_tool_usage("get_user_portfolio_summary")

            return portfolio_summary

        except Exception as e:
            self.logger.error(f"Failed to get portfolio summary: {e}")
            return {
                "error": f"Failed to retrieve portfolio summary: {str(e)}",
                "auth_required": self.is_auth_enabled(),
            }

    async def _get_watchlist(self, limit: int = 20) -> dict[str, Any]:
        """Get watchlist implementation."""
        # Validate limit
        if not isinstance(limit, int) or limit < 1 or limit > 100:
            return {
                "error": "Limit must be an integer between 1 and 100",
                "provided_limit": limit,
            }

        try:
            from maverick_mcp.providers.stock_data import StockDataProvider

            # TODO: Get actual user watchlist from database
            # For now, use a sample watchlist
            watchlist_symbols = [
                "AAPL",
                "MSFT",
                "GOOGL",
                "AMZN",
                "TSLA",
                "NVDA",
                "META",
                "NFLX",
                "ADBE",
                "CRM",
                "ORCL",
                "INTC",
                "AMD",
                "PYPL",
                "ZOOM",
            ]

            # Limit the symbols
            limited_symbols = watchlist_symbols[:limit]

            stock_provider = StockDataProvider()
            watchlist_data = []

            for symbol in limited_symbols:
                try:
                    # Get current stock data
                    stock_info = await stock_provider.get_stock_info_async(symbol)

                    # Get price data for trend analysis
                    price_data = await stock_provider.get_stock_data_async(
                        symbol, days=30
                    )

                    if not price_data.empty:
                        current_price = price_data["Close"].iloc[-1]
                        prev_close = (
                            price_data["Close"].iloc[-2]
                            if len(price_data) > 1
                            else current_price
                        )
                        day_change = current_price - prev_close
                        day_change_pct = (
                            (day_change / prev_close) * 100 if prev_close != 0 else 0
                        )

                        # Calculate 30-day trend
                        thirty_day_change = current_price - price_data["Close"].iloc[0]
                        thirty_day_change_pct = (
                            thirty_day_change / price_data["Close"].iloc[0]
                        ) * 100

                        watchlist_item = {
                            "symbol": symbol,
                            "name": stock_info.get(
                                "longName", stock_info.get("shortName", symbol)
                            ),
                            "current_price": round(current_price, 2),
                            "day_change": round(day_change, 2),
                            "day_change_pct": round(day_change_pct, 2),
                            "thirty_day_change": round(thirty_day_change, 2),
                            "thirty_day_change_pct": round(thirty_day_change_pct, 2),
                            "volume": int(price_data["Volume"].iloc[-1]),
                            "market_cap": stock_info.get("marketCap"),
                            "pe_ratio": stock_info.get("trailingPE"),
                            "sector": stock_info.get("sector"),
                            "industry": stock_info.get("industry"),
                        }

                        watchlist_data.append(watchlist_item)

                except Exception as e:
                    self.logger.warning(f"Failed to get data for {symbol}: {e}")
                    continue

            result = {
                "watchlist": watchlist_data,
                "total_symbols": len(watchlist_data),
                "requested_limit": limit,
                "market_status": "open",  # This would be determined by market hours
                "last_updated": self._get_current_timestamp(),
                "summary": {
                    "gainers": len(
                        [item for item in watchlist_data if item["day_change_pct"] > 0]
                    ),
                    "losers": len(
                        [item for item in watchlist_data if item["day_change_pct"] < 0]
                    ),
                    "unchanged": len(
                        [item for item in watchlist_data if item["day_change_pct"] == 0]
                    ),
                },
            }

            self.log_tool_usage(
                "get_watchlist", limit=limit, symbols_returned=len(watchlist_data)
            )

            return result

        except Exception as e:
            self.logger.error(f"Failed to get watchlist: {e}")
            return {
                "error": f"Failed to retrieve watchlist: {str(e)}",
                "requested_limit": limit,
            }

    def _get_current_timestamp(self) -> str:
        """Get current timestamp in ISO format."""
        from datetime import UTC, datetime

        return datetime.now(UTC).isoformat()

```

--------------------------------------------------------------------------------
/maverick_mcp/providers/mocks/mock_persistence.py:
--------------------------------------------------------------------------------

```python
"""
Mock data persistence implementation for testing.
"""

from datetime import datetime
from typing import Any

import pandas as pd
from sqlalchemy.orm import Session


class MockSession:
    """Mock SQLAlchemy session for testing."""

    def __init__(self):
        self.closed = False
        self.committed = False
        self.rolled_back = False

    def close(self):
        self.closed = True

    def commit(self):
        self.committed = True

    def rollback(self):
        self.rolled_back = True


class MockDataPersistence:
    """
    Mock implementation of IDataPersistence for testing.
    """

    def __init__(self):
        """Initialize the mock persistence layer."""
        self._price_data: dict[str, pd.DataFrame] = {}
        self._stock_records: dict[str, dict[str, Any]] = {}
        self._screening_results: dict[str, list[dict[str, Any]]] = {}
        self._call_log: list[dict[str, Any]] = []

    async def get_session(self) -> Session:
        """Get a mock database session."""
        self._log_call("get_session", {})
        return MockSession()

    async def get_read_only_session(self) -> Session:
        """Get a mock read-only database session."""
        self._log_call("get_read_only_session", {})
        return MockSession()

    async def save_price_data(
        self, session: Session, symbol: str, data: pd.DataFrame
    ) -> int:
        """Save mock price data."""
        self._log_call("save_price_data", {"symbol": symbol, "data_length": len(data)})

        symbol = symbol.upper()

        # Store the data
        if symbol in self._price_data:
            # Merge with existing data
            existing_data = self._price_data[symbol]
            combined = pd.concat([existing_data, data])
            # Remove duplicates and sort
            combined = combined[~combined.index.duplicated(keep="last")].sort_index()
            self._price_data[symbol] = combined
        else:
            self._price_data[symbol] = data.copy()

        return len(data)

    async def get_price_data(
        self,
        session: Session,
        symbol: str,
        start_date: str,
        end_date: str,
    ) -> pd.DataFrame:
        """Retrieve mock price data."""
        self._log_call(
            "get_price_data",
            {
                "symbol": symbol,
                "start_date": start_date,
                "end_date": end_date,
            },
        )

        symbol = symbol.upper()

        if symbol not in self._price_data:
            # Return empty DataFrame with proper columns
            return pd.DataFrame(columns=["Open", "High", "Low", "Close", "Volume"])

        data = self._price_data[symbol].copy()

        # Filter by date range
        if start_date:
            data = data[data.index >= start_date]
        if end_date:
            data = data[data.index <= end_date]

        return data

    async def get_or_create_stock(self, session: Session, symbol: str) -> Any:
        """Get or create a mock stock record."""
        self._log_call("get_or_create_stock", {"symbol": symbol})

        symbol = symbol.upper()

        if symbol not in self._stock_records:
            self._stock_records[symbol] = {
                "symbol": symbol,
                "company_name": f"{symbol} Inc.",
                "sector": "Technology",
                "industry": "Software",
                "exchange": "NASDAQ",
                "currency": "USD",
                "country": "US",
                "created_at": datetime.now(),
            }

        return self._stock_records[symbol]

    async def save_screening_results(
        self,
        session: Session,
        screening_type: str,
        results: list[dict[str, Any]],
    ) -> int:
        """Save mock screening results."""
        self._log_call(
            "save_screening_results",
            {
                "screening_type": screening_type,
                "results_count": len(results),
            },
        )

        self._screening_results[screening_type] = results.copy()
        return len(results)

    async def get_screening_results(
        self,
        session: Session,
        screening_type: str,
        limit: int | None = None,
        min_score: float | None = None,
    ) -> list[dict[str, Any]]:
        """Retrieve mock screening results."""
        self._log_call(
            "get_screening_results",
            {
                "screening_type": screening_type,
                "limit": limit,
                "min_score": min_score,
            },
        )

        if screening_type not in self._screening_results:
            return self._generate_default_screening_results(screening_type)

        results = self._screening_results[screening_type].copy()

        # Apply filters
        if min_score is not None:
            if screening_type == "maverick":
                results = [
                    r for r in results if r.get("combined_score", 0) >= min_score
                ]
            elif screening_type == "bearish":
                results = [r for r in results if r.get("score", 0) >= min_score]
            elif screening_type == "trending":
                results = [
                    r for r in results if r.get("momentum_score", 0) >= min_score
                ]

        if limit is not None:
            results = results[:limit]

        return results

    async def get_latest_screening_data(self) -> dict[str, list[dict[str, Any]]]:
        """Get mock latest screening data."""
        self._log_call("get_latest_screening_data", {})

        return {
            "maverick_stocks": await self.get_screening_results(None, "maverick"),
            "maverick_bear_stocks": await self.get_screening_results(None, "bearish"),
            "supply_demand_breakouts": await self.get_screening_results(
                None, "trending"
            ),
        }

    async def check_data_freshness(self, symbol: str, max_age_hours: int = 24) -> bool:
        """Check mock data freshness."""
        self._log_call(
            "check_data_freshness", {"symbol": symbol, "max_age_hours": max_age_hours}
        )

        # For testing, assume data is fresh if it exists
        symbol = symbol.upper()
        return symbol in self._price_data

    async def bulk_save_price_data(
        self, session: Session, symbol: str, data: pd.DataFrame
    ) -> int:
        """Bulk save mock price data."""
        return await self.save_price_data(session, symbol, data)

    async def get_symbols_with_data(
        self, session: Session, limit: int | None = None
    ) -> list[str]:
        """Get mock list of symbols with data."""
        self._log_call("get_symbols_with_data", {"limit": limit})

        symbols = list(self._price_data.keys())

        if limit is not None:
            symbols = symbols[:limit]

        return symbols

    async def cleanup_old_data(self, session: Session, days_to_keep: int = 365) -> int:
        """Mock cleanup of old data."""
        self._log_call("cleanup_old_data", {"days_to_keep": days_to_keep})

        # For testing, return 0 (no cleanup performed)
        return 0

    def _generate_default_screening_results(
        self, screening_type: str
    ) -> list[dict[str, Any]]:
        """Generate default screening results for testing."""
        if screening_type == "maverick":
            return [
                {
                    "symbol": "TEST1",
                    "combined_score": 95,
                    "momentum_score": 92,
                    "pattern": "Cup with Handle",
                    "consolidation": "yes",
                    "squeeze": "firing",
                },
                {
                    "symbol": "TEST2",
                    "combined_score": 88,
                    "momentum_score": 85,
                    "pattern": "Flat Base",
                    "consolidation": "no",
                    "squeeze": "setup",
                },
            ]
        elif screening_type == "bearish":
            return [
                {
                    "symbol": "BEAR1",
                    "score": 92,
                    "momentum_score": 25,
                    "rsi_14": 28,
                    "atr_contraction": True,
                    "big_down_vol": True,
                },
            ]
        elif screening_type == "trending":
            return [
                {
                    "symbol": "TREND1",
                    "momentum_score": 95,
                    "close": 150.25,
                    "sma_50": 145.50,
                    "sma_150": 140.25,
                    "sma_200": 135.75,
                    "pattern": "Breakout",
                },
            ]
        else:
            return []

    # Testing utilities

    def _log_call(self, method: str, args: dict[str, Any]) -> None:
        """Log method calls for testing verification."""
        self._call_log.append(
            {
                "method": method,
                "args": args,
                "timestamp": datetime.now(),
            }
        )

    def get_call_log(self) -> list[dict[str, Any]]:
        """Get the log of method calls."""
        return self._call_log.copy()

    def clear_call_log(self) -> None:
        """Clear the method call log."""
        self._call_log.clear()

    def set_price_data(self, symbol: str, data: pd.DataFrame) -> None:
        """Set price data for testing."""
        self._price_data[symbol.upper()] = data

    def get_stored_price_data(self, symbol: str) -> pd.DataFrame | None:
        """Get stored price data for testing verification."""
        return self._price_data.get(symbol.upper())

    def set_screening_results(
        self, screening_type: str, results: list[dict[str, Any]]
    ) -> None:
        """Set screening results for testing."""
        self._screening_results[screening_type] = results

    def clear_all_data(self) -> None:
        """Clear all stored data."""
        self._price_data.clear()
        self._stock_records.clear()
        self._screening_results.clear()

```

--------------------------------------------------------------------------------
/maverick_mcp/infrastructure/caching/cache_management_service.py:
--------------------------------------------------------------------------------

```python
"""
Cache Management Service - Responsible only for cache operations.
"""

import logging

import pandas as pd
from sqlalchemy.orm import Session

from maverick_mcp.data.models import (
    PriceCache,
    SessionLocal,
    Stock,
    bulk_insert_price_data,
)

logger = logging.getLogger("maverick_mcp.cache_management")


class CacheManagementService:
    """
    Service responsible ONLY for cache operations.

    This service:
    - Manages Redis and database cache layers
    - Handles cache key generation and TTL management
    - Contains no data fetching logic
    - Contains no business logic beyond caching
    """

    def __init__(self, db_session: Session | None = None, cache_days: int = 1):
        """
        Initialize the cache management service.

        Args:
            db_session: Optional database session for dependency injection
            cache_days: Number of days to cache data
        """
        self.cache_days = cache_days
        self._db_session = db_session

    def get_cached_data(
        self, symbol: str, start_date: str, end_date: str
    ) -> pd.DataFrame | None:
        """
        Get cached data from database within date range.

        Args:
            symbol: Stock ticker symbol (will be uppercased)
            start_date: Start date in YYYY-MM-DD format
            end_date: End date in YYYY-MM-DD format

        Returns:
            DataFrame with cached data or None if not found
        """
        symbol = symbol.upper()
        session, should_close = self._get_db_session()

        try:
            logger.info(f"Checking cache for {symbol} from {start_date} to {end_date}")

            # Get whatever data exists in the range
            df = PriceCache.get_price_data(session, symbol, start_date, end_date)

            if df.empty:
                logger.info(f"No cached data found for {symbol}")
                return None

            logger.info(f"Found {len(df)} cached records for {symbol}")

            # Normalize the data to match expected format
            df = self._normalize_cached_data(df)
            return df

        except Exception as e:
            logger.error(f"Error getting cached data for {symbol}: {e}")
            return None
        finally:
            if should_close:
                session.close()

    def cache_data(self, symbol: str, df: pd.DataFrame) -> bool:
        """
        Cache price data in the database.

        Args:
            symbol: Stock ticker symbol
            df: DataFrame with price data

        Returns:
            True if caching was successful, False otherwise
        """
        if df.empty:
            logger.info(f"Empty DataFrame provided for {symbol}, skipping cache")
            return True

        symbol = symbol.upper()
        session, should_close = self._get_db_session()

        try:
            logger.info(f"Caching {len(df)} records for {symbol}")

            # Ensure stock exists in database
            self._ensure_stock_exists(session, symbol)

            # Prepare DataFrame for caching
            cache_df = self._prepare_data_for_cache(df)

            # Insert data
            count = bulk_insert_price_data(session, symbol, cache_df)
            if count == 0:
                logger.info(
                    f"No new records cached for {symbol} (data may already exist)"
                )
            else:
                logger.info(
                    f"Successfully cached {count} new price records for {symbol}"
                )

            return True

        except Exception as e:
            logger.error(f"Error caching price data for {symbol}: {e}", exc_info=True)
            session.rollback()
            return False
        finally:
            if should_close:
                session.close()

    def invalidate_cache(self, symbol: str, start_date: str, end_date: str) -> bool:
        """
        Invalidate cached data for a symbol within a date range.

        Args:
            symbol: Stock ticker symbol
            start_date: Start date in YYYY-MM-DD format
            end_date: End date in YYYY-MM-DD format

        Returns:
            True if invalidation was successful, False otherwise
        """
        symbol = symbol.upper()
        session, should_close = self._get_db_session()

        try:
            logger.info(
                f"Invalidating cache for {symbol} from {start_date} to {end_date}"
            )

            # Delete cached data in the specified range
            deleted_count = PriceCache.delete_price_data(
                session, symbol, start_date, end_date
            )
            logger.info(f"Invalidated {deleted_count} cached records for {symbol}")

            return True

        except Exception as e:
            logger.error(f"Error invalidating cache for {symbol}: {e}")
            return False
        finally:
            if should_close:
                session.close()

    def get_cache_stats(self, symbol: str) -> dict:
        """
        Get cache statistics for a symbol.

        Args:
            symbol: Stock ticker symbol

        Returns:
            Dictionary with cache statistics
        """
        symbol = symbol.upper()
        session, should_close = self._get_db_session()

        try:
            stats = PriceCache.get_cache_stats(session, symbol)
            return {
                "symbol": symbol,
                "total_records": stats.get("total_records", 0),
                "date_range": stats.get("date_range", {}),
                "last_updated": stats.get("last_updated"),
            }
        except Exception as e:
            logger.error(f"Error getting cache stats for {symbol}: {e}")
            return {
                "symbol": symbol,
                "total_records": 0,
                "date_range": {},
                "last_updated": None,
            }
        finally:
            if should_close:
                session.close()

    def _get_db_session(self) -> tuple[Session, bool]:
        """
        Get a database session.

        Returns:
            Tuple of (session, should_close) where should_close indicates
            whether the caller should close the session.
        """
        # Use injected session if available - should NOT be closed
        if self._db_session:
            return self._db_session, False

        # Otherwise, create a new session - should be closed
        try:
            session = SessionLocal()
            return session, True
        except Exception as e:
            logger.error(f"Failed to get database session: {e}", exc_info=True)
            raise

    def _normalize_cached_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Normalize cached data to match expected format.

        Args:
            df: Raw DataFrame from cache

        Returns:
            Normalized DataFrame
        """
        # Add expected columns for compatibility
        for col in ["Dividends", "Stock Splits"]:
            if col not in df.columns:
                df[col] = 0.0

        # Ensure column names match yfinance format
        column_mapping = {
            "open": "Open",
            "high": "High",
            "low": "Low",
            "close": "Close",
            "volume": "Volume",
        }
        df.rename(columns=column_mapping, inplace=True)

        # Ensure proper data types to match yfinance
        for col in ["Open", "High", "Low", "Close"]:
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors="coerce").astype("float64")

        # Convert volume to int
        if "Volume" in df.columns:
            df["Volume"] = (
                pd.to_numeric(df["Volume"], errors="coerce").fillna(0).astype("int64")
            )

        return df

    def _prepare_data_for_cache(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Prepare DataFrame for caching by normalizing column names.

        Args:
            df: DataFrame to prepare

        Returns:
            Prepared DataFrame
        """
        cache_df = df.copy()

        # Ensure proper column names for database
        column_mapping = {
            "Open": "open",
            "High": "high",
            "Low": "low",
            "Close": "close",
            "Volume": "volume",
        }
        cache_df.rename(columns=column_mapping, inplace=True)

        logger.debug(
            f"DataFrame columns after preparation: {cache_df.columns.tolist()}"
        )
        logger.debug(f"DataFrame shape: {cache_df.shape}")

        return cache_df

    def _ensure_stock_exists(self, session: Session, symbol: str) -> Stock:
        """
        Ensure a stock exists in the database.

        Args:
            session: Database session
            symbol: Stock ticker symbol

        Returns:
            Stock object
        """
        try:
            stock = Stock.get_or_create(session, symbol)
            return stock
        except Exception as e:
            logger.error(f"Error ensuring stock {symbol} exists: {e}")
            raise

    def check_cache_health(self) -> dict:
        """
        Check the health of the cache system.

        Returns:
            Dictionary with cache health information
        """
        try:
            session, should_close = self._get_db_session()
            try:
                # Test basic database connectivity
                result = session.execute("SELECT 1")
                result.fetchone()

                # Get basic cache statistics
                total_records = session.query(PriceCache).count()

                return {
                    "status": "healthy",
                    "database_connection": True,
                    "total_cached_records": total_records,
                }
            finally:
                if should_close:
                    session.close()

        except Exception as e:
            logger.error(f"Cache health check failed: {e}")
            return {
                "status": "unhealthy",
                "database_connection": False,
                "error": str(e),
            }

```
Page 5/29FirstPrevNextLast