#
tokens: 47532/50000 7/435 files (page 20/29)
lines: off (toggle) GitHub
raw markdown copy
This is page 20 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── feature_request.md
│   │   ├── question.md
│   │   └── security_report.md
│   ├── pull_request_template.md
│   └── workflows
│       ├── claude-code-review.yml
│       └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│   ├── launch.json
│   └── settings.json
├── alembic
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       ├── 001_initial_schema.py
│       ├── 003_add_performance_indexes.py
│       ├── 006_rename_metadata_columns.py
│       ├── 008_performance_optimization_indexes.py
│       ├── 009_rename_to_supply_demand.py
│       ├── 010_self_contained_schema.py
│       ├── 011_remove_proprietary_terms.py
│       ├── 013_add_backtest_persistence_models.py
│       ├── 014_add_portfolio_models.py
│       ├── 08e3945a0c93_merge_heads.py
│       ├── 9374a5c9b679_merge_heads_for_testing.py
│       ├── abf9b9afb134_merge_multiple_heads.py
│       ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│       ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│       ├── f0696e2cac15_add_essential_performance_indexes.py
│       └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── api
│   │   └── backtesting.md
│   ├── BACKTESTING.md
│   ├── COST_BASIS_SPECIFICATION.md
│   ├── deep_research_agent.md
│   ├── exa_research_testing_strategy.md
│   ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│   ├── PORTFOLIO.md
│   ├── SETUP_SELF_CONTAINED.md
│   └── speed_testing_framework.md
├── examples
│   ├── complete_speed_validation.py
│   ├── deep_research_integration.py
│   ├── llm_optimization_example.py
│   ├── llm_speed_demo.py
│   ├── monitoring_example.py
│   ├── parallel_research_example.py
│   ├── speed_optimization_demo.py
│   └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│   ├── __init__.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── circuit_breaker.py
│   │   ├── deep_research.py
│   │   ├── market_analysis.py
│   │   ├── optimized_research.py
│   │   ├── supervisor.py
│   │   └── technical_analysis.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── api_server.py
│   │   ├── connection_manager.py
│   │   ├── dependencies
│   │   │   ├── __init__.py
│   │   │   ├── stock_analysis.py
│   │   │   └── technical_analysis.py
│   │   ├── error_handling.py
│   │   ├── inspector_compatible_sse.py
│   │   ├── inspector_sse.py
│   │   ├── middleware
│   │   │   ├── error_handling.py
│   │   │   ├── mcp_logging.py
│   │   │   ├── rate_limiting_enhanced.py
│   │   │   └── security.py
│   │   ├── openapi_config.py
│   │   ├── routers
│   │   │   ├── __init__.py
│   │   │   ├── agents.py
│   │   │   ├── backtesting.py
│   │   │   ├── data_enhanced.py
│   │   │   ├── data.py
│   │   │   ├── health_enhanced.py
│   │   │   ├── health_tools.py
│   │   │   ├── health.py
│   │   │   ├── intelligent_backtesting.py
│   │   │   ├── introspection.py
│   │   │   ├── mcp_prompts.py
│   │   │   ├── monitoring.py
│   │   │   ├── news_sentiment_enhanced.py
│   │   │   ├── performance.py
│   │   │   ├── portfolio.py
│   │   │   ├── research.py
│   │   │   ├── screening_ddd.py
│   │   │   ├── screening_parallel.py
│   │   │   ├── screening.py
│   │   │   ├── technical_ddd.py
│   │   │   ├── technical_enhanced.py
│   │   │   ├── technical.py
│   │   │   └── tool_registry.py
│   │   ├── server.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   ├── base_service.py
│   │   │   ├── market_service.py
│   │   │   ├── portfolio_service.py
│   │   │   ├── prompt_service.py
│   │   │   └── resource_service.py
│   │   ├── simple_sse.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── insomnia_export.py
│   │       └── postman_export.py
│   ├── application
│   │   ├── __init__.py
│   │   ├── commands
│   │   │   └── __init__.py
│   │   ├── dto
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_dto.py
│   │   ├── queries
│   │   │   ├── __init__.py
│   │   │   └── get_technical_analysis.py
│   │   └── screening
│   │       ├── __init__.py
│   │       ├── dtos.py
│   │       └── queries.py
│   ├── backtesting
│   │   ├── __init__.py
│   │   ├── ab_testing.py
│   │   ├── analysis.py
│   │   ├── batch_processing_stub.py
│   │   ├── batch_processing.py
│   │   ├── model_manager.py
│   │   ├── optimization.py
│   │   ├── persistence.py
│   │   ├── retraining_pipeline.py
│   │   ├── strategies
│   │   │   ├── __init__.py
│   │   │   ├── base.py
│   │   │   ├── ml
│   │   │   │   ├── __init__.py
│   │   │   │   ├── adaptive.py
│   │   │   │   ├── ensemble.py
│   │   │   │   ├── feature_engineering.py
│   │   │   │   └── regime_aware.py
│   │   │   ├── ml_strategies.py
│   │   │   ├── parser.py
│   │   │   └── templates.py
│   │   ├── strategy_executor.py
│   │   ├── vectorbt_engine.py
│   │   └── visualization.py
│   ├── config
│   │   ├── __init__.py
│   │   ├── constants.py
│   │   ├── database_self_contained.py
│   │   ├── database.py
│   │   ├── llm_optimization_config.py
│   │   ├── logging_settings.py
│   │   ├── plotly_config.py
│   │   ├── security_utils.py
│   │   ├── security.py
│   │   ├── settings.py
│   │   ├── technical_constants.py
│   │   ├── tool_estimation.py
│   │   └── validation.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── technical_analysis.py
│   │   └── visualization.py
│   ├── data
│   │   ├── __init__.py
│   │   ├── cache_manager.py
│   │   ├── cache.py
│   │   ├── django_adapter.py
│   │   ├── health.py
│   │   ├── models.py
│   │   ├── performance.py
│   │   ├── session_management.py
│   │   └── validation.py
│   ├── database
│   │   ├── __init__.py
│   │   ├── base.py
│   │   └── optimization.py
│   ├── dependencies.py
│   ├── domain
│   │   ├── __init__.py
│   │   ├── entities
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis.py
│   │   ├── events
│   │   │   └── __init__.py
│   │   ├── portfolio.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   ├── entities.py
│   │   │   ├── services.py
│   │   │   └── value_objects.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_service.py
│   │   ├── stock_analysis
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis_service.py
│   │   └── value_objects
│   │       ├── __init__.py
│   │       └── technical_indicators.py
│   ├── exceptions.py
│   ├── infrastructure
│   │   ├── __init__.py
│   │   ├── cache
│   │   │   └── __init__.py
│   │   ├── caching
│   │   │   ├── __init__.py
│   │   │   └── cache_management_service.py
│   │   ├── connection_manager.py
│   │   ├── data_fetching
│   │   │   ├── __init__.py
│   │   │   └── stock_data_service.py
│   │   ├── health
│   │   │   ├── __init__.py
│   │   │   └── health_checker.py
│   │   ├── persistence
│   │   │   ├── __init__.py
│   │   │   └── stock_repository.py
│   │   ├── providers
│   │   │   └── __init__.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   └── repositories.py
│   │   └── sse_optimizer.py
│   ├── langchain_tools
│   │   ├── __init__.py
│   │   ├── adapters.py
│   │   └── registry.py
│   ├── logging_config.py
│   ├── memory
│   │   ├── __init__.py
│   │   └── stores.py
│   ├── monitoring
│   │   ├── __init__.py
│   │   ├── health_check.py
│   │   ├── health_monitor.py
│   │   ├── integration_example.py
│   │   ├── metrics.py
│   │   ├── middleware.py
│   │   └── status_dashboard.py
│   ├── providers
│   │   ├── __init__.py
│   │   ├── dependencies.py
│   │   ├── factories
│   │   │   ├── __init__.py
│   │   │   ├── config_factory.py
│   │   │   └── provider_factory.py
│   │   ├── implementations
│   │   │   ├── __init__.py
│   │   │   ├── cache_adapter.py
│   │   │   ├── macro_data_adapter.py
│   │   │   ├── market_data_adapter.py
│   │   │   ├── persistence_adapter.py
│   │   │   └── stock_data_adapter.py
│   │   ├── interfaces
│   │   │   ├── __init__.py
│   │   │   ├── cache.py
│   │   │   ├── config.py
│   │   │   ├── macro_data.py
│   │   │   ├── market_data.py
│   │   │   ├── persistence.py
│   │   │   └── stock_data.py
│   │   ├── llm_factory.py
│   │   ├── macro_data.py
│   │   ├── market_data.py
│   │   ├── mocks
│   │   │   ├── __init__.py
│   │   │   ├── mock_cache.py
│   │   │   ├── mock_config.py
│   │   │   ├── mock_macro_data.py
│   │   │   ├── mock_market_data.py
│   │   │   ├── mock_persistence.py
│   │   │   └── mock_stock_data.py
│   │   ├── openrouter_provider.py
│   │   ├── optimized_screening.py
│   │   ├── optimized_stock_data.py
│   │   └── stock_data.py
│   ├── README.md
│   ├── tests
│   │   ├── __init__.py
│   │   ├── README_INMEMORY_TESTS.md
│   │   ├── test_cache_debug.py
│   │   ├── test_fixes_validation.py
│   │   ├── test_in_memory_routers.py
│   │   ├── test_in_memory_server.py
│   │   ├── test_macro_data_provider.py
│   │   ├── test_mailgun_email.py
│   │   ├── test_market_calendar_caching.py
│   │   ├── test_mcp_tool_fixes_pytest.py
│   │   ├── test_mcp_tool_fixes.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_models_functional.py
│   │   ├── test_server.py
│   │   ├── test_stock_data_enhanced.py
│   │   ├── test_stock_data_provider.py
│   │   └── test_technical_analysis.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── performance_monitoring.py
│   │   ├── portfolio_manager.py
│   │   ├── risk_management.py
│   │   └── sentiment_analysis.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── agent_errors.py
│   │   ├── batch_processing.py
│   │   ├── cache_warmer.py
│   │   ├── circuit_breaker_decorators.py
│   │   ├── circuit_breaker_services.py
│   │   ├── circuit_breaker.py
│   │   ├── data_chunking.py
│   │   ├── database_monitoring.py
│   │   ├── debug_utils.py
│   │   ├── fallback_strategies.py
│   │   ├── llm_optimization.py
│   │   ├── logging_example.py
│   │   ├── logging_init.py
│   │   ├── logging.py
│   │   ├── mcp_logging.py
│   │   ├── memory_profiler.py
│   │   ├── monitoring_middleware.py
│   │   ├── monitoring.py
│   │   ├── orchestration_logging.py
│   │   ├── parallel_research.py
│   │   ├── parallel_screening.py
│   │   ├── quick_cache.py
│   │   ├── resource_manager.py
│   │   ├── shutdown.py
│   │   ├── stock_helpers.py
│   │   ├── structured_logger.py
│   │   ├── tool_monitoring.py
│   │   ├── tracing.py
│   │   └── yfinance_pool.py
│   ├── validation
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── data.py
│   │   ├── middleware.py
│   │   ├── portfolio.py
│   │   ├── responses.py
│   │   ├── screening.py
│   │   └── technical.py
│   └── workflows
│       ├── __init__.py
│       ├── agents
│       │   ├── __init__.py
│       │   ├── market_analyzer.py
│       │   ├── optimizer_agent.py
│       │   ├── strategy_selector.py
│       │   └── validator_agent.py
│       ├── backtesting_workflow.py
│       └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── dev.sh
│   ├── INSTALLATION_GUIDE.md
│   ├── load_example.py
│   ├── load_market_data.py
│   ├── load_tiingo_data.py
│   ├── migrate_db.py
│   ├── README_TIINGO_LOADER.md
│   ├── requirements_tiingo.txt
│   ├── run_stock_screening.py
│   ├── run-migrations.sh
│   ├── seed_db.py
│   ├── seed_sp500.py
│   ├── setup_database.sh
│   ├── setup_self_contained.py
│   ├── setup_sp500_database.sh
│   ├── test_seeded_data.py
│   ├── test_tiingo_loader.py
│   ├── tiingo_config.py
│   └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── core
│   │   └── test_technical_analysis.py
│   ├── data
│   │   └── test_portfolio_models.py
│   ├── domain
│   │   ├── conftest.py
│   │   ├── test_portfolio_entities.py
│   │   └── test_technical_analysis_service.py
│   ├── fixtures
│   │   └── orchestration_fixtures.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── README.md
│   │   ├── run_integration_tests.sh
│   │   ├── test_api_technical.py
│   │   ├── test_chaos_engineering.py
│   │   ├── test_config_management.py
│   │   ├── test_full_backtest_workflow_advanced.py
│   │   ├── test_full_backtest_workflow.py
│   │   ├── test_high_volume.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_orchestration_complete.py
│   │   ├── test_portfolio_persistence.py
│   │   ├── test_redis_cache.py
│   │   ├── test_security_integration.py.disabled
│   │   └── vcr_setup.py
│   ├── performance
│   │   ├── __init__.py
│   │   ├── test_benchmarks.py
│   │   ├── test_load.py
│   │   ├── test_profiling.py
│   │   └── test_stress.py
│   ├── providers
│   │   └── test_stock_data_simple.py
│   ├── README.md
│   ├── test_agents_router_mcp.py
│   ├── test_backtest_persistence.py
│   ├── test_cache_management_service.py
│   ├── test_cache_serialization.py
│   ├── test_circuit_breaker.py
│   ├── test_database_pool_config_simple.py
│   ├── test_database_pool_config.py
│   ├── test_deep_research_functional.py
│   ├── test_deep_research_integration.py
│   ├── test_deep_research_parallel_execution.py
│   ├── test_error_handling.py
│   ├── test_event_loop_integrity.py
│   ├── test_exa_research_integration.py
│   ├── test_exception_hierarchy.py
│   ├── test_financial_search.py
│   ├── test_graceful_shutdown.py
│   ├── test_integration_simple.py
│   ├── test_langgraph_workflow.py
│   ├── test_market_data_async.py
│   ├── test_market_data_simple.py
│   ├── test_mcp_orchestration_functional.py
│   ├── test_ml_strategies.py
│   ├── test_optimized_research_agent.py
│   ├── test_orchestration_integration.py
│   ├── test_orchestration_logging.py
│   ├── test_orchestration_tools_simple.py
│   ├── test_parallel_research_integration.py
│   ├── test_parallel_research_orchestrator.py
│   ├── test_parallel_research_performance.py
│   ├── test_performance_optimizations.py
│   ├── test_production_validation.py
│   ├── test_provider_architecture.py
│   ├── test_rate_limiting_enhanced.py
│   ├── test_runner_validation.py
│   ├── test_security_comprehensive.py.disabled
│   ├── test_security_cors.py
│   ├── test_security_enhancements.py.disabled
│   ├── test_security_headers.py
│   ├── test_security_penetration.py
│   ├── test_session_management.py
│   ├── test_speed_optimization_validation.py
│   ├── test_stock_analysis_dependencies.py
│   ├── test_stock_analysis_service.py
│   ├── test_stock_data_fetching_service.py
│   ├── test_supervisor_agent.py
│   ├── test_supervisor_functional.py
│   ├── test_tool_estimation_config.py
│   ├── test_visualization.py
│   └── utils
│       ├── test_agent_errors.py
│       ├── test_logging.py
│       ├── test_parallel_screening.py
│       └── test_quick_cache.py
├── tools
│   ├── check_orchestration_config.py
│   ├── experiments
│   │   ├── validation_examples.py
│   │   └── validation_fixed.py
│   ├── fast_dev.sh
│   ├── hot_reload.py
│   ├── quick_test.py
│   └── templates
│       ├── new_router_template.py
│       ├── new_tool_template.py
│       ├── screening_strategy_template.py
│       └── test_template.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/maverick_mcp/backtesting/strategies/ml/feature_engineering.py:
--------------------------------------------------------------------------------

