This is page 20 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/maverick_mcp/backtesting/strategies/ml/feature_engineering.py:
--------------------------------------------------------------------------------
```python
"""Feature engineering for ML trading strategies."""
import logging
from typing import Any
import numpy as np
import pandas as pd
import pandas_ta as ta
from pandas import DataFrame, Series
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
logger = logging.getLogger(__name__)
class FeatureExtractor:
"""Extract technical and statistical features for ML models."""
def __init__(self, lookback_periods: list[int] = None):
"""Initialize feature extractor.
Args:
lookback_periods: Lookback periods for rolling features
"""
self.lookback_periods = lookback_periods or [5, 10, 20, 50]
self.scaler = StandardScaler()
def extract_price_features(self, data: DataFrame) -> DataFrame:
"""Extract price-based features.
Args:
data: OHLCV price data
Returns:
DataFrame with price features
"""
features = pd.DataFrame(index=data.index)
# Normalize column names to handle both cases
high = data.get("high", data.get("High"))
low = data.get("low", data.get("Low"))
close = data.get("close", data.get("Close"))
open_ = data.get("open", data.get("Open"))
# Safe division helper function
def safe_divide(numerator, denominator, default=0.0):
"""Safely divide two values, handling None, NaN, and zero cases."""
if numerator is None or denominator is None:
return default
# Convert to numpy arrays to handle pandas Series
num = np.asarray(numerator)
den = np.asarray(denominator)
# Use numpy divide with where condition for safety
return np.divide(
num, den, out=np.full_like(num, default, dtype=float), where=(den != 0)
)
# Price ratios and spreads with safe division
features["high_low_ratio"] = safe_divide(high, low, 1.0)
features["close_open_ratio"] = safe_divide(close, open_, 1.0)
features["hl_spread"] = (
safe_divide(high - low, close, 0.0)
if high is not None and low is not None and close is not None
else 0.0
)
features["co_spread"] = (
safe_divide(close - open_, open_, 0.0)
if close is not None and open_ is not None
else 0.0
)
# Returns with safe calculation
if close is not None:
features["returns"] = close.pct_change().fillna(0)
# Safe log returns calculation
price_ratio = safe_divide(close, close.shift(1), 1.0)
features["log_returns"] = np.log(
np.maximum(price_ratio, 1e-8)
) # Prevent log(0)
else:
features["returns"] = 0
features["log_returns"] = 0
# Volume features with safe calculations
volume = data.get("volume", data.get("Volume"))
if volume is not None and close is not None:
volume_ma = volume.rolling(20).mean()
features["volume_ma_ratio"] = safe_divide(volume, volume_ma, 1.0)
features["price_volume"] = close * volume
features["volume_returns"] = volume.pct_change().fillna(0)
else:
features["volume_ma_ratio"] = 1.0
features["price_volume"] = 0.0
features["volume_returns"] = 0.0
return features
def extract_technical_features(self, data: DataFrame) -> DataFrame:
"""Extract technical indicator features.
Args:
data: OHLCV price data
Returns:
DataFrame with technical features
"""
features = pd.DataFrame(index=data.index)
# Normalize column names
close = data.get("close", data.get("Close"))
high = data.get("high", data.get("High"))
low = data.get("low", data.get("Low"))
# Safe division helper (reused from price features)
def safe_divide(numerator, denominator, default=0.0):
"""Safely divide two values, handling None, NaN, and zero cases."""
if numerator is None or denominator is None:
return default
# Convert to numpy arrays to handle pandas Series
num = np.asarray(numerator)
den = np.asarray(denominator)
# Use numpy divide with where condition for safety
return np.divide(
num, den, out=np.full_like(num, default, dtype=float), where=(den != 0)
)
# Moving averages with safe calculations
for period in self.lookback_periods:
if close is not None:
sma = ta.sma(close, length=period)
ema = ta.ema(close, length=period)
features[f"sma_{period}_ratio"] = safe_divide(close, sma, 1.0)
features[f"ema_{period}_ratio"] = safe_divide(close, ema, 1.0)
features[f"sma_ema_diff_{period}"] = (
safe_divide(sma - ema, close, 0.0)
if sma is not None and ema is not None
else 0.0
)
else:
features[f"sma_{period}_ratio"] = 1.0
features[f"ema_{period}_ratio"] = 1.0
features[f"sma_ema_diff_{period}"] = 0.0
# RSI
rsi = ta.rsi(close, length=14)
features["rsi"] = rsi
features["rsi_oversold"] = (rsi < 30).astype(int)
features["rsi_overbought"] = (rsi > 70).astype(int)
# MACD
macd = ta.macd(close)
if macd is not None and not macd.empty:
macd_cols = macd.columns
macd_col = [
col
for col in macd_cols
if "MACD" in col and "h" not in col and "s" not in col.lower()
]
signal_col = [
col for col in macd_cols if "signal" in col.lower() or "MACDs" in col
]
hist_col = [
col for col in macd_cols if "hist" in col.lower() or "MACDh" in col
]
if macd_col:
features["macd"] = macd[macd_col[0]]
else:
features["macd"] = 0
if signal_col:
features["macd_signal"] = macd[signal_col[0]]
else:
features["macd_signal"] = 0
if hist_col:
features["macd_histogram"] = macd[hist_col[0]]
else:
features["macd_histogram"] = 0
features["macd_bullish"] = (
features["macd"] > features["macd_signal"]
).astype(int)
else:
features["macd"] = 0
features["macd_signal"] = 0
features["macd_histogram"] = 0
features["macd_bullish"] = 0
# Bollinger Bands
bb = ta.bbands(close, length=20)
if bb is not None and not bb.empty:
# Handle different pandas_ta versions that may have different column names
bb_cols = bb.columns
upper_col = [
col for col in bb_cols if "BBU" in col or "upper" in col.lower()
]
middle_col = [
col for col in bb_cols if "BBM" in col or "middle" in col.lower()
]
lower_col = [
col for col in bb_cols if "BBL" in col or "lower" in col.lower()
]
if upper_col and middle_col and lower_col:
features["bb_upper"] = bb[upper_col[0]]
features["bb_middle"] = bb[middle_col[0]]
features["bb_lower"] = bb[lower_col[0]]
# Safe BB position calculation
bb_width = features["bb_upper"] - features["bb_lower"]
features["bb_position"] = safe_divide(
close - features["bb_lower"], bb_width, 0.5
)
features["bb_squeeze"] = safe_divide(
bb_width, features["bb_middle"], 0.1
)
else:
# Fallback to manual calculation with safe operations
if close is not None:
sma_20 = close.rolling(20).mean()
std_20 = close.rolling(20).std()
features["bb_upper"] = sma_20 + (std_20 * 2)
features["bb_middle"] = sma_20
features["bb_lower"] = sma_20 - (std_20 * 2)
# Safe BB calculations
bb_width = features["bb_upper"] - features["bb_lower"]
features["bb_position"] = safe_divide(
close - features["bb_lower"], bb_width, 0.5
)
features["bb_squeeze"] = safe_divide(
bb_width, features["bb_middle"], 0.1
)
else:
features["bb_upper"] = 0
features["bb_middle"] = 0
features["bb_lower"] = 0
features["bb_position"] = 0.5
features["bb_squeeze"] = 0.1
else:
# Manual calculation fallback with safe operations
if close is not None:
sma_20 = close.rolling(20).mean()
std_20 = close.rolling(20).std()
features["bb_upper"] = sma_20 + (std_20 * 2)
features["bb_middle"] = sma_20
features["bb_lower"] = sma_20 - (std_20 * 2)
# Safe BB calculations
bb_width = features["bb_upper"] - features["bb_lower"]
features["bb_position"] = safe_divide(
close - features["bb_lower"], bb_width, 0.5
)
features["bb_squeeze"] = safe_divide(
bb_width, features["bb_middle"], 0.1
)
else:
features["bb_upper"] = 0
features["bb_middle"] = 0
features["bb_lower"] = 0
features["bb_position"] = 0.5
features["bb_squeeze"] = 0.1
# Stochastic
stoch = ta.stoch(high, low, close)
if stoch is not None and not stoch.empty:
stoch_cols = stoch.columns
k_col = [col for col in stoch_cols if "k" in col.lower()]
d_col = [col for col in stoch_cols if "d" in col.lower()]
if k_col:
features["stoch_k"] = stoch[k_col[0]]
else:
features["stoch_k"] = 50
if d_col:
features["stoch_d"] = stoch[d_col[0]]
else:
features["stoch_d"] = 50
else:
features["stoch_k"] = 50
features["stoch_d"] = 50
# ATR (Average True Range) with safe calculation
if high is not None and low is not None and close is not None:
features["atr"] = ta.atr(high, low, close)
features["atr_ratio"] = safe_divide(
features["atr"], close, 0.02
) # Default 2% ATR ratio
else:
features["atr"] = 0
features["atr_ratio"] = 0.02
return features
def extract_statistical_features(self, data: DataFrame) -> DataFrame:
"""Extract statistical features.
Args:
data: OHLCV price data
Returns:
DataFrame with statistical features
"""
features = pd.DataFrame(index=data.index)
# Safe division helper function
def safe_divide(numerator, denominator, default=0.0):
"""Safely divide two values, handling None, NaN, and zero cases."""
if numerator is None or denominator is None:
return default
# Convert to numpy arrays to handle pandas Series
num = np.asarray(numerator)
den = np.asarray(denominator)
# Use numpy divide with where condition for safety
return np.divide(
num, den, out=np.full_like(num, default, dtype=float), where=(den != 0)
)
# Rolling statistics
for period in self.lookback_periods:
returns = data["close"].pct_change()
# Volatility with safe calculations
vol_short = returns.rolling(period).std()
vol_long = returns.rolling(period * 2).std()
features[f"volatility_{period}"] = vol_short
features[f"volatility_ratio_{period}"] = safe_divide(
vol_short, vol_long, 1.0
)
# Skewness and Kurtosis
features[f"skewness_{period}"] = returns.rolling(period).skew()
features[f"kurtosis_{period}"] = returns.rolling(period).kurt()
# Min/Max ratios with safe division
if "high" in data.columns and "low" in data.columns:
rolling_high = data["high"].rolling(period).max()
rolling_low = data["low"].rolling(period).min()
features[f"high_ratio_{period}"] = safe_divide(
data["close"], rolling_high, 1.0
)
features[f"low_ratio_{period}"] = safe_divide(
data["close"], rolling_low, 1.0
)
else:
features[f"high_ratio_{period}"] = 1.0
features[f"low_ratio_{period}"] = 1.0
# Momentum features with safe division
features[f"momentum_{period}"] = safe_divide(
data["close"], data["close"].shift(period), 1.0
)
features[f"roc_{period}"] = data["close"].pct_change(periods=period)
return features
def extract_microstructure_features(self, data: DataFrame) -> DataFrame:
"""Extract market microstructure features.
Args:
data: OHLCV price data
Returns:
DataFrame with microstructure features
"""
features = pd.DataFrame(index=data.index)
# Safe division helper function
def safe_divide(numerator, denominator, default=0.0):
"""Safely divide two values, handling None, NaN, and zero cases."""
if numerator is None or denominator is None:
return default
# Convert to numpy arrays to handle pandas Series
num = np.asarray(numerator)
den = np.asarray(denominator)
# Use numpy divide with where condition for safety
return np.divide(
num, den, out=np.full_like(num, default, dtype=float), where=(den != 0)
)
# Bid-ask spread proxy (high-low spread) with safe calculation
if "high" in data.columns and "low" in data.columns:
mid_price = (data["high"] + data["low"]) / 2
features["spread_proxy"] = safe_divide(
data["high"] - data["low"], mid_price, 0.02
)
else:
features["spread_proxy"] = 0.02
# Price impact measures with safe calculations
if "volume" in data.columns:
returns_abs = abs(data["close"].pct_change())
features["amihud_illiquidity"] = safe_divide(
returns_abs, data["volume"], 0.0
)
if "high" in data.columns and "low" in data.columns:
features["volume_weighted_price"] = (
data["high"] + data["low"] + data["close"]
) / 3
else:
features["volume_weighted_price"] = data["close"]
else:
features["amihud_illiquidity"] = 0.0
features["volume_weighted_price"] = data.get("close", 0.0)
# Intraday patterns with safe calculations
if "open" in data.columns and "close" in data.columns:
prev_close = data["close"].shift(1)
features["open_gap"] = safe_divide(
data["open"] - prev_close, prev_close, 0.0
)
else:
features["open_gap"] = 0.0
if "high" in data.columns and "low" in data.columns and "close" in data.columns:
features["close_to_high"] = safe_divide(
data["high"] - data["close"], data["close"], 0.0
)
features["close_to_low"] = safe_divide(
data["close"] - data["low"], data["close"], 0.0
)
else:
features["close_to_high"] = 0.0
features["close_to_low"] = 0.0
return features
def create_target_variable(
self, data: DataFrame, forward_periods: int = 5, threshold: float = 0.02
) -> Series:
"""Create target variable for classification.
Args:
data: Price data
forward_periods: Number of periods to look forward
threshold: Return threshold for classification
Returns:
Target variable (0: sell, 1: hold, 2: buy)
"""
close = data.get("close", data.get("Close"))
forward_returns = close.pct_change(periods=forward_periods).shift(
-forward_periods
)
target = pd.Series(1, index=data.index) # Default to hold
target[forward_returns > threshold] = 2 # Buy
target[forward_returns < -threshold] = 0 # Sell
return target
def extract_all_features(self, data: DataFrame) -> DataFrame:
"""Extract all features for ML model.
Args:
data: OHLCV price data
Returns:
DataFrame with all features
"""
try:
# Validate input data
if data is None or data.empty:
logger.warning("Empty or None data provided to extract_all_features")
return pd.DataFrame()
# Extract all feature types with individual error handling
feature_dfs = []
try:
price_features = self.extract_price_features(data)
if not price_features.empty:
feature_dfs.append(price_features)
except Exception as e:
logger.warning(f"Failed to extract price features: {e}")
# Create empty DataFrame with same index as fallback
price_features = pd.DataFrame(index=data.index)
try:
technical_features = self.extract_technical_features(data)
if not technical_features.empty:
feature_dfs.append(technical_features)
except Exception as e:
logger.warning(f"Failed to extract technical features: {e}")
try:
statistical_features = self.extract_statistical_features(data)
if not statistical_features.empty:
feature_dfs.append(statistical_features)
except Exception as e:
logger.warning(f"Failed to extract statistical features: {e}")
try:
microstructure_features = self.extract_microstructure_features(data)
if not microstructure_features.empty:
feature_dfs.append(microstructure_features)
except Exception as e:
logger.warning(f"Failed to extract microstructure features: {e}")
# Combine all successfully extracted features
if feature_dfs:
all_features = pd.concat(feature_dfs, axis=1)
else:
# Fallback: create minimal feature set
logger.warning(
"No features extracted successfully, creating minimal fallback features"
)
all_features = pd.DataFrame(
{
"returns": data.get("close", pd.Series(0, index=data.index))
.pct_change()
.fillna(0),
"close": data.get("close", pd.Series(0, index=data.index)),
},
index=data.index,
)
# Handle missing values with robust method
if not all_features.empty:
# Forward fill, then backward fill, then zero fill
all_features = all_features.ffill().bfill().fillna(0)
# Replace any infinite values
all_features = all_features.replace([np.inf, -np.inf], 0)
logger.info(
f"Extracted {len(all_features.columns)} features for {len(all_features)} data points"
)
else:
logger.warning("No features could be extracted")
return all_features
except Exception as e:
logger.error(f"Critical error extracting features: {e}")
# Return minimal fallback instead of raising
return pd.DataFrame(
{
"returns": pd.Series(
0, index=data.index if data is not None else [0]
),
"close": pd.Series(
0, index=data.index if data is not None else [0]
),
}
)
class MLPredictor:
"""Machine learning predictor for trading signals."""
def __init__(self, model_type: str = "random_forest", **model_params):
"""Initialize ML predictor.
Args:
model_type: Type of ML model to use
**model_params: Model parameters
"""
self.model_type = model_type
self.model_params = model_params
self.model = None
self.scaler = StandardScaler()
self.feature_extractor = FeatureExtractor()
self.is_trained = False
def _create_model(self):
"""Create ML model based on type."""
if self.model_type == "random_forest":
self.model = RandomForestClassifier(
n_estimators=self.model_params.get("n_estimators", 100),
max_depth=self.model_params.get("max_depth", 10),
random_state=self.model_params.get("random_state", 42),
**{
k: v
for k, v in self.model_params.items()
if k not in ["n_estimators", "max_depth", "random_state"]
},
)
else:
raise ValueError(f"Unsupported model type: {self.model_type}")
def prepare_data(
self, data: DataFrame, target_periods: int = 5, return_threshold: float = 0.02
) -> tuple[DataFrame, Series]:
"""Prepare features and target for training.
Args:
data: OHLCV price data
target_periods: Periods to look forward for target
return_threshold: Return threshold for classification
Returns:
Tuple of (features, target)
"""
# Extract features
features = self.feature_extractor.extract_all_features(data)
# Create target variable
target = self.feature_extractor.create_target_variable(
data, target_periods, return_threshold
)
# Align features and target (remove NaN values)
valid_idx = features.dropna().index.intersection(target.dropna().index)
features = features.loc[valid_idx]
target = target.loc[valid_idx]
return features, target
def train(
self, data: DataFrame, target_periods: int = 5, return_threshold: float = 0.02
) -> dict[str, Any]:
"""Train the ML model.
Args:
data: OHLCV price data
target_periods: Periods to look forward for target
return_threshold: Return threshold for classification
Returns:
Training metrics
"""
try:
# Prepare data
features, target = self.prepare_data(data, target_periods, return_threshold)
if len(features) == 0:
raise ValueError("No valid training data available")
# Create and train model
self._create_model()
# Scale features
features_scaled = self.scaler.fit_transform(features)
# Train model
self.model.fit(features_scaled, target)
self.is_trained = True
# Calculate training metrics
train_score = self.model.score(features_scaled, target)
# Convert numpy int64 to Python int for JSON serialization
target_dist = target.value_counts().to_dict()
target_dist = {int(k): int(v) for k, v in target_dist.items()}
metrics = {
"train_accuracy": float(
train_score
), # Convert numpy float to Python float
"n_samples": int(len(features)),
"n_features": int(len(features.columns)),
"target_distribution": target_dist,
}
# Feature importance (if available)
if hasattr(self.model, "feature_importances_"):
# Convert numpy floats to Python floats
feature_importance = {
str(col): float(imp)
for col, imp in zip(
features.columns, self.model.feature_importances_, strict=False
)
}
metrics["feature_importance"] = feature_importance
logger.info(f"Model trained successfully: {metrics}")
return metrics
except Exception as e:
logger.error(f"Error training model: {e}")
raise
def generate_signals(self, data: DataFrame) -> tuple[Series, Series]:
"""Generate trading signals using the trained model.
Alias for predict() to match the expected interface.
Args:
data: OHLCV price data
Returns:
Tuple of (entry_signals, exit_signals)
"""
return self.predict(data)
def predict(self, data: DataFrame) -> tuple[Series, Series]:
"""Generate trading signals using the trained model.
Args:
data: OHLCV price data
Returns:
Tuple of (entry_signals, exit_signals)
"""
if not self.is_trained:
raise ValueError("Model must be trained before making predictions")
try:
# Extract features
features = self.feature_extractor.extract_all_features(data)
# Handle missing values
features = features.ffill().fillna(0)
# Scale features
features_scaled = self.scaler.transform(features)
# Make predictions
predictions = self.model.predict(features_scaled)
prediction_proba = self.model.predict_proba(features_scaled)
# Convert to signals
predictions_series = pd.Series(predictions, index=features.index)
# Entry signals (buy predictions with high confidence)
entry_signals = (predictions_series == 2) & (
pd.Series(prediction_proba[:, 2], index=features.index) > 0.6
)
# Exit signals (sell predictions or low confidence holds)
exit_signals = (predictions_series == 0) | (
(predictions_series == 1)
& (pd.Series(prediction_proba[:, 1], index=features.index) < 0.4)
)
return entry_signals, exit_signals
except Exception as e:
logger.error(f"Error making predictions: {e}")
raise
def get_feature_importance(self) -> dict[str, float]:
"""Get feature importance from trained model.
Returns:
Dictionary of feature importance scores
"""
if not self.is_trained or not hasattr(self.model, "feature_importances_"):
return {}
feature_names = self.feature_extractor.extract_all_features(
pd.DataFrame() # Empty DataFrame to get column names
).columns
return dict(zip(feature_names, self.model.feature_importances_, strict=False))
def update_model(
self, data: DataFrame, target_periods: int = 5, return_threshold: float = 0.02
) -> dict[str, Any]:
"""Update model with new data (online learning simulation).
Args:
data: New OHLCV price data
target_periods: Periods to look forward for target
return_threshold: Return threshold for classification
Returns:
Update metrics
"""
try:
# For now, retrain the model with all data
# In production, this could use partial_fit for online learning
return self.train(data, target_periods, return_threshold)
except Exception as e:
logger.error(f"Error updating model: {e}")
raise
```
--------------------------------------------------------------------------------
/maverick_mcp/backtesting/persistence.py:
--------------------------------------------------------------------------------
```python
"""
Backtesting persistence layer for saving and retrieving backtest results.
This module provides comprehensive database operations for backtest results,
including saving VectorBT results, querying historical tests, and comparing
multiple backtests with proper error handling.
"""
import logging
from datetime import datetime, timedelta
from decimal import Decimal, InvalidOperation
from typing import Any
from uuid import UUID, uuid4
import pandas as pd
from sqlalchemy import desc
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from maverick_mcp.data.models import (
BacktestResult,
BacktestTrade,
OptimizationResult,
SessionLocal,
WalkForwardTest,
)
logger = logging.getLogger(__name__)
class BacktestPersistenceError(Exception):
"""Custom exception for backtest persistence operations."""
pass
class BacktestPersistenceManager:
"""Manages persistence of backtesting results with comprehensive error handling."""
def __init__(self, session: Session | None = None):
"""Initialize persistence manager.
Args:
session: Optional SQLAlchemy session. If None, creates a new one.
"""
self.session = session
self._owns_session = session is None
def __enter__(self):
"""Context manager entry."""
if self._owns_session:
self.session = SessionLocal()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit with proper cleanup."""
if self._owns_session and self.session:
if exc_type is None:
self.session.commit()
else:
self.session.rollback()
self.session.close()
def save_backtest_result(
self,
vectorbt_results: dict[str, Any],
execution_time: float | None = None,
notes: str | None = None,
) -> str:
"""
Save VectorBT backtest results to database.
Args:
vectorbt_results: Results dictionary from VectorBTEngine
execution_time: Time taken to run the backtest in seconds
notes: Optional user notes
Returns:
UUID string of the saved backtest
Raises:
BacktestPersistenceError: If saving fails
"""
try:
# Extract basic metadata
symbol = vectorbt_results.get("symbol", "").upper()
strategy_type = vectorbt_results.get("strategy", "")
parameters = vectorbt_results.get("parameters", {})
metrics = vectorbt_results.get("metrics", {})
if not symbol or not strategy_type:
raise BacktestPersistenceError("Symbol and strategy type are required")
# Create backtest result record
backtest_result = BacktestResult(
backtest_id=uuid4(),
symbol=symbol,
strategy_type=strategy_type,
backtest_date=datetime.utcnow(),
# Date range
start_date=pd.to_datetime(vectorbt_results.get("start_date")).date(),
end_date=pd.to_datetime(vectorbt_results.get("end_date")).date(),
initial_capital=Decimal(
str(vectorbt_results.get("initial_capital", 10000))
),
# Strategy parameters
parameters=parameters,
# Performance metrics
total_return=self._safe_decimal(metrics.get("total_return")),
annualized_return=self._safe_decimal(metrics.get("annualized_return")),
sharpe_ratio=self._safe_decimal(metrics.get("sharpe_ratio")),
sortino_ratio=self._safe_decimal(metrics.get("sortino_ratio")),
calmar_ratio=self._safe_decimal(metrics.get("calmar_ratio")),
# Risk metrics
max_drawdown=self._safe_decimal(metrics.get("max_drawdown")),
max_drawdown_duration=metrics.get("max_drawdown_duration"),
volatility=self._safe_decimal(metrics.get("volatility")),
downside_volatility=self._safe_decimal(
metrics.get("downside_volatility")
),
# Trade statistics
total_trades=metrics.get("total_trades", 0),
winning_trades=metrics.get("winning_trades", 0),
losing_trades=metrics.get("losing_trades", 0),
win_rate=self._safe_decimal(metrics.get("win_rate")),
# P&L statistics
profit_factor=self._safe_decimal(metrics.get("profit_factor")),
average_win=self._safe_decimal(metrics.get("average_win")),
average_loss=self._safe_decimal(metrics.get("average_loss")),
largest_win=self._safe_decimal(metrics.get("largest_win")),
largest_loss=self._safe_decimal(metrics.get("largest_loss")),
# Portfolio values
final_portfolio_value=self._safe_decimal(metrics.get("final_value")),
peak_portfolio_value=self._safe_decimal(metrics.get("peak_value")),
# Market analysis
beta=self._safe_decimal(metrics.get("beta")),
alpha=self._safe_decimal(metrics.get("alpha")),
# Time series data
equity_curve=vectorbt_results.get("equity_curve"),
drawdown_series=vectorbt_results.get("drawdown_series"),
# Execution metadata
execution_time_seconds=Decimal(str(execution_time))
if execution_time
else None,
data_points=len(vectorbt_results.get("equity_curve", [])),
# Status
status="completed",
notes=notes,
)
self.session.add(backtest_result)
self.session.flush() # Get the ID without committing
# Save individual trades if available
trades_data = vectorbt_results.get("trades", [])
if trades_data:
self._save_trades(backtest_result.backtest_id, trades_data)
self.session.commit()
logger.info(f"Saved backtest result: {backtest_result.backtest_id}")
return str(backtest_result.backtest_id)
except SQLAlchemyError as e:
self.session.rollback()
logger.error(f"Database error saving backtest: {e}")
raise BacktestPersistenceError(f"Failed to save backtest: {e}")
except Exception as e:
self.session.rollback()
logger.error(f"Unexpected error saving backtest: {e}")
raise BacktestPersistenceError(f"Unexpected error: {e}")
def _save_trades(
self, backtest_id: UUID, trades_data: list[dict[str, Any]]
) -> None:
"""Save individual trade records."""
try:
trades = []
for i, trade in enumerate(trades_data, 1):
trade_record = BacktestTrade(
trade_id=uuid4(),
backtest_id=backtest_id,
trade_number=i,
# Entry details
entry_date=pd.to_datetime(trade.get("entry_date")).date(),
entry_price=self._safe_decimal(trade.get("entry_price")),
entry_time=pd.to_datetime(trade.get("entry_time"))
if trade.get("entry_time")
else None,
# Exit details
exit_date=pd.to_datetime(trade.get("exit_date")).date()
if trade.get("exit_date")
else None,
exit_price=self._safe_decimal(trade.get("exit_price")),
exit_time=pd.to_datetime(trade.get("exit_time"))
if trade.get("exit_time")
else None,
# Position details
position_size=self._safe_decimal(trade.get("position_size")),
direction=trade.get("direction", "long"),
# P&L
pnl=self._safe_decimal(trade.get("pnl")),
pnl_percent=self._safe_decimal(trade.get("pnl_percent")),
# Risk metrics
mae=self._safe_decimal(trade.get("mae")),
mfe=self._safe_decimal(trade.get("mfe")),
# Duration
duration_days=trade.get("duration_days"),
duration_hours=self._safe_decimal(trade.get("duration_hours")),
# Exit details
exit_reason=trade.get("exit_reason"),
fees_paid=self._safe_decimal(trade.get("fees_paid")),
slippage_cost=self._safe_decimal(trade.get("slippage_cost")),
)
trades.append(trade_record)
self.session.add_all(trades)
logger.info(f"Saved {len(trades)} trades for backtest {backtest_id}")
except Exception as e:
logger.error(f"Error saving trades: {e}")
raise
def get_backtest_by_id(self, backtest_id: str) -> BacktestResult | None:
"""
Retrieve a backtest by ID.
Args:
backtest_id: UUID string of the backtest
Returns:
BacktestResult or None if not found
"""
try:
# Convert string to UUID for database query
if isinstance(backtest_id, str):
backtest_uuid = UUID(backtest_id)
else:
backtest_uuid = backtest_id
return (
self.session.query(BacktestResult)
.filter(BacktestResult.backtest_id == backtest_uuid)
.first()
)
except SQLAlchemyError as e:
logger.error(f"Error retrieving backtest {backtest_id}: {e}")
return None
except ValueError as e:
logger.error(f"Invalid UUID format {backtest_id}: {e}")
return None
def get_backtests_by_symbol(
self, symbol: str, strategy_type: str | None = None, limit: int = 10
) -> list[BacktestResult]:
"""
Get backtests for a specific symbol.
Args:
symbol: Stock symbol
strategy_type: Optional strategy filter
limit: Maximum number of results
Returns:
List of BacktestResult objects
"""
try:
query = self.session.query(BacktestResult).filter(
BacktestResult.symbol == symbol.upper()
)
if strategy_type:
query = query.filter(BacktestResult.strategy_type == strategy_type)
return query.order_by(desc(BacktestResult.backtest_date)).limit(limit).all()
except SQLAlchemyError as e:
logger.error(f"Error retrieving backtests for {symbol}: {e}")
return []
def get_best_performing_strategies(
self, metric: str = "sharpe_ratio", min_trades: int = 10, limit: int = 20
) -> list[BacktestResult]:
"""
Get best performing backtests by specified metric.
Args:
metric: Performance metric (sharpe_ratio, total_return, profit_factor)
min_trades: Minimum number of trades required
limit: Maximum number of results
Returns:
List of top performing BacktestResult objects
"""
try:
metric_column = getattr(BacktestResult, metric, BacktestResult.sharpe_ratio)
return (
self.session.query(BacktestResult)
.filter(
BacktestResult.status == "completed",
BacktestResult.total_trades >= min_trades,
metric_column.isnot(None),
)
.order_by(desc(metric_column))
.limit(limit)
.all()
)
except SQLAlchemyError as e:
logger.error(f"Error retrieving best performing strategies: {e}")
return []
def compare_strategies(
self, backtest_ids: list[str], metrics: list[str] | None = None
) -> dict[str, Any]:
"""
Compare multiple backtests across specified metrics.
Args:
backtest_ids: List of backtest UUID strings
metrics: List of metrics to compare (default: common metrics)
Returns:
Dictionary with comparison results
"""
if not metrics:
metrics = [
"total_return",
"sharpe_ratio",
"max_drawdown",
"win_rate",
"profit_factor",
"total_trades",
]
try:
# Convert string UUIDs to UUID objects
uuid_list = []
for bt_id in backtest_ids:
if isinstance(bt_id, str):
uuid_list.append(UUID(bt_id))
else:
uuid_list.append(bt_id)
backtests = (
self.session.query(BacktestResult)
.filter(BacktestResult.backtest_id.in_(uuid_list))
.all()
)
if not backtests:
return {"error": "No backtests found"}
comparison = {"backtests": [], "summary": {}, "rankings": {}}
# Extract data for each backtest
for bt in backtests:
bt_data = {
"backtest_id": str(bt.backtest_id),
"symbol": bt.symbol,
"strategy": bt.strategy_type,
"date": bt.backtest_date.isoformat(),
"metrics": {},
}
for metric in metrics:
value = getattr(bt, metric, None)
bt_data["metrics"][metric] = float(value) if value else None
comparison["backtests"].append(bt_data)
# Calculate rankings for each metric
for metric in metrics:
metric_values = [
(bt["backtest_id"], bt["metrics"].get(metric))
for bt in comparison["backtests"]
if bt["metrics"].get(metric) is not None
]
if metric_values:
# Sort by metric value (descending for most metrics)
reverse_sort = metric != "max_drawdown" # Lower drawdown is better
sorted_values = sorted(
metric_values, key=lambda x: x[1], reverse=reverse_sort
)
comparison["rankings"][metric] = [
{"backtest_id": bt_id, "value": value, "rank": i + 1}
for i, (bt_id, value) in enumerate(sorted_values)
]
# Summary statistics
comparison["summary"] = {
"total_backtests": len(backtests),
"date_range": {
"earliest": min(bt.backtest_date for bt in backtests).isoformat(),
"latest": max(bt.backtest_date for bt in backtests).isoformat(),
},
}
return comparison
except SQLAlchemyError as e:
logger.error(f"Error comparing strategies: {e}")
return {"error": f"Database error: {e}"}
def save_optimization_results(
self,
backtest_id: str,
optimization_results: list[dict[str, Any]],
objective_function: str = "sharpe_ratio",
) -> int:
"""
Save parameter optimization results.
Args:
backtest_id: Parent backtest UUID
optimization_results: List of optimization result dictionaries
objective_function: Optimization objective (sharpe_ratio, total_return, etc.)
Returns:
Number of optimization results saved
"""
try:
# Convert string UUID to UUID object
if isinstance(backtest_id, str):
backtest_uuid = UUID(backtest_id)
else:
backtest_uuid = backtest_id
optimization_records = []
for i, result in enumerate(optimization_results, 1):
record = OptimizationResult(
optimization_id=uuid4(),
backtest_id=backtest_uuid,
parameter_set=i,
parameters=result.get("parameters", {}),
objective_function=objective_function,
objective_value=self._safe_decimal(result.get("objective_value")),
total_return=self._safe_decimal(result.get("total_return")),
sharpe_ratio=self._safe_decimal(result.get("sharpe_ratio")),
max_drawdown=self._safe_decimal(result.get("max_drawdown")),
win_rate=self._safe_decimal(result.get("win_rate")),
profit_factor=self._safe_decimal(result.get("profit_factor")),
total_trades=result.get("total_trades"),
rank=result.get("rank", i),
is_statistically_significant=result.get(
"is_statistically_significant", False
),
p_value=self._safe_decimal(result.get("p_value")),
)
optimization_records.append(record)
self.session.add_all(optimization_records)
self.session.commit()
logger.info(f"Saved {len(optimization_records)} optimization results")
return len(optimization_records)
except SQLAlchemyError as e:
self.session.rollback()
logger.error(f"Error saving optimization results: {e}")
raise BacktestPersistenceError(f"Failed to save optimization results: {e}")
def save_walk_forward_test(
self, parent_backtest_id: str, walk_forward_data: dict[str, Any]
) -> str:
"""
Save walk-forward validation test results.
Args:
parent_backtest_id: Parent backtest UUID
walk_forward_data: Walk-forward test data
Returns:
UUID string of saved walk-forward test
"""
try:
# Convert string UUID to UUID object
if isinstance(parent_backtest_id, str):
parent_uuid = UUID(parent_backtest_id)
else:
parent_uuid = parent_backtest_id
wf_test = WalkForwardTest(
walk_forward_id=uuid4(),
parent_backtest_id=parent_uuid,
window_size_months=walk_forward_data.get("window_size_months"),
step_size_months=walk_forward_data.get("step_size_months"),
# Time periods
training_start=pd.to_datetime(
walk_forward_data.get("training_start")
).date(),
training_end=pd.to_datetime(
walk_forward_data.get("training_end")
).date(),
test_period_start=pd.to_datetime(
walk_forward_data.get("test_period_start")
).date(),
test_period_end=pd.to_datetime(
walk_forward_data.get("test_period_end")
).date(),
# Results
optimal_parameters=walk_forward_data.get("optimal_parameters"),
training_performance=self._safe_decimal(
walk_forward_data.get("training_performance")
),
out_of_sample_return=self._safe_decimal(
walk_forward_data.get("out_of_sample_return")
),
out_of_sample_sharpe=self._safe_decimal(
walk_forward_data.get("out_of_sample_sharpe")
),
out_of_sample_drawdown=self._safe_decimal(
walk_forward_data.get("out_of_sample_drawdown")
),
out_of_sample_trades=walk_forward_data.get("out_of_sample_trades"),
# Performance analysis
performance_ratio=self._safe_decimal(
walk_forward_data.get("performance_ratio")
),
degradation_factor=self._safe_decimal(
walk_forward_data.get("degradation_factor")
),
is_profitable=walk_forward_data.get("is_profitable"),
is_statistically_significant=walk_forward_data.get(
"is_statistically_significant", False
),
)
self.session.add(wf_test)
self.session.commit()
logger.info(f"Saved walk-forward test: {wf_test.walk_forward_id}")
return str(wf_test.walk_forward_id)
except SQLAlchemyError as e:
self.session.rollback()
logger.error(f"Error saving walk-forward test: {e}")
raise BacktestPersistenceError(f"Failed to save walk-forward test: {e}")
def get_backtest_performance_summary(
self,
symbol: str | None = None,
strategy_type: str | None = None,
days_back: int = 30,
) -> dict[str, Any]:
"""
Get performance summary of recent backtests.
Args:
symbol: Optional symbol filter
strategy_type: Optional strategy filter
days_back: Days to look back
Returns:
Dictionary with performance summary
"""
try:
cutoff_date = datetime.utcnow() - timedelta(days=days_back)
query = self.session.query(BacktestResult).filter(
BacktestResult.backtest_date >= cutoff_date,
BacktestResult.status == "completed",
)
if symbol:
query = query.filter(BacktestResult.symbol == symbol.upper())
if strategy_type:
query = query.filter(BacktestResult.strategy_type == strategy_type)
backtests = query.all()
if not backtests:
return {"message": "No backtests found in the specified period"}
# Calculate summary statistics
returns = [float(bt.total_return) for bt in backtests if bt.total_return]
sharpe_ratios = [
float(bt.sharpe_ratio) for bt in backtests if bt.sharpe_ratio
]
win_rates = [float(bt.win_rate) for bt in backtests if bt.win_rate]
summary = {
"period": f"Last {days_back} days",
"total_backtests": len(backtests),
"performance_metrics": {
"average_return": sum(returns) / len(returns) if returns else 0,
"best_return": max(returns) if returns else 0,
"worst_return": min(returns) if returns else 0,
"average_sharpe": sum(sharpe_ratios) / len(sharpe_ratios)
if sharpe_ratios
else 0,
"average_win_rate": sum(win_rates) / len(win_rates)
if win_rates
else 0,
},
"strategy_breakdown": {},
"symbol_breakdown": {},
}
# Group by strategy
strategy_groups = {}
for bt in backtests:
strategy = bt.strategy_type
if strategy not in strategy_groups:
strategy_groups[strategy] = []
strategy_groups[strategy].append(bt)
for strategy, strategy_backtests in strategy_groups.items():
strategy_returns = [
float(bt.total_return)
for bt in strategy_backtests
if bt.total_return
]
summary["strategy_breakdown"][strategy] = {
"count": len(strategy_backtests),
"average_return": sum(strategy_returns) / len(strategy_returns)
if strategy_returns
else 0,
}
# Group by symbol
symbol_groups = {}
for bt in backtests:
symbol = bt.symbol
if symbol not in symbol_groups:
symbol_groups[symbol] = []
symbol_groups[symbol].append(bt)
for symbol, symbol_backtests in symbol_groups.items():
symbol_returns = [
float(bt.total_return) for bt in symbol_backtests if bt.total_return
]
summary["symbol_breakdown"][symbol] = {
"count": len(symbol_backtests),
"average_return": sum(symbol_returns) / len(symbol_returns)
if symbol_returns
else 0,
}
return summary
except SQLAlchemyError as e:
logger.error(f"Error generating performance summary: {e}")
return {"error": f"Database error: {e}"}
def delete_backtest(self, backtest_id: str) -> bool:
"""
Delete a backtest and all associated data.
Args:
backtest_id: UUID string of backtest to delete
Returns:
True if deleted successfully, False otherwise
"""
try:
# Convert string UUID to UUID object
if isinstance(backtest_id, str):
backtest_uuid = UUID(backtest_id)
else:
backtest_uuid = backtest_id
backtest = (
self.session.query(BacktestResult)
.filter(BacktestResult.backtest_id == backtest_uuid)
.first()
)
if not backtest:
logger.warning(f"Backtest {backtest_id} not found")
return False
# Delete associated records (cascading should handle this)
self.session.delete(backtest)
self.session.commit()
logger.info(f"Deleted backtest {backtest_id}")
return True
except SQLAlchemyError as e:
self.session.rollback()
logger.error(f"Error deleting backtest {backtest_id}: {e}")
return False
@staticmethod
def _safe_decimal(value: Any) -> Decimal | None:
"""Safely convert value to Decimal, handling None and invalid values."""
if value is None:
return None
try:
if isinstance(value, int | float):
return Decimal(str(value))
elif isinstance(value, Decimal):
return value
else:
return Decimal(str(float(value)))
except (ValueError, TypeError, InvalidOperation):
return None
def get_persistence_manager(
session: Session | None = None,
) -> BacktestPersistenceManager:
"""
Factory function to create a persistence manager.
Args:
session: Optional SQLAlchemy session
Returns:
BacktestPersistenceManager instance
"""
return BacktestPersistenceManager(session)
# Convenience functions for common operations
def save_vectorbt_results(
vectorbt_results: dict[str, Any],
execution_time: float | None = None,
notes: str | None = None,
) -> str:
"""
Convenience function to save VectorBT results.
Args:
vectorbt_results: Results from VectorBTEngine
execution_time: Execution time in seconds
notes: Optional notes
Returns:
Backtest UUID string
"""
with get_persistence_manager() as manager:
return manager.save_backtest_result(vectorbt_results, execution_time, notes)
def get_recent_backtests(symbol: str, days: int = 7) -> list[BacktestResult]:
"""
Get recent backtests for a symbol.
Args:
symbol: Stock symbol
days: Number of days to look back
Returns:
List of recent BacktestResult objects
"""
with get_persistence_manager() as manager:
cutoff_date = datetime.utcnow() - timedelta(days=days)
return (
manager.session.query(BacktestResult)
.filter(
BacktestResult.symbol == symbol.upper(),
BacktestResult.backtest_date >= cutoff_date,
)
.order_by(desc(BacktestResult.backtest_date))
.all()
)
def find_best_strategy_for_symbol(
symbol: str, metric: str = "sharpe_ratio"
) -> BacktestResult | None:
"""
Find the best performing strategy for a symbol.
Args:
symbol: Stock symbol
metric: Performance metric to optimize
Returns:
Best BacktestResult or None
"""
with get_persistence_manager() as manager:
return (
manager.get_best_performing_strategies(metric=metric, limit=1)[0]
if manager.get_backtests_by_symbol(symbol, limit=1000)
else None
)
```
--------------------------------------------------------------------------------
/tests/performance/test_profiling.py:
--------------------------------------------------------------------------------
```python
"""
Profiling Tests for Bottleneck Identification.
This test suite covers:
- Profile critical code paths with cProfile
- Identify slow database queries with timing
- Find memory allocation hotspots
- Document optimization opportunities
- Line-by-line profiling of key functions
- Call graph analysis for performance
- I/O bottleneck identification
- CPU-bound vs I/O-bound analysis
"""
import cProfile
import io
import logging
import pstats
import time
import tracemalloc
from collections.abc import Callable
from contextlib import contextmanager
from typing import Any
from unittest.mock import Mock
import numpy as np
import pandas as pd
import pytest
from maverick_mcp.backtesting import VectorBTEngine
from maverick_mcp.backtesting.persistence import BacktestPersistenceManager
from maverick_mcp.backtesting.strategies import STRATEGY_TEMPLATES
logger = logging.getLogger(__name__)
class PerformanceProfiler:
"""Comprehensive performance profiler for backtesting operations."""
def __init__(self):
self.profiling_data = {}
self.memory_snapshots = []
@contextmanager
def profile_cpu(self, operation_name: str):
"""Profile CPU usage of an operation."""
profiler = cProfile.Profile()
start_time = time.time()
profiler.enable()
try:
yield
finally:
profiler.disable()
execution_time = time.time() - start_time
# Capture profiling stats
stats_stream = io.StringIO()
stats = pstats.Stats(profiler, stream=stats_stream)
stats.sort_stats("cumulative")
stats.print_stats(20) # Top 20 functions
self.profiling_data[operation_name] = {
"execution_time": execution_time,
"cpu_profile": stats_stream.getvalue(),
"stats_object": stats,
}
@contextmanager
def profile_memory(self, operation_name: str):
"""Profile memory usage of an operation."""
tracemalloc.start()
start_memory = tracemalloc.get_traced_memory()
try:
yield
finally:
current_memory, peak_memory = tracemalloc.get_traced_memory()
tracemalloc.stop()
memory_data = {
"start_memory_mb": start_memory[0] / 1024 / 1024,
"current_memory_mb": current_memory / 1024 / 1024,
"peak_memory_mb": peak_memory / 1024 / 1024,
"memory_growth_mb": (current_memory - start_memory[0]) / 1024 / 1024,
}
if operation_name in self.profiling_data:
self.profiling_data[operation_name]["memory_profile"] = memory_data
else:
self.profiling_data[operation_name] = {"memory_profile": memory_data}
def profile_database_query(
self, query_name: str, query_func: Callable
) -> dict[str, Any]:
"""Profile database query performance."""
start_time = time.time()
try:
result = query_func()
execution_time = time.time() - start_time
return {
"query_name": query_name,
"execution_time_ms": execution_time * 1000,
"success": True,
"result_size": len(str(result)) if result else 0,
}
except Exception as e:
execution_time = time.time() - start_time
return {
"query_name": query_name,
"execution_time_ms": execution_time * 1000,
"success": False,
"error": str(e),
}
def analyze_hotspots(self, operation_name: str) -> dict[str, Any]:
"""Analyze performance hotspots from profiling data."""
if operation_name not in self.profiling_data:
return {"error": f"No profiling data for {operation_name}"}
data = self.profiling_data[operation_name]
stats = data.get("stats_object")
if not stats:
return {"error": "No CPU profiling stats available"}
# Extract top functions by cumulative time
stats.sort_stats("cumulative")
top_functions = []
for func_data in list(stats.stats.items())[:10]:
func_name, (cc, nc, tt, ct, callers) = func_data
top_functions.append(
{
"function": f"{func_name[0]}:{func_name[1]}({func_name[2]})",
"cumulative_time": ct,
"total_time": tt,
"call_count": nc,
"time_per_call": ct / nc if nc > 0 else 0,
}
)
# Extract top functions by self time
stats.sort_stats("tottime")
self_time_functions = []
for func_data in list(stats.stats.items())[:10]:
func_name, (cc, nc, tt, ct, callers) = func_data
self_time_functions.append(
{
"function": f"{func_name[0]}:{func_name[1]}({func_name[2]})",
"self_time": tt,
"cumulative_time": ct,
"call_count": nc,
}
)
return {
"operation_name": operation_name,
"total_execution_time": data.get("execution_time", 0),
"top_functions_by_cumulative": top_functions,
"top_functions_by_self_time": self_time_functions,
"memory_profile": data.get("memory_profile", {}),
}
def generate_optimization_report(self) -> dict[str, Any]:
"""Generate comprehensive optimization report."""
optimization_opportunities = []
performance_summary = {}
for operation_name, data in self.profiling_data.items():
analysis = self.analyze_hotspots(operation_name)
performance_summary[operation_name] = {
"execution_time": data.get("execution_time", 0),
"peak_memory_mb": data.get("memory_profile", {}).get(
"peak_memory_mb", 0
),
}
# Identify optimization opportunities
if "top_functions_by_cumulative" in analysis:
for func in analysis["top_functions_by_cumulative"][
:3
]: # Top 3 functions
if func["cumulative_time"] > 0.1: # More than 100ms
optimization_opportunities.append(
{
"operation": operation_name,
"function": func["function"],
"issue": "High cumulative time",
"time": func["cumulative_time"],
"priority": "High"
if func["cumulative_time"] > 1.0
else "Medium",
}
)
# Memory optimization opportunities
memory_profile = data.get("memory_profile", {})
if memory_profile.get("peak_memory_mb", 0) > 100: # More than 100MB
optimization_opportunities.append(
{
"operation": operation_name,
"issue": "High memory usage",
"memory_mb": memory_profile["peak_memory_mb"],
"priority": "High"
if memory_profile["peak_memory_mb"] > 500
else "Medium",
}
)
return {
"performance_summary": performance_summary,
"optimization_opportunities": optimization_opportunities,
"total_operations_profiled": len(self.profiling_data),
}
class TestPerformanceProfiling:
"""Performance profiling test suite."""
@pytest.fixture
async def profiling_data_provider(self):
"""Create data provider for profiling tests."""
provider = Mock()
def generate_profiling_data(symbol: str) -> pd.DataFrame:
"""Generate data with known performance characteristics."""
# Generate larger dataset to create measurable performance impact
dates = pd.date_range(
start="2020-01-01", end="2023-12-31", freq="D"
) # 4 years
np.random.seed(hash(symbol) % 1000)
returns = np.random.normal(0.0008, 0.02, len(dates))
prices = 100 * np.cumprod(1 + returns)
# Add some computationally expensive operations
high_prices = prices * np.random.uniform(1.01, 1.05, len(dates))
low_prices = prices * np.random.uniform(0.95, 0.99, len(dates))
# Simulate expensive volume calculations
base_volume = np.random.randint(1000000, 10000000, len(dates))
volume_multiplier = np.exp(
np.random.normal(0, 0.1, len(dates))
) # Log-normal distribution
volumes = (base_volume * volume_multiplier).astype(int)
return pd.DataFrame(
{
"Open": prices * np.random.uniform(0.995, 1.005, len(dates)),
"High": high_prices,
"Low": low_prices,
"Close": prices,
"Volume": volumes,
"Adj Close": prices,
},
index=dates,
)
provider.get_stock_data.side_effect = generate_profiling_data
return provider
async def test_profile_backtest_execution(self, profiling_data_provider):
"""Profile complete backtest execution to identify bottlenecks."""
profiler = PerformanceProfiler()
engine = VectorBTEngine(data_provider=profiling_data_provider)
strategies_to_profile = ["sma_cross", "rsi", "macd", "bollinger"]
for strategy in strategies_to_profile:
with profiler.profile_cpu(f"backtest_{strategy}"):
with profiler.profile_memory(f"backtest_{strategy}"):
await engine.run_backtest(
symbol="PROFILE_TEST",
strategy_type=strategy,
parameters=STRATEGY_TEMPLATES[strategy]["parameters"],
start_date="2022-01-01",
end_date="2023-12-31",
)
# Analyze profiling results
report = profiler.generate_optimization_report()
# Log performance analysis
logger.info("Backtest Execution Profiling Results:")
for operation, summary in report["performance_summary"].items():
logger.info(
f" {operation}: {summary['execution_time']:.3f}s, "
f"{summary['peak_memory_mb']:.1f}MB peak"
)
# Log optimization opportunities
if report["optimization_opportunities"]:
logger.info("Optimization Opportunities:")
for opportunity in report["optimization_opportunities"]:
priority_symbol = "🔴" if opportunity["priority"] == "High" else "🟡"
logger.info(
f" {priority_symbol} {opportunity['operation']}: {opportunity['issue']}"
)
# Performance assertions
max_execution_time = max(
summary["execution_time"]
for summary in report["performance_summary"].values()
)
assert max_execution_time <= 5.0, (
f"Slowest backtest took too long: {max_execution_time:.2f}s"
)
high_priority_issues = [
opp
for opp in report["optimization_opportunities"]
if opp["priority"] == "High"
]
assert len(high_priority_issues) <= 2, (
f"Too many high-priority performance issues: {len(high_priority_issues)}"
)
return report
async def test_profile_data_loading_bottlenecks(self, profiling_data_provider):
"""Profile data loading operations to identify I/O bottlenecks."""
profiler = PerformanceProfiler()
engine = VectorBTEngine(data_provider=profiling_data_provider)
symbols = ["DATA_1", "DATA_2", "DATA_3", "DATA_4", "DATA_5"]
# Profile data loading operations
for symbol in symbols:
with profiler.profile_cpu(f"data_loading_{symbol}"):
with profiler.profile_memory(f"data_loading_{symbol}"):
# Profile the data fetching specifically
await engine.get_historical_data(
symbol=symbol, start_date="2020-01-01", end_date="2023-12-31"
)
# Analyze data loading performance
data_loading_times = []
data_loading_memory = []
for symbol in symbols:
operation_name = f"data_loading_{symbol}"
if operation_name in profiler.profiling_data:
data_loading_times.append(
profiler.profiling_data[operation_name]["execution_time"]
)
memory_profile = profiler.profiling_data[operation_name].get(
"memory_profile", {}
)
data_loading_memory.append(memory_profile.get("peak_memory_mb", 0))
avg_loading_time = np.mean(data_loading_times) if data_loading_times else 0
max_loading_time = max(data_loading_times) if data_loading_times else 0
avg_loading_memory = np.mean(data_loading_memory) if data_loading_memory else 0
logger.info("Data Loading Performance Analysis:")
logger.info(f" Average Loading Time: {avg_loading_time:.3f}s")
logger.info(f" Maximum Loading Time: {max_loading_time:.3f}s")
logger.info(f" Average Memory Usage: {avg_loading_memory:.1f}MB")
# Performance assertions for data loading
assert avg_loading_time <= 0.5, (
f"Average data loading too slow: {avg_loading_time:.3f}s"
)
assert max_loading_time <= 1.0, (
f"Slowest data loading too slow: {max_loading_time:.3f}s"
)
assert avg_loading_memory <= 50.0, (
f"Data loading memory usage too high: {avg_loading_memory:.1f}MB"
)
return {
"avg_loading_time": avg_loading_time,
"max_loading_time": max_loading_time,
"avg_loading_memory": avg_loading_memory,
"individual_times": data_loading_times,
}
async def test_profile_database_query_performance(
self, profiling_data_provider, db_session
):
"""Profile database queries to identify slow operations."""
profiler = PerformanceProfiler()
engine = VectorBTEngine(data_provider=profiling_data_provider)
# Generate test data for database profiling
test_results = []
for i in range(10):
result = await engine.run_backtest(
symbol=f"DB_PROFILE_{i}",
strategy_type="sma_cross",
parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
start_date="2023-01-01",
end_date="2023-12-31",
)
test_results.append(result)
# Profile database operations
query_profiles = []
with BacktestPersistenceManager(session=db_session) as persistence:
# Profile save operations
for i, result in enumerate(test_results):
query_profile = profiler.profile_database_query(
f"save_backtest_{i}",
lambda r=result: persistence.save_backtest_result(
vectorbt_results=r,
execution_time=2.0,
notes="Database profiling test",
),
)
query_profiles.append(query_profile)
# Get saved IDs for retrieval profiling
saved_ids = [qp.get("result") for qp in query_profiles if qp.get("success")]
# Profile retrieval operations
for i, backtest_id in enumerate(
saved_ids[:5]
): # Profile first 5 retrievals
query_profile = profiler.profile_database_query(
f"retrieve_backtest_{i}",
lambda bid=backtest_id: persistence.get_backtest_by_id(bid),
)
query_profiles.append(query_profile)
# Profile bulk query operations
bulk_query_profile = profiler.profile_database_query(
"bulk_query_by_strategy",
lambda: persistence.get_backtests_by_strategy("sma_cross"),
)
query_profiles.append(bulk_query_profile)
# Analyze database query performance
save_times = [
qp["execution_time_ms"]
for qp in query_profiles
if "save_backtest" in qp["query_name"] and qp["success"]
]
retrieve_times = [
qp["execution_time_ms"]
for qp in query_profiles
if "retrieve_backtest" in qp["query_name"] and qp["success"]
]
avg_save_time = np.mean(save_times) if save_times else 0
avg_retrieve_time = np.mean(retrieve_times) if retrieve_times else 0
bulk_query_time = (
bulk_query_profile["execution_time_ms"]
if bulk_query_profile["success"]
else 0
)
logger.info("Database Query Performance Analysis:")
logger.info(f" Average Save Time: {avg_save_time:.1f}ms")
logger.info(f" Average Retrieve Time: {avg_retrieve_time:.1f}ms")
logger.info(f" Bulk Query Time: {bulk_query_time:.1f}ms")
# Identify slow queries
slow_queries = [
qp
for qp in query_profiles
if qp["execution_time_ms"] > 100 and qp["success"]
]
logger.info(f" Slow Queries (>100ms): {len(slow_queries)}")
# Performance assertions for database queries
assert avg_save_time <= 50.0, (
f"Average save time too slow: {avg_save_time:.1f}ms"
)
assert avg_retrieve_time <= 20.0, (
f"Average retrieve time too slow: {avg_retrieve_time:.1f}ms"
)
assert bulk_query_time <= 100.0, f"Bulk query too slow: {bulk_query_time:.1f}ms"
assert len(slow_queries) <= 2, f"Too many slow queries: {len(slow_queries)}"
return {
"avg_save_time": avg_save_time,
"avg_retrieve_time": avg_retrieve_time,
"bulk_query_time": bulk_query_time,
"slow_queries": len(slow_queries),
"query_profiles": query_profiles,
}
async def test_profile_memory_allocation_patterns(self, profiling_data_provider):
"""Profile memory allocation patterns to identify hotspots."""
profiler = PerformanceProfiler()
engine = VectorBTEngine(data_provider=profiling_data_provider)
# Test different memory usage patterns
memory_test_cases = [
("small_dataset", "2023-06-01", "2023-12-31"),
("medium_dataset", "2022-01-01", "2023-12-31"),
("large_dataset", "2020-01-01", "2023-12-31"),
]
memory_profiles = []
for case_name, start_date, end_date in memory_test_cases:
with profiler.profile_memory(f"memory_{case_name}"):
await engine.run_backtest(
symbol="MEMORY_TEST",
strategy_type="macd",
parameters=STRATEGY_TEMPLATES["macd"]["parameters"],
start_date=start_date,
end_date=end_date,
)
memory_data = profiler.profiling_data[f"memory_{case_name}"][
"memory_profile"
]
memory_profiles.append(
{
"case": case_name,
"peak_memory_mb": memory_data["peak_memory_mb"],
"memory_growth_mb": memory_data["memory_growth_mb"],
"data_points": len(
pd.date_range(start=start_date, end=end_date, freq="D")
),
}
)
# Analyze memory scaling
data_points = [mp["data_points"] for mp in memory_profiles]
peak_memories = [mp["peak_memory_mb"] for mp in memory_profiles]
# Calculate memory efficiency (MB per 1000 data points)
memory_efficiency = [
(peak_mem / data_pts * 1000)
for peak_mem, data_pts in zip(peak_memories, data_points, strict=False)
]
avg_memory_efficiency = np.mean(memory_efficiency)
logger.info("Memory Allocation Pattern Analysis:")
for profile in memory_profiles:
efficiency = profile["peak_memory_mb"] / profile["data_points"] * 1000
logger.info(
f" {profile['case']}: {profile['peak_memory_mb']:.1f}MB peak "
f"({efficiency:.2f} MB/1k points)"
)
logger.info(
f" Average Memory Efficiency: {avg_memory_efficiency:.2f} MB per 1000 data points"
)
# Memory efficiency assertions
assert avg_memory_efficiency <= 5.0, (
f"Memory efficiency too poor: {avg_memory_efficiency:.2f} MB/1k points"
)
assert max(peak_memories) <= 200.0, (
f"Peak memory usage too high: {max(peak_memories):.1f}MB"
)
return {
"memory_profiles": memory_profiles,
"avg_memory_efficiency": avg_memory_efficiency,
"peak_memory_usage": max(peak_memories),
}
async def test_profile_cpu_vs_io_bound_operations(self, profiling_data_provider):
"""Profile CPU-bound vs I/O-bound operations to optimize resource usage."""
profiler = PerformanceProfiler()
engine = VectorBTEngine(data_provider=profiling_data_provider)
# Profile CPU-intensive strategy
with profiler.profile_cpu("cpu_intensive_strategy"):
await engine.run_backtest(
symbol="CPU_TEST",
strategy_type="bollinger", # More calculations
parameters=STRATEGY_TEMPLATES["bollinger"]["parameters"],
start_date="2022-01-01",
end_date="2023-12-31",
)
# Profile I/O-intensive operations (multiple data fetches)
with profiler.profile_cpu("io_intensive_operations"):
io_symbols = ["IO_1", "IO_2", "IO_3", "IO_4", "IO_5"]
io_results = []
for symbol in io_symbols:
result = await engine.run_backtest(
symbol=symbol,
strategy_type="sma_cross", # Simpler calculations
parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
start_date="2023-06-01",
end_date="2023-12-31",
)
io_results.append(result)
# Analyze CPU vs I/O characteristics
cpu_analysis = profiler.analyze_hotspots("cpu_intensive_strategy")
io_analysis = profiler.analyze_hotspots("io_intensive_operations")
cpu_time = cpu_analysis.get("total_execution_time", 0)
io_time = io_analysis.get("total_execution_time", 0)
# Analyze function call patterns
cpu_top_functions = cpu_analysis.get("top_functions_by_cumulative", [])
io_top_functions = io_analysis.get("top_functions_by_cumulative", [])
# Calculate I/O vs CPU characteristics
cpu_bound_ratio = (
cpu_time / (cpu_time + io_time) if (cpu_time + io_time) > 0 else 0
)
logger.info("CPU vs I/O Bound Analysis:")
logger.info(f" CPU-Intensive Operation: {cpu_time:.3f}s")
logger.info(f" I/O-Intensive Operations: {io_time:.3f}s")
logger.info(f" CPU-Bound Ratio: {cpu_bound_ratio:.2%}")
logger.info(" Top CPU-Intensive Functions:")
for func in cpu_top_functions[:3]:
logger.info(f" {func['function']}: {func['cumulative_time']:.3f}s")
logger.info(" Top I/O-Intensive Functions:")
for func in io_top_functions[:3]:
logger.info(f" {func['function']}: {func['cumulative_time']:.3f}s")
# Performance balance assertions
assert cpu_time <= 3.0, f"CPU-intensive operation too slow: {cpu_time:.3f}s"
assert io_time <= 5.0, f"I/O-intensive operations too slow: {io_time:.3f}s"
return {
"cpu_time": cpu_time,
"io_time": io_time,
"cpu_bound_ratio": cpu_bound_ratio,
"cpu_top_functions": cpu_top_functions[:5],
"io_top_functions": io_top_functions[:5],
}
async def test_comprehensive_profiling_suite(
self, profiling_data_provider, db_session
):
"""Run comprehensive profiling suite and generate optimization report."""
logger.info("Starting Comprehensive Performance Profiling Suite...")
profiling_results = {}
# Run all profiling tests
profiling_results[
"backtest_execution"
] = await self.test_profile_backtest_execution(profiling_data_provider)
profiling_results[
"data_loading"
] = await self.test_profile_data_loading_bottlenecks(profiling_data_provider)
profiling_results[
"database_queries"
] = await self.test_profile_database_query_performance(
profiling_data_provider, db_session
)
profiling_results[
"memory_allocation"
] = await self.test_profile_memory_allocation_patterns(profiling_data_provider)
profiling_results[
"cpu_vs_io"
] = await self.test_profile_cpu_vs_io_bound_operations(profiling_data_provider)
# Generate comprehensive optimization report
optimization_report = {
"executive_summary": {
"profiling_areas": len(profiling_results),
"performance_bottlenecks": [],
"optimization_priorities": [],
},
"detailed_analysis": profiling_results,
}
# Identify key bottlenecks and priorities
bottlenecks = []
priorities = []
# Analyze backtest execution performance
backtest_report = profiling_results["backtest_execution"]
high_priority_issues = [
opp
for opp in backtest_report.get("optimization_opportunities", [])
if opp["priority"] == "High"
]
if high_priority_issues:
bottlenecks.append("High-priority performance issues in backtest execution")
priorities.append("Optimize hot functions in strategy calculations")
# Analyze data loading performance
data_loading = profiling_results["data_loading"]
if data_loading["max_loading_time"] > 0.8:
bottlenecks.append("Slow data loading operations")
priorities.append("Implement data caching or optimize data provider")
# Analyze database performance
db_performance = profiling_results["database_queries"]
if db_performance["slow_queries"] > 1:
bottlenecks.append("Multiple slow database queries detected")
priorities.append("Add database indexes or optimize query patterns")
# Analyze memory efficiency
memory_analysis = profiling_results["memory_allocation"]
if memory_analysis["avg_memory_efficiency"] > 3.0:
bottlenecks.append("High memory usage per data point")
priorities.append("Optimize memory allocation patterns")
optimization_report["executive_summary"]["performance_bottlenecks"] = (
bottlenecks
)
optimization_report["executive_summary"]["optimization_priorities"] = priorities
# Log comprehensive report
logger.info(
f"\n{'=' * 60}\n"
f"COMPREHENSIVE PROFILING REPORT\n"
f"{'=' * 60}\n"
f"Profiling Areas Analyzed: {len(profiling_results)}\n"
f"Performance Bottlenecks: {len(bottlenecks)}\n"
f"{'=' * 60}\n"
)
if bottlenecks:
logger.info("🔍 PERFORMANCE BOTTLENECKS IDENTIFIED:")
for i, bottleneck in enumerate(bottlenecks, 1):
logger.info(f" {i}. {bottleneck}")
if priorities:
logger.info("\n🎯 OPTIMIZATION PRIORITIES:")
for i, priority in enumerate(priorities, 1):
logger.info(f" {i}. {priority}")
logger.info(f"\n{'=' * 60}")
# Assert profiling success
assert len(bottlenecks) <= 3, (
f"Too many performance bottlenecks identified: {len(bottlenecks)}"
)
return optimization_report
if __name__ == "__main__":
# Run profiling tests
pytest.main(
[
__file__,
"-v",
"--tb=short",
"--asyncio-mode=auto",
"--timeout=300", # 5 minute timeout for profiling tests
]
)
```
--------------------------------------------------------------------------------
/tests/performance/test_benchmarks.py:
--------------------------------------------------------------------------------
```python
"""
Performance Benchmarks Against Target Metrics.
This test suite covers:
- Backtest execution < 2 seconds per backtest
- Memory usage < 500MB per backtest
- Cache hit rate > 80%
- API failure rate < 0.1%
- Database query performance < 100ms
- Throughput targets (requests per second)
- Response time SLA compliance
- Resource utilization efficiency
"""
import asyncio
import gc
import logging
import os
import statistics
import time
from dataclasses import dataclass
from typing import Any
from unittest.mock import Mock, patch
import numpy as np
import pandas as pd
import psutil
import pytest
from maverick_mcp.backtesting import VectorBTEngine
from maverick_mcp.backtesting.persistence import BacktestPersistenceManager
from maverick_mcp.backtesting.strategies import STRATEGY_TEMPLATES
logger = logging.getLogger(__name__)
@dataclass
class BenchmarkResult:
"""Data class for benchmark test results."""
test_name: str
target_value: float
actual_value: float
unit: str
passed: bool
margin: float
details: dict[str, Any]
class BenchmarkTracker:
"""Track and validate performance benchmarks."""
def __init__(self):
self.results = []
self.process = psutil.Process(os.getpid())
def add_benchmark(
self,
test_name: str,
target_value: float,
actual_value: float,
unit: str,
comparison: str = "<=",
details: dict[str, Any] | None = None,
) -> BenchmarkResult:
"""Add a benchmark result."""
if comparison == "<=":
passed = actual_value <= target_value
margin = (
(actual_value - target_value) / target_value if target_value > 0 else 0
)
elif comparison == ">=":
passed = actual_value >= target_value
margin = (
(target_value - actual_value) / target_value if target_value > 0 else 0
)
else:
raise ValueError(f"Unsupported comparison: {comparison}")
result = BenchmarkResult(
test_name=test_name,
target_value=target_value,
actual_value=actual_value,
unit=unit,
passed=passed,
margin=margin,
details=details or {},
)
self.results.append(result)
status = "✓ PASS" if passed else "✗ FAIL"
logger.info(
f"{status} {test_name}: {actual_value:.3f}{unit} (target: {target_value}{unit})"
)
return result
def get_memory_usage(self) -> float:
"""Get current memory usage in MB."""
return self.process.memory_info().rss / 1024 / 1024
def get_cpu_usage(self) -> float:
"""Get current CPU usage percentage."""
return self.process.cpu_percent()
def summary(self) -> dict[str, Any]:
"""Generate benchmark summary."""
total_tests = len(self.results)
passed_tests = sum(1 for r in self.results if r.passed)
failed_tests = total_tests - passed_tests
return {
"total_tests": total_tests,
"passed_tests": passed_tests,
"failed_tests": failed_tests,
"pass_rate": passed_tests / total_tests if total_tests > 0 else 0,
"results": self.results,
}
class TestPerformanceBenchmarks:
"""Performance benchmarks against target metrics."""
@pytest.fixture
async def benchmark_data_provider(self):
"""Create optimized data provider for benchmarks."""
provider = Mock()
def generate_benchmark_data(symbol: str) -> pd.DataFrame:
"""Generate optimized data for benchmarking."""
# Use symbol hash for deterministic but varied data
seed = hash(symbol) % 1000
np.random.seed(seed)
# Generate 1 year of data
dates = pd.date_range(start="2023-01-01", end="2023-12-31", freq="D")
returns = np.random.normal(0.0008, 0.02, len(dates))
prices = 100 * np.cumprod(1 + returns)
return pd.DataFrame(
{
"Open": prices * np.random.uniform(0.995, 1.005, len(dates)),
"High": prices * np.random.uniform(1.005, 1.025, len(dates)),
"Low": prices * np.random.uniform(0.975, 0.995, len(dates)),
"Close": prices,
"Volume": np.random.randint(1000000, 5000000, len(dates)),
"Adj Close": prices,
},
index=dates,
)
provider.get_stock_data.side_effect = generate_benchmark_data
return provider
async def test_backtest_execution_time_benchmark(self, benchmark_data_provider):
"""Test: Backtest execution < 2 seconds per backtest."""
benchmark = BenchmarkTracker()
engine = VectorBTEngine(data_provider=benchmark_data_provider)
test_cases = [
("AAPL", "sma_cross"),
("GOOGL", "rsi"),
("MSFT", "macd"),
("AMZN", "bollinger"),
("TSLA", "momentum"),
]
execution_times = []
for symbol, strategy in test_cases:
parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
start_time = time.time()
result = await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date="2023-01-01",
end_date="2023-12-31",
)
execution_time = time.time() - start_time
execution_times.append(execution_time)
# Individual backtest benchmark
benchmark.add_benchmark(
test_name=f"backtest_time_{symbol}_{strategy}",
target_value=2.0,
actual_value=execution_time,
unit="s",
comparison="<=",
details={
"symbol": symbol,
"strategy": strategy,
"result_size": len(str(result)),
},
)
# Overall benchmark
avg_execution_time = statistics.mean(execution_times)
max_execution_time = max(execution_times)
benchmark.add_benchmark(
test_name="avg_backtest_execution_time",
target_value=2.0,
actual_value=avg_execution_time,
unit="s",
comparison="<=",
details={"individual_times": execution_times},
)
benchmark.add_benchmark(
test_name="max_backtest_execution_time",
target_value=3.0, # Allow some variance
actual_value=max_execution_time,
unit="s",
comparison="<=",
details={
"slowest_case": test_cases[execution_times.index(max_execution_time)]
},
)
logger.info(
f"Backtest Execution Time Benchmark Summary:\n"
f" • Average: {avg_execution_time:.3f}s\n"
f" • Maximum: {max_execution_time:.3f}s\n"
f" • Minimum: {min(execution_times):.3f}s\n"
f" • Standard Deviation: {statistics.stdev(execution_times):.3f}s"
)
return benchmark.summary()
async def test_memory_usage_benchmark(self, benchmark_data_provider):
"""Test: Memory usage < 500MB per backtest."""
benchmark = BenchmarkTracker()
engine = VectorBTEngine(data_provider=benchmark_data_provider)
initial_memory = benchmark.get_memory_usage()
memory_measurements = []
test_symbols = [
"MEM_TEST_1",
"MEM_TEST_2",
"MEM_TEST_3",
"MEM_TEST_4",
"MEM_TEST_5",
]
for _i, symbol in enumerate(test_symbols):
gc.collect() # Force garbage collection before measurement
pre_backtest_memory = benchmark.get_memory_usage()
# Run backtest
result = await engine.run_backtest(
symbol=symbol,
strategy_type="sma_cross",
parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
start_date="2023-01-01",
end_date="2023-12-31",
)
post_backtest_memory = benchmark.get_memory_usage()
memory_delta = post_backtest_memory - pre_backtest_memory
memory_measurements.append(
{
"symbol": symbol,
"pre_memory": pre_backtest_memory,
"post_memory": post_backtest_memory,
"delta": memory_delta,
}
)
# Individual memory benchmark
benchmark.add_benchmark(
test_name=f"memory_usage_{symbol}",
target_value=500.0,
actual_value=memory_delta,
unit="MB",
comparison="<=",
details={
"pre_memory": pre_backtest_memory,
"post_memory": post_backtest_memory,
"result_size": len(str(result)),
},
)
# Overall memory benchmarks
total_memory_growth = benchmark.get_memory_usage() - initial_memory
avg_memory_per_backtest = (
total_memory_growth / len(test_symbols) if test_symbols else 0
)
max_memory_delta = max(m["delta"] for m in memory_measurements)
benchmark.add_benchmark(
test_name="avg_memory_per_backtest",
target_value=500.0,
actual_value=avg_memory_per_backtest,
unit="MB",
comparison="<=",
details={
"total_growth": total_memory_growth,
"measurements": memory_measurements,
},
)
benchmark.add_benchmark(
test_name="max_memory_per_backtest",
target_value=750.0, # Allow some variance
actual_value=max_memory_delta,
unit="MB",
comparison="<=",
details={
"worst_case": memory_measurements[
next(
i
for i, m in enumerate(memory_measurements)
if m["delta"] == max_memory_delta
)
]
},
)
logger.info(
f"Memory Usage Benchmark Summary:\n"
f" • Total Growth: {total_memory_growth:.1f}MB\n"
f" • Avg per Backtest: {avg_memory_per_backtest:.1f}MB\n"
f" • Max per Backtest: {max_memory_delta:.1f}MB\n"
f" • Initial Memory: {initial_memory:.1f}MB"
)
return benchmark.summary()
async def test_cache_hit_rate_benchmark(self, benchmark_data_provider):
"""Test: Cache hit rate > 80%."""
benchmark = BenchmarkTracker()
engine = VectorBTEngine(data_provider=benchmark_data_provider)
# Mock cache to track hits/misses
cache_stats = {"hits": 0, "misses": 0, "total_requests": 0}
def mock_cache_get(key):
cache_stats["total_requests"] += 1
# Simulate realistic cache behavior
if cache_stats["total_requests"] <= 5: # First few are misses
cache_stats["misses"] += 1
return None
else: # Later requests are hits
cache_stats["hits"] += 1
return "cached_result"
with patch(
"maverick_mcp.core.cache.CacheManager.get", side_effect=mock_cache_get
):
# Run multiple backtests with repeated data access
symbols = [
"CACHE_A",
"CACHE_B",
"CACHE_A",
"CACHE_B",
"CACHE_A",
"CACHE_C",
"CACHE_A",
]
for symbol in symbols:
await engine.run_backtest(
symbol=symbol,
strategy_type="sma_cross",
parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
start_date="2023-01-01",
end_date="2023-12-31",
)
# Calculate cache hit rate
total_cache_requests = cache_stats["total_requests"]
cache_hits = cache_stats["hits"]
cache_hit_rate = (
(cache_hits / total_cache_requests * 100) if total_cache_requests > 0 else 0
)
benchmark.add_benchmark(
test_name="cache_hit_rate",
target_value=80.0,
actual_value=cache_hit_rate,
unit="%",
comparison=">=",
details={
"total_requests": total_cache_requests,
"hits": cache_hits,
"misses": cache_stats["misses"],
},
)
logger.info(
f"Cache Hit Rate Benchmark:\n"
f" • Total Cache Requests: {total_cache_requests}\n"
f" • Cache Hits: {cache_hits}\n"
f" • Cache Misses: {cache_stats['misses']}\n"
f" • Hit Rate: {cache_hit_rate:.1f}%"
)
return benchmark.summary()
async def test_api_failure_rate_benchmark(self, benchmark_data_provider):
"""Test: API failure rate < 0.1%."""
benchmark = BenchmarkTracker()
# Mock API with occasional failures
api_stats = {"total_calls": 0, "failures": 0}
def mock_api_call(*args, **kwargs):
api_stats["total_calls"] += 1
# Simulate very low failure rate
if api_stats["total_calls"] % 2000 == 0: # 0.05% failure rate
api_stats["failures"] += 1
raise ConnectionError("Simulated API failure")
return benchmark_data_provider.get_stock_data(*args, **kwargs)
# Test with many API calls
with patch.object(
benchmark_data_provider, "get_stock_data", side_effect=mock_api_call
):
engine = VectorBTEngine(data_provider=benchmark_data_provider)
test_symbols = [
f"API_TEST_{i}" for i in range(50)
] # 50 symbols to test API reliability
successful_backtests = 0
failed_backtests = 0
for symbol in test_symbols:
try:
await engine.run_backtest(
symbol=symbol,
strategy_type="rsi",
parameters=STRATEGY_TEMPLATES["rsi"]["parameters"],
start_date="2023-01-01",
end_date="2023-12-31",
)
successful_backtests += 1
except Exception:
failed_backtests += 1
# Calculate failure rates
total_api_calls = api_stats["total_calls"]
api_failures = api_stats["failures"]
api_failure_rate = (
(api_failures / total_api_calls * 100) if total_api_calls > 0 else 0
)
total_backtests = successful_backtests + failed_backtests
backtest_failure_rate = (
(failed_backtests / total_backtests * 100) if total_backtests > 0 else 0
)
benchmark.add_benchmark(
test_name="api_failure_rate",
target_value=0.1,
actual_value=api_failure_rate,
unit="%",
comparison="<=",
details={
"total_api_calls": total_api_calls,
"api_failures": api_failures,
"successful_backtests": successful_backtests,
"failed_backtests": failed_backtests,
},
)
benchmark.add_benchmark(
test_name="backtest_success_rate",
target_value=99.5,
actual_value=100 - backtest_failure_rate,
unit="%",
comparison=">=",
details={"backtest_failure_rate": backtest_failure_rate},
)
logger.info(
f"API Reliability Benchmark:\n"
f" • Total API Calls: {total_api_calls}\n"
f" • API Failures: {api_failures}\n"
f" • API Failure Rate: {api_failure_rate:.3f}%\n"
f" • Backtest Success Rate: {100 - backtest_failure_rate:.2f}%"
)
return benchmark.summary()
async def test_database_query_performance_benchmark(
self, benchmark_data_provider, db_session
):
"""Test: Database query performance < 100ms."""
benchmark = BenchmarkTracker()
engine = VectorBTEngine(data_provider=benchmark_data_provider)
# Generate test data for database operations
test_results = []
for i in range(10):
result = await engine.run_backtest(
symbol=f"DB_PERF_{i}",
strategy_type="macd",
parameters=STRATEGY_TEMPLATES["macd"]["parameters"],
start_date="2023-01-01",
end_date="2023-12-31",
)
test_results.append(result)
# Test database save performance
save_times = []
with BacktestPersistenceManager(session=db_session) as persistence:
for result in test_results:
start_time = time.time()
backtest_id = persistence.save_backtest_result(
vectorbt_results=result,
execution_time=2.0,
notes="DB performance test",
)
save_time = (time.time() - start_time) * 1000 # Convert to ms
save_times.append((backtest_id, save_time))
# Test database query performance
query_times = []
with BacktestPersistenceManager(session=db_session) as persistence:
for backtest_id, _ in save_times:
start_time = time.time()
persistence.get_backtest_by_id(backtest_id)
query_time = (time.time() - start_time) * 1000 # Convert to ms
query_times.append(query_time)
# Test bulk query performance
start_time = time.time()
bulk_results = persistence.get_backtests_by_strategy("macd")
bulk_query_time = (time.time() - start_time) * 1000
# Calculate benchmarks
avg_save_time = statistics.mean([t for _, t in save_times])
max_save_time = max([t for _, t in save_times])
avg_query_time = statistics.mean(query_times)
max_query_time = max(query_times)
# Add benchmarks
benchmark.add_benchmark(
test_name="avg_db_save_time",
target_value=100.0,
actual_value=avg_save_time,
unit="ms",
comparison="<=",
details={"individual_times": [t for _, t in save_times]},
)
benchmark.add_benchmark(
test_name="max_db_save_time",
target_value=200.0,
actual_value=max_save_time,
unit="ms",
comparison="<=",
)
benchmark.add_benchmark(
test_name="avg_db_query_time",
target_value=50.0,
actual_value=avg_query_time,
unit="ms",
comparison="<=",
details={"individual_times": query_times},
)
benchmark.add_benchmark(
test_name="max_db_query_time",
target_value=100.0,
actual_value=max_query_time,
unit="ms",
comparison="<=",
)
benchmark.add_benchmark(
test_name="bulk_query_time",
target_value=200.0,
actual_value=bulk_query_time,
unit="ms",
comparison="<=",
details={"records_returned": len(bulk_results)},
)
logger.info(
f"Database Performance Benchmark:\n"
f" • Avg Save Time: {avg_save_time:.1f}ms\n"
f" • Max Save Time: {max_save_time:.1f}ms\n"
f" • Avg Query Time: {avg_query_time:.1f}ms\n"
f" • Max Query Time: {max_query_time:.1f}ms\n"
f" • Bulk Query Time: {bulk_query_time:.1f}ms"
)
return benchmark.summary()
async def test_throughput_benchmark(self, benchmark_data_provider):
"""Test: Throughput targets (requests per second)."""
benchmark = BenchmarkTracker()
engine = VectorBTEngine(data_provider=benchmark_data_provider)
# Test sequential throughput
symbols = ["THRU_1", "THRU_2", "THRU_3", "THRU_4", "THRU_5"]
start_time = time.time()
for symbol in symbols:
await engine.run_backtest(
symbol=symbol,
strategy_type="sma_cross",
parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
start_date="2023-01-01",
end_date="2023-12-31",
)
sequential_time = time.time() - start_time
sequential_throughput = len(symbols) / sequential_time
# Test concurrent throughput
concurrent_symbols = ["CONC_1", "CONC_2", "CONC_3", "CONC_4", "CONC_5"]
start_time = time.time()
concurrent_tasks = []
for symbol in concurrent_symbols:
task = engine.run_backtest(
symbol=symbol,
strategy_type="sma_cross",
parameters=STRATEGY_TEMPLATES["sma_cross"]["parameters"],
start_date="2023-01-01",
end_date="2023-12-31",
)
concurrent_tasks.append(task)
await asyncio.gather(*concurrent_tasks)
concurrent_time = time.time() - start_time
concurrent_throughput = len(concurrent_symbols) / concurrent_time
# Benchmarks
benchmark.add_benchmark(
test_name="sequential_throughput",
target_value=2.0, # 2 backtests per second
actual_value=sequential_throughput,
unit="req/s",
comparison=">=",
details={"execution_time": sequential_time, "requests": len(symbols)},
)
benchmark.add_benchmark(
test_name="concurrent_throughput",
target_value=5.0, # 5 backtests per second with concurrency
actual_value=concurrent_throughput,
unit="req/s",
comparison=">=",
details={
"execution_time": concurrent_time,
"requests": len(concurrent_symbols),
},
)
# Concurrency speedup
speedup = concurrent_throughput / sequential_throughput
benchmark.add_benchmark(
test_name="concurrency_speedup",
target_value=2.0, # At least 2x speedup
actual_value=speedup,
unit="x",
comparison=">=",
details={
"sequential_throughput": sequential_throughput,
"concurrent_throughput": concurrent_throughput,
},
)
logger.info(
f"Throughput Benchmark:\n"
f" • Sequential: {sequential_throughput:.2f} req/s\n"
f" • Concurrent: {concurrent_throughput:.2f} req/s\n"
f" • Speedup: {speedup:.2f}x"
)
return benchmark.summary()
async def test_response_time_sla_benchmark(self, benchmark_data_provider):
"""Test: Response time SLA compliance."""
benchmark = BenchmarkTracker()
engine = VectorBTEngine(data_provider=benchmark_data_provider)
response_times = []
symbols = [f"SLA_{i}" for i in range(20)]
for symbol in symbols:
start_time = time.time()
await engine.run_backtest(
symbol=symbol,
strategy_type="rsi",
parameters=STRATEGY_TEMPLATES["rsi"]["parameters"],
start_date="2023-01-01",
end_date="2023-12-31",
)
response_time = (time.time() - start_time) * 1000 # Convert to ms
response_times.append(response_time)
# SLA percentile benchmarks
p50 = np.percentile(response_times, 50)
p95 = np.percentile(response_times, 95)
p99 = np.percentile(response_times, 99)
benchmark.add_benchmark(
test_name="response_time_p50",
target_value=1500.0, # 1.5 seconds for 50th percentile
actual_value=p50,
unit="ms",
comparison="<=",
details={"percentile": "50th"},
)
benchmark.add_benchmark(
test_name="response_time_p95",
target_value=3000.0, # 3 seconds for 95th percentile
actual_value=p95,
unit="ms",
comparison="<=",
details={"percentile": "95th"},
)
benchmark.add_benchmark(
test_name="response_time_p99",
target_value=5000.0, # 5 seconds for 99th percentile
actual_value=p99,
unit="ms",
comparison="<=",
details={"percentile": "99th"},
)
# SLA compliance rate (percentage of requests under target)
sla_target = 2000.0 # 2 seconds
sla_compliant = sum(1 for t in response_times if t <= sla_target)
sla_compliance_rate = sla_compliant / len(response_times) * 100
benchmark.add_benchmark(
test_name="sla_compliance_rate",
target_value=95.0, # 95% of requests should meet SLA
actual_value=sla_compliance_rate,
unit="%",
comparison=">=",
details={
"sla_target_ms": sla_target,
"compliant_requests": sla_compliant,
"total_requests": len(response_times),
},
)
logger.info(
f"Response Time SLA Benchmark:\n"
f" • 50th Percentile: {p50:.1f}ms\n"
f" • 95th Percentile: {p95:.1f}ms\n"
f" • 99th Percentile: {p99:.1f}ms\n"
f" • SLA Compliance: {sla_compliance_rate:.1f}%"
)
return benchmark.summary()
async def test_comprehensive_benchmark_suite(
self, benchmark_data_provider, db_session
):
"""Run comprehensive benchmark suite and generate report."""
logger.info("Running Comprehensive Benchmark Suite...")
# Run all individual benchmarks
benchmark_results = []
benchmark_results.append(
await self.test_backtest_execution_time_benchmark(benchmark_data_provider)
)
benchmark_results.append(
await self.test_memory_usage_benchmark(benchmark_data_provider)
)
benchmark_results.append(
await self.test_cache_hit_rate_benchmark(benchmark_data_provider)
)
benchmark_results.append(
await self.test_api_failure_rate_benchmark(benchmark_data_provider)
)
benchmark_results.append(
await self.test_database_query_performance_benchmark(
benchmark_data_provider, db_session
)
)
benchmark_results.append(
await self.test_throughput_benchmark(benchmark_data_provider)
)
benchmark_results.append(
await self.test_response_time_sla_benchmark(benchmark_data_provider)
)
# Aggregate results
total_tests = sum(r["total_tests"] for r in benchmark_results)
total_passed = sum(r["passed_tests"] for r in benchmark_results)
total_failed = sum(r["failed_tests"] for r in benchmark_results)
overall_pass_rate = total_passed / total_tests if total_tests > 0 else 0
# Generate comprehensive report
report = {
"summary": {
"total_tests": total_tests,
"passed_tests": total_passed,
"failed_tests": total_failed,
"overall_pass_rate": overall_pass_rate,
},
"benchmark_suites": benchmark_results,
"critical_failures": [
result
for suite in benchmark_results
for result in suite["results"]
if not result.passed
and result.margin > 0.2 # More than 20% over target
],
}
logger.info(
f"\n{'=' * 60}\n"
f"COMPREHENSIVE BENCHMARK REPORT\n"
f"{'=' * 60}\n"
f"Total Tests: {total_tests}\n"
f"Passed: {total_passed} ({overall_pass_rate:.1%})\n"
f"Failed: {total_failed}\n"
f"{'=' * 60}\n"
)
# Assert overall benchmark success
assert overall_pass_rate >= 0.8, (
f"Overall benchmark pass rate too low: {overall_pass_rate:.1%}"
)
assert len(report["critical_failures"]) == 0, (
f"Critical benchmark failures detected: {len(report['critical_failures'])}"
)
return report
if __name__ == "__main__":
# Run benchmark tests
pytest.main(
[
__file__,
"-v",
"--tb=short",
"--asyncio-mode=auto",
"--timeout=300", # 5 minute timeout for benchmarks
]
)
```
--------------------------------------------------------------------------------
/tests/integration/test_high_volume.py:
--------------------------------------------------------------------------------
```python
"""
High-Volume Integration Tests for Production Scenarios.
This test suite covers:
- Testing with 100+ symbols
- Testing with years of historical data
- Memory management under load
- Concurrent user scenarios
- Database performance under high load
- Cache efficiency with large datasets
- API rate limiting and throttling
"""
import asyncio
import gc
import logging
import os
import random
import time
from datetime import datetime, timedelta
from unittest.mock import Mock
import numpy as np
import pandas as pd
import psutil
import pytest
from maverick_mcp.backtesting import VectorBTEngine
from maverick_mcp.backtesting.persistence import BacktestPersistenceManager
from maverick_mcp.backtesting.strategies import STRATEGY_TEMPLATES
logger = logging.getLogger(__name__)
# High volume test parameters
LARGE_SYMBOL_SET = [
# Technology
"AAPL",
"MSFT",
"GOOGL",
"AMZN",
"META",
"TSLA",
"NVDA",
"ADBE",
"CRM",
"ORCL",
"NFLX",
"INTC",
"AMD",
"QCOM",
"AVGO",
"TXN",
"MU",
"AMAT",
"LRCX",
"KLAC",
# Finance
"JPM",
"BAC",
"WFC",
"C",
"GS",
"MS",
"AXP",
"BRK-B",
"BLK",
"SPGI",
"CME",
"ICE",
"MCO",
"COF",
"USB",
"TFC",
"PNC",
"SCHW",
"CB",
"AIG",
# Healthcare
"JNJ",
"PFE",
"ABT",
"MRK",
"TMO",
"DHR",
"BMY",
"ABBV",
"AMGN",
"GILD",
"BIIB",
"REGN",
"VRTX",
"ISRG",
"SYK",
"BSX",
"MDT",
"EW",
"HOLX",
"RMD",
# Consumer
"WMT",
"PG",
"KO",
"PEP",
"COST",
"HD",
"MCD",
"NKE",
"SBUX",
"TGT",
"LOW",
"DIS",
"CMCSA",
"VZ",
"T",
"TMUS",
"CVX",
"XOM",
"UNH",
"CVS",
# Industrials
"BA",
"CAT",
"DE",
"GE",
"HON",
"MMM",
"LMT",
"RTX",
"UNP",
"UPS",
"FDX",
"WM",
"EMR",
"ETN",
"PH",
"CMI",
"PCAR",
"ROK",
"DOV",
"ITW",
# Extended set for 100+ symbols
"F",
"GM",
"FORD",
"RIVN",
"LCID",
"PLTR",
"SNOW",
"ZM",
"DOCU",
"OKTA",
]
STRATEGIES_FOR_VOLUME_TEST = ["sma_cross", "rsi", "macd", "bollinger", "momentum"]
class TestHighVolumeIntegration:
"""High-volume integration tests for production scenarios."""
@pytest.fixture
async def high_volume_data_provider(self):
"""Create data provider with large dataset simulation."""
provider = Mock()
def generate_multi_year_data(symbol: str, years: int = 3) -> pd.DataFrame:
"""Generate multi-year realistic data for a symbol."""
# Generate deterministic but varied data based on symbol hash
symbol_seed = hash(symbol) % 10000
np.random.seed(symbol_seed)
# Create 3 years of daily data
start_date = datetime.now() - timedelta(days=years * 365)
dates = pd.date_range(
start=start_date, periods=years * 252, freq="B"
) # Business days
# Generate realistic price movements
base_price = 50 + (symbol_seed % 200) # Base price $50-$250
returns = np.random.normal(0.0005, 0.02, len(dates)) # Daily returns
# Add some trend and volatility clustering
trend = (
np.sin(np.arange(len(dates)) / 252 * 2 * np.pi) * 0.001
) # Annual cycle
returns += trend
# Generate prices
prices = base_price * np.cumprod(1 + returns)
# Create OHLCV data
high_mult = np.random.uniform(1.005, 1.03, len(dates))
low_mult = np.random.uniform(0.97, 0.995, len(dates))
open_mult = np.random.uniform(0.995, 1.005, len(dates))
volumes = np.random.randint(100000, 10000000, len(dates))
data = pd.DataFrame(
{
"Open": prices * open_mult,
"High": prices * high_mult,
"Low": prices * low_mult,
"Close": prices,
"Volume": volumes,
"Adj Close": prices,
},
index=dates,
)
# Ensure OHLC constraints
data["High"] = np.maximum(
data["High"], np.maximum(data["Open"], data["Close"])
)
data["Low"] = np.minimum(
data["Low"], np.minimum(data["Open"], data["Close"])
)
return data
provider.get_stock_data.side_effect = generate_multi_year_data
return provider
async def test_large_symbol_set_backtesting(
self, high_volume_data_provider, benchmark_timer
):
"""Test backtesting with 100+ symbols."""
symbols = LARGE_SYMBOL_SET[:100] # Use first 100 symbols
strategy = "sma_cross"
engine = VectorBTEngine(data_provider=high_volume_data_provider)
parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
results = []
failed_symbols = []
# Track memory usage
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
with benchmark_timer() as timer:
# Process symbols in batches to manage memory
batch_size = 20
for i in range(0, len(symbols), batch_size):
batch_symbols = symbols[i : i + batch_size]
# Process batch
batch_tasks = []
for symbol in batch_symbols:
task = engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date="2022-01-01",
end_date="2023-12-31",
)
batch_tasks.append((symbol, task))
# Execute batch concurrently
batch_results = await asyncio.gather(
*[task for _, task in batch_tasks], return_exceptions=True
)
# Process results
for _j, (symbol, result) in enumerate(
zip(batch_symbols, batch_results, strict=False)
):
if isinstance(result, Exception):
failed_symbols.append(symbol)
logger.error(f"✗ {symbol} failed: {result}")
else:
results.append(result)
if len(results) % 10 == 0:
logger.info(f"Processed {len(results)} symbols...")
# Force garbage collection after each batch
gc.collect()
# Check memory usage
current_memory = process.memory_info().rss / 1024 / 1024
memory_growth = current_memory - initial_memory
if memory_growth > 2000: # More than 2GB growth
logger.warning(f"High memory usage detected: {memory_growth:.1f}MB")
execution_time = timer.elapsed
final_memory = process.memory_info().rss / 1024 / 1024
total_memory_growth = final_memory - initial_memory
# Performance assertions
success_rate = len(results) / len(symbols)
assert success_rate >= 0.85, f"Success rate too low: {success_rate:.1%}"
assert execution_time < 1800, (
f"Execution time too long: {execution_time:.1f}s"
) # 30 minutes max
assert total_memory_growth < 3000, (
f"Memory growth too high: {total_memory_growth:.1f}MB"
) # Max 3GB growth
# Calculate performance metrics
avg_execution_time = execution_time / len(symbols)
throughput = len(results) / execution_time # Backtests per second
logger.info(
f"Large Symbol Set Test Results:\n"
f" • Total Symbols: {len(symbols)}\n"
f" • Successful: {len(results)}\n"
f" • Failed: {len(failed_symbols)}\n"
f" • Success Rate: {success_rate:.1%}\n"
f" • Total Execution Time: {execution_time:.1f}s\n"
f" • Avg Time per Symbol: {avg_execution_time:.2f}s\n"
f" • Throughput: {throughput:.2f} backtests/second\n"
f" • Memory Growth: {total_memory_growth:.1f}MB\n"
f" • Failed Symbols: {failed_symbols[:10]}{'...' if len(failed_symbols) > 10 else ''}"
)
return {
"symbols_processed": len(results),
"execution_time": execution_time,
"throughput": throughput,
"memory_growth": total_memory_growth,
"success_rate": success_rate,
}
async def test_multi_year_historical_data(
self, high_volume_data_provider, benchmark_timer
):
"""Test with years of historical data (high data volume)."""
symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
strategy = "sma_cross"
engine = VectorBTEngine(data_provider=high_volume_data_provider)
parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
# Test with different time periods
time_periods = [
("1_year", "2023-01-01", "2023-12-31"),
("2_years", "2022-01-01", "2023-12-31"),
("3_years", "2021-01-01", "2023-12-31"),
("5_years", "2019-01-01", "2023-12-31"),
]
period_results = {}
for period_name, start_date, end_date in time_periods:
with benchmark_timer() as timer:
period_data = []
for symbol in symbols:
try:
result = await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date=start_date,
end_date=end_date,
)
period_data.append(result)
except Exception as e:
logger.error(f"Failed {symbol} for {period_name}: {e}")
execution_time = timer.elapsed
# Calculate average data points processed
avg_data_points = np.mean(
[len(r.get("equity_curve", [])) for r in period_data]
)
data_throughput = avg_data_points * len(period_data) / execution_time
period_results[period_name] = {
"execution_time": execution_time,
"symbols_processed": len(period_data),
"avg_data_points": avg_data_points,
"data_throughput": data_throughput,
}
logger.info(
f"{period_name.upper()} Period Results:\n"
f" • Execution Time: {execution_time:.1f}s\n"
f" • Avg Data Points: {avg_data_points:.0f}\n"
f" • Data Throughput: {data_throughput:.0f} points/second"
)
# Validate performance scales reasonably with data size
one_year_time = period_results["1_year"]["execution_time"]
three_year_time = period_results["3_years"]["execution_time"]
# 3 years should not take more than 5x the time of 1 year (allow for overhead)
time_scaling = three_year_time / one_year_time
assert time_scaling < 5.0, f"Time scaling too poor: {time_scaling:.1f}x"
return period_results
async def test_concurrent_user_scenarios(
self, high_volume_data_provider, benchmark_timer
):
"""Test concurrent user scenarios with multiple simultaneous backtests."""
symbols = LARGE_SYMBOL_SET[:50]
strategies = STRATEGIES_FOR_VOLUME_TEST
# Simulate different user scenarios
user_scenarios = [
{
"user_id": f"user_{i}",
"symbols": random.sample(symbols, 5),
"strategy": random.choice(strategies),
"start_date": "2022-01-01",
"end_date": "2023-12-31",
}
for i in range(20) # Simulate 20 concurrent users
]
async def simulate_user_session(scenario):
"""Simulate a single user session."""
engine = VectorBTEngine(data_provider=high_volume_data_provider)
parameters = STRATEGY_TEMPLATES[scenario["strategy"]]["parameters"]
user_results = []
session_start = time.time()
for symbol in scenario["symbols"]:
try:
result = await engine.run_backtest(
symbol=symbol,
strategy_type=scenario["strategy"],
parameters=parameters,
start_date=scenario["start_date"],
end_date=scenario["end_date"],
)
user_results.append(result)
except Exception as e:
logger.error(f"User {scenario['user_id']} failed on {symbol}: {e}")
session_time = time.time() - session_start
return {
"user_id": scenario["user_id"],
"results": user_results,
"session_time": session_time,
"symbols_processed": len(user_results),
"success_rate": len(user_results) / len(scenario["symbols"]),
}
# Execute all user sessions concurrently
with benchmark_timer() as timer:
# Use semaphore to control concurrency
semaphore = asyncio.Semaphore(10) # Max 10 concurrent sessions
async def run_with_semaphore(scenario):
async with semaphore:
return await simulate_user_session(scenario)
session_results = await asyncio.gather(
*[run_with_semaphore(scenario) for scenario in user_scenarios],
return_exceptions=True,
)
total_execution_time = timer.elapsed
# Analyze results
successful_sessions = [r for r in session_results if isinstance(r, dict)]
failed_sessions = len(session_results) - len(successful_sessions)
total_backtests = sum(r["symbols_processed"] for r in successful_sessions)
avg_session_time = np.mean([r["session_time"] for r in successful_sessions])
avg_success_rate = np.mean([r["success_rate"] for r in successful_sessions])
# Performance assertions
session_success_rate = len(successful_sessions) / len(session_results)
assert session_success_rate >= 0.8, (
f"Session success rate too low: {session_success_rate:.1%}"
)
assert avg_success_rate >= 0.8, (
f"Average backtest success rate too low: {avg_success_rate:.1%}"
)
assert total_execution_time < 600, (
f"Total execution time too long: {total_execution_time:.1f}s"
) # 10 minutes max
concurrent_throughput = total_backtests / total_execution_time
logger.info(
f"Concurrent User Scenarios Results:\n"
f" • Total Users: {len(user_scenarios)}\n"
f" • Successful Sessions: {len(successful_sessions)}\n"
f" • Failed Sessions: {failed_sessions}\n"
f" • Session Success Rate: {session_success_rate:.1%}\n"
f" • Total Backtests: {total_backtests}\n"
f" • Avg Session Time: {avg_session_time:.1f}s\n"
f" • Avg Backtest Success Rate: {avg_success_rate:.1%}\n"
f" • Total Execution Time: {total_execution_time:.1f}s\n"
f" • Concurrent Throughput: {concurrent_throughput:.2f} backtests/second"
)
return {
"session_success_rate": session_success_rate,
"avg_success_rate": avg_success_rate,
"concurrent_throughput": concurrent_throughput,
"total_execution_time": total_execution_time,
}
async def test_database_performance_under_load(
self, high_volume_data_provider, db_session, benchmark_timer
):
"""Test database performance under high load."""
symbols = LARGE_SYMBOL_SET[:30] # 30 symbols for DB test
strategy = "sma_cross"
engine = VectorBTEngine(data_provider=high_volume_data_provider)
parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
# Run backtests and save to database
backtest_results = []
with benchmark_timer() as timer:
# Generate backtest results
for symbol in symbols:
try:
result = await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date="2023-01-01",
end_date="2023-12-31",
)
backtest_results.append(result)
except Exception as e:
logger.error(f"Backtest failed for {symbol}: {e}")
backtest_generation_time = timer.elapsed
# Test database operations under load
with benchmark_timer() as db_timer:
with BacktestPersistenceManager(session=db_session) as persistence:
saved_ids = []
# Batch save results
for result in backtest_results:
try:
backtest_id = persistence.save_backtest_result(
vectorbt_results=result,
execution_time=2.0,
notes=f"High volume test - {result['symbol']}",
)
saved_ids.append(backtest_id)
except Exception as e:
logger.error(f"Save failed for {result['symbol']}: {e}")
# Test batch retrieval
retrieved_results = []
for backtest_id in saved_ids:
try:
retrieved = persistence.get_backtest_by_id(backtest_id)
if retrieved:
retrieved_results.append(retrieved)
except Exception as e:
logger.error(f"Retrieval failed for {backtest_id}: {e}")
# Test queries under load
strategy_results = persistence.get_backtests_by_strategy(strategy)
db_operation_time = db_timer.elapsed
# Performance assertions
save_success_rate = len(saved_ids) / len(backtest_results)
retrieval_success_rate = (
len(retrieved_results) / len(saved_ids) if saved_ids else 0
)
assert save_success_rate >= 0.95, (
f"Database save success rate too low: {save_success_rate:.1%}"
)
assert retrieval_success_rate >= 0.95, (
f"Database retrieval success rate too low: {retrieval_success_rate:.1%}"
)
assert db_operation_time < 300, (
f"Database operations too slow: {db_operation_time:.1f}s"
) # 5 minutes max
# Calculate database performance metrics
save_throughput = len(saved_ids) / db_operation_time
logger.info(
f"Database Performance Under Load Results:\n"
f" • Backtest Generation: {backtest_generation_time:.1f}s\n"
f" • Database Operations: {db_operation_time:.1f}s\n"
f" • Backtests Generated: {len(backtest_results)}\n"
f" • Records Saved: {len(saved_ids)}\n"
f" • Records Retrieved: {len(retrieved_results)}\n"
f" • Save Success Rate: {save_success_rate:.1%}\n"
f" • Retrieval Success Rate: {retrieval_success_rate:.1%}\n"
f" • Save Throughput: {save_throughput:.2f} saves/second\n"
f" • Query Results: {len(strategy_results)} records"
)
return {
"save_success_rate": save_success_rate,
"retrieval_success_rate": retrieval_success_rate,
"save_throughput": save_throughput,
"db_operation_time": db_operation_time,
}
async def test_memory_management_large_datasets(
self, high_volume_data_provider, benchmark_timer
):
"""Test memory management with large datasets."""
symbols = LARGE_SYMBOL_SET[:25] # 25 symbols for memory test
strategies = STRATEGIES_FOR_VOLUME_TEST
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_snapshots = []
engine = VectorBTEngine(data_provider=high_volume_data_provider)
with benchmark_timer() as timer:
for i, symbol in enumerate(symbols):
for strategy in strategies:
try:
parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
# Run backtest
await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date="2021-01-01", # 3 years of data
end_date="2023-12-31",
)
# Take memory snapshot
current_memory = process.memory_info().rss / 1024 / 1024
memory_snapshots.append(
{
"iteration": i * len(strategies)
+ strategies.index(strategy),
"symbol": symbol,
"strategy": strategy,
"memory_mb": current_memory,
"memory_growth": current_memory - initial_memory,
}
)
# Force periodic garbage collection
if (i * len(strategies) + strategies.index(strategy)) % 10 == 0:
gc.collect()
except Exception as e:
logger.error(f"Failed {symbol} with {strategy}: {e}")
execution_time = timer.elapsed
final_memory = process.memory_info().rss / 1024 / 1024
total_memory_growth = final_memory - initial_memory
peak_memory = max(snapshot["memory_mb"] for snapshot in memory_snapshots)
# Analyze memory patterns
memory_growths = [s["memory_growth"] for s in memory_snapshots]
avg_memory_growth = np.mean(memory_growths)
max_memory_growth = max(memory_growths)
# Check for memory leaks (memory should not grow linearly with iterations)
if len(memory_snapshots) > 10:
# Linear regression to detect memory leaks
iterations = [s["iteration"] for s in memory_snapshots]
memory_values = [s["memory_growth"] for s in memory_snapshots]
# Simple linear regression
n = len(iterations)
sum_x = sum(iterations)
sum_y = sum(memory_values)
sum_xy = sum(x * y for x, y in zip(iterations, memory_values, strict=False))
sum_xx = sum(x * x for x in iterations)
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_xx - sum_x * sum_x)
# Memory leak detection (slope should be small)
memory_leak_rate = slope # MB per iteration
else:
memory_leak_rate = 0
# Performance assertions
assert total_memory_growth < 2000, (
f"Total memory growth too high: {total_memory_growth:.1f}MB"
)
assert peak_memory < initial_memory + 2500, (
f"Peak memory too high: {peak_memory:.1f}MB"
)
assert abs(memory_leak_rate) < 5, (
f"Potential memory leak detected: {memory_leak_rate:.2f}MB/iteration"
)
logger.info(
f"Memory Management Large Datasets Results:\n"
f" • Initial Memory: {initial_memory:.1f}MB\n"
f" • Final Memory: {final_memory:.1f}MB\n"
f" • Total Growth: {total_memory_growth:.1f}MB\n"
f" • Peak Memory: {peak_memory:.1f}MB\n"
f" • Avg Growth: {avg_memory_growth:.1f}MB\n"
f" • Max Growth: {max_memory_growth:.1f}MB\n"
f" • Memory Leak Rate: {memory_leak_rate:.2f}MB/iteration\n"
f" • Execution Time: {execution_time:.1f}s\n"
f" • Iterations: {len(memory_snapshots)}"
)
return {
"total_memory_growth": total_memory_growth,
"peak_memory": peak_memory,
"memory_leak_rate": memory_leak_rate,
"execution_time": execution_time,
"memory_snapshots": memory_snapshots,
}
async def test_cache_efficiency_large_dataset(
self, high_volume_data_provider, benchmark_timer
):
"""Test cache efficiency with large datasets."""
# Test cache with repeated access patterns
symbols = LARGE_SYMBOL_SET[:20]
strategy = "sma_cross"
engine = VectorBTEngine(data_provider=high_volume_data_provider)
parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
# First pass - populate cache
with benchmark_timer() as timer:
first_pass_results = []
for symbol in symbols:
try:
result = await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date="2023-01-01",
end_date="2023-12-31",
)
first_pass_results.append(result)
except Exception as e:
logger.error(f"First pass failed for {symbol}: {e}")
first_pass_time = timer.elapsed
# Second pass - should benefit from cache
with benchmark_timer() as timer:
second_pass_results = []
for symbol in symbols:
try:
result = await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date="2023-01-01",
end_date="2023-12-31",
)
second_pass_results.append(result)
except Exception as e:
logger.error(f"Second pass failed for {symbol}: {e}")
second_pass_time = timer.elapsed
# Third pass - different parameters (no cache benefit)
modified_parameters = {
**parameters,
"fast_period": parameters.get("fast_period", 10) + 5,
}
with benchmark_timer() as timer:
third_pass_results = []
for symbol in symbols:
try:
result = await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=modified_parameters,
start_date="2023-01-01",
end_date="2023-12-31",
)
third_pass_results.append(result)
except Exception as e:
logger.error(f"Third pass failed for {symbol}: {e}")
third_pass_time = timer.elapsed
# Calculate cache efficiency metrics
cache_speedup = (
first_pass_time / second_pass_time if second_pass_time > 0 else 1.0
)
no_cache_comparison = (
first_pass_time / third_pass_time if third_pass_time > 0 else 1.0
)
# Cache hit rate estimation (if second pass is significantly faster)
estimated_cache_hit_rate = max(
0, min(1, (first_pass_time - second_pass_time) / first_pass_time)
)
logger.info(
f"Cache Efficiency Large Dataset Results:\n"
f" • First Pass (populate): {first_pass_time:.2f}s ({len(first_pass_results)} symbols)\n"
f" • Second Pass (cached): {second_pass_time:.2f}s ({len(second_pass_results)} symbols)\n"
f" • Third Pass (no cache): {third_pass_time:.2f}s ({len(third_pass_results)} symbols)\n"
f" • Cache Speedup: {cache_speedup:.2f}x\n"
f" • No Cache Comparison: {no_cache_comparison:.2f}x\n"
f" • Estimated Cache Hit Rate: {estimated_cache_hit_rate:.1%}"
)
return {
"first_pass_time": first_pass_time,
"second_pass_time": second_pass_time,
"third_pass_time": third_pass_time,
"cache_speedup": cache_speedup,
"estimated_cache_hit_rate": estimated_cache_hit_rate,
}
if __name__ == "__main__":
# Run high-volume integration tests
pytest.main(
[
__file__,
"-v",
"--tb=short",
"--asyncio-mode=auto",
"--timeout=3600", # 1 hour timeout for high-volume tests
"--durations=20", # Show 20 slowest tests
"-x", # Stop on first failure
]
)
```
--------------------------------------------------------------------------------
/maverick_mcp/monitoring/metrics.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive Prometheus metrics for MaverickMCP backtesting system.
This module provides specialized metrics for monitoring:
- Backtesting execution performance and reliability
- Strategy performance over time
- API rate limiting and failure tracking
- Resource usage and optimization
- Anomaly detection and alerting
"""
import threading
import time
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from prometheus_client import (
CollectorRegistry,
Counter,
Gauge,
Histogram,
Summary,
generate_latest,
)
from maverick_mcp.utils.logging import get_logger
logger = get_logger(__name__)
# Custom registry for backtesting metrics to avoid conflicts
BACKTESTING_REGISTRY = CollectorRegistry()
# =============================================================================
# BACKTESTING EXECUTION METRICS
# =============================================================================
# Backtest execution counters
backtest_executions_total = Counter(
"maverick_backtest_executions_total",
"Total number of backtesting executions",
["strategy_name", "status", "symbol", "timeframe"],
registry=BACKTESTING_REGISTRY,
)
backtest_execution_duration = Histogram(
"maverick_backtest_execution_duration_seconds",
"Duration of backtesting executions in seconds",
["strategy_name", "symbol", "timeframe", "data_size"],
buckets=(
0.1,
0.5,
1.0,
2.5,
5.0,
10.0,
30.0,
60.0,
120.0,
300.0,
600.0,
float("inf"),
),
registry=BACKTESTING_REGISTRY,
)
backtest_data_points_processed = Counter(
"maverick_backtest_data_points_processed_total",
"Total number of data points processed during backtesting",
["strategy_name", "symbol", "timeframe"],
registry=BACKTESTING_REGISTRY,
)
backtest_memory_usage = Histogram(
"maverick_backtest_memory_usage_mb",
"Memory usage during backtesting in MB",
["strategy_name", "symbol", "complexity"],
buckets=(10, 25, 50, 100, 250, 500, 1000, 2500, 5000, float("inf")),
registry=BACKTESTING_REGISTRY,
)
# =============================================================================
# STRATEGY PERFORMANCE METRICS
# =============================================================================
# Strategy returns and performance
strategy_returns = Histogram(
"maverick_strategy_returns_percent",
"Strategy returns in percentage",
["strategy_name", "symbol", "period"],
buckets=(-50, -25, -10, -5, -1, 0, 1, 5, 10, 25, 50, 100, float("inf")),
registry=BACKTESTING_REGISTRY,
)
strategy_sharpe_ratio = Histogram(
"maverick_strategy_sharpe_ratio",
"Strategy Sharpe ratio",
["strategy_name", "symbol", "period"],
buckets=(-2, -1, -0.5, 0, 0.5, 1.0, 1.5, 2.0, 3.0, 4.0, float("inf")),
registry=BACKTESTING_REGISTRY,
)
strategy_max_drawdown = Histogram(
"maverick_strategy_max_drawdown_percent",
"Maximum drawdown percentage for strategy",
["strategy_name", "symbol", "period"],
buckets=(0, 5, 10, 15, 20, 30, 40, 50, 75, 100, float("inf")),
registry=BACKTESTING_REGISTRY,
)
strategy_win_rate = Histogram(
"maverick_strategy_win_rate_percent",
"Win rate percentage for strategy",
["strategy_name", "symbol", "period"],
buckets=(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100),
registry=BACKTESTING_REGISTRY,
)
strategy_trades_total = Counter(
"maverick_strategy_trades_total",
"Total number of trades executed by strategy",
["strategy_name", "symbol", "trade_type", "outcome"],
registry=BACKTESTING_REGISTRY,
)
# Strategy execution latency
strategy_execution_latency = Summary(
"maverick_strategy_execution_latency_seconds",
"Strategy execution latency for signal generation",
["strategy_name", "complexity"],
registry=BACKTESTING_REGISTRY,
)
# =============================================================================
# API RATE LIMITING AND FAILURE METRICS
# =============================================================================
# API call tracking
api_calls_total = Counter(
"maverick_api_calls_total",
"Total API calls made to external providers",
["provider", "endpoint", "method", "status_code"],
registry=BACKTESTING_REGISTRY,
)
api_call_duration = Histogram(
"maverick_api_call_duration_seconds",
"API call duration in seconds",
["provider", "endpoint"],
buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, float("inf")),
registry=BACKTESTING_REGISTRY,
)
# Rate limiting metrics
rate_limit_hits = Counter(
"maverick_rate_limit_hits_total",
"Total rate limit hits by provider",
["provider", "endpoint", "limit_type"],
registry=BACKTESTING_REGISTRY,
)
rate_limit_remaining = Gauge(
"maverick_rate_limit_remaining",
"Remaining API calls before hitting rate limit",
["provider", "endpoint", "window"],
registry=BACKTESTING_REGISTRY,
)
rate_limit_reset_time = Gauge(
"maverick_rate_limit_reset_timestamp",
"Timestamp when rate limit resets",
["provider", "endpoint"],
registry=BACKTESTING_REGISTRY,
)
# API failures and errors
api_failures_total = Counter(
"maverick_api_failures_total",
"Total API failures by error type",
["provider", "endpoint", "error_type", "error_code"],
registry=BACKTESTING_REGISTRY,
)
api_retry_attempts = Counter(
"maverick_api_retry_attempts_total",
"Total API retry attempts",
["provider", "endpoint", "retry_number"],
registry=BACKTESTING_REGISTRY,
)
# Circuit breaker metrics
circuit_breaker_state = Gauge(
"maverick_circuit_breaker_state",
"Circuit breaker state (0=closed, 1=open, 2=half-open)",
["provider", "endpoint"],
registry=BACKTESTING_REGISTRY,
)
circuit_breaker_failures = Counter(
"maverick_circuit_breaker_failures_total",
"Circuit breaker failure count",
["provider", "endpoint"],
registry=BACKTESTING_REGISTRY,
)
# =============================================================================
# RESOURCE USAGE AND PERFORMANCE METRICS
# =============================================================================
# VectorBT specific metrics
vectorbt_memory_usage = Gauge(
"maverick_vectorbt_memory_usage_mb",
"VectorBT memory usage in MB",
["operation_type"],
registry=BACKTESTING_REGISTRY,
)
vectorbt_computation_time = Histogram(
"maverick_vectorbt_computation_time_seconds",
"VectorBT computation time in seconds",
["operation_type", "data_size"],
buckets=(0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, float("inf")),
registry=BACKTESTING_REGISTRY,
)
# Database query performance
database_query_duration = Histogram(
"maverick_database_query_duration_seconds",
"Database query execution time",
["query_type", "table_name", "operation"],
buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, float("inf")),
registry=BACKTESTING_REGISTRY,
)
database_connection_pool_usage = Gauge(
"maverick_database_connection_pool_usage",
"Database connection pool usage",
["pool_type", "status"],
registry=BACKTESTING_REGISTRY,
)
# Cache performance metrics
cache_operations_total = Counter(
"maverick_cache_operations_total",
"Total cache operations",
["cache_type", "operation", "status"],
registry=BACKTESTING_REGISTRY,
)
cache_hit_ratio = Histogram(
"maverick_cache_hit_ratio",
"Cache hit ratio percentage",
["cache_type", "key_pattern"],
buckets=(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 95, 99, 100),
registry=BACKTESTING_REGISTRY,
)
# =============================================================================
# ANOMALY DETECTION METRICS
# =============================================================================
# Performance anomaly detection
strategy_performance_anomalies = Counter(
"maverick_strategy_performance_anomalies_total",
"Detected strategy performance anomalies",
["strategy_name", "anomaly_type", "severity"],
registry=BACKTESTING_REGISTRY,
)
data_quality_issues = Counter(
"maverick_data_quality_issues_total",
"Data quality issues detected",
["data_source", "issue_type", "symbol"],
registry=BACKTESTING_REGISTRY,
)
resource_usage_alerts = Counter(
"maverick_resource_usage_alerts_total",
"Resource usage threshold alerts",
["resource_type", "threshold_type"],
registry=BACKTESTING_REGISTRY,
)
# Threshold monitoring gauges
performance_thresholds = Gauge(
"maverick_performance_thresholds",
"Performance monitoring thresholds",
["metric_name", "threshold_type"], # threshold_type: warning, critical
registry=BACKTESTING_REGISTRY,
)
# =============================================================================
# BUSINESS METRICS FOR TRADING
# =============================================================================
# Portfolio performance metrics
portfolio_value = Gauge(
"maverick_portfolio_value_usd",
"Current portfolio value in USD",
["portfolio_id", "currency"],
registry=BACKTESTING_REGISTRY,
)
portfolio_daily_pnl = Histogram(
"maverick_portfolio_daily_pnl_usd",
"Daily PnL in USD",
["portfolio_id", "strategy"],
buckets=(
-10000,
-5000,
-1000,
-500,
-100,
0,
100,
500,
1000,
5000,
10000,
float("inf"),
),
registry=BACKTESTING_REGISTRY,
)
active_positions = Gauge(
"maverick_active_positions_count",
"Number of active positions",
["portfolio_id", "symbol", "position_type"],
registry=BACKTESTING_REGISTRY,
)
# =============================================================================
# METRICS COLLECTOR CLASS
# =============================================================================
@dataclass
class PerformanceThreshold:
"""Configuration for performance thresholds."""
metric_name: str
warning_value: float
critical_value: float
comparison_type: str = "greater_than" # greater_than, less_than, equal_to
class BacktestingMetricsCollector:
"""
Comprehensive metrics collector for backtesting operations.
Provides high-level interfaces for tracking backtesting performance,
strategy metrics, API usage, and anomaly detection.
"""
def __init__(self):
self.logger = get_logger(f"{__name__}.BacktestingMetricsCollector")
self._anomaly_thresholds = self._initialize_default_thresholds()
self._lock = threading.Lock()
# Initialize performance thresholds in Prometheus
self._setup_performance_thresholds()
self.logger.info("Backtesting metrics collector initialized")
def _initialize_default_thresholds(self) -> dict[str, PerformanceThreshold]:
"""Initialize default performance thresholds for anomaly detection."""
return {
"sharpe_ratio_low": PerformanceThreshold(
"sharpe_ratio", 0.5, 0.0, "less_than"
),
"max_drawdown_high": PerformanceThreshold(
"max_drawdown", 20.0, 30.0, "greater_than"
),
"win_rate_low": PerformanceThreshold("win_rate", 40.0, 30.0, "less_than"),
"execution_time_high": PerformanceThreshold(
"execution_time", 60.0, 120.0, "greater_than"
),
"api_failure_rate_high": PerformanceThreshold(
"api_failure_rate", 5.0, 10.0, "greater_than"
),
"memory_usage_high": PerformanceThreshold(
"memory_usage", 1000, 2000, "greater_than"
),
}
def _setup_performance_thresholds(self):
"""Setup performance threshold gauges."""
for _threshold_name, threshold in self._anomaly_thresholds.items():
performance_thresholds.labels(
metric_name=threshold.metric_name, threshold_type="warning"
).set(threshold.warning_value)
performance_thresholds.labels(
metric_name=threshold.metric_name, threshold_type="critical"
).set(threshold.critical_value)
@contextmanager
def track_backtest_execution(
self, strategy_name: str, symbol: str, timeframe: str, data_points: int = 0
):
"""
Context manager for tracking backtest execution metrics.
Args:
strategy_name: Name of the trading strategy
symbol: Trading symbol (e.g., 'AAPL')
timeframe: Data timeframe (e.g., '1D', '1H')
data_points: Number of data points being processed
"""
start_time = time.time()
start_memory = self._get_memory_usage()
# Determine data size category
data_size = self._categorize_data_size(data_points)
try:
yield
# Success metrics
duration = time.time() - start_time
memory_used = self._get_memory_usage() - start_memory
backtest_executions_total.labels(
strategy_name=strategy_name,
status="success",
symbol=symbol,
timeframe=timeframe,
).inc()
backtest_execution_duration.labels(
strategy_name=strategy_name,
symbol=symbol,
timeframe=timeframe,
data_size=data_size,
).observe(duration)
if data_points > 0:
backtest_data_points_processed.labels(
strategy_name=strategy_name, symbol=symbol, timeframe=timeframe
).inc(data_points)
if memory_used > 0:
complexity = self._categorize_complexity(data_points, duration)
backtest_memory_usage.labels(
strategy_name=strategy_name, symbol=symbol, complexity=complexity
).observe(memory_used)
# Check for performance anomalies
self._check_execution_anomalies(strategy_name, duration, memory_used)
except Exception as e:
# Error metrics
backtest_executions_total.labels(
strategy_name=strategy_name,
status="failure",
symbol=symbol,
timeframe=timeframe,
).inc()
self.logger.error(f"Backtest execution failed for {strategy_name}: {e}")
raise
def track_strategy_performance(
self,
strategy_name: str,
symbol: str,
period: str,
returns: float,
sharpe_ratio: float,
max_drawdown: float,
win_rate: float,
total_trades: int,
winning_trades: int,
):
"""
Track comprehensive strategy performance metrics.
Args:
strategy_name: Name of the trading strategy
symbol: Trading symbol
period: Performance period (e.g., '1Y', '6M', '3M')
returns: Total returns in percentage
sharpe_ratio: Sharpe ratio
max_drawdown: Maximum drawdown percentage
win_rate: Win rate percentage
total_trades: Total number of trades
winning_trades: Number of winning trades
"""
# Record performance metrics
strategy_returns.labels(
strategy_name=strategy_name, symbol=symbol, period=period
).observe(returns)
strategy_sharpe_ratio.labels(
strategy_name=strategy_name, symbol=symbol, period=period
).observe(sharpe_ratio)
strategy_max_drawdown.labels(
strategy_name=strategy_name, symbol=symbol, period=period
).observe(max_drawdown)
strategy_win_rate.labels(
strategy_name=strategy_name, symbol=symbol, period=period
).observe(win_rate)
# Record trade counts
strategy_trades_total.labels(
strategy_name=strategy_name,
symbol=symbol,
trade_type="total",
outcome="all",
).inc(total_trades)
strategy_trades_total.labels(
strategy_name=strategy_name,
symbol=symbol,
trade_type="winning",
outcome="profit",
).inc(winning_trades)
losing_trades = total_trades - winning_trades
strategy_trades_total.labels(
strategy_name=strategy_name,
symbol=symbol,
trade_type="losing",
outcome="loss",
).inc(losing_trades)
# Check for performance anomalies
self._check_strategy_anomalies(
strategy_name, sharpe_ratio, max_drawdown, win_rate
)
def track_api_call(
self,
provider: str,
endpoint: str,
method: str,
status_code: int,
duration: float,
error_type: str | None = None,
remaining_calls: int | None = None,
reset_time: datetime | None = None,
):
"""
Track API call metrics including rate limiting and failures.
Args:
provider: API provider name (e.g., 'tiingo', 'yahoo')
endpoint: API endpoint
method: HTTP method
status_code: Response status code
duration: Request duration in seconds
error_type: Type of error if request failed
remaining_calls: Remaining API calls before rate limit
reset_time: When rate limit resets
"""
# Basic API call tracking
api_calls_total.labels(
provider=provider,
endpoint=endpoint,
method=method,
status_code=str(status_code),
).inc()
api_call_duration.labels(provider=provider, endpoint=endpoint).observe(duration)
# Rate limiting metrics
if remaining_calls is not None:
rate_limit_remaining.labels(
provider=provider, endpoint=endpoint, window="current"
).set(remaining_calls)
if reset_time is not None:
rate_limit_reset_time.labels(provider=provider, endpoint=endpoint).set(
reset_time.timestamp()
)
# Failure tracking
if status_code >= 400:
error_code = self._categorize_error_code(status_code)
api_failures_total.labels(
provider=provider,
endpoint=endpoint,
error_type=error_type or "unknown",
error_code=error_code,
).inc()
# Check for rate limiting
if status_code == 429:
rate_limit_hits.labels(
provider=provider, endpoint=endpoint, limit_type="requests_per_hour"
).inc()
# Check for API anomalies
self._check_api_anomalies(provider, endpoint, status_code, duration)
def track_circuit_breaker(
self, provider: str, endpoint: str, state: str, failure_count: int
):
"""Track circuit breaker state and failures."""
state_mapping = {"closed": 0, "open": 1, "half-open": 2}
circuit_breaker_state.labels(provider=provider, endpoint=endpoint).set(
state_mapping.get(state, 0)
)
if failure_count > 0:
circuit_breaker_failures.labels(provider=provider, endpoint=endpoint).inc(
failure_count
)
def track_resource_usage(
self,
operation_type: str,
memory_mb: float,
computation_time: float,
data_size: str = "unknown",
):
"""Track resource usage for VectorBT operations."""
vectorbt_memory_usage.labels(operation_type=operation_type).set(memory_mb)
vectorbt_computation_time.labels(
operation_type=operation_type, data_size=data_size
).observe(computation_time)
# Check for resource usage anomalies
if memory_mb > self._anomaly_thresholds["memory_usage_high"].warning_value:
resource_usage_alerts.labels(
resource_type="memory",
threshold_type="warning"
if memory_mb
< self._anomaly_thresholds["memory_usage_high"].critical_value
else "critical",
).inc()
def track_database_operation(
self, query_type: str, table_name: str, operation: str, duration: float
):
"""Track database operation performance."""
database_query_duration.labels(
query_type=query_type, table_name=table_name, operation=operation
).observe(duration)
def track_cache_operation(
self, cache_type: str, operation: str, hit: bool, key_pattern: str = "general"
):
"""Track cache operation performance."""
status = "hit" if hit else "miss"
cache_operations_total.labels(
cache_type=cache_type, operation=operation, status=status
).inc()
def detect_anomaly(self, anomaly_type: str, severity: str, context: dict[str, Any]):
"""Record detected anomaly."""
strategy_name = context.get("strategy_name", "unknown")
strategy_performance_anomalies.labels(
strategy_name=strategy_name, anomaly_type=anomaly_type, severity=severity
).inc()
self.logger.warning(
f"Anomaly detected: {anomaly_type} (severity: {severity})",
extra={"context": context},
)
def update_portfolio_metrics(
self,
portfolio_id: str,
portfolio_value_usd: float,
daily_pnl_usd: float,
strategy: str,
positions: list[dict[str, Any]],
):
"""Update portfolio-related metrics."""
portfolio_value.labels(portfolio_id=portfolio_id, currency="USD").set(
portfolio_value_usd
)
portfolio_daily_pnl.labels(
portfolio_id=portfolio_id, strategy=strategy
).observe(daily_pnl_usd)
# Update position counts
for position in positions:
active_positions.labels(
portfolio_id=portfolio_id,
symbol=position.get("symbol", "unknown"),
position_type=position.get("type", "long"),
).set(position.get("quantity", 0))
def _get_memory_usage(self) -> float:
"""Get current memory usage in MB."""
try:
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024
except ImportError:
return 0.0
def _categorize_data_size(self, data_points: int) -> str:
"""Categorize data size for metrics labeling."""
if data_points < 100:
return "small"
elif data_points < 1000:
return "medium"
elif data_points < 10000:
return "large"
else:
return "xlarge"
def _categorize_complexity(self, data_points: int, duration: float) -> str:
"""Categorize operation complexity."""
if data_points < 1000 and duration < 10:
return "simple"
elif data_points < 10000 and duration < 60:
return "moderate"
else:
return "complex"
def _categorize_error_code(self, status_code: int) -> str:
"""Categorize HTTP error codes."""
if 400 <= status_code < 500:
return "client_error"
elif 500 <= status_code < 600:
return "server_error"
else:
return "other"
def _check_execution_anomalies(
self, strategy_name: str, duration: float, memory_mb: float
):
"""Check for execution performance anomalies."""
threshold = self._anomaly_thresholds["execution_time_high"]
if duration > threshold.critical_value:
self.detect_anomaly(
"execution_time_high",
"critical",
{
"strategy_name": strategy_name,
"duration": duration,
"threshold": threshold.critical_value,
},
)
elif duration > threshold.warning_value:
self.detect_anomaly(
"execution_time_high",
"warning",
{
"strategy_name": strategy_name,
"duration": duration,
"threshold": threshold.warning_value,
},
)
def _check_strategy_anomalies(
self,
strategy_name: str,
sharpe_ratio: float,
max_drawdown: float,
win_rate: float,
):
"""Check for strategy performance anomalies."""
# Check Sharpe ratio
threshold = self._anomaly_thresholds["sharpe_ratio_low"]
if sharpe_ratio < threshold.critical_value:
self.detect_anomaly(
"sharpe_ratio_low",
"critical",
{"strategy_name": strategy_name, "sharpe_ratio": sharpe_ratio},
)
elif sharpe_ratio < threshold.warning_value:
self.detect_anomaly(
"sharpe_ratio_low",
"warning",
{"strategy_name": strategy_name, "sharpe_ratio": sharpe_ratio},
)
# Check max drawdown
threshold = self._anomaly_thresholds["max_drawdown_high"]
if max_drawdown > threshold.critical_value:
self.detect_anomaly(
"max_drawdown_high",
"critical",
{"strategy_name": strategy_name, "max_drawdown": max_drawdown},
)
elif max_drawdown > threshold.warning_value:
self.detect_anomaly(
"max_drawdown_high",
"warning",
{"strategy_name": strategy_name, "max_drawdown": max_drawdown},
)
# Check win rate
threshold = self._anomaly_thresholds["win_rate_low"]
if win_rate < threshold.critical_value:
self.detect_anomaly(
"win_rate_low",
"critical",
{"strategy_name": strategy_name, "win_rate": win_rate},
)
elif win_rate < threshold.warning_value:
self.detect_anomaly(
"win_rate_low",
"warning",
{"strategy_name": strategy_name, "win_rate": win_rate},
)
def _check_api_anomalies(
self, provider: str, endpoint: str, status_code: int, duration: float
):
"""Check for API call anomalies."""
# Check API response time
if duration > 30.0: # 30 second threshold
self.detect_anomaly(
"api_response_slow",
"warning" if duration < 60.0 else "critical",
{"provider": provider, "endpoint": endpoint, "duration": duration},
)
# Check for repeated failures
if status_code >= 500:
self.detect_anomaly(
"api_server_error",
"critical",
{
"provider": provider,
"endpoint": endpoint,
"status_code": status_code,
},
)
def get_metrics_text(self) -> str:
"""Get all backtesting metrics in Prometheus text format."""
return generate_latest(BACKTESTING_REGISTRY).decode("utf-8")
# =============================================================================
# GLOBAL INSTANCES AND CONVENIENCE FUNCTIONS
# =============================================================================
# Global metrics collector instance
_metrics_collector: BacktestingMetricsCollector | None = None
_collector_lock = threading.Lock()
def get_backtesting_metrics() -> BacktestingMetricsCollector:
"""Get or create the global backtesting metrics collector."""
global _metrics_collector
if _metrics_collector is None:
with _collector_lock:
if _metrics_collector is None:
_metrics_collector = BacktestingMetricsCollector()
return _metrics_collector
# Convenience functions for common operations
def track_backtest_execution(
strategy_name: str, symbol: str, timeframe: str, data_points: int = 0
):
"""Convenience function to track backtest execution."""
return get_backtesting_metrics().track_backtest_execution(
strategy_name, symbol, timeframe, data_points
)
def track_strategy_performance(
strategy_name: str,
symbol: str,
period: str,
returns: float,
sharpe_ratio: float,
max_drawdown: float,
win_rate: float,
total_trades: int,
winning_trades: int,
):
"""Convenience function to track strategy performance."""
get_backtesting_metrics().track_strategy_performance(
strategy_name,
symbol,
period,
returns,
sharpe_ratio,
max_drawdown,
win_rate,
total_trades,
winning_trades,
)
def track_api_call_metrics(
provider: str,
endpoint: str,
method: str,
status_code: int,
duration: float,
error_type: str | None = None,
remaining_calls: int | None = None,
reset_time: datetime | None = None,
):
"""Convenience function to track API call metrics."""
get_backtesting_metrics().track_api_call(
provider,
endpoint,
method,
status_code,
duration,
error_type,
remaining_calls,
reset_time,
)
def track_anomaly_detection(anomaly_type: str, severity: str, context: dict[str, Any]):
"""Convenience function to track detected anomalies."""
get_backtesting_metrics().detect_anomaly(anomaly_type, severity, context)
def get_metrics_for_prometheus() -> str:
"""Get backtesting metrics in Prometheus format."""
return get_backtesting_metrics().get_metrics_text()
```
--------------------------------------------------------------------------------
/maverick_mcp/backtesting/strategies/ml/adaptive.py:
--------------------------------------------------------------------------------
```python
"""Adaptive trading strategies with online learning and parameter adjustment."""
import logging
from typing import Any
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
from maverick_mcp.backtesting.strategies.base import Strategy
logger = logging.getLogger(__name__)
class AdaptiveStrategy(Strategy):
"""Base class for adaptive strategies that adjust parameters based on performance."""
def __init__(
self,
base_strategy: Strategy,
adaptation_method: str = "gradient",
learning_rate: float = 0.01,
lookback_period: int = 50,
adaptation_frequency: int = 10,
parameters: dict[str, Any] | None = None,
):
"""Initialize adaptive strategy.
Args:
base_strategy: Base strategy to adapt
adaptation_method: Method for parameter adaptation
learning_rate: Learning rate for parameter updates
lookback_period: Period for performance evaluation
adaptation_frequency: How often to adapt parameters
parameters: Additional parameters
"""
super().__init__(parameters)
self.base_strategy = base_strategy
self.adaptation_method = adaptation_method
self.learning_rate = learning_rate
self.lookback_period = lookback_period
self.adaptation_frequency = adaptation_frequency
# Performance tracking
self.performance_history = []
self.parameter_history = []
self.last_adaptation = 0
# Store original parameters for reference
self.original_parameters = base_strategy.parameters.copy()
@property
def name(self) -> str:
"""Get strategy name."""
return f"Adaptive({self.base_strategy.name})"
@property
def description(self) -> str:
"""Get strategy description."""
return f"Adaptive version of {self.base_strategy.name} using {self.adaptation_method} method"
def calculate_performance_metric(self, returns: Series) -> float:
"""Calculate performance metric for parameter adaptation.
Args:
returns: Strategy returns
Returns:
Performance score
"""
if len(returns) == 0:
return 0.0
# Use Sharpe ratio as default performance metric
if returns.std() == 0:
return 0.0
sharpe = returns.mean() / returns.std() * np.sqrt(252)
# Alternative metrics could be:
# - Calmar ratio: returns.mean() / abs(max_drawdown)
# - Sortino ratio: returns.mean() / downside_deviation
# - Information ratio: excess_returns.mean() / tracking_error
return sharpe
def adapt_parameters_gradient(self, performance_gradient: float) -> None:
"""Adapt parameters using gradient-based method.
Args:
performance_gradient: Gradient of performance with respect to parameters
"""
adaptable_params = self.get_adaptable_parameters()
for param_name, param_info in adaptable_params.items():
if param_name in self.base_strategy.parameters:
current_value = self.base_strategy.parameters[param_name]
# Calculate parameter update
param_gradient = performance_gradient * self.learning_rate
# Apply bounds and constraints
min_val = param_info.get("min", current_value * 0.5)
max_val = param_info.get("max", current_value * 2.0)
step_size = param_info.get("step", abs(current_value) * 0.01)
# Update parameter
new_value = current_value + param_gradient * step_size
new_value = max(min_val, min(max_val, new_value))
self.base_strategy.parameters[param_name] = new_value
logger.debug(
f"Adapted {param_name}: {current_value:.4f} -> {new_value:.4f}"
)
def adapt_parameters_random_search(self) -> None:
"""Adapt parameters using random search with performance feedback."""
adaptable_params = self.get_adaptable_parameters()
# Try random perturbations and keep improvements
for param_name, param_info in adaptable_params.items():
if param_name in self.base_strategy.parameters:
current_value = self.base_strategy.parameters[param_name]
# Generate random perturbation
min_val = param_info.get("min", current_value * 0.5)
max_val = param_info.get("max", current_value * 2.0)
# Small random step
perturbation = np.random.normal(0, abs(current_value) * 0.1)
new_value = current_value + perturbation
new_value = max(min_val, min(max_val, new_value))
# Store new value for trial
self.base_strategy.parameters[param_name] = new_value
# Note: Performance evaluation would happen in next cycle
# For now, keep the change and let performance tracking decide
def get_adaptable_parameters(self) -> dict[str, dict]:
"""Get parameters that can be adapted.
Returns:
Dictionary of adaptable parameters with their constraints
"""
# Default adaptable parameters - can be overridden by subclasses
return {
"lookback_period": {"min": 5, "max": 200, "step": 1},
"threshold": {"min": 0.001, "max": 0.1, "step": 0.001},
"window": {"min": 5, "max": 100, "step": 1},
"period": {"min": 5, "max": 200, "step": 1},
}
def adapt_parameters(self, recent_performance: float) -> None:
"""Adapt strategy parameters based on recent performance.
Args:
recent_performance: Recent performance metric
"""
try:
if self.adaptation_method == "gradient":
# Approximate gradient based on performance change
if len(self.performance_history) > 1:
performance_gradient = (
recent_performance - self.performance_history[-2]
)
self.adapt_parameters_gradient(performance_gradient)
elif self.adaptation_method == "random_search":
# Use random search with performance feedback
self.adapt_parameters_random_search()
else:
logger.warning(f"Unknown adaptation method: {self.adaptation_method}")
# Store adapted parameters
self.parameter_history.append(self.base_strategy.parameters.copy())
except Exception as e:
logger.error(f"Error adapting parameters: {e}")
def generate_signals(self, data: DataFrame) -> tuple[Series, Series]:
"""Generate adaptive trading signals.
Args:
data: Price data with OHLCV columns
Returns:
Tuple of (entry_signals, exit_signals) as boolean Series
"""
try:
# Generate signals from base strategy
entry_signals, exit_signals = self.base_strategy.generate_signals(data)
# Calculate strategy performance for adaptation
positions = entry_signals.astype(int) - exit_signals.astype(int)
returns = positions.shift(1) * data["close"].pct_change()
# Track performance over time for adaptation
for idx in range(
self.adaptation_frequency, len(data), self.adaptation_frequency
):
if idx > self.last_adaptation + self.adaptation_frequency:
# Calculate recent performance
recent_returns = returns.iloc[
max(0, idx - self.lookback_period) : idx
]
if len(recent_returns) > 0:
recent_performance = self.calculate_performance_metric(
recent_returns
)
self.performance_history.append(recent_performance)
# Adapt parameters based on performance
self.adapt_parameters(recent_performance)
# Re-generate signals with adapted parameters
entry_signals, exit_signals = (
self.base_strategy.generate_signals(data)
)
self.last_adaptation = idx
return entry_signals, exit_signals
except Exception as e:
logger.error(f"Error generating adaptive signals: {e}")
return pd.Series(False, index=data.index), pd.Series(
False, index=data.index
)
def get_adaptation_history(self) -> dict[str, Any]:
"""Get history of parameter adaptations.
Returns:
Dictionary with adaptation history
"""
return {
"performance_history": self.performance_history,
"parameter_history": self.parameter_history,
"current_parameters": self.base_strategy.parameters,
"original_parameters": self.original_parameters,
}
def reset_to_original(self) -> None:
"""Reset strategy parameters to original values."""
self.base_strategy.parameters = self.original_parameters.copy()
self.performance_history = []
self.parameter_history = []
self.last_adaptation = 0
class OnlineLearningStrategy(Strategy):
"""Strategy with online learning capabilities using streaming ML algorithms."""
def __init__(
self,
model_type: str = "sgd",
update_frequency: int = 10,
feature_window: int = 20,
confidence_threshold: float = 0.6,
min_training_samples: int = 100,
initial_training_period: int = 200,
parameters: dict[str, Any] | None = None,
):
"""Initialize online learning strategy.
Args:
model_type: Type of online learning model
update_frequency: How often to update the model
feature_window: Window for feature calculation
confidence_threshold: Minimum confidence for signals
min_training_samples: Minimum samples before enabling predictions
initial_training_period: Period for initial batch training
parameters: Additional parameters
"""
super().__init__(parameters)
self.model_type = model_type
self.update_frequency = update_frequency
self.feature_window = feature_window
self.confidence_threshold = confidence_threshold
self.min_training_samples = min_training_samples
self.initial_training_period = initial_training_period
# Initialize online learning components
self.model = None
self.scaler = None
self.is_trained = False
self.is_initial_trained = False
self.training_buffer = []
self.last_update = 0
self.training_samples_count = 0
# Feature consistency tracking
self.expected_feature_count = None
self.feature_stats_buffer = []
self._initialize_model()
def _initialize_model(self):
"""Initialize online learning model with proper configuration."""
if self.model_type == "sgd":
self.model = SGDClassifier(
loss="log_loss",
learning_rate="adaptive",
eta0=0.01,
random_state=42,
max_iter=1000,
tol=1e-4,
warm_start=True, # Enable incremental learning
alpha=0.01, # Regularization
fit_intercept=True,
)
else:
raise ValueError(f"Unsupported model type: {self.model_type}")
# Initialize scaler as None - will be created during first fit
self.scaler = None
@property
def name(self) -> str:
"""Get strategy name."""
return f"OnlineLearning({self.model_type.upper()})"
@property
def description(self) -> str:
"""Get strategy description."""
return (
f"Online learning strategy using {self.model_type} with streaming updates"
)
def extract_features(self, data: DataFrame, end_idx: int) -> np.ndarray:
"""Extract features for online learning with enhanced stability.
Args:
data: Price data
end_idx: End index for feature calculation
Returns:
Feature array with consistent dimensionality
"""
try:
start_idx = max(0, end_idx - self.feature_window)
window_data = data.iloc[start_idx : end_idx + 1]
# Need minimum data for meaningful features
if len(window_data) < max(5, self.feature_window // 4):
return np.array([])
features = []
# Price features with error handling
returns = window_data["close"].pct_change().dropna()
if len(returns) == 0:
return np.array([])
# Basic return statistics (robust to small samples)
mean_return = returns.mean() if len(returns) > 0 else 0.0
std_return = returns.std() if len(returns) > 1 else 0.01 # Small default
skew_return = returns.skew() if len(returns) > 3 else 0.0
kurt_return = returns.kurtosis() if len(returns) > 3 else 0.0
# Replace NaN/inf values
features.extend(
[
mean_return if np.isfinite(mean_return) else 0.0,
std_return if np.isfinite(std_return) else 0.01,
skew_return if np.isfinite(skew_return) else 0.0,
kurt_return if np.isfinite(kurt_return) else 0.0,
]
)
# Technical indicators with fallbacks
current_price = window_data["close"].iloc[-1]
# Short-term moving average ratio
if len(window_data) >= 5:
sma_5 = window_data["close"].rolling(5).mean().iloc[-1]
features.append(current_price / sma_5 if sma_5 > 0 else 1.0)
else:
features.append(1.0)
# Medium-term moving average ratio
if len(window_data) >= 10:
sma_10 = window_data["close"].rolling(10).mean().iloc[-1]
features.append(current_price / sma_10 if sma_10 > 0 else 1.0)
else:
features.append(1.0)
# Long-term moving average ratio (if enough data)
if len(window_data) >= 20:
sma_20 = window_data["close"].rolling(20).mean().iloc[-1]
features.append(current_price / sma_20 if sma_20 > 0 else 1.0)
else:
features.append(1.0)
# Volatility feature
if len(returns) > 10:
vol_ratio = std_return / returns.rolling(10).std().mean()
features.append(vol_ratio if np.isfinite(vol_ratio) else 1.0)
else:
features.append(1.0)
# Volume features (if available)
if "volume" in window_data.columns and len(window_data) >= 5:
current_volume = window_data["volume"].iloc[-1]
volume_ma = window_data["volume"].rolling(5).mean().iloc[-1]
volume_ratio = current_volume / volume_ma if volume_ma > 0 else 1.0
features.append(volume_ratio if np.isfinite(volume_ratio) else 1.0)
# Volume trend
if len(window_data) >= 10:
volume_ma_long = window_data["volume"].rolling(10).mean().iloc[-1]
volume_trend = (
volume_ma / volume_ma_long if volume_ma_long > 0 else 1.0
)
features.append(volume_trend if np.isfinite(volume_trend) else 1.0)
else:
features.append(1.0)
else:
features.extend([1.0, 1.0])
feature_array = np.array(features)
# Validate feature consistency
if self.expected_feature_count is None:
self.expected_feature_count = len(feature_array)
elif len(feature_array) != self.expected_feature_count:
logger.warning(
f"Feature count mismatch: expected {self.expected_feature_count}, got {len(feature_array)}"
)
return np.array([])
# Check for any remaining NaN or inf values
if not np.all(np.isfinite(feature_array)):
logger.warning("Non-finite features detected, replacing with defaults")
feature_array = np.nan_to_num(
feature_array, nan=0.0, posinf=1.0, neginf=-1.0
)
return feature_array
except Exception as e:
logger.error(f"Error extracting features: {e}")
return np.array([])
def create_target(self, data: DataFrame, idx: int, forward_periods: int = 3) -> int:
"""Create target variable for online learning.
Args:
data: Price data
idx: Current index
forward_periods: Periods to look forward
Returns:
Target class (0: sell, 1: hold, 2: buy)
"""
if idx + forward_periods >= len(data):
return 1 # Hold as default
current_price = data["close"].iloc[idx]
future_price = data["close"].iloc[idx + forward_periods]
return_threshold = 0.02 # 2% threshold
forward_return = (future_price - current_price) / current_price
if forward_return > return_threshold:
return 2 # Buy
elif forward_return < -return_threshold:
return 0 # Sell
else:
return 1 # Hold
def _initial_training(self, data: DataFrame, current_idx: int) -> bool:
"""Perform initial batch training on historical data.
Args:
data: Price data
current_idx: Current index
Returns:
True if initial training successful
"""
try:
if current_idx < self.initial_training_period:
return False
# Collect initial training data
training_examples = []
training_targets = []
# Use a substantial portion of historical data for initial training
start_idx = max(
self.feature_window, current_idx - self.initial_training_period
)
for idx in range(
start_idx, current_idx - 10
): # Leave some data for validation
features = self.extract_features(data, idx)
if len(features) > 0:
target = self.create_target(data, idx)
training_examples.append(features)
training_targets.append(target)
if len(training_examples) < self.min_training_samples:
logger.debug(
f"Insufficient training samples: {len(training_examples)} < {self.min_training_samples}"
)
return False
X = np.array(training_examples)
y = np.array(training_targets)
# Check for class balance
unique_classes, class_counts = np.unique(y, return_counts=True)
if len(unique_classes) < 2:
logger.warning(
f"Insufficient class diversity for training: {unique_classes}"
)
return False
# Initialize scaler with training data
self.scaler = StandardScaler()
X_scaled = self.scaler.fit_transform(X)
# Train initial model
self.model.fit(X_scaled, y)
self.is_initial_trained = True
self.is_trained = True
self.training_samples_count = len(X)
logger.info(
f"Initial training completed with {len(X)} samples, classes: {dict(zip(unique_classes, class_counts, strict=False))}"
)
return True
except Exception as e:
logger.error(f"Error in initial training: {e}")
return False
def update_model(self, data: DataFrame, current_idx: int) -> None:
"""Update online learning model with new data.
Args:
data: Price data
current_idx: Current index
"""
# Perform initial training if not done yet
if not self.is_initial_trained:
if self._initial_training(data, current_idx):
self.last_update = current_idx
return
# Check if update is needed
if current_idx - self.last_update < self.update_frequency:
return
try:
# Collect recent training examples for incremental update
recent_examples = []
recent_targets = []
# Look back for recent training examples (smaller window for incremental updates)
lookback_start = max(0, current_idx - self.update_frequency)
for idx in range(lookback_start, current_idx):
features = self.extract_features(data, idx)
if len(features) > 0:
target = self.create_target(data, idx)
recent_examples.append(features)
recent_targets.append(target)
if len(recent_examples) < 2: # Need minimum samples for incremental update
return
X = np.array(recent_examples)
y = np.array(recent_targets)
# Scale features using existing scaler
X_scaled = self.scaler.transform(X)
# Incremental update using partial_fit
# Ensure we have all classes that the model has seen before
existing_classes = self.model.classes_
self.model.partial_fit(X_scaled, y, classes=existing_classes)
self.training_samples_count += len(X)
self.last_update = current_idx
logger.debug(
f"Updated online model at index {current_idx} with {len(X)} samples (total: {self.training_samples_count})"
)
except Exception as e:
logger.error(f"Error updating online model: {e}")
# Reset training flag to attempt re-initialization
if "partial_fit" in str(e).lower():
logger.warning("Partial fit failed, will attempt re-initialization")
self.is_trained = False
self.is_initial_trained = False
def generate_signals(self, data: DataFrame) -> tuple[Series, Series]:
"""Generate signals using online learning.
Args:
data: Price data with OHLCV columns
Returns:
Tuple of (entry_signals, exit_signals) as boolean Series
"""
entry_signals = pd.Series(False, index=data.index)
exit_signals = pd.Series(False, index=data.index)
try:
# Need minimum data for features
start_idx = max(self.feature_window, self.initial_training_period + 10)
if len(data) < start_idx:
logger.warning(
f"Insufficient data for online learning: {len(data)} < {start_idx}"
)
return entry_signals, exit_signals
for idx in range(start_idx, len(data)):
# Update model periodically
self.update_model(data, idx)
if not self.is_trained or self.scaler is None:
continue
# Extract features for current point
features = self.extract_features(data, idx)
if len(features) == 0:
continue
try:
# Make prediction with error handling
X = self.scaler.transform([features])
prediction = self.model.predict(X)[0]
# Get confidence score
if hasattr(self.model, "predict_proba"):
probabilities = self.model.predict_proba(X)[0]
confidence = max(probabilities)
else:
# For models without predict_proba, use decision function
if hasattr(self.model, "decision_function"):
decision_values = self.model.decision_function(X)[0]
# Convert to pseudo-probability (sigmoid-like)
confidence = 1.0 / (1.0 + np.exp(-abs(decision_values)))
else:
confidence = 0.6 # Default confidence
# Generate signals based on prediction and confidence
if confidence >= self.confidence_threshold:
if prediction == 2: # Buy signal
entry_signals.iloc[idx] = True
elif prediction == 0: # Sell signal
exit_signals.iloc[idx] = True
except Exception as pred_error:
logger.debug(f"Prediction error at index {idx}: {pred_error}")
continue
# Log summary statistics
total_entry_signals = entry_signals.sum()
total_exit_signals = exit_signals.sum()
logger.info(
f"Generated {total_entry_signals} entry and {total_exit_signals} exit signals using online learning"
)
except Exception as e:
logger.error(f"Error generating online learning signals: {e}")
return entry_signals, exit_signals
def get_model_info(self) -> dict[str, Any]:
"""Get information about the online learning model.
Returns:
Dictionary with model information
"""
info = {
"model_type": self.model_type,
"is_trained": self.is_trained,
"is_initial_trained": self.is_initial_trained,
"feature_window": self.feature_window,
"update_frequency": self.update_frequency,
"confidence_threshold": self.confidence_threshold,
"min_training_samples": self.min_training_samples,
"initial_training_period": self.initial_training_period,
"training_samples_count": self.training_samples_count,
"expected_feature_count": self.expected_feature_count,
}
if hasattr(self.model, "coef_") and self.model.coef_ is not None:
info["model_coefficients"] = self.model.coef_.tolist()
if hasattr(self.model, "classes_") and self.model.classes_ is not None:
info["model_classes"] = self.model.classes_.tolist()
if self.scaler is not None:
info["feature_scaling"] = {
"mean": self.scaler.mean_.tolist()
if hasattr(self.scaler, "mean_")
else None,
"scale": self.scaler.scale_.tolist()
if hasattr(self.scaler, "scale_")
else None,
}
return info
class HybridAdaptiveStrategy(AdaptiveStrategy):
"""Hybrid strategy combining parameter adaptation with online learning."""
def __init__(
self, base_strategy: Strategy, online_learning_weight: float = 0.3, **kwargs
):
"""Initialize hybrid adaptive strategy.
Args:
base_strategy: Base strategy to adapt
online_learning_weight: Weight for online learning component
**kwargs: Additional parameters for AdaptiveStrategy
"""
super().__init__(base_strategy, **kwargs)
self.online_learning_weight = online_learning_weight
self.online_strategy = OnlineLearningStrategy()
@property
def name(self) -> str:
"""Get strategy name."""
return f"HybridAdaptive({self.base_strategy.name})"
@property
def description(self) -> str:
"""Get strategy description."""
return "Hybrid adaptive strategy combining parameter adaptation with online learning"
def generate_signals(self, data: DataFrame) -> tuple[Series, Series]:
"""Generate hybrid adaptive signals.
Args:
data: Price data with OHLCV columns
Returns:
Tuple of (entry_signals, exit_signals) as boolean Series
"""
# Get signals from adaptive base strategy
adaptive_entry, adaptive_exit = super().generate_signals(data)
# Get signals from online learning component
online_entry, online_exit = self.online_strategy.generate_signals(data)
# Combine signals with weighting
base_weight = 1.0 - self.online_learning_weight
# Weighted combination for entry signals
combined_entry = (
adaptive_entry.astype(float) * base_weight
+ online_entry.astype(float) * self.online_learning_weight
) > 0.5
# Weighted combination for exit signals
combined_exit = (
adaptive_exit.astype(float) * base_weight
+ online_exit.astype(float) * self.online_learning_weight
) > 0.5
return combined_entry, combined_exit
def get_hybrid_info(self) -> dict[str, Any]:
"""Get information about hybrid strategy components.
Returns:
Dictionary with hybrid strategy information
"""
return {
"adaptation_history": self.get_adaptation_history(),
"online_learning_info": self.online_strategy.get_model_info(),
"online_learning_weight": self.online_learning_weight,
"base_weight": 1.0 - self.online_learning_weight,
}
```