```python
"""Feature engineering for ML trading strategies."""

import logging
from typing import Any

import numpy as np
import pandas as pd
import pandas_ta as ta
from pandas import DataFrame, Series
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler

logger = logging.getLogger(__name__)


class FeatureExtractor:
    """Extract technical and statistical features for ML models."""

    def __init__(self, lookback_periods: list[int] = None):
        """Initialize feature extractor.

        Args:
            lookback_periods: Lookback periods for rolling features
        """
        self.lookback_periods = lookback_periods or [5, 10, 20, 50]
        self.scaler = StandardScaler()

    def extract_price_features(self, data: DataFrame) -> DataFrame:
        """Extract price-based features.

        Args:
            data: OHLCV price data

        Returns:
            DataFrame with price features
        """
        features = pd.DataFrame(index=data.index)

        # Normalize column names to handle both cases
        high = data.get("high", data.get("High"))
        low = data.get("low", data.get("Low"))
        close = data.get("close", data.get("Close"))
        open_ = data.get("open", data.get("Open"))

        # Safe division helper function
        def safe_divide(numerator, denominator, default=0.0):
            """Safely divide two values, handling None, NaN, and zero cases."""
            if numerator is None or denominator is None:
                return default
            # Convert to numpy arrays to handle pandas Series
            num = np.asarray(numerator)
            den = np.asarray(denominator)
            # Use numpy divide with where condition for safety
            return np.divide(
                num, den, out=np.full_like(num, default, dtype=float), where=(den != 0)
            )

        # Price ratios and spreads with safe division
        features["high_low_ratio"] = safe_divide(high, low, 1.0)
        features["close_open_ratio"] = safe_divide(close, open_, 1.0)
        features["hl_spread"] = (
            safe_divide(high - low, close, 0.0)
            if high is not None and low is not None and close is not None
            else 0.0
        )
        features["co_spread"] = (
            safe_divide(close - open_, open_, 0.0)
            if close is not None and open_ is not None
            else 0.0
        )

        # Returns with safe calculation
        if close is not None:
            features["returns"] = close.pct_change().fillna(0)
            # Safe log returns calculation
            price_ratio = safe_divide(close, close.shift(1), 1.0)
            features["log_returns"] = np.log(
                np.maximum(price_ratio, 1e-8)
            )  # Prevent log(0)
        else:
            features["returns"] = 0
            features["log_returns"] = 0

        # Volume features with safe calculations
        volume = data.get("volume", data.get("Volume"))
        if volume is not None and close is not None:
            volume_ma = volume.rolling(20).mean()
            features["volume_ma_ratio"] = safe_divide(volume, volume_ma, 1.0)
            features["price_volume"] = close * volume
            features["volume_returns"] = volume.pct_change().fillna(0)
        else:
            features["volume_ma_ratio"] = 1.0
            features["price_volume"] = 0.0
            features["volume_returns"] = 0.0

        return features

    def extract_technical_features(self, data: DataFrame) -> DataFrame:
        """Extract technical indicator features.

        Args:
            data: OHLCV price data

        Returns:
            DataFrame with technical features
        """
        features = pd.DataFrame(index=data.index)

        # Normalize column names
        close = data.get("close", data.get("Close"))
        high = data.get("high", data.get("High"))
        low = data.get("low", data.get("Low"))

        # Safe division helper (reused from price features)
        def safe_divide(numerator, denominator, default=0.0):
            """Safely divide two values, handling None, NaN, and zero cases."""
            if numerator is None or denominator is None:
                return default
            # Convert to numpy arrays to handle pandas Series
            num = np.asarray(numerator)
            den = np.asarray(denominator)
            # Use numpy divide with where condition for safety
            return np.divide(
                num, den, out=np.full_like(num, default, dtype=float), where=(den != 0)
            )

        # Moving averages with safe calculations
        for period in self.lookback_periods:
            if close is not None:
                sma = ta.sma(close, length=period)
                ema = ta.ema(close, length=period)

                features[f"sma_{period}_ratio"] = safe_divide(close, sma, 1.0)
                features[f"ema_{period}_ratio"] = safe_divide(close, ema, 1.0)
                features[f"sma_ema_diff_{period}"] = (
                    safe_divide(sma - ema, close, 0.0)
                    if sma is not None and ema is not None
                    else 0.0
                )
            else:
                features[f"sma_{period}_ratio"] = 1.0
                features[f"ema_{period}_ratio"] = 1.0
                features[f"sma_ema_diff_{period}"] = 0.0

        # RSI
        rsi = ta.rsi(close, length=14)
        features["rsi"] = rsi
        features["rsi_oversold"] = (rsi < 30).astype(int)
        features["rsi_overbought"] = (rsi > 70).astype(int)

        # MACD
        macd = ta.macd(close)
        if macd is not None and not macd.empty:
            macd_cols = macd.columns
            macd_col = [
                col
                for col in macd_cols
                if "MACD" in col and "h" not in col and "s" not in col.lower()
            ]
            signal_col = [
                col for col in macd_cols if "signal" in col.lower() or "MACDs" in col
            ]
            hist_col = [
                col for col in macd_cols if "hist" in col.lower() or "MACDh" in col
            ]

            if macd_col:
                features["macd"] = macd[macd_col[0]]
            else:
                features["macd"] = 0

            if signal_col:
                features["macd_signal"] = macd[signal_col[0]]
            else:
                features["macd_signal"] = 0

            if hist_col:
                features["macd_histogram"] = macd[hist_col[0]]
            else:
                features["macd_histogram"] = 0

            features["macd_bullish"] = (
                features["macd"] > features["macd_signal"]
            ).astype(int)
        else:
            features["macd"] = 0
            features["macd_signal"] = 0
            features["macd_histogram"] = 0
            features["macd_bullish"] = 0

        # Bollinger Bands
        bb = ta.bbands(close, length=20)
        if bb is not None and not bb.empty:
            # Handle different pandas_ta versions that may have different column names
            bb_cols = bb.columns
            upper_col = [
                col for col in bb_cols if "BBU" in col or "upper" in col.lower()
            ]
            middle_col = [
                col for col in bb_cols if "BBM" in col or "middle" in col.lower()
            ]
            lower_col = [
                col for col in bb_cols if "BBL" in col or "lower" in col.lower()
            ]

            if upper_col and middle_col and lower_col:
                features["bb_upper"] = bb[upper_col[0]]
                features["bb_middle"] = bb[middle_col[0]]
                features["bb_lower"] = bb[lower_col[0]]

                # Safe BB position calculation
                bb_width = features["bb_upper"] - features["bb_lower"]
                features["bb_position"] = safe_divide(
                    close - features["bb_lower"], bb_width, 0.5
                )
                features["bb_squeeze"] = safe_divide(
                    bb_width, features["bb_middle"], 0.1
                )
            else:
                # Fallback to manual calculation with safe operations
                if close is not None:
                    sma_20 = close.rolling(20).mean()
                    std_20 = close.rolling(20).std()
                    features["bb_upper"] = sma_20 + (std_20 * 2)
                    features["bb_middle"] = sma_20
                    features["bb_lower"] = sma_20 - (std_20 * 2)

                    # Safe BB calculations
                    bb_width = features["bb_upper"] - features["bb_lower"]
                    features["bb_position"] = safe_divide(
                        close - features["bb_lower"], bb_width, 0.5
                    )
                    features["bb_squeeze"] = safe_divide(
                        bb_width, features["bb_middle"], 0.1
                    )
                else:
                    features["bb_upper"] = 0
                    features["bb_middle"] = 0
                    features["bb_lower"] = 0
                    features["bb_position"] = 0.5
                    features["bb_squeeze"] = 0.1
        else:
            # Manual calculation fallback with safe operations
            if close is not None:
                sma_20 = close.rolling(20).mean()
                std_20 = close.rolling(20).std()
                features["bb_upper"] = sma_20 + (std_20 * 2)
                features["bb_middle"] = sma_20
                features["bb_lower"] = sma_20 - (std_20 * 2)

                # Safe BB calculations
                bb_width = features["bb_upper"] - features["bb_lower"]
                features["bb_position"] = safe_divide(
                    close - features["bb_lower"], bb_width, 0.5
                )
                features["bb_squeeze"] = safe_divide(
                    bb_width, features["bb_middle"], 0.1
                )
            else:
                features["bb_upper"] = 0
                features["bb_middle"] = 0
                features["bb_lower"] = 0
                features["bb_position"] = 0.5
                features["bb_squeeze"] = 0.1

        # Stochastic
        stoch = ta.stoch(high, low, close)
        if stoch is not None and not stoch.empty:
            stoch_cols = stoch.columns
            k_col = [col for col in stoch_cols if "k" in col.lower()]
            d_col = [col for col in stoch_cols if "d" in col.lower()]

            if k_col:
                features["stoch_k"] = stoch[k_col[0]]
            else:
                features["stoch_k"] = 50

            if d_col:
                features["stoch_d"] = stoch[d_col[0]]
            else:
                features["stoch_d"] = 50
        else:
            features["stoch_k"] = 50
            features["stoch_d"] = 50

        # ATR (Average True Range) with safe calculation
        if high is not None and low is not None and close is not None:
            features["atr"] = ta.atr(high, low, close)
            features["atr_ratio"] = safe_divide(
                features["atr"], close, 0.02
            )  # Default 2% ATR ratio
        else:
            features["atr"] = 0
            features["atr_ratio"] = 0.02

        return features

    def extract_statistical_features(self, data: DataFrame) -> DataFrame:
        """Extract statistical features.

        Args:
            data: OHLCV price data

        Returns:
            DataFrame with statistical features
        """
        features = pd.DataFrame(index=data.index)

        # Safe division helper function
        def safe_divide(numerator, denominator, default=0.0):
            """Safely divide two values, handling None, NaN, and zero cases."""
            if numerator is None or denominator is None:
                return default
            # Convert to numpy arrays to handle pandas Series
            num = np.asarray(numerator)
            den = np.asarray(denominator)
            # Use numpy divide with where condition for safety
            return np.divide(
                num, den, out=np.full_like(num, default, dtype=float), where=(den != 0)
            )

        # Rolling statistics
        for period in self.lookback_periods:
            returns = data["close"].pct_change()

            # Volatility with safe calculations
            vol_short = returns.rolling(period).std()
            vol_long = returns.rolling(period * 2).std()
            features[f"volatility_{period}"] = vol_short
            features[f"volatility_ratio_{period}"] = safe_divide(
                vol_short, vol_long, 1.0
            )

            # Skewness and Kurtosis
            features[f"skewness_{period}"] = returns.rolling(period).skew()
            features[f"kurtosis_{period}"] = returns.rolling(period).kurt()

            # Min/Max ratios with safe division
            if "high" in data.columns and "low" in data.columns:
                rolling_high = data["high"].rolling(period).max()
                rolling_low = data["low"].rolling(period).min()
                features[f"high_ratio_{period}"] = safe_divide(
                    data["close"], rolling_high, 1.0
                )
                features[f"low_ratio_{period}"] = safe_divide(
                    data["close"], rolling_low, 1.0
                )
            else:
                features[f"high_ratio_{period}"] = 1.0
                features[f"low_ratio_{period}"] = 1.0

            # Momentum features with safe division
            features[f"momentum_{period}"] = safe_divide(
                data["close"], data["close"].shift(period), 1.0
            )
            features[f"roc_{period}"] = data["close"].pct_change(periods=period)

        return features

    def extract_microstructure_features(self, data: DataFrame) -> DataFrame:
        """Extract market microstructure features.

        Args:
            data: OHLCV price data

        Returns:
            DataFrame with microstructure features
        """
        features = pd.DataFrame(index=data.index)

        # Safe division helper function
        def safe_divide(numerator, denominator, default=0.0):
            """Safely divide two values, handling None, NaN, and zero cases."""
            if numerator is None or denominator is None:
                return default
            # Convert to numpy arrays to handle pandas Series
            num = np.asarray(numerator)
            den = np.asarray(denominator)
            # Use numpy divide with where condition for safety
            return np.divide(
                num, den, out=np.full_like(num, default, dtype=float), where=(den != 0)
            )

        # Bid-ask spread proxy (high-low spread) with safe calculation
        if "high" in data.columns and "low" in data.columns:
            mid_price = (data["high"] + data["low"]) / 2
            features["spread_proxy"] = safe_divide(
                data["high"] - data["low"], mid_price, 0.02
            )
        else:
            features["spread_proxy"] = 0.02

        # Price impact measures with safe calculations
        if "volume" in data.columns:
            returns_abs = abs(data["close"].pct_change())
            features["amihud_illiquidity"] = safe_divide(
                returns_abs, data["volume"], 0.0
            )

            if "high" in data.columns and "low" in data.columns:
                features["volume_weighted_price"] = (
                    data["high"] + data["low"] + data["close"]
                ) / 3
            else:
                features["volume_weighted_price"] = data["close"]
        else:
            features["amihud_illiquidity"] = 0.0
            features["volume_weighted_price"] = data.get("close", 0.0)

        # Intraday patterns with safe calculations
        if "open" in data.columns and "close" in data.columns:
            prev_close = data["close"].shift(1)
            features["open_gap"] = safe_divide(
                data["open"] - prev_close, prev_close, 0.0
            )
        else:
            features["open_gap"] = 0.0

        if "high" in data.columns and "low" in data.columns and "close" in data.columns:
            features["close_to_high"] = safe_divide(
                data["high"] - data["close"], data["close"], 0.0
            )
            features["close_to_low"] = safe_divide(
                data["close"] - data["low"], data["close"], 0.0
            )
        else:
            features["close_to_high"] = 0.0
            features["close_to_low"] = 0.0

        return features

    def create_target_variable(
        self, data: DataFrame, forward_periods: int = 5, threshold: float = 0.02
    ) -> Series:
        """Create target variable for classification.

        Args:
            data: Price data
            forward_periods: Number of periods to look forward
            threshold: Return threshold for classification

        Returns:
            Target variable (0: sell, 1: hold, 2: buy)
        """
        close = data.get("close", data.get("Close"))
        forward_returns = close.pct_change(periods=forward_periods).shift(
            -forward_periods
        )

        target = pd.Series(1, index=data.index)  # Default to hold
        target[forward_returns > threshold] = 2  # Buy
        target[forward_returns < -threshold] = 0  # Sell

        return target

    def extract_all_features(self, data: DataFrame) -> DataFrame:
        """Extract all features for ML model.

        Args:
            data: OHLCV price data

        Returns:
            DataFrame with all features
        """
        try:
            # Validate input data
            if data is None or data.empty:
                logger.warning("Empty or None data provided to extract_all_features")
                return pd.DataFrame()

            # Extract all feature types with individual error handling
            feature_dfs = []

            try:
                price_features = self.extract_price_features(data)
                if not price_features.empty:
                    feature_dfs.append(price_features)
            except Exception as e:
                logger.warning(f"Failed to extract price features: {e}")
                # Create empty DataFrame with same index as fallback
                price_features = pd.DataFrame(index=data.index)

            try:
                technical_features = self.extract_technical_features(data)
                if not technical_features.empty:
                    feature_dfs.append(technical_features)
            except Exception as e:
                logger.warning(f"Failed to extract technical features: {e}")

            try:
                statistical_features = self.extract_statistical_features(data)
                if not statistical_features.empty:
                    feature_dfs.append(statistical_features)
            except Exception as e:
                logger.warning(f"Failed to extract statistical features: {e}")

            try:
                microstructure_features = self.extract_microstructure_features(data)
                if not microstructure_features.empty:
                    feature_dfs.append(microstructure_features)
            except Exception as e:
                logger.warning(f"Failed to extract microstructure features: {e}")

            # Combine all successfully extracted features
            if feature_dfs:
                all_features = pd.concat(feature_dfs, axis=1)
            else:
                # Fallback: create minimal feature set
                logger.warning(
                    "No features extracted successfully, creating minimal fallback features"
                )
                all_features = pd.DataFrame(
                    {
                        "returns": data.get("close", pd.Series(0, index=data.index))
                        .pct_change()
                        .fillna(0),
                        "close": data.get("close", pd.Series(0, index=data.index)),
                    },
                    index=data.index,
                )

            # Handle missing values with robust method
            if not all_features.empty:
                # Forward fill, then backward fill, then zero fill
                all_features = all_features.ffill().bfill().fillna(0)

                # Replace any infinite values
                all_features = all_features.replace([np.inf, -np.inf], 0)

                logger.info(
                    f"Extracted {len(all_features.columns)} features for {len(all_features)} data points"
                )
            else:
                logger.warning("No features could be extracted")

            return all_features

        except Exception as e:
            logger.error(f"Critical error extracting features: {e}")
            # Return minimal fallback instead of raising
            return pd.DataFrame(
                {
                    "returns": pd.Series(
                        0, index=data.index if data is not None else [0]
                    ),
                    "close": pd.Series(
                        0, index=data.index if data is not None else [0]
                    ),
                }
            )


class MLPredictor:
    """Machine learning predictor for trading signals."""

    def __init__(self, model_type: str = "random_forest", **model_params):
        """Initialize ML predictor.

        Args:
            model_type: Type of ML model to use
            **model_params: Model parameters
        """
        self.model_type = model_type
        self.model_params = model_params
        self.model = None
        self.scaler = StandardScaler()
        self.feature_extractor = FeatureExtractor()
        self.is_trained = False

    def _create_model(self):
        """Create ML model based on type."""
        if self.model_type == "random_forest":
            self.model = RandomForestClassifier(
                n_estimators=self.model_params.get("n_estimators", 100),
                max_depth=self.model_params.get("max_depth", 10),
                random_state=self.model_params.get("random_state", 42),
                **{
                    k: v
                    for k, v in self.model_params.items()
                    if k not in ["n_estimators", "max_depth", "random_state"]
                },
            )
        else:
            raise ValueError(f"Unsupported model type: {self.model_type}")

    def prepare_data(
        self, data: DataFrame, target_periods: int = 5, return_threshold: float = 0.02
    ) -> tuple[DataFrame, Series]:
        """Prepare features and target for training.

        Args:
            data: OHLCV price data
            target_periods: Periods to look forward for target
            return_threshold: Return threshold for classification

        Returns:
            Tuple of (features, target)
        """
        # Extract features
        features = self.feature_extractor.extract_all_features(data)

        # Create target variable
        target = self.feature_extractor.create_target_variable(
            data, target_periods, return_threshold
        )

        # Align features and target (remove NaN values)
        valid_idx = features.dropna().index.intersection(target.dropna().index)
        features = features.loc[valid_idx]
        target = target.loc[valid_idx]

        return features, target

    def train(
        self, data: DataFrame, target_periods: int = 5, return_threshold: float = 0.02
    ) -> dict[str, Any]:
        """Train the ML model.

        Args:
            data: OHLCV price data
            target_periods: Periods to look forward for target
            return_threshold: Return threshold for classification

        Returns:
            Training metrics
        """
        try:
            # Prepare data
            features, target = self.prepare_data(data, target_periods, return_threshold)

            if len(features) == 0:
                raise ValueError("No valid training data available")

            # Create and train model
            self._create_model()

            # Scale features
            features_scaled = self.scaler.fit_transform(features)

            # Train model
            self.model.fit(features_scaled, target)
            self.is_trained = True

            # Calculate training metrics
            train_score = self.model.score(features_scaled, target)

            # Convert numpy int64 to Python int for JSON serialization
            target_dist = target.value_counts().to_dict()
            target_dist = {int(k): int(v) for k, v in target_dist.items()}

            metrics = {
                "train_accuracy": float(
                    train_score
                ),  # Convert numpy float to Python float
                "n_samples": int(len(features)),
                "n_features": int(len(features.columns)),
                "target_distribution": target_dist,
            }

            # Feature importance (if available)
            if hasattr(self.model, "feature_importances_"):
                # Convert numpy floats to Python floats
                feature_importance = {
                    str(col): float(imp)
                    for col, imp in zip(
                        features.columns, self.model.feature_importances_, strict=False
                    )
                }
                metrics["feature_importance"] = feature_importance

            logger.info(f"Model trained successfully: {metrics}")
            return metrics

        except Exception as e:
            logger.error(f"Error training model: {e}")
            raise

    def generate_signals(self, data: DataFrame) -> tuple[Series, Series]:
        """Generate trading signals using the trained model.

        Alias for predict() to match the expected interface.

        Args:
            data: OHLCV price data

        Returns:
            Tuple of (entry_signals, exit_signals)
        """
        return self.predict(data)

    def predict(self, data: DataFrame) -> tuple[Series, Series]:
        """Generate trading signals using the trained model.

        Args:
            data: OHLCV price data

        Returns:
            Tuple of (entry_signals, exit_signals)
        """
        if not self.is_trained:
            raise ValueError("Model must be trained before making predictions")

        try:
            # Extract features
            features = self.feature_extractor.extract_all_features(data)

            # Handle missing values
            features = features.ffill().fillna(0)

            # Scale features
            features_scaled = self.scaler.transform(features)

            # Make predictions
            predictions = self.model.predict(features_scaled)
            prediction_proba = self.model.predict_proba(features_scaled)

            # Convert to signals
            predictions_series = pd.Series(predictions, index=features.index)

            # Entry signals (buy predictions with high confidence)
            entry_signals = (predictions_series == 2) & (
                pd.Series(prediction_proba[:, 2], index=features.index) > 0.6
            )

            # Exit signals (sell predictions or low confidence holds)
            exit_signals = (predictions_series == 0) | (
                (predictions_series == 1)
                & (pd.Series(prediction_proba[:, 1], index=features.index) < 0.4)
            )

            return entry_signals, exit_signals

        except Exception as e:
            logger.error(f"Error making predictions: {e}")
            raise

    def get_feature_importance(self) -> dict[str, float]:
        """Get feature importance from trained model.

        Returns:
            Dictionary of feature importance scores
        """
        if not self.is_trained or not hasattr(self.model, "feature_importances_"):
            return {}

        feature_names = self.feature_extractor.extract_all_features(
            pd.DataFrame()  # Empty DataFrame to get column names
        ).columns

        return dict(zip(feature_names, self.model.feature_importances_, strict=False))

    def update_model(
        self, data: DataFrame, target_periods: int = 5, return_threshold: float = 0.02
    ) -> dict[str, Any]:
        """Update model with new data (online learning simulation).

        Args:
            data: New OHLCV price data
            target_periods: Periods to look forward for target
            return_threshold: Return threshold for classification

        Returns:
            Update metrics
        """
        try:
            # For now, retrain the model with all data
            # In production, this could use partial_fit for online learning
            return self.train(data, target_periods, return_threshold)

        except Exception as e:
            logger.error(f"Error updating model: {e}")
            raise

```

--------------------------------------------------------------------------------
/maverick_mcp/backtesting/persistence.py:
--------------------------------------------------------------------------------

```python
"""
Backtesting persistence layer for saving and retrieving backtest results.

This module provides comprehensive database operations for backtest results,
including saving VectorBT results, querying historical tests, and comparing
multiple backtests with proper error handling.
"""

import logging
from datetime import datetime, timedelta
from decimal import Decimal, InvalidOperation
from typing import Any
from uuid import UUID, uuid4

import pandas as pd
from sqlalchemy import desc
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session

from maverick_mcp.data.models import (
    BacktestResult,
    BacktestTrade,
    OptimizationResult,
    SessionLocal,
    WalkForwardTest,
)

logger = logging.getLogger(__name__)


class BacktestPersistenceError(Exception):
    """Custom exception for backtest persistence operations."""

    pass


class BacktestPersistenceManager:
    """Manages persistence of backtesting results with comprehensive error handling."""

    def __init__(self, session: Session | None = None):
        """Initialize persistence manager.

        Args:
            session: Optional SQLAlchemy session. If None, creates a new one.
        """
        self.session = session
        self._owns_session = session is None

    def __enter__(self):
        """Context manager entry."""
        if self._owns_session:
            self.session = SessionLocal()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit with proper cleanup."""
        if self._owns_session and self.session:
            if exc_type is None:
                self.session.commit()
            else:
                self.session.rollback()
            self.session.close()

    def save_backtest_result(
        self,
        vectorbt_results: dict[str, Any],
        execution_time: float | None = None,
        notes: str | None = None,
    ) -> str:
        """
        Save VectorBT backtest results to database.

        Args:
            vectorbt_results: Results dictionary from VectorBTEngine
            execution_time: Time taken to run the backtest in seconds
            notes: Optional user notes

        Returns:
            UUID string of the saved backtest

        Raises:
            BacktestPersistenceError: If saving fails
        """
        try:
            # Extract basic metadata
            symbol = vectorbt_results.get("symbol", "").upper()
            strategy_type = vectorbt_results.get("strategy", "")
            parameters = vectorbt_results.get("parameters", {})
            metrics = vectorbt_results.get("metrics", {})

            if not symbol or not strategy_type:
                raise BacktestPersistenceError("Symbol and strategy type are required")

            # Create backtest result record
            backtest_result = BacktestResult(
                backtest_id=uuid4(),
                symbol=symbol,
                strategy_type=strategy_type,
                backtest_date=datetime.utcnow(),
                # Date range
                start_date=pd.to_datetime(vectorbt_results.get("start_date")).date(),
                end_date=pd.to_datetime(vectorbt_results.get("end_date")).date(),
                initial_capital=Decimal(
                    str(vectorbt_results.get("initial_capital", 10000))
                ),
                # Strategy parameters
                parameters=parameters,
                # Performance metrics
                total_return=self._safe_decimal(metrics.get("total_return")),
                annualized_return=self._safe_decimal(metrics.get("annualized_return")),
                sharpe_ratio=self._safe_decimal(metrics.get("sharpe_ratio")),
                sortino_ratio=self._safe_decimal(metrics.get("sortino_ratio")),
                calmar_ratio=self._safe_decimal(metrics.get("calmar_ratio")),
                # Risk metrics
                max_drawdown=self._safe_decimal(metrics.get("max_drawdown")),
                max_drawdown_duration=metrics.get("max_drawdown_duration"),
                volatility=self._safe_decimal(metrics.get("volatility")),
                downside_volatility=self._safe_decimal(
                    metrics.get("downside_volatility")
                ),
                # Trade statistics
                total_trades=metrics.get("total_trades", 0),
                winning_trades=metrics.get("winning_trades", 0),
                losing_trades=metrics.get("losing_trades", 0),
                win_rate=self._safe_decimal(metrics.get("win_rate")),
                # P&L statistics
                profit_factor=self._safe_decimal(metrics.get("profit_factor")),
                average_win=self._safe_decimal(metrics.get("average_win")),
                average_loss=self._safe_decimal(metrics.get("average_loss")),
                largest_win=self._safe_decimal(metrics.get("largest_win")),
                largest_loss=self._safe_decimal(metrics.get("largest_loss")),
                # Portfolio values
                final_portfolio_value=self._safe_decimal(metrics.get("final_value")),
                peak_portfolio_value=self._safe_decimal(metrics.get("peak_value")),
                # Market analysis
                beta=self._safe_decimal(metrics.get("beta")),
                alpha=self._safe_decimal(metrics.get("alpha")),
                # Time series data
                equity_curve=vectorbt_results.get("equity_curve"),
                drawdown_series=vectorbt_results.get("drawdown_series"),
                # Execution metadata
                execution_time_seconds=Decimal(str(execution_time))
                if execution_time
                else None,
                data_points=len(vectorbt_results.get("equity_curve", [])),
                # Status
                status="completed",
                notes=notes,
            )

            self.session.add(backtest_result)
            self.session.flush()  # Get the ID without committing

            # Save individual trades if available
            trades_data = vectorbt_results.get("trades", [])
            if trades_data:
                self._save_trades(backtest_result.backtest_id, trades_data)

            self.session.commit()
            logger.info(f"Saved backtest result: {backtest_result.backtest_id}")

            return str(backtest_result.backtest_id)

        except SQLAlchemyError as e:
            self.session.rollback()
            logger.error(f"Database error saving backtest: {e}")
            raise BacktestPersistenceError(f"Failed to save backtest: {e}")
        except Exception as e:
            self.session.rollback()
            logger.error(f"Unexpected error saving backtest: {e}")
            raise BacktestPersistenceError(f"Unexpected error: {e}")

    def _save_trades(
        self, backtest_id: UUID, trades_data: list[dict[str, Any]]
    ) -> None:
        """Save individual trade records."""
        try:
            trades = []
            for i, trade in enumerate(trades_data, 1):
                trade_record = BacktestTrade(
                    trade_id=uuid4(),
                    backtest_id=backtest_id,
                    trade_number=i,
                    # Entry details
                    entry_date=pd.to_datetime(trade.get("entry_date")).date(),
                    entry_price=self._safe_decimal(trade.get("entry_price")),
                    entry_time=pd.to_datetime(trade.get("entry_time"))
                    if trade.get("entry_time")
                    else None,
                    # Exit details
                    exit_date=pd.to_datetime(trade.get("exit_date")).date()
                    if trade.get("exit_date")
                    else None,
                    exit_price=self._safe_decimal(trade.get("exit_price")),
                    exit_time=pd.to_datetime(trade.get("exit_time"))
                    if trade.get("exit_time")
                    else None,
                    # Position details
                    position_size=self._safe_decimal(trade.get("position_size")),
                    direction=trade.get("direction", "long"),
                    # P&L
                    pnl=self._safe_decimal(trade.get("pnl")),
                    pnl_percent=self._safe_decimal(trade.get("pnl_percent")),
                    # Risk metrics
                    mae=self._safe_decimal(trade.get("mae")),
                    mfe=self._safe_decimal(trade.get("mfe")),
                    # Duration
                    duration_days=trade.get("duration_days"),
                    duration_hours=self._safe_decimal(trade.get("duration_hours")),
                    # Exit details
                    exit_reason=trade.get("exit_reason"),
                    fees_paid=self._safe_decimal(trade.get("fees_paid")),
                    slippage_cost=self._safe_decimal(trade.get("slippage_cost")),
                )
                trades.append(trade_record)

            self.session.add_all(trades)
            logger.info(f"Saved {len(trades)} trades for backtest {backtest_id}")

        except Exception as e:
            logger.error(f"Error saving trades: {e}")
            raise

    def get_backtest_by_id(self, backtest_id: str) -> BacktestResult | None:
        """
        Retrieve a backtest by ID.

        Args:
            backtest_id: UUID string of the backtest

        Returns:
            BacktestResult or None if not found
        """
        try:
            # Convert string to UUID for database query
            if isinstance(backtest_id, str):
                backtest_uuid = UUID(backtest_id)
            else:
                backtest_uuid = backtest_id

            return (
                self.session.query(BacktestResult)
                .filter(BacktestResult.backtest_id == backtest_uuid)
                .first()
            )
        except SQLAlchemyError as e:
            logger.error(f"Error retrieving backtest {backtest_id}: {e}")
            return None
        except ValueError as e:
            logger.error(f"Invalid UUID format {backtest_id}: {e}")
            return None

    def get_backtests_by_symbol(
        self, symbol: str, strategy_type: str | None = None, limit: int = 10
    ) -> list[BacktestResult]:
        """
        Get backtests for a specific symbol.

        Args:
            symbol: Stock symbol
            strategy_type: Optional strategy filter
            limit: Maximum number of results

        Returns:
            List of BacktestResult objects
        """
        try:
            query = self.session.query(BacktestResult).filter(
                BacktestResult.symbol == symbol.upper()
            )

            if strategy_type:
                query = query.filter(BacktestResult.strategy_type == strategy_type)

            return query.order_by(desc(BacktestResult.backtest_date)).limit(limit).all()

        except SQLAlchemyError as e:
            logger.error(f"Error retrieving backtests for {symbol}: {e}")
            return []

    def get_best_performing_strategies(
        self, metric: str = "sharpe_ratio", min_trades: int = 10, limit: int = 20
    ) -> list[BacktestResult]:
        """
        Get best performing backtests by specified metric.

        Args:
            metric: Performance metric (sharpe_ratio, total_return, profit_factor)
            min_trades: Minimum number of trades required
            limit: Maximum number of results

        Returns:
            List of top performing BacktestResult objects
        """
        try:
            metric_column = getattr(BacktestResult, metric, BacktestResult.sharpe_ratio)

            return (
                self.session.query(BacktestResult)
                .filter(
                    BacktestResult.status == "completed",
                    BacktestResult.total_trades >= min_trades,
                    metric_column.isnot(None),
                )
                .order_by(desc(metric_column))
                .limit(limit)
                .all()
            )

        except SQLAlchemyError as e:
            logger.error(f"Error retrieving best performing strategies: {e}")
            return []

    def compare_strategies(
        self, backtest_ids: list[str], metrics: list[str] | None = None
    ) -> dict[str, Any]:
        """
        Compare multiple backtests across specified metrics.

        Args:
            backtest_ids: List of backtest UUID strings
            metrics: List of metrics to compare (default: common metrics)

        Returns:
            Dictionary with comparison results
        """
        if not metrics:
            metrics = [
                "total_return",
                "sharpe_ratio",
                "max_drawdown",
                "win_rate",
                "profit_factor",
                "total_trades",
            ]

        try:
            # Convert string UUIDs to UUID objects
            uuid_list = []
            for bt_id in backtest_ids:
                if isinstance(bt_id, str):
                    uuid_list.append(UUID(bt_id))
                else:
                    uuid_list.append(bt_id)

            backtests = (
                self.session.query(BacktestResult)
                .filter(BacktestResult.backtest_id.in_(uuid_list))
                .all()
            )

            if not backtests:
                return {"error": "No backtests found"}

            comparison = {"backtests": [], "summary": {}, "rankings": {}}

            # Extract data for each backtest
            for bt in backtests:
                bt_data = {
                    "backtest_id": str(bt.backtest_id),
                    "symbol": bt.symbol,
                    "strategy": bt.strategy_type,
                    "date": bt.backtest_date.isoformat(),
                    "metrics": {},
                }

                for metric in metrics:
                    value = getattr(bt, metric, None)
                    bt_data["metrics"][metric] = float(value) if value else None

                comparison["backtests"].append(bt_data)

            # Calculate rankings for each metric
            for metric in metrics:
                metric_values = [
                    (bt["backtest_id"], bt["metrics"].get(metric))
                    for bt in comparison["backtests"]
                    if bt["metrics"].get(metric) is not None
                ]

                if metric_values:
                    # Sort by metric value (descending for most metrics)
                    reverse_sort = metric != "max_drawdown"  # Lower drawdown is better
                    sorted_values = sorted(
                        metric_values, key=lambda x: x[1], reverse=reverse_sort
                    )

                    comparison["rankings"][metric] = [
                        {"backtest_id": bt_id, "value": value, "rank": i + 1}
                        for i, (bt_id, value) in enumerate(sorted_values)
                    ]

            # Summary statistics
            comparison["summary"] = {
                "total_backtests": len(backtests),
                "date_range": {
                    "earliest": min(bt.backtest_date for bt in backtests).isoformat(),
                    "latest": max(bt.backtest_date for bt in backtests).isoformat(),
                },
            }

            return comparison

        except SQLAlchemyError as e:
            logger.error(f"Error comparing strategies: {e}")
            return {"error": f"Database error: {e}"}

    def save_optimization_results(
        self,
        backtest_id: str,
        optimization_results: list[dict[str, Any]],
        objective_function: str = "sharpe_ratio",
    ) -> int:
        """
        Save parameter optimization results.

        Args:
            backtest_id: Parent backtest UUID
            optimization_results: List of optimization result dictionaries
            objective_function: Optimization objective (sharpe_ratio, total_return, etc.)

        Returns:
            Number of optimization results saved
        """
        try:
            # Convert string UUID to UUID object
            if isinstance(backtest_id, str):
                backtest_uuid = UUID(backtest_id)
            else:
                backtest_uuid = backtest_id

            optimization_records = []

            for i, result in enumerate(optimization_results, 1):
                record = OptimizationResult(
                    optimization_id=uuid4(),
                    backtest_id=backtest_uuid,
                    parameter_set=i,
                    parameters=result.get("parameters", {}),
                    objective_function=objective_function,
                    objective_value=self._safe_decimal(result.get("objective_value")),
                    total_return=self._safe_decimal(result.get("total_return")),
                    sharpe_ratio=self._safe_decimal(result.get("sharpe_ratio")),
                    max_drawdown=self._safe_decimal(result.get("max_drawdown")),
                    win_rate=self._safe_decimal(result.get("win_rate")),
                    profit_factor=self._safe_decimal(result.get("profit_factor")),
                    total_trades=result.get("total_trades"),
                    rank=result.get("rank", i),
                    is_statistically_significant=result.get(
                        "is_statistically_significant", False
                    ),
                    p_value=self._safe_decimal(result.get("p_value")),
                )
                optimization_records.append(record)

            self.session.add_all(optimization_records)
            self.session.commit()

            logger.info(f"Saved {len(optimization_records)} optimization results")
            return len(optimization_records)

        except SQLAlchemyError as e:
            self.session.rollback()
            logger.error(f"Error saving optimization results: {e}")
            raise BacktestPersistenceError(f"Failed to save optimization results: {e}")

    def save_walk_forward_test(
        self, parent_backtest_id: str, walk_forward_data: dict[str, Any]
    ) -> str:
        """
        Save walk-forward validation test results.

        Args:
            parent_backtest_id: Parent backtest UUID
            walk_forward_data: Walk-forward test data

        Returns:
            UUID string of saved walk-forward test
        """
        try:
            # Convert string UUID to UUID object
            if isinstance(parent_backtest_id, str):
                parent_uuid = UUID(parent_backtest_id)
            else:
                parent_uuid = parent_backtest_id

            wf_test = WalkForwardTest(
                walk_forward_id=uuid4(),
                parent_backtest_id=parent_uuid,
                window_size_months=walk_forward_data.get("window_size_months"),
                step_size_months=walk_forward_data.get("step_size_months"),
                # Time periods
                training_start=pd.to_datetime(
                    walk_forward_data.get("training_start")
                ).date(),
                training_end=pd.to_datetime(
                    walk_forward_data.get("training_end")
                ).date(),
                test_period_start=pd.to_datetime(
                    walk_forward_data.get("test_period_start")
                ).date(),
                test_period_end=pd.to_datetime(
                    walk_forward_data.get("test_period_end")
                ).date(),
                # Results
                optimal_parameters=walk_forward_data.get("optimal_parameters"),
                training_performance=self._safe_decimal(
                    walk_forward_data.get("training_performance")
                ),
                out_of_sample_return=self._safe_decimal(
                    walk_forward_data.get("out_of_sample_return")
                ),
                out_of_sample_sharpe=self._safe_decimal(
                    walk_forward_data.get("out_of_sample_sharpe")
                ),
                out_of_sample_drawdown=self._safe_decimal(
                    walk_forward_data.get("out_of_sample_drawdown")
                ),
                out_of_sample_trades=walk_forward_data.get("out_of_sample_trades"),
                # Performance analysis
                performance_ratio=self._safe_decimal(
                    walk_forward_data.get("performance_ratio")
                ),
                degradation_factor=self._safe_decimal(
                    walk_forward_data.get("degradation_factor")
                ),
                is_profitable=walk_forward_data.get("is_profitable"),
                is_statistically_significant=walk_forward_data.get(
                    "is_statistically_significant", False
                ),
            )

            self.session.add(wf_test)
            self.session.commit()

            logger.info(f"Saved walk-forward test: {wf_test.walk_forward_id}")
            return str(wf_test.walk_forward_id)

        except SQLAlchemyError as e:
            self.session.rollback()
            logger.error(f"Error saving walk-forward test: {e}")
            raise BacktestPersistenceError(f"Failed to save walk-forward test: {e}")

    def get_backtest_performance_summary(
        self,
        symbol: str | None = None,
        strategy_type: str | None = None,
        days_back: int = 30,
    ) -> dict[str, Any]:
        """
        Get performance summary of recent backtests.

        Args:
            symbol: Optional symbol filter
            strategy_type: Optional strategy filter
            days_back: Days to look back

        Returns:
            Dictionary with performance summary
        """
        try:
            cutoff_date = datetime.utcnow() - timedelta(days=days_back)

            query = self.session.query(BacktestResult).filter(
                BacktestResult.backtest_date >= cutoff_date,
                BacktestResult.status == "completed",
            )

            if symbol:
                query = query.filter(BacktestResult.symbol == symbol.upper())
            if strategy_type:
                query = query.filter(BacktestResult.strategy_type == strategy_type)

            backtests = query.all()

            if not backtests:
                return {"message": "No backtests found in the specified period"}

            # Calculate summary statistics
            returns = [float(bt.total_return) for bt in backtests if bt.total_return]
            sharpe_ratios = [
                float(bt.sharpe_ratio) for bt in backtests if bt.sharpe_ratio
            ]
            win_rates = [float(bt.win_rate) for bt in backtests if bt.win_rate]

            summary = {
                "period": f"Last {days_back} days",
                "total_backtests": len(backtests),
                "performance_metrics": {
                    "average_return": sum(returns) / len(returns) if returns else 0,
                    "best_return": max(returns) if returns else 0,
                    "worst_return": min(returns) if returns else 0,
                    "average_sharpe": sum(sharpe_ratios) / len(sharpe_ratios)
                    if sharpe_ratios
                    else 0,
                    "average_win_rate": sum(win_rates) / len(win_rates)
                    if win_rates
                    else 0,
                },
                "strategy_breakdown": {},
                "symbol_breakdown": {},
            }

            # Group by strategy
            strategy_groups = {}
            for bt in backtests:
                strategy = bt.strategy_type
                if strategy not in strategy_groups:
                    strategy_groups[strategy] = []
                strategy_groups[strategy].append(bt)

            for strategy, strategy_backtests in strategy_groups.items():
                strategy_returns = [
                    float(bt.total_return)
                    for bt in strategy_backtests
                    if bt.total_return
                ]
                summary["strategy_breakdown"][strategy] = {
                    "count": len(strategy_backtests),
                    "average_return": sum(strategy_returns) / len(strategy_returns)
                    if strategy_returns
                    else 0,
                }

            # Group by symbol
            symbol_groups = {}
            for bt in backtests:
                symbol = bt.symbol
                if symbol not in symbol_groups:
                    symbol_groups[symbol] = []
                symbol_groups[symbol].append(bt)

            for symbol, symbol_backtests in symbol_groups.items():
                symbol_returns = [
                    float(bt.total_return) for bt in symbol_backtests if bt.total_return
                ]
                summary["symbol_breakdown"][symbol] = {
                    "count": len(symbol_backtests),
                    "average_return": sum(symbol_returns) / len(symbol_returns)
                    if symbol_returns
                    else 0,
                }

            return summary

        except SQLAlchemyError as e:
            logger.error(f"Error generating performance summary: {e}")
            return {"error": f"Database error: {e}"}

    def delete_backtest(self, backtest_id: str) -> bool:
        """
        Delete a backtest and all associated data.

        Args:
            backtest_id: UUID string of backtest to delete

        Returns:
            True if deleted successfully, False otherwise
        """
        try:
            # Convert string UUID to UUID object
            if isinstance(backtest_id, str):
                backtest_uuid = UUID(backtest_id)
            else:
                backtest_uuid = backtest_id

            backtest = (
                self.session.query(BacktestResult)
                .filter(BacktestResult.backtest_id == backtest_uuid)
                .first()
            )

            if not backtest:
                logger.warning(f"Backtest {backtest_id} not found")
                return False

            # Delete associated records (cascading should handle this)
            self.session.delete(backtest)
            self.session.commit()

            logger.info(f"Deleted backtest {backtest_id}")
            return True

        except SQLAlchemyError as e:
            self.session.rollback()
            logger.error(f"Error deleting backtest {backtest_id}: {e}")
            return False

    @staticmethod
    def _safe_decimal(value: Any) -> Decimal | None:
        """Safely convert value to Decimal, handling None and invalid values."""
        if value is None:
            return None
        try:
            if isinstance(value, int | float):
                return Decimal(str(value))
            elif isinstance(value, Decimal):
                return value
            else:
                return Decimal(str(float(value)))
        except (ValueError, TypeError, InvalidOperation):
            return None


def get_persistence_manager(
    session: Session | None = None,
) -> BacktestPersistenceManager:
    """
    Factory function to create a persistence manager.

    Args:
        session: Optional SQLAlchemy session

    Returns:
        BacktestPersistenceManager instance
    """
    return BacktestPersistenceManager(session)


# Convenience functions for common operations


def save_vectorbt_results(
    vectorbt_results: dict[str, Any],
    execution_time: float | None = None,
    notes: str | None = None,
) -> str:
    """
    Convenience function to save VectorBT results.

    Args:
        vectorbt_results: Results from VectorBTEngine
        execution_time: Execution time in seconds
        notes: Optional notes

    Returns:
        Backtest UUID string
    """
    with get_persistence_manager() as manager:
        return manager.save_backtest_result(vectorbt_results, execution_time, notes)


def get_recent_backtests(symbol: str, days: int = 7) -> list[BacktestResult]:
    """
    Get recent backtests for a symbol.

    Args:
        symbol: Stock symbol
        days: Number of days to look back

    Returns:
        List of recent BacktestResult objects
    """
    with get_persistence_manager() as manager:
        cutoff_date = datetime.utcnow() - timedelta(days=days)
        return (
            manager.session.query(BacktestResult)
            .filter(
                BacktestResult.symbol == symbol.upper(),
                BacktestResult.backtest_date >= cutoff_date,
            )
            .order_by(desc(BacktestResult.backtest_date))
            .all()
        )


def find_best_strategy_for_symbol(
    symbol: str, metric: str = "sharpe_ratio"
) -> BacktestResult | None:
    """
    Find the best performing strategy for a symbol.

    Args:
        symbol: Stock symbol
        metric: Performance metric to optimize

    Returns:
        Best BacktestResult or None
    """
    with get_persistence_manager() as manager:
        return (
            manager.get_best_performing_strategies(metric=metric, limit=1)[0]
            if manager.get_backtests_by_symbol(symbol, limit=1000)
            else None
        )

```

--------------------------------------------------------------------------------
/tests/performance/test_profiling.py:
--------------------------------------------------------------------------------

```python
"""
Profiling Tests for Bottleneck Identification.

This test suite covers:
- Profile critical code paths with cProfile
- Identify slow database queries with timing
- Find memory allocation hotspots
- Document optimization opportunities
- Line-by-line profiling of key functions
- Call graph analysis for performance
- I/O bottleneck identification
- CPU-bound vs I/O-bound analysis
"""

import cProfile
import io
import logging
import pstats
import time
import tracemalloc
from collections.abc import Callable
from contextlib import contextmanager
from typing import Any
from unittest.mock import Mock

import numpy as np
import pandas as pd
import pytest

from maverick_mcp.backtesting import VectorBTEngine
from maverick_mcp.backtesting.persistence import BacktestPersistenceManager
from maverick_mcp.backtesting.strategies import STRATEGY_TEMPLATES

logger = logging.getLogger(__name__)


class PerformanceProfiler:
    """Comprehensive performance profiler for backtesting operations."""

    def __init__(self):
        self.profiling_data = {}
        self.memory_snapshots = []

    @contextmanager
    def profile_cpu(self, operation_name: str):
        """Profile CPU usage of an operation."""
        profiler = cProfile.Profile()
        start_time = time.time()

        profiler.enable()
        try:
            yield
        finally:
            profiler.disable()
            execution_time = time.time() - start_time

            # Capture profiling stats
            stats_stream = io.StringIO()
            stats = pstats.Stats(profiler, stream=stats_stream)
            stats.sort_stats("cumulative")
            stats.print_stats(20)  # Top 20 functions

            self.profiling_data[operation_name] = {
                "execution_time": execution_time,
                "cpu_profile": stats_stream.getvalue(),
                "stats_object": stats,
            }

    @contextmanager
    def profile_memory(self, operation_name: str):
        """Profile memory usage of an operation."""
        tracemalloc.start()
        start_memory = tracemalloc.get_traced_memory()

        try:
            yield
        finally:
            current_memory, peak_memory = tracemalloc.get_traced_memory()
            tracemalloc.stop()

            memory_data = {
                "start_memory_mb": start_memory[0] / 1024 / 1024,
                "current_memory_mb": current_memory / 1024 / 1024,
                "peak_memory_mb": peak_memory / 1024 / 1024,
                "memory_growth_mb": (current_memory - start_memory[0]) / 1024 / 1024,
            }

            if operation_name in self.profiling_data:
                self.profiling_data[operation_name]["memory_profile"] = memory_data
            else:
                self.profiling_data[operation_name] = {"memory_profile": memory_data}

    def profile_database_query(
        self, query_name: str, query_func: Callable
    ) -> dict[str, Any]:
        """Profile database query performance."""
        start_time = time.time()

        try:
            result = query_func()
            execution_time = time.time() - start_time

            return {
                "query_name": query_name,
                "execution_time_ms": execution_time * 1000,
                "success": True,
                "result_size": len(str(result)) if result else 0,
            }
        except Exception as e:
            execution_time = time.time() - start_time
            return {
                "query_name": query_name,
                "execution_time_ms": execution_time * 1000,
                "success": False,
                "error": str(e),
            }

    def analyze_hotspots(self, operation_name: str) -> dict[str, Any]:
        """Analyze performance hotspots from profiling data."""
        if operation_name not in self.profiling_data:
            return {"error": f"No profiling data for {operation_name}"}

        data = self.profiling_data[operation_name]
        stats = data.get("stats_object")

        if not stats:
            return {"error": "No CPU profiling stats available"}

        # Extract top functions by cumulative time
        stats.sort_stats("cumulative")
        top_functions = []

        for func_data in list(stats.stats.items())[:10]:
            func_name, (cc, nc, tt, ct, callers) = func_data
            top_functions.append(
                {
                    "function": f"{func_name[0]}:{func_name[1]}({func_name[2]})",
                    "cumulative_time": ct,
                    "total_time": tt,
                    "call_count": nc,
                    "time_per_call": ct / nc if nc > 0 else 0,
                }
            )

        # Extract top functions by self time
        stats.sort_stats("tottime")
        self_time_functions = []

        for func_data in list(stats.stats.items())[:10]:
            func_name, (cc, nc, tt, ct, callers) = func_data
            self_time_functions.append(
                {
                    "function": f"{func_name[0]}:{func_name[1]}({func_name[2]})",
                    "self_time": tt,
                    "cumulative_time": ct,
                    "call_count": nc,
                }
            )

        return {
            "operation_name": operation_name,
            "total_execution_time": data.get("execution_time", 0),
            "top_functions_by_cumulative": top_functions,
            "top_functions_by_self_time": self_time_functions,
            "memory_profile": data.get("memory_profile", {}),
        }

    def generate_optimization_report(self) -> dict[str, Any]:
        """Generate comprehensive optimization report."""
        optimization_opportunities = []
        performance_summary = {}

        for operation_name, data in self.profiling_data.items():
            analysis = self.analyze_hotspots(operation_name)

            performance_summary[operation_name] = {
                "execution_time": data.get("execution_time", 0),
                "peak_memory_mb": data.get("memory_profile", {}).get(
                    "peak_memory_mb", 0
                ),
            }

            # Identify optimization opportunities
            if "top_functions_by_cumulative" in analysis:
                for func in analysis["top_functions_by_cumulative"][
                    :3
                ]:  # Top 3 functions
                    if func["cumulative_time"] > 0.1:  # More than 100ms
                        optimization_opportunities.append(
                            {
                                "operation": operation_name,
                                "function": func["function"],
                                "issue": "High cumulative time",
                                "time": func["cumulative_time"],
                                "priority": "High"
                                if func["cumulative_time"] > 1.0
                                else "Medium",
                            }
                        )

            # Memory optimization opportunities
            memory_profile = data.get("memory_profile", {})
            if memory_profile.get("peak_memory_mb", 0) > 100:  # More than 100MB
                optimization_opportunities.append(
                    {
                        "operation": operation_name,
                        "issue": "High memory usage",
                        "memory_mb": memory_profile["peak_memory_mb"],
                        "priority": "High"
                        if memory_profile["peak_memory_mb"] > 500
                        else "Medium",
                    }
                )

        return {
            "performance_summary": performance_summary,
            "optimization_opportunities": optimization_opportunities,
            "total_operations_profiled": len(self.profiling_data),
        }


class TestPerformanceProfiling:
    """Performance profiling test suite."""

    @pytest.fixture
    async def profiling_data_provider(self):
        """Create data provider for profiling tests."""
        provider = Mock()

        def generate_profiling_data(symbol: str) -> pd.DataFrame:
            """Generate data with known performance characteristics."""
            # Generate larger dataset to create measurable performance impact
            dates = pd.date_range(
                start="2020-01-01", end="2023-12-31", freq="D"
            )  # 4 years
            np.random.seed(hash(symbol) % 1000)

            returns = np.random.normal(0.0008, 0.02, len(dates))
            prices = 100 * np.cumprod(1 + returns)

            # Add some computationally expensive operations
            high_prices = prices * np.random.uniform(1.01, 1.05, len(dates))
            low_prices = prices * np.random.uniform(0.95, 0.99, len(dates))

            # Simulate expensive volume calculations
            base_volume = np.random.randint(1000000, 10000000, len(dates))
            volume_multiplier = np.exp(
                np.random.normal(0, 0.1, len(dates))
            )  # Log-normal distribution
            volumes = (base_volume * volume_multiplier).astype(int)

            return pd.DataFrame(
                {
                    "Open": prices * np.random.uniform(0.995, 1.005, len(dates)),
                    "High": high_prices,
                    "Low": low_prices,
                    "Close": prices,
                    "Volume": volumes,
                    "Adj Close": prices,
                },
                index=dates,
            )

        provider.get_stock_data.side_effect = generate_profiling_data
        return provider

    async def test_profile_backtest_execution(self, profiling_data_provider):
        """Profile complete backtest execution to identify bottlenecks."""
        profiler = PerformanceProfiler()
        engine = VectorBTEngine(data_provider=profiling_data_provider)

        strategies_to_profile = ["sma_cross", "rsi", "macd", "bollinger"]

        for strategy in strategies_to_profile:
            with profiler.profile_cpu(f"backtest_{strategy}"):
                with profiler.profile_memory(f"backtest_{strategy}"):
                    await engine.run_backtest(
                        symbol="PROFILE_TEST",
                        strategy_type=strategy,
                        parameters=STRATEGY_TEMPLATES[strategy]["parameters"],
                        start_date="2022-01-01",
                        end_date="2023-12-31",
                    )

        # Analyze profiling results
        report = profiler.generate_optimization_report()

        # Log performance analysis
        logger.info("Backtest Execution Profiling Results:")
        for operation, summary in report["performance_summary"].items():
            logger.info(
                f"  {operation}: {summary['execution_time']:.3f}s, "
                f"{summary['peak_memory_mb']:.1f}MB peak"
            )

        # Log optimization opportunities
        if report["optimization_opportunities"]:
            logger.info("Optimization Opportunities:")
            for opportunity in report["optimization_opportunities"]:
                priority_symbol = "🔴" if opportunity["priority"] == "High" else "🟡"
                logger.info(
                    f"  {priority_symbol} {opportunity['operation']}: {opportunity['issue']}"
                )

        # Performance assertions
        max_execution_time = max(
            summary["execution_time"]
            for summary in report["performance_summary"].values()
        )
        assert max_execution_time <= 5.0, (
            f"Slowest backtest took too long: {max_execution_time:.2f}s"
        )

        high_priority_issues = [
            opp
            for opp in report["optimization_opportunities"]
            if opp["priority"] == "High"
        ]
        assert len(high_priority_issues) <= 2, (
            f"Too many high-priority performance issues: {len(high_priority_issues)}"
        )

        return report

    async def test_profile_data_loading_bottlenecks(self, profiling_data_provider):
        """Profile data loading operations to identify I/O bottlenecks."""
        profiler = PerformanceProfiler()
        engine = VectorBTEngine(data_provider=profiling_data_provider)

        symbols = ["DATA_1", "DATA_2", "DATA_3", "DATA_4", "DATA_5"]

        # Profile data loading operations
        for symbol in symbols:
            with profiler.profile_cpu(f"data_loading_{symbol}"):
                with profiler.profile_memory(f"data_loading_{symbol}"):
                    # Profile the data fetching specifically
                    await engine.get_historical_data(
                        symbol=symbol, start_date="2020-01-01", end_date="2023-12-31"
                    )

        # Analyze data loading performance
        data_loading_times = []
        data_loading_memory = []

        for symbol in symbols:
            operation_name = f"data_loading_{symbol}"
            if operation_name in profiler.profiling_data:
                data_loading_times.append(
                    profiler.profiling_data[operation_name]["execution_time"]
                )
                memory_profile = profiler.profiling_data[operation_name].get(
                    "memory_profile", {}
                )
                data_loading_memory.append(memory_profile.get("peak_memory_mb", 0))

        avg_loading_time = np.mean(data_loading_times) if data_loading_times else 0
        max_loading_time = max(data_loading_times) if data_loading_times else 0
        avg_loading_memory = np.mean(data_loading_memory) if data_loading_memory else 0

        logger.info("Data Loading Performance Analysis:")
        logger.info(f"  Average Loading Time: {avg_loading_time:.3f}s")
        logger.info(f"  Maximum Loading Time: {max_loading_time:.3f}s")
        logger.info(f"  Average Memory Usage: {avg_loading_memory:.1f}MB")

        # Performance assertions for data loading
        assert avg_loading_time <= 0.5, (
            f"Average data loading too slow: {avg_loading_time:.3f}s"
        )
        assert max_loading_time <= 1.0, (
            f"Slowest data loading too slow: {max_loading_time:.3f}s"
        )
        assert avg_loading_memory <= 50.0, (
            f"Data loading memory usage too high: {avg_loading_memory:.1f}MB"
        )

        return {
            "avg_loading_time": avg_loading_time,
            "max_loading_time": max_loading_time,
            "avg_loading_memory": avg_loading_memory,
            "individual_times": data_loading_times,
        }

    async def test_profile_database_query_performance(
        self, profiling_data_provider, db_session
    ):
        """Profile database queries to identify slow operations."""
        profiler = PerformanceProfiler()
        engine = VectorBTEngine(data_provider=profiling_data_provider)

        # Generate test data for database profiling
        test_results = []
        for i in range(10):
            result = await engine.run_backtest(
                symbol=f"DB_PROFILE_{i}",
                strategy_type="sma_cross",
                parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
                start_date="2023-01-01",
                end_date="2023-12-31",
            )
            test_results.append(result)

        # Profile database operations
        query_profiles = []

        with BacktestPersistenceManager(session=db_session) as persistence:
            # Profile save operations
            for i, result in enumerate(test_results):
                query_profile = profiler.profile_database_query(
                    f"save_backtest_{i}",
                    lambda r=result: persistence.save_backtest_result(
                        vectorbt_results=r,
                        execution_time=2.0,
                        notes="Database profiling test",
                    ),
                )
                query_profiles.append(query_profile)

            # Get saved IDs for retrieval profiling
            saved_ids = [qp.get("result") for qp in query_profiles if qp.get("success")]

            # Profile retrieval operations
            for i, backtest_id in enumerate(
                saved_ids[:5]
            ):  # Profile first 5 retrievals
                query_profile = profiler.profile_database_query(
                    f"retrieve_backtest_{i}",
                    lambda bid=backtest_id: persistence.get_backtest_by_id(bid),
                )
                query_profiles.append(query_profile)

            # Profile bulk query operations
            bulk_query_profile = profiler.profile_database_query(
                "bulk_query_by_strategy",
                lambda: persistence.get_backtests_by_strategy("sma_cross"),
            )
            query_profiles.append(bulk_query_profile)

        # Analyze database query performance
        save_times = [
            qp["execution_time_ms"]
            for qp in query_profiles
            if "save_backtest" in qp["query_name"] and qp["success"]
        ]
        retrieve_times = [
            qp["execution_time_ms"]
            for qp in query_profiles
            if "retrieve_backtest" in qp["query_name"] and qp["success"]
        ]

        avg_save_time = np.mean(save_times) if save_times else 0
        avg_retrieve_time = np.mean(retrieve_times) if retrieve_times else 0
        bulk_query_time = (
            bulk_query_profile["execution_time_ms"]
            if bulk_query_profile["success"]
            else 0
        )

        logger.info("Database Query Performance Analysis:")
        logger.info(f"  Average Save Time: {avg_save_time:.1f}ms")
        logger.info(f"  Average Retrieve Time: {avg_retrieve_time:.1f}ms")
        logger.info(f"  Bulk Query Time: {bulk_query_time:.1f}ms")

        # Identify slow queries
        slow_queries = [
            qp
            for qp in query_profiles
            if qp["execution_time_ms"] > 100 and qp["success"]
        ]
        logger.info(f"  Slow Queries (>100ms): {len(slow_queries)}")

        # Performance assertions for database queries
        assert avg_save_time <= 50.0, (
            f"Average save time too slow: {avg_save_time:.1f}ms"
        )
        assert avg_retrieve_time <= 20.0, (
            f"Average retrieve time too slow: {avg_retrieve_time:.1f}ms"
        )
        assert bulk_query_time <= 100.0, f"Bulk query too slow: {bulk_query_time:.1f}ms"
        assert len(slow_queries) <= 2, f"Too many slow queries: {len(slow_queries)}"

        return {
            "avg_save_time": avg_save_time,
            "avg_retrieve_time": avg_retrieve_time,
            "bulk_query_time": bulk_query_time,
            "slow_queries": len(slow_queries),
            "query_profiles": query_profiles,
        }

    async def test_profile_memory_allocation_patterns(self, profiling_data_provider):
        """Profile memory allocation patterns to identify hotspots."""
        profiler = PerformanceProfiler()
        engine = VectorBTEngine(data_provider=profiling_data_provider)

        # Test different memory usage patterns
        memory_test_cases = [
            ("small_dataset", "2023-06-01", "2023-12-31"),
            ("medium_dataset", "2022-01-01", "2023-12-31"),
            ("large_dataset", "2020-01-01", "2023-12-31"),
        ]

        memory_profiles = []

        for case_name, start_date, end_date in memory_test_cases:
            with profiler.profile_memory(f"memory_{case_name}"):
                await engine.run_backtest(
                    symbol="MEMORY_TEST",
                    strategy_type="macd",
                    parameters=STRATEGY_TEMPLATES["macd"]["parameters"],
                    start_date=start_date,
                    end_date=end_date,
                )

            memory_data = profiler.profiling_data[f"memory_{case_name}"][
                "memory_profile"
            ]
            memory_profiles.append(
                {
                    "case": case_name,
                    "peak_memory_mb": memory_data["peak_memory_mb"],
                    "memory_growth_mb": memory_data["memory_growth_mb"],
                    "data_points": len(
                        pd.date_range(start=start_date, end=end_date, freq="D")
                    ),
                }
            )

        # Analyze memory scaling
        data_points = [mp["data_points"] for mp in memory_profiles]
        peak_memories = [mp["peak_memory_mb"] for mp in memory_profiles]

        # Calculate memory efficiency (MB per 1000 data points)
        memory_efficiency = [
            (peak_mem / data_pts * 1000)
            for peak_mem, data_pts in zip(peak_memories, data_points, strict=False)
        ]

        avg_memory_efficiency = np.mean(memory_efficiency)

        logger.info("Memory Allocation Pattern Analysis:")
        for profile in memory_profiles:
            efficiency = profile["peak_memory_mb"] / profile["data_points"] * 1000
            logger.info(
                f"  {profile['case']}: {profile['peak_memory_mb']:.1f}MB peak "
                f"({efficiency:.2f} MB/1k points)"
            )

        logger.info(
            f"  Average Memory Efficiency: {avg_memory_efficiency:.2f} MB per 1000 data points"
        )

        # Memory efficiency assertions
        assert avg_memory_efficiency <= 5.0, (
            f"Memory efficiency too poor: {avg_memory_efficiency:.2f} MB/1k points"
        )
        assert max(peak_memories) <= 200.0, (
            f"Peak memory usage too high: {max(peak_memories):.1f}MB"
        )

        return {
            "memory_profiles": memory_profiles,
            "avg_memory_efficiency": avg_memory_efficiency,
            "peak_memory_usage": max(peak_memories),
        }

    async def test_profile_cpu_vs_io_bound_operations(self, profiling_data_provider):
        """Profile CPU-bound vs I/O-bound operations to optimize resource usage."""
        profiler = PerformanceProfiler()
        engine = VectorBTEngine(data_provider=profiling_data_provider)

        # Profile CPU-intensive strategy
        with profiler.profile_cpu("cpu_intensive_strategy"):
            await engine.run_backtest(
                symbol="CPU_TEST",
                strategy_type="bollinger",  # More calculations
                parameters=STRATEGY_TEMPLATES["bollinger"]["parameters"],
                start_date="2022-01-01",
                end_date="2023-12-31",
            )

        # Profile I/O-intensive operations (multiple data fetches)
        with profiler.profile_cpu("io_intensive_operations"):
            io_symbols = ["IO_1", "IO_2", "IO_3", "IO_4", "IO_5"]
            io_results = []

            for symbol in io_symbols:
                result = await engine.run_backtest(
                    symbol=symbol,
                    strategy_type="sma_cross",  # Simpler calculations
                    parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
                    start_date="2023-06-01",
                    end_date="2023-12-31",
                )
                io_results.append(result)

        # Analyze CPU vs I/O characteristics
        cpu_analysis = profiler.analyze_hotspots("cpu_intensive_strategy")
        io_analysis = profiler.analyze_hotspots("io_intensive_operations")

        cpu_time = cpu_analysis.get("total_execution_time", 0)
        io_time = io_analysis.get("total_execution_time", 0)

        # Analyze function call patterns
        cpu_top_functions = cpu_analysis.get("top_functions_by_cumulative", [])
        io_top_functions = io_analysis.get("top_functions_by_cumulative", [])

        # Calculate I/O vs CPU characteristics
        cpu_bound_ratio = (
            cpu_time / (cpu_time + io_time) if (cpu_time + io_time) > 0 else 0
        )

        logger.info("CPU vs I/O Bound Analysis:")
        logger.info(f"  CPU-Intensive Operation: {cpu_time:.3f}s")
        logger.info(f"  I/O-Intensive Operations: {io_time:.3f}s")
        logger.info(f"  CPU-Bound Ratio: {cpu_bound_ratio:.2%}")

        logger.info("  Top CPU-Intensive Functions:")
        for func in cpu_top_functions[:3]:
            logger.info(f"    {func['function']}: {func['cumulative_time']:.3f}s")

        logger.info("  Top I/O-Intensive Functions:")
        for func in io_top_functions[:3]:
            logger.info(f"    {func['function']}: {func['cumulative_time']:.3f}s")

        # Performance balance assertions
        assert cpu_time <= 3.0, f"CPU-intensive operation too slow: {cpu_time:.3f}s"
        assert io_time <= 5.0, f"I/O-intensive operations too slow: {io_time:.3f}s"

        return {
            "cpu_time": cpu_time,
            "io_time": io_time,
            "cpu_bound_ratio": cpu_bound_ratio,
            "cpu_top_functions": cpu_top_functions[:5],
            "io_top_functions": io_top_functions[:5],
        }

    async def test_comprehensive_profiling_suite(
        self, profiling_data_provider, db_session
    ):
        """Run comprehensive profiling suite and generate optimization report."""
        logger.info("Starting Comprehensive Performance Profiling Suite...")

        profiling_results = {}

        # Run all profiling tests
        profiling_results[
            "backtest_execution"
        ] = await self.test_profile_backtest_execution(profiling_data_provider)
        profiling_results[
            "data_loading"
        ] = await self.test_profile_data_loading_bottlenecks(profiling_data_provider)
        profiling_results[
            "database_queries"
        ] = await self.test_profile_database_query_performance(
            profiling_data_provider, db_session
        )
        profiling_results[
            "memory_allocation"
        ] = await self.test_profile_memory_allocation_patterns(profiling_data_provider)
        profiling_results[
            "cpu_vs_io"
        ] = await self.test_profile_cpu_vs_io_bound_operations(profiling_data_provider)

        # Generate comprehensive optimization report
        optimization_report = {
            "executive_summary": {
                "profiling_areas": len(profiling_results),
                "performance_bottlenecks": [],
                "optimization_priorities": [],
            },
            "detailed_analysis": profiling_results,
        }

        # Identify key bottlenecks and priorities
        bottlenecks = []
        priorities = []

        # Analyze backtest execution performance
        backtest_report = profiling_results["backtest_execution"]
        high_priority_issues = [
            opp
            for opp in backtest_report.get("optimization_opportunities", [])
            if opp["priority"] == "High"
        ]
        if high_priority_issues:
            bottlenecks.append("High-priority performance issues in backtest execution")
            priorities.append("Optimize hot functions in strategy calculations")

        # Analyze data loading performance
        data_loading = profiling_results["data_loading"]
        if data_loading["max_loading_time"] > 0.8:
            bottlenecks.append("Slow data loading operations")
            priorities.append("Implement data caching or optimize data provider")

        # Analyze database performance
        db_performance = profiling_results["database_queries"]
        if db_performance["slow_queries"] > 1:
            bottlenecks.append("Multiple slow database queries detected")
            priorities.append("Add database indexes or optimize query patterns")

        # Analyze memory efficiency
        memory_analysis = profiling_results["memory_allocation"]
        if memory_analysis["avg_memory_efficiency"] > 3.0:
            bottlenecks.append("High memory usage per data point")
            priorities.append("Optimize memory allocation patterns")

        optimization_report["executive_summary"]["performance_bottlenecks"] = (
            bottlenecks
        )
        optimization_report["executive_summary"]["optimization_priorities"] = priorities

        # Log comprehensive report
        logger.info(
            f"\n{'=' * 60}\n"
            f"COMPREHENSIVE PROFILING REPORT\n"
            f"{'=' * 60}\n"
            f"Profiling Areas Analyzed: {len(profiling_results)}\n"
            f"Performance Bottlenecks: {len(bottlenecks)}\n"
            f"{'=' * 60}\n"
        )

        if bottlenecks:
            logger.info("🔍 PERFORMANCE BOTTLENECKS IDENTIFIED:")
            for i, bottleneck in enumerate(bottlenecks, 1):
                logger.info(f"  {i}. {bottleneck}")

        if priorities:
            logger.info("\n🎯 OPTIMIZATION PRIORITIES:")
            for i, priority in enumerate(priorities, 1):
                logger.info(f"  {i}. {priority}")

        logger.info(f"\n{'=' * 60}")

        # Assert profiling success
        assert len(bottlenecks) <= 3, (
            f"Too many performance bottlenecks identified: {len(bottlenecks)}"
        )

        return optimization_report


if __name__ == "__main__":
    # Run profiling tests
    pytest.main(
        [
            __file__,
            "-v",
            "--tb=short",
            "--asyncio-mode=auto",
            "--timeout=300",  # 5 minute timeout for profiling tests
        ]
    )

```

--------------------------------------------------------------------------------
/tests/performance/test_benchmarks.py:
--------------------------------------------------------------------------------

```python
"""
Performance Benchmarks Against Target Metrics.

This test suite covers:
- Backtest execution < 2 seconds per backtest
- Memory usage < 500MB per backtest
- Cache hit rate > 80%
- API failure rate < 0.1%
- Database query performance < 100ms
- Throughput targets (requests per second)
- Response time SLA compliance
- Resource utilization efficiency
"""

import asyncio
import gc
import logging
import os
import statistics
import time
from dataclasses import dataclass
from typing import Any
from unittest.mock import Mock, patch

import numpy as np
import pandas as pd
import psutil
import pytest

from maverick_mcp.backtesting import VectorBTEngine
from maverick_mcp.backtesting.persistence import BacktestPersistenceManager
from maverick_mcp.backtesting.strategies import STRATEGY_TEMPLATES

logger = logging.getLogger(__name__)


@dataclass
class BenchmarkResult:
    """Data class for benchmark test results."""

    test_name: str
    target_value: float
    actual_value: float
    unit: str
    passed: bool
    margin: float
    details: dict[str, Any]


class BenchmarkTracker:
    """Track and validate performance benchmarks."""

    def __init__(self):
        self.results = []
        self.process = psutil.Process(os.getpid())

    def add_benchmark(
        self,
        test_name: str,
        target_value: float,
        actual_value: float,
        unit: str,
        comparison: str = "<=",
        details: dict[str, Any] | None = None,
    ) -> BenchmarkResult:
        """Add a benchmark result."""
        if comparison == "<=":
            passed = actual_value <= target_value
            margin = (
                (actual_value - target_value) / target_value if target_value > 0 else 0
            )
        elif comparison == ">=":
            passed = actual_value >= target_value
            margin = (
                (target_value - actual_value) / target_value if target_value > 0 else 0
            )
        else:
            raise ValueError(f"Unsupported comparison: {comparison}")

        result = BenchmarkResult(
            test_name=test_name,
            target_value=target_value,
            actual_value=actual_value,
            unit=unit,
            passed=passed,
            margin=margin,
            details=details or {},
        )

        self.results.append(result)

        status = "✓ PASS" if passed else "✗ FAIL"
        logger.info(
            f"{status} {test_name}: {actual_value:.3f}{unit} (target: {target_value}{unit})"
        )

        return result

    def get_memory_usage(self) -> float:
        """Get current memory usage in MB."""
        return self.process.memory_info().rss / 1024 / 1024

    def get_cpu_usage(self) -> float:
        """Get current CPU usage percentage."""
        return self.process.cpu_percent()

    def summary(self) -> dict[str, Any]:
        """Generate benchmark summary."""
        total_tests = len(self.results)
        passed_tests = sum(1 for r in self.results if r.passed)
        failed_tests = total_tests - passed_tests

        return {
            "total_tests": total_tests,
            "passed_tests": passed_tests,
            "failed_tests": failed_tests,
            "pass_rate": passed_tests / total_tests if total_tests > 0 else 0,
            "results": self.results,
        }


class TestPerformanceBenchmarks:
    """Performance benchmarks against target metrics."""

    @pytest.fixture
    async def benchmark_data_provider(self):
        """Create optimized data provider for benchmarks."""
        provider = Mock()

        def generate_benchmark_data(symbol: str) -> pd.DataFrame:
            """Generate optimized data for benchmarking."""
            # Use symbol hash for deterministic but varied data
            seed = hash(symbol) % 1000
            np.random.seed(seed)

            # Generate 1 year of data
            dates = pd.date_range(start="2023-01-01", end="2023-12-31", freq="D")
            returns = np.random.normal(0.0008, 0.02, len(dates))
            prices = 100 * np.cumprod(1 + returns)

            return pd.DataFrame(
                {
                    "Open": prices * np.random.uniform(0.995, 1.005, len(dates)),
                    "High": prices * np.random.uniform(1.005, 1.025, len(dates)),
                    "Low": prices * np.random.uniform(0.975, 0.995, len(dates)),
                    "Close": prices,
                    "Volume": np.random.randint(1000000, 5000000, len(dates)),
                    "Adj Close": prices,
                },
                index=dates,
            )

        provider.get_stock_data.side_effect = generate_benchmark_data
        return provider

    async def test_backtest_execution_time_benchmark(self, benchmark_data_provider):
        """Test: Backtest execution < 2 seconds per backtest."""
        benchmark = BenchmarkTracker()
        engine = VectorBTEngine(data_provider=benchmark_data_provider)

        test_cases = [
            ("AAPL", "sma_cross"),
            ("GOOGL", "rsi"),
            ("MSFT", "macd"),
            ("AMZN", "bollinger"),
            ("TSLA", "momentum"),
        ]

        execution_times = []

        for symbol, strategy in test_cases:
            parameters = STRATEGY_TEMPLATES[strategy]["parameters"]

            start_time = time.time()
            result = await engine.run_backtest(
                symbol=symbol,
                strategy_type=strategy,
                parameters=parameters,
                start_date="2023-01-01",
                end_date="2023-12-31",
            )
            execution_time = time.time() - start_time

            execution_times.append(execution_time)

            # Individual backtest benchmark
            benchmark.add_benchmark(
                test_name=f"backtest_time_{symbol}_{strategy}",
                target_value=2.0,
                actual_value=execution_time,
                unit="s",
                comparison="<=",
                details={
                    "symbol": symbol,
                    "strategy": strategy,
                    "result_size": len(str(result)),
                },
            )

        # Overall benchmark
        avg_execution_time = statistics.mean(execution_times)
        max_execution_time = max(execution_times)

        benchmark.add_benchmark(
            test_name="avg_backtest_execution_time",
            target_value=2.0,
            actual_value=avg_execution_time,
            unit="s",
            comparison="<=",
            details={"individual_times": execution_times},
        )

        benchmark.add_benchmark(
            test_name="max_backtest_execution_time",
            target_value=3.0,  # Allow some variance
            actual_value=max_execution_time,
            unit="s",
            comparison="<=",
            details={
                "slowest_case": test_cases[execution_times.index(max_execution_time)]
            },
        )

        logger.info(
            f"Backtest Execution Time Benchmark Summary:\n"
            f"  • Average: {avg_execution_time:.3f}s\n"
            f"  • Maximum: {max_execution_time:.3f}s\n"
            f"  • Minimum: {min(execution_times):.3f}s\n"
            f"  • Standard Deviation: {statistics.stdev(execution_times):.3f}s"
        )

        return benchmark.summary()

    async def test_memory_usage_benchmark(self, benchmark_data_provider):
        """Test: Memory usage < 500MB per backtest."""
        benchmark = BenchmarkTracker()
        engine = VectorBTEngine(data_provider=benchmark_data_provider)

        initial_memory = benchmark.get_memory_usage()
        memory_measurements = []

        test_symbols = [
            "MEM_TEST_1",
            "MEM_TEST_2",
            "MEM_TEST_3",
            "MEM_TEST_4",
            "MEM_TEST_5",
        ]

        for _i, symbol in enumerate(test_symbols):
            gc.collect()  # Force garbage collection before measurement
            pre_backtest_memory = benchmark.get_memory_usage()

            # Run backtest
            result = await engine.run_backtest(
                symbol=symbol,
                strategy_type="sma_cross",
                parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
                start_date="2023-01-01",
                end_date="2023-12-31",
            )

            post_backtest_memory = benchmark.get_memory_usage()
            memory_delta = post_backtest_memory - pre_backtest_memory

            memory_measurements.append(
                {
                    "symbol": symbol,
                    "pre_memory": pre_backtest_memory,
                    "post_memory": post_backtest_memory,
                    "delta": memory_delta,
                }
            )

            # Individual memory benchmark
            benchmark.add_benchmark(
                test_name=f"memory_usage_{symbol}",
                target_value=500.0,
                actual_value=memory_delta,
                unit="MB",
                comparison="<=",
                details={
                    "pre_memory": pre_backtest_memory,
                    "post_memory": post_backtest_memory,
                    "result_size": len(str(result)),
                },
            )

        # Overall memory benchmarks
        total_memory_growth = benchmark.get_memory_usage() - initial_memory
        avg_memory_per_backtest = (
            total_memory_growth / len(test_symbols) if test_symbols else 0
        )
        max_memory_delta = max(m["delta"] for m in memory_measurements)

        benchmark.add_benchmark(
            test_name="avg_memory_per_backtest",
            target_value=500.0,
            actual_value=avg_memory_per_backtest,
            unit="MB",
            comparison="<=",
            details={
                "total_growth": total_memory_growth,
                "measurements": memory_measurements,
            },
        )

        benchmark.add_benchmark(
            test_name="max_memory_per_backtest",
            target_value=750.0,  # Allow some variance
            actual_value=max_memory_delta,
            unit="MB",
            comparison="<=",
            details={
                "worst_case": memory_measurements[
                    next(
                        i
                        for i, m in enumerate(memory_measurements)
                        if m["delta"] == max_memory_delta
                    )
                ]
            },
        )

        logger.info(
            f"Memory Usage Benchmark Summary:\n"
            f"  • Total Growth: {total_memory_growth:.1f}MB\n"
            f"  • Avg per Backtest: {avg_memory_per_backtest:.1f}MB\n"
            f"  • Max per Backtest: {max_memory_delta:.1f}MB\n"
            f"  • Initial Memory: {initial_memory:.1f}MB"
        )

        return benchmark.summary()

    async def test_cache_hit_rate_benchmark(self, benchmark_data_provider):
        """Test: Cache hit rate > 80%."""
        benchmark = BenchmarkTracker()
        engine = VectorBTEngine(data_provider=benchmark_data_provider)

        # Mock cache to track hits/misses
        cache_stats = {"hits": 0, "misses": 0, "total_requests": 0}

        def mock_cache_get(key):
            cache_stats["total_requests"] += 1
            # Simulate realistic cache behavior
            if cache_stats["total_requests"] <= 5:  # First few are misses
                cache_stats["misses"] += 1
                return None
            else:  # Later requests are hits
                cache_stats["hits"] += 1
                return "cached_result"

        with patch(
            "maverick_mcp.core.cache.CacheManager.get", side_effect=mock_cache_get
        ):
            # Run multiple backtests with repeated data access
            symbols = [
                "CACHE_A",
                "CACHE_B",
                "CACHE_A",
                "CACHE_B",
                "CACHE_A",
                "CACHE_C",
                "CACHE_A",
            ]

            for symbol in symbols:
                await engine.run_backtest(
                    symbol=symbol,
                    strategy_type="sma_cross",
                    parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
                    start_date="2023-01-01",
                    end_date="2023-12-31",
                )

        # Calculate cache hit rate
        total_cache_requests = cache_stats["total_requests"]
        cache_hits = cache_stats["hits"]
        cache_hit_rate = (
            (cache_hits / total_cache_requests * 100) if total_cache_requests > 0 else 0
        )

        benchmark.add_benchmark(
            test_name="cache_hit_rate",
            target_value=80.0,
            actual_value=cache_hit_rate,
            unit="%",
            comparison=">=",
            details={
                "total_requests": total_cache_requests,
                "hits": cache_hits,
                "misses": cache_stats["misses"],
            },
        )

        logger.info(
            f"Cache Hit Rate Benchmark:\n"
            f"  • Total Cache Requests: {total_cache_requests}\n"
            f"  • Cache Hits: {cache_hits}\n"
            f"  • Cache Misses: {cache_stats['misses']}\n"
            f"  • Hit Rate: {cache_hit_rate:.1f}%"
        )

        return benchmark.summary()

    async def test_api_failure_rate_benchmark(self, benchmark_data_provider):
        """Test: API failure rate < 0.1%."""
        benchmark = BenchmarkTracker()

        # Mock API with occasional failures
        api_stats = {"total_calls": 0, "failures": 0}

        def mock_api_call(*args, **kwargs):
            api_stats["total_calls"] += 1
            # Simulate very low failure rate
            if api_stats["total_calls"] % 2000 == 0:  # 0.05% failure rate
                api_stats["failures"] += 1
                raise ConnectionError("Simulated API failure")
            return benchmark_data_provider.get_stock_data(*args, **kwargs)

        # Test with many API calls
        with patch.object(
            benchmark_data_provider, "get_stock_data", side_effect=mock_api_call
        ):
            engine = VectorBTEngine(data_provider=benchmark_data_provider)

            test_symbols = [
                f"API_TEST_{i}" for i in range(50)
            ]  # 50 symbols to test API reliability

            successful_backtests = 0
            failed_backtests = 0

            for symbol in test_symbols:
                try:
                    await engine.run_backtest(
                        symbol=symbol,
                        strategy_type="rsi",
                        parameters=STRATEGY_TEMPLATES["rsi"]["parameters"],
                        start_date="2023-01-01",
                        end_date="2023-12-31",
                    )
                    successful_backtests += 1
                except Exception:
                    failed_backtests += 1

        # Calculate failure rates
        total_api_calls = api_stats["total_calls"]
        api_failures = api_stats["failures"]
        api_failure_rate = (
            (api_failures / total_api_calls * 100) if total_api_calls > 0 else 0
        )

        total_backtests = successful_backtests + failed_backtests
        backtest_failure_rate = (
            (failed_backtests / total_backtests * 100) if total_backtests > 0 else 0
        )

        benchmark.add_benchmark(
            test_name="api_failure_rate",
            target_value=0.1,
            actual_value=api_failure_rate,
            unit="%",
            comparison="<=",
            details={
                "total_api_calls": total_api_calls,
                "api_failures": api_failures,
                "successful_backtests": successful_backtests,
                "failed_backtests": failed_backtests,
            },
        )

        benchmark.add_benchmark(
            test_name="backtest_success_rate",
            target_value=99.5,
            actual_value=100 - backtest_failure_rate,
            unit="%",
            comparison=">=",
            details={"backtest_failure_rate": backtest_failure_rate},
        )

        logger.info(
            f"API Reliability Benchmark:\n"
            f"  • Total API Calls: {total_api_calls}\n"
            f"  • API Failures: {api_failures}\n"
            f"  • API Failure Rate: {api_failure_rate:.3f}%\n"
            f"  • Backtest Success Rate: {100 - backtest_failure_rate:.2f}%"
        )

        return benchmark.summary()

    async def test_database_query_performance_benchmark(
        self, benchmark_data_provider, db_session
    ):
        """Test: Database query performance < 100ms."""
        benchmark = BenchmarkTracker()
        engine = VectorBTEngine(data_provider=benchmark_data_provider)

        # Generate test data for database operations
        test_results = []
        for i in range(10):
            result = await engine.run_backtest(
                symbol=f"DB_PERF_{i}",
                strategy_type="macd",
                parameters=STRATEGY_TEMPLATES["macd"]["parameters"],
                start_date="2023-01-01",
                end_date="2023-12-31",
            )
            test_results.append(result)

        # Test database save performance
        save_times = []
        with BacktestPersistenceManager(session=db_session) as persistence:
            for result in test_results:
                start_time = time.time()
                backtest_id = persistence.save_backtest_result(
                    vectorbt_results=result,
                    execution_time=2.0,
                    notes="DB performance test",
                )
                save_time = (time.time() - start_time) * 1000  # Convert to ms
                save_times.append((backtest_id, save_time))

        # Test database query performance
        query_times = []
        with BacktestPersistenceManager(session=db_session) as persistence:
            for backtest_id, _ in save_times:
                start_time = time.time()
                persistence.get_backtest_by_id(backtest_id)
                query_time = (time.time() - start_time) * 1000  # Convert to ms
                query_times.append(query_time)

            # Test bulk query performance
            start_time = time.time()
            bulk_results = persistence.get_backtests_by_strategy("macd")
            bulk_query_time = (time.time() - start_time) * 1000

        # Calculate benchmarks
        avg_save_time = statistics.mean([t for _, t in save_times])
        max_save_time = max([t for _, t in save_times])
        avg_query_time = statistics.mean(query_times)
        max_query_time = max(query_times)

        # Add benchmarks
        benchmark.add_benchmark(
            test_name="avg_db_save_time",
            target_value=100.0,
            actual_value=avg_save_time,
            unit="ms",
            comparison="<=",
            details={"individual_times": [t for _, t in save_times]},
        )

        benchmark.add_benchmark(
            test_name="max_db_save_time",
            target_value=200.0,
            actual_value=max_save_time,
            unit="ms",
            comparison="<=",
        )

        benchmark.add_benchmark(
            test_name="avg_db_query_time",
            target_value=50.0,
            actual_value=avg_query_time,
            unit="ms",
            comparison="<=",
            details={"individual_times": query_times},
        )

        benchmark.add_benchmark(
            test_name="max_db_query_time",
            target_value=100.0,
            actual_value=max_query_time,
            unit="ms",
            comparison="<=",
        )

        benchmark.add_benchmark(
            test_name="bulk_query_time",
            target_value=200.0,
            actual_value=bulk_query_time,
            unit="ms",
            comparison="<=",
            details={"records_returned": len(bulk_results)},
        )

        logger.info(
            f"Database Performance Benchmark:\n"
            f"  • Avg Save Time: {avg_save_time:.1f}ms\n"
            f"  • Max Save Time: {max_save_time:.1f}ms\n"
            f"  • Avg Query Time: {avg_query_time:.1f}ms\n"
            f"  • Max Query Time: {max_query_time:.1f}ms\n"
            f"  • Bulk Query Time: {bulk_query_time:.1f}ms"
        )

        return benchmark.summary()

    async def test_throughput_benchmark(self, benchmark_data_provider):
        """Test: Throughput targets (requests per second)."""
        benchmark = BenchmarkTracker()
        engine = VectorBTEngine(data_provider=benchmark_data_provider)

        # Test sequential throughput
        symbols = ["THRU_1", "THRU_2", "THRU_3", "THRU_4", "THRU_5"]
        start_time = time.time()

        for symbol in symbols:
            await engine.run_backtest(
                symbol=symbol,
                strategy_type="sma_cross",
                parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
                start_date="2023-01-01",
                end_date="2023-12-31",
            )

        sequential_time = time.time() - start_time
        sequential_throughput = len(symbols) / sequential_time

        # Test concurrent throughput
        concurrent_symbols = ["CONC_1", "CONC_2", "CONC_3", "CONC_4", "CONC_5"]
        start_time = time.time()

        concurrent_tasks = []
        for symbol in concurrent_symbols:
            task = engine.run_backtest(
                symbol=symbol,
                strategy_type="sma_cross",
                parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
                start_date="2023-01-01",
                end_date="2023-12-31",
            )
            concurrent_tasks.append(task)

        await asyncio.gather(*concurrent_tasks)
        concurrent_time = time.time() - start_time
        concurrent_throughput = len(concurrent_symbols) / concurrent_time

        # Benchmarks
        benchmark.add_benchmark(
            test_name="sequential_throughput",
            target_value=2.0,  # 2 backtests per second
            actual_value=sequential_throughput,
            unit="req/s",
            comparison=">=",
            details={"execution_time": sequential_time, "requests": len(symbols)},
        )

        benchmark.add_benchmark(
            test_name="concurrent_throughput",
            target_value=5.0,  # 5 backtests per second with concurrency
            actual_value=concurrent_throughput,
            unit="req/s",
            comparison=">=",
            details={
                "execution_time": concurrent_time,
                "requests": len(concurrent_symbols),
            },
        )

        # Concurrency speedup
        speedup = concurrent_throughput / sequential_throughput
        benchmark.add_benchmark(
            test_name="concurrency_speedup",
            target_value=2.0,  # At least 2x speedup
            actual_value=speedup,
            unit="x",
            comparison=">=",
            details={
                "sequential_throughput": sequential_throughput,
                "concurrent_throughput": concurrent_throughput,
            },
        )

        logger.info(
            f"Throughput Benchmark:\n"
            f"  • Sequential: {sequential_throughput:.2f} req/s\n"
            f"  • Concurrent: {concurrent_throughput:.2f} req/s\n"
            f"  • Speedup: {speedup:.2f}x"
        )

        return benchmark.summary()

    async def test_response_time_sla_benchmark(self, benchmark_data_provider):
        """Test: Response time SLA compliance."""
        benchmark = BenchmarkTracker()
        engine = VectorBTEngine(data_provider=benchmark_data_provider)

        response_times = []
        symbols = [f"SLA_{i}" for i in range(20)]

        for symbol in symbols:
            start_time = time.time()
            await engine.run_backtest(
                symbol=symbol,
                strategy_type="rsi",
                parameters=STRATEGY_TEMPLATES["rsi"]["parameters"],
                start_date="2023-01-01",
                end_date="2023-12-31",
            )
            response_time = (time.time() - start_time) * 1000  # Convert to ms
            response_times.append(response_time)

        # SLA percentile benchmarks
        p50 = np.percentile(response_times, 50)
        p95 = np.percentile(response_times, 95)
        p99 = np.percentile(response_times, 99)

        benchmark.add_benchmark(
            test_name="response_time_p50",
            target_value=1500.0,  # 1.5 seconds for 50th percentile
            actual_value=p50,
            unit="ms",
            comparison="<=",
            details={"percentile": "50th"},
        )

        benchmark.add_benchmark(
            test_name="response_time_p95",
            target_value=3000.0,  # 3 seconds for 95th percentile
            actual_value=p95,
            unit="ms",
            comparison="<=",
            details={"percentile": "95th"},
        )

        benchmark.add_benchmark(
            test_name="response_time_p99",
            target_value=5000.0,  # 5 seconds for 99th percentile
            actual_value=p99,
            unit="ms",
            comparison="<=",
            details={"percentile": "99th"},
        )

        # SLA compliance rate (percentage of requests under target)
        sla_target = 2000.0  # 2 seconds
        sla_compliant = sum(1 for t in response_times if t <= sla_target)
        sla_compliance_rate = sla_compliant / len(response_times) * 100

        benchmark.add_benchmark(
            test_name="sla_compliance_rate",
            target_value=95.0,  # 95% of requests should meet SLA
            actual_value=sla_compliance_rate,
            unit="%",
            comparison=">=",
            details={
                "sla_target_ms": sla_target,
                "compliant_requests": sla_compliant,
                "total_requests": len(response_times),
            },
        )

        logger.info(
            f"Response Time SLA Benchmark:\n"
            f"  • 50th Percentile: {p50:.1f}ms\n"
            f"  • 95th Percentile: {p95:.1f}ms\n"
            f"  • 99th Percentile: {p99:.1f}ms\n"
            f"  • SLA Compliance: {sla_compliance_rate:.1f}%"
        )

        return benchmark.summary()

    async def test_comprehensive_benchmark_suite(
        self, benchmark_data_provider, db_session
    ):
        """Run comprehensive benchmark suite and generate report."""
        logger.info("Running Comprehensive Benchmark Suite...")

        # Run all individual benchmarks
        benchmark_results = []

        benchmark_results.append(
            await self.test_backtest_execution_time_benchmark(benchmark_data_provider)
        )
        benchmark_results.append(
            await self.test_memory_usage_benchmark(benchmark_data_provider)
        )
        benchmark_results.append(
            await self.test_cache_hit_rate_benchmark(benchmark_data_provider)
        )
        benchmark_results.append(
            await self.test_api_failure_rate_benchmark(benchmark_data_provider)
        )
        benchmark_results.append(
            await self.test_database_query_performance_benchmark(
                benchmark_data_provider, db_session
            )
        )
        benchmark_results.append(
            await self.test_throughput_benchmark(benchmark_data_provider)
        )
        benchmark_results.append(
            await self.test_response_time_sla_benchmark(benchmark_data_provider)
        )

        # Aggregate results
        total_tests = sum(r["total_tests"] for r in benchmark_results)
        total_passed = sum(r["passed_tests"] for r in benchmark_results)
        total_failed = sum(r["failed_tests"] for r in benchmark_results)
        overall_pass_rate = total_passed / total_tests if total_tests > 0 else 0

        # Generate comprehensive report
        report = {
            "summary": {
                "total_tests": total_tests,
                "passed_tests": total_passed,
                "failed_tests": total_failed,
                "overall_pass_rate": overall_pass_rate,
            },
            "benchmark_suites": benchmark_results,
            "critical_failures": [
                result
                for suite in benchmark_results
                for result in suite["results"]
                if not result.passed
                and result.margin > 0.2  # More than 20% over target
            ],
        }

        logger.info(
            f"\n{'=' * 60}\n"
            f"COMPREHENSIVE BENCHMARK REPORT\n"
            f"{'=' * 60}\n"
            f"Total Tests: {total_tests}\n"
            f"Passed: {total_passed} ({overall_pass_rate:.1%})\n"
            f"Failed: {total_failed}\n"
            f"{'=' * 60}\n"
        )

        # Assert overall benchmark success
        assert overall_pass_rate >= 0.8, (
            f"Overall benchmark pass rate too low: {overall_pass_rate:.1%}"
        )
        assert len(report["critical_failures"]) == 0, (
            f"Critical benchmark failures detected: {len(report['critical_failures'])}"
        )

        return report


if __name__ == "__main__":
    # Run benchmark tests
    pytest.main(
        [
            __file__,
            "-v",
            "--tb=short",
            "--asyncio-mode=auto",
            "--timeout=300",  # 5 minute timeout for benchmarks
        ]
    )

```

--------------------------------------------------------------------------------
/tests/integration/test_high_volume.py:
--------------------------------------------------------------------------------

```python
"""
High-Volume Integration Tests for Production Scenarios.

This test suite covers:
- Testing with 100+ symbols
- Testing with years of historical data
- Memory management under load
- Concurrent user scenarios
- Database performance under high load
- Cache efficiency with large datasets
- API rate limiting and throttling
"""

import asyncio
import gc
import logging
import os
import random
import time
from datetime import datetime, timedelta
from unittest.mock import Mock

import numpy as np
import pandas as pd
import psutil
import pytest

from maverick_mcp.backtesting import VectorBTEngine
from maverick_mcp.backtesting.persistence import BacktestPersistenceManager
from maverick_mcp.backtesting.strategies import STRATEGY_TEMPLATES

logger = logging.getLogger(__name__)

# High volume test parameters
LARGE_SYMBOL_SET = [
    # Technology
    "AAPL",
    "MSFT",
    "GOOGL",
    "AMZN",
    "META",
    "TSLA",
    "NVDA",
    "ADBE",
    "CRM",
    "ORCL",
    "NFLX",
    "INTC",
    "AMD",
    "QCOM",
    "AVGO",
    "TXN",
    "MU",
    "AMAT",
    "LRCX",
    "KLAC",
    # Finance
    "JPM",
    "BAC",
    "WFC",
    "C",
    "GS",
    "MS",
    "AXP",
    "BRK-B",
    "BLK",
    "SPGI",
    "CME",
    "ICE",
    "MCO",
    "COF",
    "USB",
    "TFC",
    "PNC",
    "SCHW",
    "CB",
    "AIG",
    # Healthcare
    "JNJ",
    "PFE",
    "ABT",
    "MRK",
    "TMO",
    "DHR",
    "BMY",
    "ABBV",
    "AMGN",
    "GILD",
    "BIIB",
    "REGN",
    "VRTX",
    "ISRG",
    "SYK",
    "BSX",
    "MDT",
    "EW",
    "HOLX",
    "RMD",
    # Consumer
    "WMT",
    "PG",
    "KO",
    "PEP",
    "COST",
    "HD",
    "MCD",
    "NKE",
    "SBUX",
    "TGT",
    "LOW",
    "DIS",
    "CMCSA",
    "VZ",
    "T",
    "TMUS",
    "CVX",
    "XOM",
    "UNH",
    "CVS",
    # Industrials
    "BA",
    "CAT",
    "DE",
    "GE",
    "HON",
    "MMM",
    "LMT",
    "RTX",
    "UNP",
    "UPS",
    "FDX",
    "WM",
    "EMR",
    "ETN",
    "PH",
    "CMI",
    "PCAR",
    "ROK",
    "DOV",
    "ITW",
    # Extended set for 100+ symbols
    "F",
    "GM",
    "FORD",
    "RIVN",
    "LCID",
    "PLTR",
    "SNOW",
    "ZM",
    "DOCU",
    "OKTA",
]

STRATEGIES_FOR_VOLUME_TEST = ["sma_cross", "rsi", "macd", "bollinger", "momentum"]


class TestHighVolumeIntegration:
    """High-volume integration tests for production scenarios."""

    @pytest.fixture
    async def high_volume_data_provider(self):
        """Create data provider with large dataset simulation."""
        provider = Mock()

        def generate_multi_year_data(symbol: str, years: int = 3) -> pd.DataFrame:
            """Generate multi-year realistic data for a symbol."""
            # Generate deterministic but varied data based on symbol hash
            symbol_seed = hash(symbol) % 10000
            np.random.seed(symbol_seed)

            # Create 3 years of daily data
            start_date = datetime.now() - timedelta(days=years * 365)
            dates = pd.date_range(
                start=start_date, periods=years * 252, freq="B"
            )  # Business days

            # Generate realistic price movements
            base_price = 50 + (symbol_seed % 200)  # Base price $50-$250
            returns = np.random.normal(0.0005, 0.02, len(dates))  # Daily returns

            # Add some trend and volatility clustering
            trend = (
                np.sin(np.arange(len(dates)) / 252 * 2 * np.pi) * 0.001
            )  # Annual cycle
            returns += trend

            # Generate prices
            prices = base_price * np.cumprod(1 + returns)

            # Create OHLCV data
            high_mult = np.random.uniform(1.005, 1.03, len(dates))
            low_mult = np.random.uniform(0.97, 0.995, len(dates))
            open_mult = np.random.uniform(0.995, 1.005, len(dates))

            volumes = np.random.randint(100000, 10000000, len(dates))

            data = pd.DataFrame(
                {
                    "Open": prices * open_mult,
                    "High": prices * high_mult,
                    "Low": prices * low_mult,
                    "Close": prices,
                    "Volume": volumes,
                    "Adj Close": prices,
                },
                index=dates,
            )

            # Ensure OHLC constraints
            data["High"] = np.maximum(
                data["High"], np.maximum(data["Open"], data["Close"])
            )
            data["Low"] = np.minimum(
                data["Low"], np.minimum(data["Open"], data["Close"])
            )

            return data

        provider.get_stock_data.side_effect = generate_multi_year_data
        return provider

    async def test_large_symbol_set_backtesting(
        self, high_volume_data_provider, benchmark_timer
    ):
        """Test backtesting with 100+ symbols."""
        symbols = LARGE_SYMBOL_SET[:100]  # Use first 100 symbols
        strategy = "sma_cross"

        engine = VectorBTEngine(data_provider=high_volume_data_provider)
        parameters = STRATEGY_TEMPLATES[strategy]["parameters"]

        results = []
        failed_symbols = []

        # Track memory usage
        process = psutil.Process(os.getpid())
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB

        with benchmark_timer() as timer:
            # Process symbols in batches to manage memory
            batch_size = 20
            for i in range(0, len(symbols), batch_size):
                batch_symbols = symbols[i : i + batch_size]

                # Process batch
                batch_tasks = []
                for symbol in batch_symbols:
                    task = engine.run_backtest(
                        symbol=symbol,
                        strategy_type=strategy,
                        parameters=parameters,
                        start_date="2022-01-01",
                        end_date="2023-12-31",
                    )
                    batch_tasks.append((symbol, task))

                # Execute batch concurrently
                batch_results = await asyncio.gather(
                    *[task for _, task in batch_tasks], return_exceptions=True
                )

                # Process results
                for _j, (symbol, result) in enumerate(
                    zip(batch_symbols, batch_results, strict=False)
                ):
                    if isinstance(result, Exception):
                        failed_symbols.append(symbol)
                        logger.error(f"✗ {symbol} failed: {result}")
                    else:
                        results.append(result)
                        if len(results) % 10 == 0:
                            logger.info(f"Processed {len(results)} symbols...")

                # Force garbage collection after each batch
                gc.collect()

                # Check memory usage
                current_memory = process.memory_info().rss / 1024 / 1024
                memory_growth = current_memory - initial_memory

                if memory_growth > 2000:  # More than 2GB growth
                    logger.warning(f"High memory usage detected: {memory_growth:.1f}MB")

        execution_time = timer.elapsed
        final_memory = process.memory_info().rss / 1024 / 1024
        total_memory_growth = final_memory - initial_memory

        # Performance assertions
        success_rate = len(results) / len(symbols)
        assert success_rate >= 0.85, f"Success rate too low: {success_rate:.1%}"
        assert execution_time < 1800, (
            f"Execution time too long: {execution_time:.1f}s"
        )  # 30 minutes max
        assert total_memory_growth < 3000, (
            f"Memory growth too high: {total_memory_growth:.1f}MB"
        )  # Max 3GB growth

        # Calculate performance metrics
        avg_execution_time = execution_time / len(symbols)
        throughput = len(results) / execution_time  # Backtests per second

        logger.info(
            f"Large Symbol Set Test Results:\n"
            f"  • Total Symbols: {len(symbols)}\n"
            f"  • Successful: {len(results)}\n"
            f"  • Failed: {len(failed_symbols)}\n"
            f"  • Success Rate: {success_rate:.1%}\n"
            f"  • Total Execution Time: {execution_time:.1f}s\n"
            f"  • Avg Time per Symbol: {avg_execution_time:.2f}s\n"
            f"  • Throughput: {throughput:.2f} backtests/second\n"
            f"  • Memory Growth: {total_memory_growth:.1f}MB\n"
            f"  • Failed Symbols: {failed_symbols[:10]}{'...' if len(failed_symbols) > 10 else ''}"
        )

        return {
            "symbols_processed": len(results),
            "execution_time": execution_time,
            "throughput": throughput,
            "memory_growth": total_memory_growth,
            "success_rate": success_rate,
        }

    async def test_multi_year_historical_data(
        self, high_volume_data_provider, benchmark_timer
    ):
        """Test with years of historical data (high data volume)."""
        symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
        strategy = "sma_cross"

        engine = VectorBTEngine(data_provider=high_volume_data_provider)
        parameters = STRATEGY_TEMPLATES[strategy]["parameters"]

        # Test with different time periods
        time_periods = [
            ("1_year", "2023-01-01", "2023-12-31"),
            ("2_years", "2022-01-01", "2023-12-31"),
            ("3_years", "2021-01-01", "2023-12-31"),
            ("5_years", "2019-01-01", "2023-12-31"),
        ]

        period_results = {}

        for period_name, start_date, end_date in time_periods:
            with benchmark_timer() as timer:
                period_data = []

                for symbol in symbols:
                    try:
                        result = await engine.run_backtest(
                            symbol=symbol,
                            strategy_type=strategy,
                            parameters=parameters,
                            start_date=start_date,
                            end_date=end_date,
                        )
                        period_data.append(result)

                    except Exception as e:
                        logger.error(f"Failed {symbol} for {period_name}: {e}")

                execution_time = timer.elapsed

                # Calculate average data points processed
                avg_data_points = np.mean(
                    [len(r.get("equity_curve", [])) for r in period_data]
                )
                data_throughput = avg_data_points * len(period_data) / execution_time

                period_results[period_name] = {
                    "execution_time": execution_time,
                    "symbols_processed": len(period_data),
                    "avg_data_points": avg_data_points,
                    "data_throughput": data_throughput,
                }

                logger.info(
                    f"{period_name.upper()} Period Results:\n"
                    f"  • Execution Time: {execution_time:.1f}s\n"
                    f"  • Avg Data Points: {avg_data_points:.0f}\n"
                    f"  • Data Throughput: {data_throughput:.0f} points/second"
                )

        # Validate performance scales reasonably with data size
        one_year_time = period_results["1_year"]["execution_time"]
        three_year_time = period_results["3_years"]["execution_time"]

        # 3 years should not take more than 5x the time of 1 year (allow for overhead)
        time_scaling = three_year_time / one_year_time
        assert time_scaling < 5.0, f"Time scaling too poor: {time_scaling:.1f}x"

        return period_results

    async def test_concurrent_user_scenarios(
        self, high_volume_data_provider, benchmark_timer
    ):
        """Test concurrent user scenarios with multiple simultaneous backtests."""
        symbols = LARGE_SYMBOL_SET[:50]
        strategies = STRATEGIES_FOR_VOLUME_TEST

        # Simulate different user scenarios
        user_scenarios = [
            {
                "user_id": f"user_{i}",
                "symbols": random.sample(symbols, 5),
                "strategy": random.choice(strategies),
                "start_date": "2022-01-01",
                "end_date": "2023-12-31",
            }
            for i in range(20)  # Simulate 20 concurrent users
        ]

        async def simulate_user_session(scenario):
            """Simulate a single user session."""
            engine = VectorBTEngine(data_provider=high_volume_data_provider)
            parameters = STRATEGY_TEMPLATES[scenario["strategy"]]["parameters"]

            user_results = []
            session_start = time.time()

            for symbol in scenario["symbols"]:
                try:
                    result = await engine.run_backtest(
                        symbol=symbol,
                        strategy_type=scenario["strategy"],
                        parameters=parameters,
                        start_date=scenario["start_date"],
                        end_date=scenario["end_date"],
                    )
                    user_results.append(result)

                except Exception as e:
                    logger.error(f"User {scenario['user_id']} failed on {symbol}: {e}")

            session_time = time.time() - session_start

            return {
                "user_id": scenario["user_id"],
                "results": user_results,
                "session_time": session_time,
                "symbols_processed": len(user_results),
                "success_rate": len(user_results) / len(scenario["symbols"]),
            }

        # Execute all user sessions concurrently
        with benchmark_timer() as timer:
            # Use semaphore to control concurrency
            semaphore = asyncio.Semaphore(10)  # Max 10 concurrent sessions

            async def run_with_semaphore(scenario):
                async with semaphore:
                    return await simulate_user_session(scenario)

            session_results = await asyncio.gather(
                *[run_with_semaphore(scenario) for scenario in user_scenarios],
                return_exceptions=True,
            )

        total_execution_time = timer.elapsed

        # Analyze results
        successful_sessions = [r for r in session_results if isinstance(r, dict)]
        failed_sessions = len(session_results) - len(successful_sessions)

        total_backtests = sum(r["symbols_processed"] for r in successful_sessions)
        avg_session_time = np.mean([r["session_time"] for r in successful_sessions])
        avg_success_rate = np.mean([r["success_rate"] for r in successful_sessions])

        # Performance assertions
        session_success_rate = len(successful_sessions) / len(session_results)
        assert session_success_rate >= 0.8, (
            f"Session success rate too low: {session_success_rate:.1%}"
        )
        assert avg_success_rate >= 0.8, (
            f"Average backtest success rate too low: {avg_success_rate:.1%}"
        )
        assert total_execution_time < 600, (
            f"Total execution time too long: {total_execution_time:.1f}s"
        )  # 10 minutes max

        concurrent_throughput = total_backtests / total_execution_time

        logger.info(
            f"Concurrent User Scenarios Results:\n"
            f"  • Total Users: {len(user_scenarios)}\n"
            f"  • Successful Sessions: {len(successful_sessions)}\n"
            f"  • Failed Sessions: {failed_sessions}\n"
            f"  • Session Success Rate: {session_success_rate:.1%}\n"
            f"  • Total Backtests: {total_backtests}\n"
            f"  • Avg Session Time: {avg_session_time:.1f}s\n"
            f"  • Avg Backtest Success Rate: {avg_success_rate:.1%}\n"
            f"  • Total Execution Time: {total_execution_time:.1f}s\n"
            f"  • Concurrent Throughput: {concurrent_throughput:.2f} backtests/second"
        )

        return {
            "session_success_rate": session_success_rate,
            "avg_success_rate": avg_success_rate,
            "concurrent_throughput": concurrent_throughput,
            "total_execution_time": total_execution_time,
        }

    async def test_database_performance_under_load(
        self, high_volume_data_provider, db_session, benchmark_timer
    ):
        """Test database performance under high load."""
        symbols = LARGE_SYMBOL_SET[:30]  # 30 symbols for DB test
        strategy = "sma_cross"

        engine = VectorBTEngine(data_provider=high_volume_data_provider)
        parameters = STRATEGY_TEMPLATES[strategy]["parameters"]

        # Run backtests and save to database
        backtest_results = []

        with benchmark_timer() as timer:
            # Generate backtest results
            for symbol in symbols:
                try:
                    result = await engine.run_backtest(
                        symbol=symbol,
                        strategy_type=strategy,
                        parameters=parameters,
                        start_date="2023-01-01",
                        end_date="2023-12-31",
                    )
                    backtest_results.append(result)
                except Exception as e:
                    logger.error(f"Backtest failed for {symbol}: {e}")

        backtest_generation_time = timer.elapsed

        # Test database operations under load
        with benchmark_timer() as db_timer:
            with BacktestPersistenceManager(session=db_session) as persistence:
                saved_ids = []

                # Batch save results
                for result in backtest_results:
                    try:
                        backtest_id = persistence.save_backtest_result(
                            vectorbt_results=result,
                            execution_time=2.0,
                            notes=f"High volume test - {result['symbol']}",
                        )
                        saved_ids.append(backtest_id)
                    except Exception as e:
                        logger.error(f"Save failed for {result['symbol']}: {e}")

                # Test batch retrieval
                retrieved_results = []
                for backtest_id in saved_ids:
                    try:
                        retrieved = persistence.get_backtest_by_id(backtest_id)
                        if retrieved:
                            retrieved_results.append(retrieved)
                    except Exception as e:
                        logger.error(f"Retrieval failed for {backtest_id}: {e}")

                # Test queries under load
                strategy_results = persistence.get_backtests_by_strategy(strategy)

        db_operation_time = db_timer.elapsed

        # Performance assertions
        save_success_rate = len(saved_ids) / len(backtest_results)
        retrieval_success_rate = (
            len(retrieved_results) / len(saved_ids) if saved_ids else 0
        )

        assert save_success_rate >= 0.95, (
            f"Database save success rate too low: {save_success_rate:.1%}"
        )
        assert retrieval_success_rate >= 0.95, (
            f"Database retrieval success rate too low: {retrieval_success_rate:.1%}"
        )
        assert db_operation_time < 300, (
            f"Database operations too slow: {db_operation_time:.1f}s"
        )  # 5 minutes max

        # Calculate database performance metrics
        save_throughput = len(saved_ids) / db_operation_time
        logger.info(
            f"Database Performance Under Load Results:\n"
            f"  • Backtest Generation: {backtest_generation_time:.1f}s\n"
            f"  • Database Operations: {db_operation_time:.1f}s\n"
            f"  • Backtests Generated: {len(backtest_results)}\n"
            f"  • Records Saved: {len(saved_ids)}\n"
            f"  • Records Retrieved: {len(retrieved_results)}\n"
            f"  • Save Success Rate: {save_success_rate:.1%}\n"
            f"  • Retrieval Success Rate: {retrieval_success_rate:.1%}\n"
            f"  • Save Throughput: {save_throughput:.2f} saves/second\n"
            f"  • Query Results: {len(strategy_results)} records"
        )

        return {
            "save_success_rate": save_success_rate,
            "retrieval_success_rate": retrieval_success_rate,
            "save_throughput": save_throughput,
            "db_operation_time": db_operation_time,
        }

    async def test_memory_management_large_datasets(
        self, high_volume_data_provider, benchmark_timer
    ):
        """Test memory management with large datasets."""
        symbols = LARGE_SYMBOL_SET[:25]  # 25 symbols for memory test
        strategies = STRATEGIES_FOR_VOLUME_TEST

        process = psutil.Process(os.getpid())
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB
        memory_snapshots = []

        engine = VectorBTEngine(data_provider=high_volume_data_provider)

        with benchmark_timer() as timer:
            for i, symbol in enumerate(symbols):
                for strategy in strategies:
                    try:
                        parameters = STRATEGY_TEMPLATES[strategy]["parameters"]

                        # Run backtest
                        await engine.run_backtest(
                            symbol=symbol,
                            strategy_type=strategy,
                            parameters=parameters,
                            start_date="2021-01-01",  # 3 years of data
                            end_date="2023-12-31",
                        )

                        # Take memory snapshot
                        current_memory = process.memory_info().rss / 1024 / 1024
                        memory_snapshots.append(
                            {
                                "iteration": i * len(strategies)
                                + strategies.index(strategy),
                                "symbol": symbol,
                                "strategy": strategy,
                                "memory_mb": current_memory,
                                "memory_growth": current_memory - initial_memory,
                            }
                        )

                        # Force periodic garbage collection
                        if (i * len(strategies) + strategies.index(strategy)) % 10 == 0:
                            gc.collect()

                    except Exception as e:
                        logger.error(f"Failed {symbol} with {strategy}: {e}")

        execution_time = timer.elapsed
        final_memory = process.memory_info().rss / 1024 / 1024
        total_memory_growth = final_memory - initial_memory
        peak_memory = max(snapshot["memory_mb"] for snapshot in memory_snapshots)

        # Analyze memory patterns
        memory_growths = [s["memory_growth"] for s in memory_snapshots]
        avg_memory_growth = np.mean(memory_growths)
        max_memory_growth = max(memory_growths)

        # Check for memory leaks (memory should not grow linearly with iterations)
        if len(memory_snapshots) > 10:
            # Linear regression to detect memory leaks
            iterations = [s["iteration"] for s in memory_snapshots]
            memory_values = [s["memory_growth"] for s in memory_snapshots]

            # Simple linear regression
            n = len(iterations)
            sum_x = sum(iterations)
            sum_y = sum(memory_values)
            sum_xy = sum(x * y for x, y in zip(iterations, memory_values, strict=False))
            sum_xx = sum(x * x for x in iterations)

            slope = (n * sum_xy - sum_x * sum_y) / (n * sum_xx - sum_x * sum_x)

            # Memory leak detection (slope should be small)
            memory_leak_rate = slope  # MB per iteration
        else:
            memory_leak_rate = 0

        # Performance assertions
        assert total_memory_growth < 2000, (
            f"Total memory growth too high: {total_memory_growth:.1f}MB"
        )
        assert peak_memory < initial_memory + 2500, (
            f"Peak memory too high: {peak_memory:.1f}MB"
        )
        assert abs(memory_leak_rate) < 5, (
            f"Potential memory leak detected: {memory_leak_rate:.2f}MB/iteration"
        )

        logger.info(
            f"Memory Management Large Datasets Results:\n"
            f"  • Initial Memory: {initial_memory:.1f}MB\n"
            f"  • Final Memory: {final_memory:.1f}MB\n"
            f"  • Total Growth: {total_memory_growth:.1f}MB\n"
            f"  • Peak Memory: {peak_memory:.1f}MB\n"
            f"  • Avg Growth: {avg_memory_growth:.1f}MB\n"
            f"  • Max Growth: {max_memory_growth:.1f}MB\n"
            f"  • Memory Leak Rate: {memory_leak_rate:.2f}MB/iteration\n"
            f"  • Execution Time: {execution_time:.1f}s\n"
            f"  • Iterations: {len(memory_snapshots)}"
        )

        return {
            "total_memory_growth": total_memory_growth,
            "peak_memory": peak_memory,
            "memory_leak_rate": memory_leak_rate,
            "execution_time": execution_time,
            "memory_snapshots": memory_snapshots,
        }

    async def test_cache_efficiency_large_dataset(
        self, high_volume_data_provider, benchmark_timer
    ):
        """Test cache efficiency with large datasets."""
        # Test cache with repeated access patterns
        symbols = LARGE_SYMBOL_SET[:20]
        strategy = "sma_cross"

        engine = VectorBTEngine(data_provider=high_volume_data_provider)
        parameters = STRATEGY_TEMPLATES[strategy]["parameters"]

        # First pass - populate cache
        with benchmark_timer() as timer:
            first_pass_results = []
            for symbol in symbols:
                try:
                    result = await engine.run_backtest(
                        symbol=symbol,
                        strategy_type=strategy,
                        parameters=parameters,
                        start_date="2023-01-01",
                        end_date="2023-12-31",
                    )
                    first_pass_results.append(result)
                except Exception as e:
                    logger.error(f"First pass failed for {symbol}: {e}")

        first_pass_time = timer.elapsed

        # Second pass - should benefit from cache
        with benchmark_timer() as timer:
            second_pass_results = []
            for symbol in symbols:
                try:
                    result = await engine.run_backtest(
                        symbol=symbol,
                        strategy_type=strategy,
                        parameters=parameters,
                        start_date="2023-01-01",
                        end_date="2023-12-31",
                    )
                    second_pass_results.append(result)
                except Exception as e:
                    logger.error(f"Second pass failed for {symbol}: {e}")

        second_pass_time = timer.elapsed

        # Third pass - different parameters (no cache benefit)
        modified_parameters = {
            **parameters,
            "fast_period": parameters.get("fast_period", 10) + 5,
        }
        with benchmark_timer() as timer:
            third_pass_results = []
            for symbol in symbols:
                try:
                    result = await engine.run_backtest(
                        symbol=symbol,
                        strategy_type=strategy,
                        parameters=modified_parameters,
                        start_date="2023-01-01",
                        end_date="2023-12-31",
                    )
                    third_pass_results.append(result)
                except Exception as e:
                    logger.error(f"Third pass failed for {symbol}: {e}")

        third_pass_time = timer.elapsed

        # Calculate cache efficiency metrics
        cache_speedup = (
            first_pass_time / second_pass_time if second_pass_time > 0 else 1.0
        )
        no_cache_comparison = (
            first_pass_time / third_pass_time if third_pass_time > 0 else 1.0
        )

        # Cache hit rate estimation (if second pass is significantly faster)
        estimated_cache_hit_rate = max(
            0, min(1, (first_pass_time - second_pass_time) / first_pass_time)
        )

        logger.info(
            f"Cache Efficiency Large Dataset Results:\n"
            f"  • First Pass (populate): {first_pass_time:.2f}s ({len(first_pass_results)} symbols)\n"
            f"  • Second Pass (cached): {second_pass_time:.2f}s ({len(second_pass_results)} symbols)\n"
            f"  • Third Pass (no cache): {third_pass_time:.2f}s ({len(third_pass_results)} symbols)\n"
            f"  • Cache Speedup: {cache_speedup:.2f}x\n"
            f"  • No Cache Comparison: {no_cache_comparison:.2f}x\n"
            f"  • Estimated Cache Hit Rate: {estimated_cache_hit_rate:.1%}"
        )

        return {
            "first_pass_time": first_pass_time,
            "second_pass_time": second_pass_time,
            "third_pass_time": third_pass_time,
            "cache_speedup": cache_speedup,
            "estimated_cache_hit_rate": estimated_cache_hit_rate,
        }


if __name__ == "__main__":
    # Run high-volume integration tests
    pytest.main(
        [
            __file__,
            "-v",
            "--tb=short",
            "--asyncio-mode=auto",
            "--timeout=3600",  # 1 hour timeout for high-volume tests
            "--durations=20",  # Show 20 slowest tests
            "-x",  # Stop on first failure
        ]
    )

```

--------------------------------------------------------------------------------
/maverick_mcp/monitoring/metrics.py:
--------------------------------------------------------------------------------

```python
"""
Comprehensive Prometheus metrics for MaverickMCP backtesting system.

This module provides specialized metrics for monitoring:
- Backtesting execution performance and reliability
- Strategy performance over time
- API rate limiting and failure tracking
- Resource usage and optimization
- Anomaly detection and alerting
"""

import threading
import time
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from typing import Any

from prometheus_client import (
    CollectorRegistry,
    Counter,
    Gauge,
    Histogram,
    Summary,
    generate_latest,
)

from maverick_mcp.utils.logging import get_logger

logger = get_logger(__name__)

# Custom registry for backtesting metrics to avoid conflicts
BACKTESTING_REGISTRY = CollectorRegistry()

# =============================================================================
# BACKTESTING EXECUTION METRICS
# =============================================================================

# Backtest execution counters
backtest_executions_total = Counter(
    "maverick_backtest_executions_total",
    "Total number of backtesting executions",
    ["strategy_name", "status", "symbol", "timeframe"],
    registry=BACKTESTING_REGISTRY,
)

backtest_execution_duration = Histogram(
    "maverick_backtest_execution_duration_seconds",
    "Duration of backtesting executions in seconds",
    ["strategy_name", "symbol", "timeframe", "data_size"],
    buckets=(
        0.1,
        0.5,
        1.0,
        2.5,
        5.0,
        10.0,
        30.0,
        60.0,
        120.0,
        300.0,
        600.0,
        float("inf"),
    ),
    registry=BACKTESTING_REGISTRY,
)

backtest_data_points_processed = Counter(
    "maverick_backtest_data_points_processed_total",
    "Total number of data points processed during backtesting",
    ["strategy_name", "symbol", "timeframe"],
    registry=BACKTESTING_REGISTRY,
)

backtest_memory_usage = Histogram(
    "maverick_backtest_memory_usage_mb",
    "Memory usage during backtesting in MB",
    ["strategy_name", "symbol", "complexity"],
    buckets=(10, 25, 50, 100, 250, 500, 1000, 2500, 5000, float("inf")),
    registry=BACKTESTING_REGISTRY,
)

# =============================================================================
# STRATEGY PERFORMANCE METRICS
# =============================================================================

# Strategy returns and performance
strategy_returns = Histogram(
    "maverick_strategy_returns_percent",
    "Strategy returns in percentage",
    ["strategy_name", "symbol", "period"],
    buckets=(-50, -25, -10, -5, -1, 0, 1, 5, 10, 25, 50, 100, float("inf")),
    registry=BACKTESTING_REGISTRY,
)

strategy_sharpe_ratio = Histogram(
    "maverick_strategy_sharpe_ratio",
    "Strategy Sharpe ratio",
    ["strategy_name", "symbol", "period"],
    buckets=(-2, -1, -0.5, 0, 0.5, 1.0, 1.5, 2.0, 3.0, 4.0, float("inf")),
    registry=BACKTESTING_REGISTRY,
)

strategy_max_drawdown = Histogram(
    "maverick_strategy_max_drawdown_percent",
    "Maximum drawdown percentage for strategy",
    ["strategy_name", "symbol", "period"],
    buckets=(0, 5, 10, 15, 20, 30, 40, 50, 75, 100, float("inf")),
    registry=BACKTESTING_REGISTRY,
)

strategy_win_rate = Histogram(
    "maverick_strategy_win_rate_percent",
    "Win rate percentage for strategy",
    ["strategy_name", "symbol", "period"],
    buckets=(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100),
    registry=BACKTESTING_REGISTRY,
)

strategy_trades_total = Counter(
    "maverick_strategy_trades_total",
    "Total number of trades executed by strategy",
    ["strategy_name", "symbol", "trade_type", "outcome"],
    registry=BACKTESTING_REGISTRY,
)

# Strategy execution latency
strategy_execution_latency = Summary(
    "maverick_strategy_execution_latency_seconds",
    "Strategy execution latency for signal generation",
    ["strategy_name", "complexity"],
    registry=BACKTESTING_REGISTRY,
)

# =============================================================================
# API RATE LIMITING AND FAILURE METRICS
# =============================================================================

# API call tracking
api_calls_total = Counter(
    "maverick_api_calls_total",
    "Total API calls made to external providers",
    ["provider", "endpoint", "method", "status_code"],
    registry=BACKTESTING_REGISTRY,
)

api_call_duration = Histogram(
    "maverick_api_call_duration_seconds",
    "API call duration in seconds",
    ["provider", "endpoint"],
    buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, float("inf")),
    registry=BACKTESTING_REGISTRY,
)

# Rate limiting metrics
rate_limit_hits = Counter(
    "maverick_rate_limit_hits_total",
    "Total rate limit hits by provider",
    ["provider", "endpoint", "limit_type"],
    registry=BACKTESTING_REGISTRY,
)

rate_limit_remaining = Gauge(
    "maverick_rate_limit_remaining",
    "Remaining API calls before hitting rate limit",
    ["provider", "endpoint", "window"],
    registry=BACKTESTING_REGISTRY,
)

rate_limit_reset_time = Gauge(
    "maverick_rate_limit_reset_timestamp",
    "Timestamp when rate limit resets",
    ["provider", "endpoint"],
    registry=BACKTESTING_REGISTRY,
)

# API failures and errors
api_failures_total = Counter(
    "maverick_api_failures_total",
    "Total API failures by error type",
    ["provider", "endpoint", "error_type", "error_code"],
    registry=BACKTESTING_REGISTRY,
)

api_retry_attempts = Counter(
    "maverick_api_retry_attempts_total",
    "Total API retry attempts",
    ["provider", "endpoint", "retry_number"],
    registry=BACKTESTING_REGISTRY,
)

# Circuit breaker metrics
circuit_breaker_state = Gauge(
    "maverick_circuit_breaker_state",
    "Circuit breaker state (0=closed, 1=open, 2=half-open)",
    ["provider", "endpoint"],
    registry=BACKTESTING_REGISTRY,
)

circuit_breaker_failures = Counter(
    "maverick_circuit_breaker_failures_total",
    "Circuit breaker failure count",
    ["provider", "endpoint"],
    registry=BACKTESTING_REGISTRY,
)

# =============================================================================
# RESOURCE USAGE AND PERFORMANCE METRICS
# =============================================================================

# VectorBT specific metrics
vectorbt_memory_usage = Gauge(
    "maverick_vectorbt_memory_usage_mb",
    "VectorBT memory usage in MB",
    ["operation_type"],
    registry=BACKTESTING_REGISTRY,
)

vectorbt_computation_time = Histogram(
    "maverick_vectorbt_computation_time_seconds",
    "VectorBT computation time in seconds",
    ["operation_type", "data_size"],
    buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, float("inf")),
    registry=BACKTESTING_REGISTRY,
)

# Database query performance
database_query_duration = Histogram(
    "maverick_database_query_duration_seconds",
    "Database query execution time",
    ["query_type", "table_name", "operation"],
    buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, float("inf")),
    registry=BACKTESTING_REGISTRY,
)

database_connection_pool_usage = Gauge(
    "maverick_database_connection_pool_usage",
    "Database connection pool usage",
    ["pool_type", "status"],
    registry=BACKTESTING_REGISTRY,
)

# Cache performance metrics
cache_operations_total = Counter(
    "maverick_cache_operations_total",
    "Total cache operations",
    ["cache_type", "operation", "status"],
    registry=BACKTESTING_REGISTRY,
)

cache_hit_ratio = Histogram(
    "maverick_cache_hit_ratio",
    "Cache hit ratio percentage",
    ["cache_type", "key_pattern"],
    buckets=(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 95, 99, 100),
    registry=BACKTESTING_REGISTRY,
)

# =============================================================================
# ANOMALY DETECTION METRICS
# =============================================================================

# Performance anomaly detection
strategy_performance_anomalies = Counter(
    "maverick_strategy_performance_anomalies_total",
    "Detected strategy performance anomalies",
    ["strategy_name", "anomaly_type", "severity"],
    registry=BACKTESTING_REGISTRY,
)

data_quality_issues = Counter(
    "maverick_data_quality_issues_total",
    "Data quality issues detected",
    ["data_source", "issue_type", "symbol"],
    registry=BACKTESTING_REGISTRY,
)

resource_usage_alerts = Counter(
    "maverick_resource_usage_alerts_total",
    "Resource usage threshold alerts",
    ["resource_type", "threshold_type"],
    registry=BACKTESTING_REGISTRY,
)

# Threshold monitoring gauges
performance_thresholds = Gauge(
    "maverick_performance_thresholds",
    "Performance monitoring thresholds",
    ["metric_name", "threshold_type"],  # threshold_type: warning, critical
    registry=BACKTESTING_REGISTRY,
)

# =============================================================================
# BUSINESS METRICS FOR TRADING
# =============================================================================

# Portfolio performance metrics
portfolio_value = Gauge(
    "maverick_portfolio_value_usd",
    "Current portfolio value in USD",
    ["portfolio_id", "currency"],
    registry=BACKTESTING_REGISTRY,
)

portfolio_daily_pnl = Histogram(
    "maverick_portfolio_daily_pnl_usd",
    "Daily PnL in USD",
    ["portfolio_id", "strategy"],
    buckets=(
        -10000,
        -5000,
        -1000,
        -500,
        -100,
        0,
        100,
        500,
        1000,
        5000,
        10000,
        float("inf"),
    ),
    registry=BACKTESTING_REGISTRY,
)

active_positions = Gauge(
    "maverick_active_positions_count",
    "Number of active positions",
    ["portfolio_id", "symbol", "position_type"],
    registry=BACKTESTING_REGISTRY,
)

# =============================================================================
# METRICS COLLECTOR CLASS
# =============================================================================


@dataclass
class PerformanceThreshold:
    """Configuration for performance thresholds."""

    metric_name: str
    warning_value: float
    critical_value: float
    comparison_type: str = "greater_than"  # greater_than, less_than, equal_to


class BacktestingMetricsCollector:
    """
    Comprehensive metrics collector for backtesting operations.

    Provides high-level interfaces for tracking backtesting performance,
    strategy metrics, API usage, and anomaly detection.
    """

    def __init__(self):
        self.logger = get_logger(f"{__name__}.BacktestingMetricsCollector")
        self._anomaly_thresholds = self._initialize_default_thresholds()
        self._lock = threading.Lock()

        # Initialize performance thresholds in Prometheus
        self._setup_performance_thresholds()

        self.logger.info("Backtesting metrics collector initialized")

    def _initialize_default_thresholds(self) -> dict[str, PerformanceThreshold]:
        """Initialize default performance thresholds for anomaly detection."""
        return {
            "sharpe_ratio_low": PerformanceThreshold(
                "sharpe_ratio", 0.5, 0.0, "less_than"
            ),
            "max_drawdown_high": PerformanceThreshold(
                "max_drawdown", 20.0, 30.0, "greater_than"
            ),
            "win_rate_low": PerformanceThreshold("win_rate", 40.0, 30.0, "less_than"),
            "execution_time_high": PerformanceThreshold(
                "execution_time", 60.0, 120.0, "greater_than"
            ),
            "api_failure_rate_high": PerformanceThreshold(
                "api_failure_rate", 5.0, 10.0, "greater_than"
            ),
            "memory_usage_high": PerformanceThreshold(
                "memory_usage", 1000, 2000, "greater_than"
            ),
        }

    def _setup_performance_thresholds(self):
        """Setup performance threshold gauges."""
        for _threshold_name, threshold in self._anomaly_thresholds.items():
            performance_thresholds.labels(
                metric_name=threshold.metric_name, threshold_type="warning"
            ).set(threshold.warning_value)

            performance_thresholds.labels(
                metric_name=threshold.metric_name, threshold_type="critical"
            ).set(threshold.critical_value)

    @contextmanager
    def track_backtest_execution(
        self, strategy_name: str, symbol: str, timeframe: str, data_points: int = 0
    ):
        """
        Context manager for tracking backtest execution metrics.

        Args:
            strategy_name: Name of the trading strategy
            symbol: Trading symbol (e.g., 'AAPL')
            timeframe: Data timeframe (e.g., '1D', '1H')
            data_points: Number of data points being processed
        """
        start_time = time.time()
        start_memory = self._get_memory_usage()

        # Determine data size category
        data_size = self._categorize_data_size(data_points)

        try:
            yield

            # Success metrics
            duration = time.time() - start_time
            memory_used = self._get_memory_usage() - start_memory

            backtest_executions_total.labels(
                strategy_name=strategy_name,
                status="success",
                symbol=symbol,
                timeframe=timeframe,
            ).inc()

            backtest_execution_duration.labels(
                strategy_name=strategy_name,
                symbol=symbol,
                timeframe=timeframe,
                data_size=data_size,
            ).observe(duration)

            if data_points > 0:
                backtest_data_points_processed.labels(
                    strategy_name=strategy_name, symbol=symbol, timeframe=timeframe
                ).inc(data_points)

            if memory_used > 0:
                complexity = self._categorize_complexity(data_points, duration)
                backtest_memory_usage.labels(
                    strategy_name=strategy_name, symbol=symbol, complexity=complexity
                ).observe(memory_used)

            # Check for performance anomalies
            self._check_execution_anomalies(strategy_name, duration, memory_used)

        except Exception as e:
            # Error metrics
            backtest_executions_total.labels(
                strategy_name=strategy_name,
                status="failure",
                symbol=symbol,
                timeframe=timeframe,
            ).inc()

            self.logger.error(f"Backtest execution failed for {strategy_name}: {e}")
            raise

    def track_strategy_performance(
        self,
        strategy_name: str,
        symbol: str,
        period: str,
        returns: float,
        sharpe_ratio: float,
        max_drawdown: float,
        win_rate: float,
        total_trades: int,
        winning_trades: int,
    ):
        """
        Track comprehensive strategy performance metrics.

        Args:
            strategy_name: Name of the trading strategy
            symbol: Trading symbol
            period: Performance period (e.g., '1Y', '6M', '3M')
            returns: Total returns in percentage
            sharpe_ratio: Sharpe ratio
            max_drawdown: Maximum drawdown percentage
            win_rate: Win rate percentage
            total_trades: Total number of trades
            winning_trades: Number of winning trades
        """
        # Record performance metrics
        strategy_returns.labels(
            strategy_name=strategy_name, symbol=symbol, period=period
        ).observe(returns)

        strategy_sharpe_ratio.labels(
            strategy_name=strategy_name, symbol=symbol, period=period
        ).observe(sharpe_ratio)

        strategy_max_drawdown.labels(
            strategy_name=strategy_name, symbol=symbol, period=period
        ).observe(max_drawdown)

        strategy_win_rate.labels(
            strategy_name=strategy_name, symbol=symbol, period=period
        ).observe(win_rate)

        # Record trade counts
        strategy_trades_total.labels(
            strategy_name=strategy_name,
            symbol=symbol,
            trade_type="total",
            outcome="all",
        ).inc(total_trades)

        strategy_trades_total.labels(
            strategy_name=strategy_name,
            symbol=symbol,
            trade_type="winning",
            outcome="profit",
        ).inc(winning_trades)

        losing_trades = total_trades - winning_trades
        strategy_trades_total.labels(
            strategy_name=strategy_name,
            symbol=symbol,
            trade_type="losing",
            outcome="loss",
        ).inc(losing_trades)

        # Check for performance anomalies
        self._check_strategy_anomalies(
            strategy_name, sharpe_ratio, max_drawdown, win_rate
        )

    def track_api_call(
        self,
        provider: str,
        endpoint: str,
        method: str,
        status_code: int,
        duration: float,
        error_type: str | None = None,
        remaining_calls: int | None = None,
        reset_time: datetime | None = None,
    ):
        """
        Track API call metrics including rate limiting and failures.

        Args:
            provider: API provider name (e.g., 'tiingo', 'yahoo')
            endpoint: API endpoint
            method: HTTP method
            status_code: Response status code
            duration: Request duration in seconds
            error_type: Type of error if request failed
            remaining_calls: Remaining API calls before rate limit
            reset_time: When rate limit resets
        """
        # Basic API call tracking
        api_calls_total.labels(
            provider=provider,
            endpoint=endpoint,
            method=method,
            status_code=str(status_code),
        ).inc()

        api_call_duration.labels(provider=provider, endpoint=endpoint).observe(duration)

        # Rate limiting metrics
        if remaining_calls is not None:
            rate_limit_remaining.labels(
                provider=provider, endpoint=endpoint, window="current"
            ).set(remaining_calls)

        if reset_time is not None:
            rate_limit_reset_time.labels(provider=provider, endpoint=endpoint).set(
                reset_time.timestamp()
            )

        # Failure tracking
        if status_code >= 400:
            error_code = self._categorize_error_code(status_code)
            api_failures_total.labels(
                provider=provider,
                endpoint=endpoint,
                error_type=error_type or "unknown",
                error_code=error_code,
            ).inc()

            # Check for rate limiting
            if status_code == 429:
                rate_limit_hits.labels(
                    provider=provider, endpoint=endpoint, limit_type="requests_per_hour"
                ).inc()

        # Check for API anomalies
        self._check_api_anomalies(provider, endpoint, status_code, duration)

    def track_circuit_breaker(
        self, provider: str, endpoint: str, state: str, failure_count: int
    ):
        """Track circuit breaker state and failures."""
        state_mapping = {"closed": 0, "open": 1, "half-open": 2}
        circuit_breaker_state.labels(provider=provider, endpoint=endpoint).set(
            state_mapping.get(state, 0)
        )

        if failure_count > 0:
            circuit_breaker_failures.labels(provider=provider, endpoint=endpoint).inc(
                failure_count
            )

    def track_resource_usage(
        self,
        operation_type: str,
        memory_mb: float,
        computation_time: float,
        data_size: str = "unknown",
    ):
        """Track resource usage for VectorBT operations."""
        vectorbt_memory_usage.labels(operation_type=operation_type).set(memory_mb)

        vectorbt_computation_time.labels(
            operation_type=operation_type, data_size=data_size
        ).observe(computation_time)

        # Check for resource usage anomalies
        if memory_mb > self._anomaly_thresholds["memory_usage_high"].warning_value:
            resource_usage_alerts.labels(
                resource_type="memory",
                threshold_type="warning"
                if memory_mb
                < self._anomaly_thresholds["memory_usage_high"].critical_value
                else "critical",
            ).inc()

    def track_database_operation(
        self, query_type: str, table_name: str, operation: str, duration: float
    ):
        """Track database operation performance."""
        database_query_duration.labels(
            query_type=query_type, table_name=table_name, operation=operation
        ).observe(duration)

    def track_cache_operation(
        self, cache_type: str, operation: str, hit: bool, key_pattern: str = "general"
    ):
        """Track cache operation performance."""
        status = "hit" if hit else "miss"
        cache_operations_total.labels(
            cache_type=cache_type, operation=operation, status=status
        ).inc()

    def detect_anomaly(self, anomaly_type: str, severity: str, context: dict[str, Any]):
        """Record detected anomaly."""
        strategy_name = context.get("strategy_name", "unknown")

        strategy_performance_anomalies.labels(
            strategy_name=strategy_name, anomaly_type=anomaly_type, severity=severity
        ).inc()

        self.logger.warning(
            f"Anomaly detected: {anomaly_type} (severity: {severity})",
            extra={"context": context},
        )

    def update_portfolio_metrics(
        self,
        portfolio_id: str,
        portfolio_value_usd: float,
        daily_pnl_usd: float,
        strategy: str,
        positions: list[dict[str, Any]],
    ):
        """Update portfolio-related metrics."""
        portfolio_value.labels(portfolio_id=portfolio_id, currency="USD").set(
            portfolio_value_usd
        )

        portfolio_daily_pnl.labels(
            portfolio_id=portfolio_id, strategy=strategy
        ).observe(daily_pnl_usd)

        # Update position counts
        for position in positions:
            active_positions.labels(
                portfolio_id=portfolio_id,
                symbol=position.get("symbol", "unknown"),
                position_type=position.get("type", "long"),
            ).set(position.get("quantity", 0))

    def _get_memory_usage(self) -> float:
        """Get current memory usage in MB."""
        try:
            import psutil

            process = psutil.Process()
            return process.memory_info().rss / 1024 / 1024
        except ImportError:
            return 0.0

    def _categorize_data_size(self, data_points: int) -> str:
        """Categorize data size for metrics labeling."""
        if data_points < 100:
            return "small"
        elif data_points < 1000:
            return "medium"
        elif data_points < 10000:
            return "large"
        else:
            return "xlarge"

    def _categorize_complexity(self, data_points: int, duration: float) -> str:
        """Categorize operation complexity."""
        if data_points < 1000 and duration < 10:
            return "simple"
        elif data_points < 10000 and duration < 60:
            return "moderate"
        else:
            return "complex"

    def _categorize_error_code(self, status_code: int) -> str:
        """Categorize HTTP error codes."""
        if 400 <= status_code < 500:
            return "client_error"
        elif 500 <= status_code < 600:
            return "server_error"
        else:
            return "other"

    def _check_execution_anomalies(
        self, strategy_name: str, duration: float, memory_mb: float
    ):
        """Check for execution performance anomalies."""
        threshold = self._anomaly_thresholds["execution_time_high"]
        if duration > threshold.critical_value:
            self.detect_anomaly(
                "execution_time_high",
                "critical",
                {
                    "strategy_name": strategy_name,
                    "duration": duration,
                    "threshold": threshold.critical_value,
                },
            )
        elif duration > threshold.warning_value:
            self.detect_anomaly(
                "execution_time_high",
                "warning",
                {
                    "strategy_name": strategy_name,
                    "duration": duration,
                    "threshold": threshold.warning_value,
                },
            )

    def _check_strategy_anomalies(
        self,
        strategy_name: str,
        sharpe_ratio: float,
        max_drawdown: float,
        win_rate: float,
    ):
        """Check for strategy performance anomalies."""
        # Check Sharpe ratio
        threshold = self._anomaly_thresholds["sharpe_ratio_low"]
        if sharpe_ratio < threshold.critical_value:
            self.detect_anomaly(
                "sharpe_ratio_low",
                "critical",
                {"strategy_name": strategy_name, "sharpe_ratio": sharpe_ratio},
            )
        elif sharpe_ratio < threshold.warning_value:
            self.detect_anomaly(
                "sharpe_ratio_low",
                "warning",
                {"strategy_name": strategy_name, "sharpe_ratio": sharpe_ratio},
            )

        # Check max drawdown
        threshold = self._anomaly_thresholds["max_drawdown_high"]
        if max_drawdown > threshold.critical_value:
            self.detect_anomaly(
                "max_drawdown_high",
                "critical",
                {"strategy_name": strategy_name, "max_drawdown": max_drawdown},
            )
        elif max_drawdown > threshold.warning_value:
            self.detect_anomaly(
                "max_drawdown_high",
                "warning",
                {"strategy_name": strategy_name, "max_drawdown": max_drawdown},
            )

        # Check win rate
        threshold = self._anomaly_thresholds["win_rate_low"]
        if win_rate < threshold.critical_value:
            self.detect_anomaly(
                "win_rate_low",
                "critical",
                {"strategy_name": strategy_name, "win_rate": win_rate},
            )
        elif win_rate < threshold.warning_value:
            self.detect_anomaly(
                "win_rate_low",
                "warning",
                {"strategy_name": strategy_name, "win_rate": win_rate},
            )

    def _check_api_anomalies(
        self, provider: str, endpoint: str, status_code: int, duration: float
    ):
        """Check for API call anomalies."""
        # Check API response time
        if duration > 30.0:  # 30 second threshold
            self.detect_anomaly(
                "api_response_slow",
                "warning" if duration < 60.0 else "critical",
                {"provider": provider, "endpoint": endpoint, "duration": duration},
            )

        # Check for repeated failures
        if status_code >= 500:
            self.detect_anomaly(
                "api_server_error",
                "critical",
                {
                    "provider": provider,
                    "endpoint": endpoint,
                    "status_code": status_code,
                },
            )

    def get_metrics_text(self) -> str:
        """Get all backtesting metrics in Prometheus text format."""
        return generate_latest(BACKTESTING_REGISTRY).decode("utf-8")


# =============================================================================
# GLOBAL INSTANCES AND CONVENIENCE FUNCTIONS
# =============================================================================

# Global metrics collector instance
_metrics_collector: BacktestingMetricsCollector | None = None
_collector_lock = threading.Lock()


def get_backtesting_metrics() -> BacktestingMetricsCollector:
    """Get or create the global backtesting metrics collector."""
    global _metrics_collector
    if _metrics_collector is None:
        with _collector_lock:
            if _metrics_collector is None:
                _metrics_collector = BacktestingMetricsCollector()
    return _metrics_collector


# Convenience functions for common operations
def track_backtest_execution(
    strategy_name: str, symbol: str, timeframe: str, data_points: int = 0
):
    """Convenience function to track backtest execution."""
    return get_backtesting_metrics().track_backtest_execution(
        strategy_name, symbol, timeframe, data_points
    )


def track_strategy_performance(
    strategy_name: str,
    symbol: str,
    period: str,
    returns: float,
    sharpe_ratio: float,
    max_drawdown: float,
    win_rate: float,
    total_trades: int,
    winning_trades: int,
):
    """Convenience function to track strategy performance."""
    get_backtesting_metrics().track_strategy_performance(
        strategy_name,
        symbol,
        period,
        returns,
        sharpe_ratio,
        max_drawdown,
        win_rate,
        total_trades,
        winning_trades,
    )


def track_api_call_metrics(
    provider: str,
    endpoint: str,
    method: str,
    status_code: int,
    duration: float,
    error_type: str | None = None,
    remaining_calls: int | None = None,
    reset_time: datetime | None = None,
):
    """Convenience function to track API call metrics."""
    get_backtesting_metrics().track_api_call(
        provider,
        endpoint,
        method,
        status_code,
        duration,
        error_type,
        remaining_calls,
        reset_time,
    )


def track_anomaly_detection(anomaly_type: str, severity: str, context: dict[str, Any]):
    """Convenience function to track detected anomalies."""
    get_backtesting_metrics().detect_anomaly(anomaly_type, severity, context)


def get_metrics_for_prometheus() -> str:
    """Get backtesting metrics in Prometheus format."""
    return get_backtesting_metrics().get_metrics_text()

```

--------------------------------------------------------------------------------
/maverick_mcp/backtesting/strategies/ml/adaptive.py:
--------------------------------------------------------------------------------

```python
"""Adaptive trading strategies with online learning and parameter adjustment."""

import logging
from typing import Any

import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler

from maverick_mcp.backtesting.strategies.base import Strategy

logger = logging.getLogger(__name__)


class AdaptiveStrategy(Strategy):
    """Base class for adaptive strategies that adjust parameters based on performance."""

    def __init__(
        self,
        base_strategy: Strategy,
        adaptation_method: str = "gradient",
        learning_rate: float = 0.01,
        lookback_period: int = 50,
        adaptation_frequency: int = 10,
        parameters: dict[str, Any] | None = None,
    ):
        """Initialize adaptive strategy.

        Args:
            base_strategy: Base strategy to adapt
            adaptation_method: Method for parameter adaptation
            learning_rate: Learning rate for parameter updates
            lookback_period: Period for performance evaluation
            adaptation_frequency: How often to adapt parameters
            parameters: Additional parameters
        """
        super().__init__(parameters)
        self.base_strategy = base_strategy
        self.adaptation_method = adaptation_method
        self.learning_rate = learning_rate
        self.lookback_period = lookback_period
        self.adaptation_frequency = adaptation_frequency

        # Performance tracking
        self.performance_history = []
        self.parameter_history = []
        self.last_adaptation = 0

        # Store original parameters for reference
        self.original_parameters = base_strategy.parameters.copy()

    @property
    def name(self) -> str:
        """Get strategy name."""
        return f"Adaptive({self.base_strategy.name})"

    @property
    def description(self) -> str:
        """Get strategy description."""
        return f"Adaptive version of {self.base_strategy.name} using {self.adaptation_method} method"

    def calculate_performance_metric(self, returns: Series) -> float:
        """Calculate performance metric for parameter adaptation.

        Args:
            returns: Strategy returns

        Returns:
            Performance score
        """
        if len(returns) == 0:
            return 0.0

        # Use Sharpe ratio as default performance metric
        if returns.std() == 0:
            return 0.0

        sharpe = returns.mean() / returns.std() * np.sqrt(252)

        # Alternative metrics could be:
        # - Calmar ratio: returns.mean() / abs(max_drawdown)
        # - Sortino ratio: returns.mean() / downside_deviation
        # - Information ratio: excess_returns.mean() / tracking_error

        return sharpe

    def adapt_parameters_gradient(self, performance_gradient: float) -> None:
        """Adapt parameters using gradient-based method.

        Args:
            performance_gradient: Gradient of performance with respect to parameters
        """
        adaptable_params = self.get_adaptable_parameters()

        for param_name, param_info in adaptable_params.items():
            if param_name in self.base_strategy.parameters:
                current_value = self.base_strategy.parameters[param_name]

                # Calculate parameter update
                param_gradient = performance_gradient * self.learning_rate

                # Apply bounds and constraints
                min_val = param_info.get("min", current_value * 0.5)
                max_val = param_info.get("max", current_value * 2.0)
                step_size = param_info.get("step", abs(current_value) * 0.01)

                # Update parameter
                new_value = current_value + param_gradient * step_size
                new_value = max(min_val, min(max_val, new_value))

                self.base_strategy.parameters[param_name] = new_value

                logger.debug(
                    f"Adapted {param_name}: {current_value:.4f} -> {new_value:.4f}"
                )

    def adapt_parameters_random_search(self) -> None:
        """Adapt parameters using random search with performance feedback."""
        adaptable_params = self.get_adaptable_parameters()

        # Try random perturbations and keep improvements
        for param_name, param_info in adaptable_params.items():
            if param_name in self.base_strategy.parameters:
                current_value = self.base_strategy.parameters[param_name]

                # Generate random perturbation
                min_val = param_info.get("min", current_value * 0.5)
                max_val = param_info.get("max", current_value * 2.0)

                # Small random step
                perturbation = np.random.normal(0, abs(current_value) * 0.1)
                new_value = current_value + perturbation
                new_value = max(min_val, min(max_val, new_value))

                # Store new value for trial
                self.base_strategy.parameters[param_name] = new_value

                # Note: Performance evaluation would happen in next cycle
                # For now, keep the change and let performance tracking decide

    def get_adaptable_parameters(self) -> dict[str, dict]:
        """Get parameters that can be adapted.

        Returns:
            Dictionary of adaptable parameters with their constraints
        """
        # Default adaptable parameters - can be overridden by subclasses
        return {
            "lookback_period": {"min": 5, "max": 200, "step": 1},
            "threshold": {"min": 0.001, "max": 0.1, "step": 0.001},
            "window": {"min": 5, "max": 100, "step": 1},
            "period": {"min": 5, "max": 200, "step": 1},
        }

    def adapt_parameters(self, recent_performance: float) -> None:
        """Adapt strategy parameters based on recent performance.

        Args:
            recent_performance: Recent performance metric
        """
        try:
            if self.adaptation_method == "gradient":
                # Approximate gradient based on performance change
                if len(self.performance_history) > 1:
                    performance_gradient = (
                        recent_performance - self.performance_history[-2]
                    )
                    self.adapt_parameters_gradient(performance_gradient)

            elif self.adaptation_method == "random_search":
                # Use random search with performance feedback
                self.adapt_parameters_random_search()

            else:
                logger.warning(f"Unknown adaptation method: {self.adaptation_method}")

            # Store adapted parameters
            self.parameter_history.append(self.base_strategy.parameters.copy())

        except Exception as e:
            logger.error(f"Error adapting parameters: {e}")

    def generate_signals(self, data: DataFrame) -> tuple[Series, Series]:
        """Generate adaptive trading signals.

        Args:
            data: Price data with OHLCV columns

        Returns:
            Tuple of (entry_signals, exit_signals) as boolean Series
        """
        try:
            # Generate signals from base strategy
            entry_signals, exit_signals = self.base_strategy.generate_signals(data)

            # Calculate strategy performance for adaptation
            positions = entry_signals.astype(int) - exit_signals.astype(int)
            returns = positions.shift(1) * data["close"].pct_change()

            # Track performance over time for adaptation
            for idx in range(
                self.adaptation_frequency, len(data), self.adaptation_frequency
            ):
                if idx > self.last_adaptation + self.adaptation_frequency:
                    # Calculate recent performance
                    recent_returns = returns.iloc[
                        max(0, idx - self.lookback_period) : idx
                    ]
                    if len(recent_returns) > 0:
                        recent_performance = self.calculate_performance_metric(
                            recent_returns
                        )
                        self.performance_history.append(recent_performance)

                        # Adapt parameters based on performance
                        self.adapt_parameters(recent_performance)

                        # Re-generate signals with adapted parameters
                        entry_signals, exit_signals = (
                            self.base_strategy.generate_signals(data)
                        )

                    self.last_adaptation = idx

            return entry_signals, exit_signals

        except Exception as e:
            logger.error(f"Error generating adaptive signals: {e}")
            return pd.Series(False, index=data.index), pd.Series(
                False, index=data.index
            )

    def get_adaptation_history(self) -> dict[str, Any]:
        """Get history of parameter adaptations.

        Returns:
            Dictionary with adaptation history
        """
        return {
            "performance_history": self.performance_history,
            "parameter_history": self.parameter_history,
            "current_parameters": self.base_strategy.parameters,
            "original_parameters": self.original_parameters,
        }

    def reset_to_original(self) -> None:
        """Reset strategy parameters to original values."""
        self.base_strategy.parameters = self.original_parameters.copy()
        self.performance_history = []
        self.parameter_history = []
        self.last_adaptation = 0


class OnlineLearningStrategy(Strategy):
    """Strategy with online learning capabilities using streaming ML algorithms."""

    def __init__(
        self,
        model_type: str = "sgd",
        update_frequency: int = 10,
        feature_window: int = 20,
        confidence_threshold: float = 0.6,
        min_training_samples: int = 100,
        initial_training_period: int = 200,
        parameters: dict[str, Any] | None = None,
    ):
        """Initialize online learning strategy.

        Args:
            model_type: Type of online learning model
            update_frequency: How often to update the model
            feature_window: Window for feature calculation
            confidence_threshold: Minimum confidence for signals
            min_training_samples: Minimum samples before enabling predictions
            initial_training_period: Period for initial batch training
            parameters: Additional parameters
        """
        super().__init__(parameters)
        self.model_type = model_type
        self.update_frequency = update_frequency
        self.feature_window = feature_window
        self.confidence_threshold = confidence_threshold
        self.min_training_samples = min_training_samples
        self.initial_training_period = initial_training_period

        # Initialize online learning components
        self.model = None
        self.scaler = None
        self.is_trained = False
        self.is_initial_trained = False
        self.training_buffer = []
        self.last_update = 0
        self.training_samples_count = 0

        # Feature consistency tracking
        self.expected_feature_count = None
        self.feature_stats_buffer = []

        self._initialize_model()

    def _initialize_model(self):
        """Initialize online learning model with proper configuration."""
        if self.model_type == "sgd":
            self.model = SGDClassifier(
                loss="log_loss",
                learning_rate="adaptive",
                eta0=0.01,
                random_state=42,
                max_iter=1000,
                tol=1e-4,
                warm_start=True,  # Enable incremental learning
                alpha=0.01,  # Regularization
                fit_intercept=True,
            )
        else:
            raise ValueError(f"Unsupported model type: {self.model_type}")

        # Initialize scaler as None - will be created during first fit
        self.scaler = None

    @property
    def name(self) -> str:
        """Get strategy name."""
        return f"OnlineLearning({self.model_type.upper()})"

    @property
    def description(self) -> str:
        """Get strategy description."""
        return (
            f"Online learning strategy using {self.model_type} with streaming updates"
        )

    def extract_features(self, data: DataFrame, end_idx: int) -> np.ndarray:
        """Extract features for online learning with enhanced stability.

        Args:
            data: Price data
            end_idx: End index for feature calculation

        Returns:
            Feature array with consistent dimensionality
        """
        try:
            start_idx = max(0, end_idx - self.feature_window)
            window_data = data.iloc[start_idx : end_idx + 1]

            # Need minimum data for meaningful features
            if len(window_data) < max(5, self.feature_window // 4):
                return np.array([])

            features = []

            # Price features with error handling
            returns = window_data["close"].pct_change().dropna()
            if len(returns) == 0:
                return np.array([])

            # Basic return statistics (robust to small samples)
            mean_return = returns.mean() if len(returns) > 0 else 0.0
            std_return = returns.std() if len(returns) > 1 else 0.01  # Small default
            skew_return = returns.skew() if len(returns) > 3 else 0.0
            kurt_return = returns.kurtosis() if len(returns) > 3 else 0.0

            # Replace NaN/inf values
            features.extend(
                [
                    mean_return if np.isfinite(mean_return) else 0.0,
                    std_return if np.isfinite(std_return) else 0.01,
                    skew_return if np.isfinite(skew_return) else 0.0,
                    kurt_return if np.isfinite(kurt_return) else 0.0,
                ]
            )

            # Technical indicators with fallbacks
            current_price = window_data["close"].iloc[-1]

            # Short-term moving average ratio
            if len(window_data) >= 5:
                sma_5 = window_data["close"].rolling(5).mean().iloc[-1]
                features.append(current_price / sma_5 if sma_5 > 0 else 1.0)
            else:
                features.append(1.0)

            # Medium-term moving average ratio
            if len(window_data) >= 10:
                sma_10 = window_data["close"].rolling(10).mean().iloc[-1]
                features.append(current_price / sma_10 if sma_10 > 0 else 1.0)
            else:
                features.append(1.0)

            # Long-term moving average ratio (if enough data)
            if len(window_data) >= 20:
                sma_20 = window_data["close"].rolling(20).mean().iloc[-1]
                features.append(current_price / sma_20 if sma_20 > 0 else 1.0)
            else:
                features.append(1.0)

            # Volatility feature
            if len(returns) > 10:
                vol_ratio = std_return / returns.rolling(10).std().mean()
                features.append(vol_ratio if np.isfinite(vol_ratio) else 1.0)
            else:
                features.append(1.0)

            # Volume features (if available)
            if "volume" in window_data.columns and len(window_data) >= 5:
                current_volume = window_data["volume"].iloc[-1]
                volume_ma = window_data["volume"].rolling(5).mean().iloc[-1]
                volume_ratio = current_volume / volume_ma if volume_ma > 0 else 1.0
                features.append(volume_ratio if np.isfinite(volume_ratio) else 1.0)

                # Volume trend
                if len(window_data) >= 10:
                    volume_ma_long = window_data["volume"].rolling(10).mean().iloc[-1]
                    volume_trend = (
                        volume_ma / volume_ma_long if volume_ma_long > 0 else 1.0
                    )
                    features.append(volume_trend if np.isfinite(volume_trend) else 1.0)
                else:
                    features.append(1.0)
            else:
                features.extend([1.0, 1.0])

            feature_array = np.array(features)

            # Validate feature consistency
            if self.expected_feature_count is None:
                self.expected_feature_count = len(feature_array)
            elif len(feature_array) != self.expected_feature_count:
                logger.warning(
                    f"Feature count mismatch: expected {self.expected_feature_count}, got {len(feature_array)}"
                )
                return np.array([])

            # Check for any remaining NaN or inf values
            if not np.all(np.isfinite(feature_array)):
                logger.warning("Non-finite features detected, replacing with defaults")
                feature_array = np.nan_to_num(
                    feature_array, nan=0.0, posinf=1.0, neginf=-1.0
                )

            return feature_array

        except Exception as e:
            logger.error(f"Error extracting features: {e}")
            return np.array([])

    def create_target(self, data: DataFrame, idx: int, forward_periods: int = 3) -> int:
        """Create target variable for online learning.

        Args:
            data: Price data
            idx: Current index
            forward_periods: Periods to look forward

        Returns:
            Target class (0: sell, 1: hold, 2: buy)
        """
        if idx + forward_periods >= len(data):
            return 1  # Hold as default

        current_price = data["close"].iloc[idx]
        future_price = data["close"].iloc[idx + forward_periods]

        return_threshold = 0.02  # 2% threshold
        forward_return = (future_price - current_price) / current_price

        if forward_return > return_threshold:
            return 2  # Buy
        elif forward_return < -return_threshold:
            return 0  # Sell
        else:
            return 1  # Hold

    def _initial_training(self, data: DataFrame, current_idx: int) -> bool:
        """Perform initial batch training on historical data.

        Args:
            data: Price data
            current_idx: Current index

        Returns:
            True if initial training successful
        """
        try:
            if current_idx < self.initial_training_period:
                return False

            # Collect initial training data
            training_examples = []
            training_targets = []

            # Use a substantial portion of historical data for initial training
            start_idx = max(
                self.feature_window, current_idx - self.initial_training_period
            )

            for idx in range(
                start_idx, current_idx - 10
            ):  # Leave some data for validation
                features = self.extract_features(data, idx)
                if len(features) > 0:
                    target = self.create_target(data, idx)
                    training_examples.append(features)
                    training_targets.append(target)

            if len(training_examples) < self.min_training_samples:
                logger.debug(
                    f"Insufficient training samples: {len(training_examples)} < {self.min_training_samples}"
                )
                return False

            X = np.array(training_examples)
            y = np.array(training_targets)

            # Check for class balance
            unique_classes, class_counts = np.unique(y, return_counts=True)
            if len(unique_classes) < 2:
                logger.warning(
                    f"Insufficient class diversity for training: {unique_classes}"
                )
                return False

            # Initialize scaler with training data
            self.scaler = StandardScaler()
            X_scaled = self.scaler.fit_transform(X)

            # Train initial model
            self.model.fit(X_scaled, y)
            self.is_initial_trained = True
            self.is_trained = True
            self.training_samples_count = len(X)

            logger.info(
                f"Initial training completed with {len(X)} samples, classes: {dict(zip(unique_classes, class_counts, strict=False))}"
            )
            return True

        except Exception as e:
            logger.error(f"Error in initial training: {e}")
            return False

    def update_model(self, data: DataFrame, current_idx: int) -> None:
        """Update online learning model with new data.

        Args:
            data: Price data
            current_idx: Current index
        """
        # Perform initial training if not done yet
        if not self.is_initial_trained:
            if self._initial_training(data, current_idx):
                self.last_update = current_idx
            return

        # Check if update is needed
        if current_idx - self.last_update < self.update_frequency:
            return

        try:
            # Collect recent training examples for incremental update
            recent_examples = []
            recent_targets = []

            # Look back for recent training examples (smaller window for incremental updates)
            lookback_start = max(0, current_idx - self.update_frequency)

            for idx in range(lookback_start, current_idx):
                features = self.extract_features(data, idx)
                if len(features) > 0:
                    target = self.create_target(data, idx)
                    recent_examples.append(features)
                    recent_targets.append(target)

            if len(recent_examples) < 2:  # Need minimum samples for incremental update
                return

            X = np.array(recent_examples)
            y = np.array(recent_targets)

            # Scale features using existing scaler
            X_scaled = self.scaler.transform(X)

            # Incremental update using partial_fit
            # Ensure we have all classes that the model has seen before
            existing_classes = self.model.classes_
            self.model.partial_fit(X_scaled, y, classes=existing_classes)

            self.training_samples_count += len(X)
            self.last_update = current_idx

            logger.debug(
                f"Updated online model at index {current_idx} with {len(X)} samples (total: {self.training_samples_count})"
            )

        except Exception as e:
            logger.error(f"Error updating online model: {e}")
            # Reset training flag to attempt re-initialization
            if "partial_fit" in str(e).lower():
                logger.warning("Partial fit failed, will attempt re-initialization")
                self.is_trained = False
                self.is_initial_trained = False

    def generate_signals(self, data: DataFrame) -> tuple[Series, Series]:
        """Generate signals using online learning.

        Args:
            data: Price data with OHLCV columns

        Returns:
            Tuple of (entry_signals, exit_signals) as boolean Series
        """
        entry_signals = pd.Series(False, index=data.index)
        exit_signals = pd.Series(False, index=data.index)

        try:
            # Need minimum data for features
            start_idx = max(self.feature_window, self.initial_training_period + 10)

            if len(data) < start_idx:
                logger.warning(
                    f"Insufficient data for online learning: {len(data)} < {start_idx}"
                )
                return entry_signals, exit_signals

            for idx in range(start_idx, len(data)):
                # Update model periodically
                self.update_model(data, idx)

                if not self.is_trained or self.scaler is None:
                    continue

                # Extract features for current point
                features = self.extract_features(data, idx)
                if len(features) == 0:
                    continue

                try:
                    # Make prediction with error handling
                    X = self.scaler.transform([features])
                    prediction = self.model.predict(X)[0]

                    # Get confidence score
                    if hasattr(self.model, "predict_proba"):
                        probabilities = self.model.predict_proba(X)[0]
                        confidence = max(probabilities)
                    else:
                        # For models without predict_proba, use decision function
                        if hasattr(self.model, "decision_function"):
                            decision_values = self.model.decision_function(X)[0]
                            # Convert to pseudo-probability (sigmoid-like)
                            confidence = 1.0 / (1.0 + np.exp(-abs(decision_values)))
                        else:
                            confidence = 0.6  # Default confidence

                    # Generate signals based on prediction and confidence
                    if confidence >= self.confidence_threshold:
                        if prediction == 2:  # Buy signal
                            entry_signals.iloc[idx] = True
                        elif prediction == 0:  # Sell signal
                            exit_signals.iloc[idx] = True

                except Exception as pred_error:
                    logger.debug(f"Prediction error at index {idx}: {pred_error}")
                    continue

            # Log summary statistics
            total_entry_signals = entry_signals.sum()
            total_exit_signals = exit_signals.sum()
            logger.info(
                f"Generated {total_entry_signals} entry and {total_exit_signals} exit signals using online learning"
            )

        except Exception as e:
            logger.error(f"Error generating online learning signals: {e}")

        return entry_signals, exit_signals

    def get_model_info(self) -> dict[str, Any]:
        """Get information about the online learning model.

        Returns:
            Dictionary with model information
        """
        info = {
            "model_type": self.model_type,
            "is_trained": self.is_trained,
            "is_initial_trained": self.is_initial_trained,
            "feature_window": self.feature_window,
            "update_frequency": self.update_frequency,
            "confidence_threshold": self.confidence_threshold,
            "min_training_samples": self.min_training_samples,
            "initial_training_period": self.initial_training_period,
            "training_samples_count": self.training_samples_count,
            "expected_feature_count": self.expected_feature_count,
        }

        if hasattr(self.model, "coef_") and self.model.coef_ is not None:
            info["model_coefficients"] = self.model.coef_.tolist()

        if hasattr(self.model, "classes_") and self.model.classes_ is not None:
            info["model_classes"] = self.model.classes_.tolist()

        if self.scaler is not None:
            info["feature_scaling"] = {
                "mean": self.scaler.mean_.tolist()
                if hasattr(self.scaler, "mean_")
                else None,
                "scale": self.scaler.scale_.tolist()
                if hasattr(self.scaler, "scale_")
                else None,
            }

        return info


class HybridAdaptiveStrategy(AdaptiveStrategy):
    """Hybrid strategy combining parameter adaptation with online learning."""

    def __init__(
        self, base_strategy: Strategy, online_learning_weight: float = 0.3, **kwargs
    ):
        """Initialize hybrid adaptive strategy.

        Args:
            base_strategy: Base strategy to adapt
            online_learning_weight: Weight for online learning component
            **kwargs: Additional parameters for AdaptiveStrategy
        """
        super().__init__(base_strategy, **kwargs)
        self.online_learning_weight = online_learning_weight
        self.online_strategy = OnlineLearningStrategy()

    @property
    def name(self) -> str:
        """Get strategy name."""
        return f"HybridAdaptive({self.base_strategy.name})"

    @property
    def description(self) -> str:
        """Get strategy description."""
        return "Hybrid adaptive strategy combining parameter adaptation with online learning"

    def generate_signals(self, data: DataFrame) -> tuple[Series, Series]:
        """Generate hybrid adaptive signals.

        Args:
            data: Price data with OHLCV columns

        Returns:
            Tuple of (entry_signals, exit_signals) as boolean Series
        """
        # Get signals from adaptive base strategy
        adaptive_entry, adaptive_exit = super().generate_signals(data)

        # Get signals from online learning component
        online_entry, online_exit = self.online_strategy.generate_signals(data)

        # Combine signals with weighting
        base_weight = 1.0 - self.online_learning_weight

        # Weighted combination for entry signals
        combined_entry = (
            adaptive_entry.astype(float) * base_weight
            + online_entry.astype(float) * self.online_learning_weight
        ) > 0.5

        # Weighted combination for exit signals
        combined_exit = (
            adaptive_exit.astype(float) * base_weight
            + online_exit.astype(float) * self.online_learning_weight
        ) > 0.5

        return combined_entry, combined_exit

    def get_hybrid_info(self) -> dict[str, Any]:
        """Get information about hybrid strategy components.

        Returns:
            Dictionary with hybrid strategy information
        """
        return {
            "adaptation_history": self.get_adaptation_history(),
            "online_learning_info": self.online_strategy.get_model_info(),
            "online_learning_weight": self.online_learning_weight,
            "base_weight": 1.0 - self.online_learning_weight,
        }

```
Page 20/29FirstPrevNextLast