This is page 21 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/maverick_mcp/providers/macro_data.py:
--------------------------------------------------------------------------------
```python
"""
Macroeconomic data providers and utilities for Maverick-MCP.
Provides GDP, inflation rate, unemployment rate, and other macroeconomic indicators.
"""
import logging
import os
from datetime import UTC, datetime, timedelta
import pandas as pd
from dotenv import load_dotenv
from sklearn.preprocessing import MinMaxScaler
from maverick_mcp.utils.circuit_breaker_decorators import (
with_economic_data_circuit_breaker,
)
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("maverick_mcp.macro_data")
# Configuration
FRED_API_KEY = os.getenv("FRED_API_KEY", "")
class MacroDataProvider:
"""Provider for macroeconomic data using FRED API."""
MAX_WINDOW_DAYS = 365
def __init__(self, window_days: int = MAX_WINDOW_DAYS):
try:
from fredapi import Fred
self.fred = Fred(api_key=FRED_API_KEY)
self.scaler = MinMaxScaler()
self.window_days = window_days
self.historical_data_bounds: dict[str, dict[str, str]] = {}
self.update_historical_bounds()
# For momentum swings, shorter lookbacks
self.lookback_days = 30
# Weights for macro sentiment
self.weights = {
# Short-term signals (60% total)
"vix": 0.20,
"sp500_momentum": 0.20,
"nasdaq_momentum": 0.15,
"usd_momentum": 0.05,
# Macro signals (40% total)
"inflation_rate": 0.15,
"gdp_growth_rate": 0.15,
"unemployment_rate": 0.10,
}
self.previous_sentiment_score = None
except ImportError:
logger.error(
"fredapi not installed. Please install with 'pip install fredapi'"
)
raise
@with_economic_data_circuit_breaker(
use_fallback=False
) # Fallback handled at method level
def _get_fred_series(
self, series_id: str, start_date: str, end_date: str
) -> pd.Series:
"""
Get FRED series data with circuit breaker protection.
Args:
series_id: FRED series identifier
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
Returns:
Pandas Series with the data
"""
return self.fred.get_series(series_id, start_date, end_date)
def _calculate_weighted_rolling_performance(
self, series_id: str, lookbacks: list[int], weights: list[float]
) -> float:
"""
Utility method to compute a weighted performance measure over multiple rolling windows.
For example, if lookbacks = [30, 90, 180] and weights = [0.5, 0.3, 0.2],
it calculates performance for each window and returns the sum of the weighted values.
"""
if len(lookbacks) != len(weights):
logger.error("Lookbacks and weights must have the same length.")
return 0.0
end_date = datetime.now(UTC)
total_performance = 0.0
for days, w in zip(lookbacks, weights, strict=False):
start_date = end_date - timedelta(days=days)
series_data = self._get_fred_series(
series_id,
start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"),
)
# Ensure we have a pandas Series, then clean it
if isinstance(series_data, pd.Series):
df = series_data.dropna()
if not df.empty:
# Simple rolling mean to reduce single-day spikes
df = df.rolling(window=2).mean().dropna()
if not df.empty:
start_price = df.iloc[0]
end_price = df.iloc[-1]
performance = ((end_price - start_price) / start_price) * 100
total_performance += performance * w
else:
logger.warning(
f"No FRED data for {series_id} over last {days} days."
)
else:
logger.warning(
f"Unexpected data type from FRED API for {series_id}: {type(series_data)}"
)
return total_performance
def get_sp500_performance(self) -> float:
"""
Calculate a multi-timeframe rolling performance for S&P 500 (similar to SPY).
Example using 1-month, 3-month, and 6-month lookbacks with custom weights.
"""
try:
lookbacks = [30, 90, 180]
weights = [0.5, 0.3, 0.2]
return self._calculate_weighted_rolling_performance(
"SP500", lookbacks, weights
)
except Exception as e:
logger.error(f"Error fetching S&P 500 rolling performance: {e}")
return 0.0
def get_nasdaq_performance(self) -> float:
"""
Calculate a multi-timeframe rolling performance for NASDAQ-100 (similar to QQQ).
Example using 1-month, 3-month, and 6-month lookbacks with custom weights.
"""
try:
lookbacks = [30, 90, 180]
weights = [0.5, 0.3, 0.2]
return self._calculate_weighted_rolling_performance(
"NASDAQ100", lookbacks, weights
)
except Exception as e:
logger.error(f"Error fetching NASDAQ rolling performance: {e}")
return 0.0
def get_gdp_growth_rate(self):
"""
Fetch GDP growth rate with retry logic and better error handling.
"""
try:
# Get last 2 quarters of data to ensure we have the latest
end_date = datetime.now(UTC)
start_date = end_date - timedelta(days=180)
data = self._get_fred_series(
"A191RL1Q225SBEA",
start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"),
)
if data.empty:
logger.warning("No GDP data available from FRED")
return {"current": 0.0, "previous": 0.0}
# Get last two values
last_two = data.tail(2)
if len(last_two) >= 2:
return {
"current": float(last_two.iloc[-1]),
"previous": float(last_two.iloc[-2]),
}
return {
"current": float(last_two.iloc[-1]),
"previous": float(last_two.iloc[-1]),
}
except Exception as e:
logger.error(f"Error fetching GDP growth rate: {e}")
return {"current": 0.0, "previous": 0.0}
def get_unemployment_rate(self):
try:
# Get recent unemployment data
end_date = datetime.now(UTC)
start_date = end_date - timedelta(days=90)
series_data = self._get_fred_series(
"UNRATE", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")
)
if not isinstance(series_data, pd.Series):
logger.error(
f"Expected pandas Series from FRED API, got {type(series_data)}"
)
return {"current": 0.0, "previous": 0.0}
data = series_data.dropna()
if len(data) >= 2:
return {
"current": float(data.iloc[-1]),
"previous": float(data.iloc[-2]),
}
return {"current": float(data.iloc[-1]), "previous": float(data.iloc[-1])}
except Exception as e:
logger.error(f"Error fetching Unemployment rate: {e}")
return {"current": None, "previous": None}
def get_inflation_rate(self):
"""
Fetch the annual core inflation rate based on CPI data from FRED.
Uses CPILFESL (Core CPI: All Items Less Food and Energy).
"""
try:
end_date = datetime.now(UTC)
# Get ~5 years of data to ensure we have enough
start_date = end_date - timedelta(days=5 * 365)
# 1) Fetch monthly CPILFESL data from FRED
series_data = self.fred.get_series(
"CPILFESL",
observation_start=start_date.strftime("%Y-%m-%d"),
observation_end=end_date.strftime("%Y-%m-%d"),
)
# 2) Ensure it's a pandas Series and clean it
if not isinstance(series_data, pd.Series):
logger.error(
f"Expected pandas Series from FRED API, got {type(series_data)}"
)
return {"current": None, "previous": None, "bounds": (None, None)}
data = series_data.dropna().sort_index()
# Optional: Force a monthly freq alignment in case data is stored daily
data = data.asfreq("MS").dropna()
if data.empty:
logger.error("No inflation data available from FRED")
return {"current": None, "previous": None, "bounds": (None, None)}
# 3) The latest monthly index is now guaranteed to be the first of the month
latest_idx = data.index[-1]
latest_value = data.iloc[-1]
# 4) Get data for exactly one year prior (the matching month)
# Because we forced MS freq, this is typically just `iloc[-13]` (12 steps back),
# but let's keep the logic explicit:
if isinstance(latest_idx, pd.Timestamp):
year_ago_idx = latest_idx - pd.DateOffset(years=1)
else:
# Fallback for unexpected index types
year_ago_idx = pd.Timestamp(latest_idx) - pd.DateOffset(years=1)
# If your data is strictly monthly, you can do:
# year_ago_value = data.loc[year_ago_idx] # might fail if missing data
# Or fallback to "on or before" logic:
year_ago_series = data[data.index <= year_ago_idx]
if year_ago_series.empty:
logger.warning(
"Not enough data to get year-ago CPI. Using 0 as fallback."
)
current_inflation = 0.0
else:
year_ago_value = year_ago_series.iloc[-1]
current_inflation = (
(latest_value - year_ago_value) / year_ago_value
) * 100
# 5) Compute previous month's YoY
if isinstance(latest_idx, pd.Timestamp):
prev_month_idx = latest_idx - pd.DateOffset(months=1)
else:
prev_month_idx = pd.Timestamp(latest_idx) - pd.DateOffset(months=1)
prev_month_series = data[data.index <= prev_month_idx]
if prev_month_series.empty:
logger.warning("No data for previous month. Using 0 as fallback.")
previous_inflation = 0.0
else:
prev_month_value = prev_month_series.iloc[-1]
if isinstance(prev_month_idx, pd.Timestamp) and not pd.isna(
prev_month_idx
):
prev_year_ago_idx = prev_month_idx - pd.DateOffset(years=1)
else:
# Handle NaT or other types
prev_year_ago_idx = pd.Timestamp(prev_month_idx) - pd.DateOffset(
years=1
)
prev_year_ago_series = data[data.index <= prev_year_ago_idx]
if prev_year_ago_series.empty:
logger.warning(
"No data for previous year's month. Using 0 as fallback."
)
previous_inflation = 0.0
else:
prev_year_ago_value = prev_year_ago_series.iloc[-1]
previous_inflation = (
(prev_month_value - prev_year_ago_value) / prev_year_ago_value
) * 100
# 6) Optionally round
current_inflation = round(current_inflation, 2)
previous_inflation = round(previous_inflation, 2)
# 7) Compute bounds
yoy_changes = data.pct_change(periods=12) * 100
yoy_changes = yoy_changes.dropna()
if yoy_changes.empty:
inflation_min, inflation_max = 0.0, 0.0
else:
inflation_min = yoy_changes.min()
inflation_max = yoy_changes.max()
bounds = (round(inflation_min, 2), round(inflation_max, 2))
logger.info(
f"Core CPI (YoY): current={current_inflation}%, previous={previous_inflation}%"
)
return {
"current": current_inflation,
"previous": previous_inflation,
"bounds": bounds,
}
except Exception as e:
logger.error(f"Error fetching core inflation rate: {e}", exc_info=True)
return {"current": None, "previous": None, "bounds": (None, None)}
def get_vix(self) -> float | None:
"""Get VIX data from FRED."""
try:
import yfinance as yf
# Try Yahoo Finance first
ticker = yf.Ticker("^VIX")
data = ticker.history(period="1d")
if not data.empty:
return float(data["Close"].iloc[-1])
# fallback to FRED
end_date = datetime.now(UTC)
start_date = end_date - timedelta(days=7)
series_data = self.fred.get_series(
"VIXCLS", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")
)
if isinstance(series_data, pd.Series):
df = series_data.dropna()
if not df.empty:
return float(df.iloc[-1])
return None
except Exception as e:
logger.error(f"Error fetching VIX: {e}")
return None
def get_sp500_momentum(self) -> float:
"""
Calculate short-term momentum of the S&P 500 over multiple very short timeframes
(3-day, 7-day, 14-day). We also add a tiny rolling average to reduce spikiness.
"""
try:
end_date = datetime.now(UTC)
lookbacks = [3, 7, 14]
momentums = []
for days in lookbacks:
start_date = end_date - timedelta(days=days)
series_data = self.fred.get_series(
"SP500",
start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"),
)
if isinstance(series_data, pd.Series):
df = series_data.dropna()
df = df.rolling(window=2).mean().dropna()
if len(df) >= 2:
momentum = ((df.iloc[-1] - df.iloc[0]) / df.iloc[0]) * 100
momentums.append(momentum)
if momentums:
weighted: float = (
0.5 * momentums[0] + 0.3 * momentums[1] + 0.2 * momentums[2]
if len(momentums) == 3
else sum(momentums) / len(momentums)
)
return weighted
return 0.0
except Exception as e:
logger.error(f"Error fetching S&P 500 momentum: {e}")
return 0.0
def get_nasdaq_momentum(self) -> float:
"""
Calculate short-term momentum of the NASDAQ-100 over multiple timeframes
(3-day, 7-day, 14-day).
"""
try:
end_date = datetime.now(UTC)
lookbacks = [3, 7, 14]
momentums = []
for days in lookbacks:
start_date = end_date - timedelta(days=days + 5)
series_data = self.fred.get_series(
"NASDAQ100",
start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"),
)
if isinstance(series_data, pd.Series):
df = series_data.dropna()
df = df.rolling(window=2).mean().dropna()
if len(df) >= 2:
momentum = ((df.iloc[-1] - df.iloc[0]) / df.iloc[0]) * 100
momentums.append(momentum)
else:
logger.warning(f"Insufficient NASDAQ data for {days}-day lookback")
momentums.append(0.0)
if len(momentums) == 3:
result: float = (
0.5 * momentums[0] + 0.3 * momentums[1] + 0.2 * momentums[2]
)
return result
logger.warning("Insufficient data for NASDAQ momentum calculation")
return sum(momentums) / len(momentums) if momentums else 0.0
except Exception as e:
logger.error(f"Error fetching NASDAQ momentum: {e}")
return 0.0
def get_usd_momentum(self) -> float:
"""
Calculate USD momentum using DTWEXBGS (Broad USD Index) from FRED
over multiple short-term lookbacks (3-day, 7-day, 14-day).
"""
try:
end_date = datetime.now(UTC)
lookbacks = [3, 7, 14]
momentums = []
for days in lookbacks:
start_date = end_date - timedelta(days=days + 5)
df = self.fred.get_series(
"DTWEXBGS",
start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"),
)
df = df.dropna()
df = df.rolling(window=2).mean().dropna()
if len(df) >= 2:
first_valid = df.iloc[0]
last_valid = df.iloc[-1]
momentum = ((last_valid - first_valid) / first_valid) * 100
momentums.append(momentum)
else:
logger.warning(f"Insufficient USD data for {days}-day lookback")
momentums.append(0.0)
if len(momentums) == 3:
result: float = (
0.5 * momentums[0] + 0.3 * momentums[1] + 0.2 * momentums[2]
)
return result
logger.warning("Insufficient data for USD momentum calculation")
return sum(momentums) / len(momentums) if momentums else 0.0
except Exception as e:
logger.error(f"Error fetching USD momentum: {e}")
return 0.0
def update_historical_bounds(self):
"""
Update historical bounds based on the last `window_days` of data.
These bounds are used for normalization in `normalize_indicators()`.
"""
end_date = datetime.now(UTC)
start_date = end_date - timedelta(days=self.window_days)
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
indicators = {
"gdp_growth_rate": "A191RL1Q225SBEA",
"unemployment_rate": "UNRATE",
"inflation_rate": "CPILFESL",
"sp500_momentum": "SP500",
"nasdaq_momentum": "NASDAQCOM",
"vix": "VIXCLS",
}
for key, series_id in indicators.items():
try:
if key == "gdp_growth_rate":
data = self.fred.get_series(series_id, start_date_str, end_date_str)
elif key == "inflation_rate":
# For inflation bounds, get a wider historical range
wider_start = (end_date - timedelta(days=5 * 365)).strftime(
"%Y-%m-%d"
)
cpi = self.fred.get_series(series_id, wider_start, end_date_str)
cpi = cpi.dropna()
if len(cpi) > 13:
# Calculate year-over-year inflation rates
inflation_rates = []
for i in range(12, len(cpi)):
yoy_inflation = (
(cpi.iloc[i] - cpi.iloc[i - 12]) / cpi.iloc[i - 12]
) * 100
inflation_rates.append(yoy_inflation)
if inflation_rates:
data = pd.Series(inflation_rates)
else:
data = pd.Series([], dtype=float)
else:
# Not enough data for YoY calculation
data = pd.Series([], dtype=float)
elif key in ["sp500_momentum", "nasdaq_momentum"]:
df = self.fred.get_series(series_id, start_date_str, end_date_str)
df = df.dropna()
df = df.rolling(window=2).mean().dropna()
if not df.empty:
start_price = df.iloc[0]
end_price = df.iloc[-1]
performance = ((end_price - start_price) / start_price) * 100
data = pd.Series([performance], index=[df.index[-1]])
else:
data = pd.Series([], dtype=float)
else:
data = self.fred.get_series(series_id, start_date_str, end_date_str)
if not data.empty:
min_val = data.min()
max_val = data.max()
self.historical_data_bounds[key] = {"min": min_val, "max": max_val}
else:
self.historical_data_bounds[key] = self.default_bounds(key)
logger.warning(f"No data fetched for {key}. Using default bounds.")
except Exception as e:
logger.error(f"Error updating historical bounds for {key}: {e}")
self.historical_data_bounds[key] = self.default_bounds(key)
def default_bounds(self, key: str):
"""
Tighter or more relevant default bounds for short-term, but fix them
so we don't recalculate them daily in a way that swings the scale.
"""
default_bounds = {
"vix": {"min": 10.0, "max": 50.0},
"sp500_momentum": {"min": -15.0, "max": 15.0},
"nasdaq_momentum": {"min": -20.0, "max": 20.0},
"usd_momentum": {"min": -5.0, "max": 5.0},
"inflation_rate": {"min": 0.0, "max": 10.0},
"gdp_growth_rate": {"min": -2.0, "max": 6.0},
"unemployment_rate": {"min": 2.0, "max": 10.0},
}
return default_bounds.get(key, {"min": 0.0, "max": 1.0})
def normalize_indicators(self, indicators: dict) -> dict:
"""
Convert raw indicators to [0,1], with risk-off indicators inverted (lower is better).
"""
normalized = {}
for key, value in indicators.items():
if value is None:
normalized[key] = 0.5
continue
bounds = self.historical_data_bounds.get(key, self.default_bounds(key))
min_val = float(bounds["min"])
max_val = float(bounds["max"])
denom = max_val - min_val if (max_val != min_val) else 1e-9
norm_val = (value - min_val) / denom
if key in ["vix", "unemployment_rate", "inflation_rate"]:
norm_val = 1.0 - norm_val
norm_val = max(0.0, min(1.0, norm_val))
normalized[key] = norm_val
return normalized
def get_historical_data(self) -> dict:
"""Get historical data for all indicators over self.lookback_days."""
end_date = datetime.now(UTC)
start_date = end_date - timedelta(days=self.lookback_days)
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
try:
sp500_data = self.fred.get_series("SP500", start_date_str, end_date_str)
sp500_performance = []
if not sp500_data.empty:
first_value = sp500_data.iloc[0]
sp500_performance = [
(x - first_value) / first_value * 100 for x in sp500_data
]
nasdaq_data = self.fred.get_series(
"NASDAQ100", start_date_str, end_date_str
)
nasdaq_performance = []
if not nasdaq_data.empty:
first_value = nasdaq_data.iloc[0]
nasdaq_performance = [
(x - first_value) / first_value * 100 for x in nasdaq_data
]
vix_data = self.fred.get_series("VIXCLS", start_date_str, end_date_str)
vix_values = vix_data.tolist() if not vix_data.empty else []
gdp_data = self.fred.get_series(
"A191RL1Q225SBEA", start_date_str, end_date_str
)
gdp_values = gdp_data.tolist() if not gdp_data.empty else []
unemployment_data = self.fred.get_series(
"UNRATE", start_date_str, end_date_str
)
unemployment_values = (
unemployment_data.tolist() if not unemployment_data.empty else []
)
cpi_data = self.fred.get_series("CPILFESL", start_date_str, end_date_str)
inflation_values = []
if not cpi_data.empty and len(cpi_data) > 12:
inflation_values = [
((cpi_data.iloc[i] - cpi_data.iloc[i - 12]) / cpi_data.iloc[i - 12])
* 100
for i in range(12, len(cpi_data))
]
return {
"sp500_performance": sp500_performance,
"nasdaq_performance": nasdaq_performance,
"vix": vix_values,
"gdp_growth_rate": gdp_values,
"unemployment_rate": unemployment_values,
"inflation_rate": inflation_values,
}
except Exception as e:
logger.error(f"Error fetching historical data: {str(e)}")
return {
"sp500_performance": [],
"nasdaq_performance": [],
"vix": [],
"gdp_growth_rate": [],
"unemployment_rate": [],
"inflation_rate": [],
}
def get_macro_statistics(self):
"""
Main method to aggregate macro stats with better error handling and smoothing.
"""
try:
self.update_historical_bounds()
# Get inflation rate and bounds
inflation_data = self.get_inflation_rate()
gdp_data = self.get_gdp_growth_rate()
unemployment_data = self.get_unemployment_rate()
# Pull raw indicator values with safe defaults
indicators = {
"gdp_growth_rate": gdp_data["current"] or 0.0,
"gdp_growth_rate_previous": gdp_data["previous"] or 0.0,
"unemployment_rate": unemployment_data["current"] or 0.0,
"unemployment_rate_previous": unemployment_data["previous"] or 0.0,
"inflation_rate": inflation_data["current"] or 0.0,
"inflation_rate_previous": inflation_data["previous"] or 0.0,
"vix": self.get_vix() or 0.0,
"sp500_momentum": self.get_sp500_momentum() or 0.0,
"nasdaq_momentum": self.get_nasdaq_momentum() or 0.0,
"usd_momentum": self.get_usd_momentum() or 0.0,
}
# Normalize
normalized = self.normalize_indicators(indicators)
sentiment_score = sum(normalized[k] * self.weights[k] for k in self.weights)
sentiment_score = (sentiment_score / sum(self.weights.values())) * 100
sentiment_score = max(1, min(100, sentiment_score))
# Increase smoothing factor to reduce big overnight moves
if self.previous_sentiment_score is not None:
smoothing_factor = 0.8 # keep 80% old, 20% new
sentiment_score = (
smoothing_factor * self.previous_sentiment_score
+ (1 - smoothing_factor) * sentiment_score
)
self.previous_sentiment_score = sentiment_score
historical_data = self.get_historical_data()
# Return dictionary with all values guaranteed to be numeric
return {
"gdp_growth_rate": float(indicators["gdp_growth_rate"]),
"gdp_growth_rate_previous": float(
indicators["gdp_growth_rate_previous"]
),
"unemployment_rate": float(indicators["unemployment_rate"]),
"unemployment_rate_previous": float(
indicators["unemployment_rate_previous"]
),
"inflation_rate": float(indicators["inflation_rate"]),
"inflation_rate_previous": float(indicators["inflation_rate_previous"]),
"sp500_performance": float(self.get_sp500_performance() or 0.0),
"nasdaq_performance": float(self.get_nasdaq_performance() or 0.0),
"vix": float(indicators["vix"]),
"sentiment_score": float(sentiment_score),
"historical_data": historical_data,
}
except Exception as e:
logger.error(f"Error in get_macro_statistics: {e}")
# Return safe defaults if everything fails
return {
"gdp_growth_rate": 0.0,
"gdp_growth_rate_previous": 0.0,
"unemployment_rate": 0.0,
"unemployment_rate_previous": 0.0,
"inflation_rate": 0.0,
"inflation_rate_previous": 0.0,
"sp500_performance": 0.0,
"nasdaq_performance": 0.0,
"vix": 0.0,
"sentiment_score": 50.0,
"historical_data": {},
}
```
--------------------------------------------------------------------------------
/maverick_mcp/utils/structured_logger.py:
--------------------------------------------------------------------------------
```python
"""
Enhanced structured logging infrastructure for backtesting system.
This module provides comprehensive structured logging capabilities with:
- Correlation ID generation and tracking across async boundaries
- Request context propagation
- JSON formatting for log aggregation
- Performance metrics logging
- Resource usage tracking
- Debug mode with verbose logging
- Async logging to avoid blocking operations
- Log rotation and compression
- Multiple output handlers (console, file, remote)
"""
import asyncio
import gc
import json
import logging
import logging.handlers
import os
import sys
import threading
import time
import traceback
import uuid
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from contextvars import ContextVar
from datetime import UTC, datetime
from functools import wraps
from pathlib import Path
from typing import Any
import psutil
# Context variables for request tracking across async boundaries
correlation_id_var: ContextVar[str | None] = ContextVar("correlation_id", default=None)
request_start_var: ContextVar[float | None] = ContextVar("request_start", default=None)
user_id_var: ContextVar[str | None] = ContextVar("user_id", default=None)
tool_name_var: ContextVar[str | None] = ContextVar("tool_name", default=None)
operation_context_var: ContextVar[dict[str, Any] | None] = ContextVar(
"operation_context", default=None
)
# Global logger registry for performance metrics aggregation
_performance_logger_registry: dict[str, "PerformanceMetricsLogger"] = {}
_log_level_counts: dict[str, int] = {
"DEBUG": 0,
"INFO": 0,
"WARNING": 0,
"ERROR": 0,
"CRITICAL": 0,
}
# Thread pool for async logging operations
_async_log_executor: ThreadPoolExecutor | None = None
_async_log_lock = threading.Lock()
class CorrelationIDGenerator:
"""Enhanced correlation ID generation with backtesting context."""
@staticmethod
def generate_correlation_id(prefix: str = "bt") -> str:
"""Generate a unique correlation ID with backtesting prefix."""
timestamp = int(time.time() * 1000) % 1000000 # Last 6 digits of timestamp
random_part = uuid.uuid4().hex[:8]
return f"{prefix}-{timestamp}-{random_part}"
@staticmethod
def set_correlation_id(
correlation_id: str | None = None, prefix: str = "bt"
) -> str:
"""Set correlation ID in context with automatic generation."""
if not correlation_id:
correlation_id = CorrelationIDGenerator.generate_correlation_id(prefix)
correlation_id_var.set(correlation_id)
return correlation_id
@staticmethod
def get_correlation_id() -> str | None:
"""Get current correlation ID from context."""
return correlation_id_var.get()
@staticmethod
def propagate_context(target_context: dict[str, Any]) -> dict[str, Any]:
"""Propagate correlation context to target dict."""
target_context.update(
{
"correlation_id": correlation_id_var.get(),
"user_id": user_id_var.get(),
"tool_name": tool_name_var.get(),
"operation_context": operation_context_var.get(),
}
)
return target_context
class EnhancedStructuredFormatter(logging.Formatter):
"""Enhanced JSON formatter with performance metrics and resource tracking."""
def __init__(
self, include_performance: bool = True, include_resources: bool = True
):
super().__init__()
self.include_performance = include_performance
self.include_resources = include_resources
self._process = psutil.Process()
def format(self, record: logging.LogRecord) -> str:
"""Format log record with comprehensive structured data."""
# Base structured log data
log_data = {
"timestamp": datetime.now(UTC).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
"thread": record.thread,
"process_id": record.process,
}
# Add correlation context
CorrelationIDGenerator.propagate_context(log_data)
# Add performance metrics if enabled
if self.include_performance:
request_start = request_start_var.get()
if request_start:
log_data["duration_ms"] = int((time.time() - request_start) * 1000)
# Add resource usage if enabled
if self.include_resources:
try:
memory_info = self._process.memory_info()
log_data["memory_rss_mb"] = round(memory_info.rss / 1024 / 1024, 2)
log_data["memory_vms_mb"] = round(memory_info.vms / 1024 / 1024, 2)
log_data["cpu_percent"] = self._process.cpu_percent(interval=None)
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Process might have ended or access denied
pass
# Add exception information
if record.exc_info:
log_data["exception"] = {
"type": record.exc_info[0].__name__
if record.exc_info[0]
else "Unknown",
"message": str(record.exc_info[1]),
"traceback": traceback.format_exception(*record.exc_info),
}
# Add extra fields from the record
extra_fields = {}
for key, value in record.__dict__.items():
if key not in {
"name",
"msg",
"args",
"created",
"filename",
"funcName",
"levelname",
"levelno",
"lineno",
"module",
"msecs",
"pathname",
"process",
"processName",
"relativeCreated",
"thread",
"threadName",
"exc_info",
"exc_text",
"stack_info",
"getMessage",
"message",
}:
# Sanitize sensitive data
if self._is_sensitive_field(key):
extra_fields[key] = "***MASKED***"
else:
extra_fields[key] = self._serialize_value(value)
if extra_fields:
log_data["extra"] = extra_fields
return json.dumps(log_data, default=str, ensure_ascii=False)
def _is_sensitive_field(self, field_name: str) -> bool:
"""Check if field contains sensitive information."""
sensitive_keywords = {
"password",
"token",
"key",
"secret",
"auth",
"credential",
"bearer",
"session",
"cookie",
"api_key",
"access_token",
"refresh_token",
"private",
"confidential",
}
return any(keyword in field_name.lower() for keyword in sensitive_keywords)
def _serialize_value(self, value: Any) -> Any:
"""Safely serialize complex values for JSON output."""
if isinstance(value, str | int | float | bool) or value is None:
return value
elif isinstance(value, dict):
return {k: self._serialize_value(v) for k, v in value.items()}
elif isinstance(value, list | tuple):
return [self._serialize_value(item) for item in value]
else:
return str(value)
class AsyncLogHandler(logging.Handler):
"""Non-blocking async log handler to prevent performance impact."""
def __init__(self, target_handler: logging.Handler, max_queue_size: int = 10000):
super().__init__()
self.target_handler = target_handler
self.max_queue_size = max_queue_size
self._queue: list[logging.LogRecord] = []
self._queue_lock = threading.Lock()
self._shutdown = False
# Start background thread for processing logs
self._worker_thread = threading.Thread(target=self._process_logs, daemon=True)
self._worker_thread.start()
def emit(self, record: logging.LogRecord):
"""Queue log record for async processing."""
if self._shutdown:
return
with self._queue_lock:
if len(self._queue) < self.max_queue_size:
self._queue.append(record)
# If queue is full, drop oldest records
elif self._queue:
self._queue.pop(0)
self._queue.append(record)
def _process_logs(self):
"""Background thread to process queued log records."""
while not self._shutdown:
records_to_process = []
with self._queue_lock:
if self._queue:
records_to_process = self._queue[:]
self._queue.clear()
for record in records_to_process:
try:
self.target_handler.emit(record)
except Exception:
# Silently ignore errors to prevent infinite recursion
pass
# Brief sleep to prevent busy waiting
time.sleep(0.01)
def close(self):
"""Close the handler and wait for queue to flush."""
self._shutdown = True
self._worker_thread.join(timeout=5.0)
self.target_handler.close()
super().close()
class PerformanceMetricsLogger:
"""Comprehensive performance metrics logging for backtesting operations."""
def __init__(self, logger_name: str = "maverick_mcp.performance"):
self.logger = logging.getLogger(logger_name)
self.metrics: dict[str, list[float]] = {
"execution_times": [],
"memory_usage": [],
"cpu_usage": [],
"operation_counts": [],
}
self._start_times: dict[str, float] = {}
self._lock = threading.Lock()
# Register for global aggregation
_performance_logger_registry[logger_name] = self
def start_operation(self, operation_id: str, operation_type: str, **context):
"""Start tracking a performance-critical operation."""
start_time = time.time()
with self._lock:
self._start_times[operation_id] = start_time
# Set request context
request_start_var.set(start_time)
if "tool_name" in context:
tool_name_var.set(context["tool_name"])
self.logger.info(
f"Started {operation_type} operation",
extra={
"operation_id": operation_id,
"operation_type": operation_type,
"start_time": start_time,
**context,
},
)
def end_operation(self, operation_id: str, success: bool = True, **metrics):
"""End tracking of a performance-critical operation."""
end_time = time.time()
with self._lock:
start_time = self._start_times.pop(operation_id, end_time)
duration_ms = (end_time - start_time) * 1000
# Collect system metrics
try:
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
cpu_percent = process.cpu_percent(interval=None)
except (psutil.NoSuchProcess, psutil.AccessDenied):
memory_mb = 0
cpu_percent = 0
# Update internal metrics
with self._lock:
self.metrics["execution_times"].append(duration_ms)
self.metrics["memory_usage"].append(memory_mb)
self.metrics["cpu_usage"].append(cpu_percent)
self.metrics["operation_counts"].append(1)
log_level = logging.INFO if success else logging.ERROR
self.logger.log(
log_level,
f"{'Completed' if success else 'Failed'} operation in {duration_ms:.2f}ms",
extra={
"operation_id": operation_id,
"duration_ms": duration_ms,
"memory_mb": memory_mb,
"cpu_percent": cpu_percent,
"success": success,
**metrics,
},
)
def log_business_metric(self, metric_name: str, value: int | float, **context):
"""Log business-specific metrics like strategies processed, success rates."""
self.logger.info(
f"Business metric: {metric_name} = {value}",
extra={
"metric_name": metric_name,
"metric_value": value,
"metric_type": "business",
**context,
},
)
def get_performance_summary(self) -> dict[str, Any]:
"""Get aggregated performance metrics summary."""
with self._lock:
if not self.metrics["execution_times"]:
return {"message": "No performance data available"}
execution_times = self.metrics["execution_times"]
memory_usage = self.metrics["memory_usage"]
cpu_usage = self.metrics["cpu_usage"]
return {
"operations_count": len(execution_times),
"execution_time_stats": {
"avg_ms": sum(execution_times) / len(execution_times),
"min_ms": min(execution_times),
"max_ms": max(execution_times),
"total_ms": sum(execution_times),
},
"memory_stats": {
"avg_mb": sum(memory_usage) / len(memory_usage)
if memory_usage
else 0,
"peak_mb": max(memory_usage) if memory_usage else 0,
},
"cpu_stats": {
"avg_percent": sum(cpu_usage) / len(cpu_usage) if cpu_usage else 0,
"peak_percent": max(cpu_usage) if cpu_usage else 0,
},
"timestamp": datetime.now(UTC).isoformat(),
}
class DebugModeManager:
"""Manages debug mode configuration and verbose logging."""
def __init__(self):
self._debug_enabled = os.getenv("MAVERICK_DEBUG", "false").lower() in (
"true",
"1",
"on",
)
self._verbose_modules: set = set()
self._debug_filters: dict[str, Any] = {}
def is_debug_enabled(self, module_name: str = "") -> bool:
"""Check if debug mode is enabled globally or for specific module."""
if not self._debug_enabled:
return False
if not module_name:
return True
# Check if specific module debug is enabled
return module_name in self._verbose_modules or not self._verbose_modules
def enable_verbose_logging(self, module_pattern: str):
"""Enable verbose logging for specific module pattern."""
self._verbose_modules.add(module_pattern)
def add_debug_filter(self, filter_name: str, filter_config: dict[str, Any]):
"""Add custom debug filter configuration."""
self._debug_filters[filter_name] = filter_config
def should_log_request_response(self, operation_name: str) -> bool:
"""Check if request/response should be logged for operation."""
if not self._debug_enabled:
return False
# Check specific filters
for _filter_name, config in self._debug_filters.items():
if config.get("log_request_response") and operation_name in config.get(
"operations", []
):
return True
return True # Default to true in debug mode
class StructuredLoggerManager:
"""Central manager for structured logging configuration."""
def __init__(self):
self.debug_manager = DebugModeManager()
self.performance_loggers: dict[str, PerformanceMetricsLogger] = {}
self._configured = False
def setup_structured_logging(
self,
log_level: str = "INFO",
log_format: str = "json",
log_file: str | None = None,
enable_async: bool = True,
enable_rotation: bool = True,
max_log_size: int = 10 * 1024 * 1024, # 10MB
backup_count: int = 5,
console_output: str = "stdout", # stdout, stderr
remote_handler_config: dict[str, Any] | None = None,
):
"""Setup comprehensive structured logging infrastructure."""
if self._configured:
return
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, log_level.upper()))
# Clear existing handlers
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
handlers = []
# Console handler
console_stream = sys.stdout if console_output == "stdout" else sys.stderr
console_handler = logging.StreamHandler(console_stream)
if log_format == "json":
console_formatter = EnhancedStructuredFormatter(
include_performance=True, include_resources=True
)
else:
console_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
console_handler.setFormatter(console_formatter)
handlers.append(console_handler)
# File handler with rotation if specified
if log_file:
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
if enable_rotation:
file_handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=max_log_size, backupCount=backup_count
)
else:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(EnhancedStructuredFormatter())
handlers.append(file_handler)
# Remote handler if configured (for log aggregation)
if remote_handler_config:
remote_handler = self._create_remote_handler(remote_handler_config)
if remote_handler:
handlers.append(remote_handler)
# Wrap handlers with async processing if enabled
if enable_async:
handlers = [AsyncLogHandler(handler) for handler in handlers]
# Add all handlers to root logger
for handler in handlers:
root_logger.addHandler(handler)
# Set specific logger levels to reduce noise
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.WARNING)
# Enable debug mode loggers if configured
if self.debug_manager.is_debug_enabled():
self._setup_debug_loggers()
self._configured = True
def _create_remote_handler(self, config: dict[str, Any]) -> logging.Handler | None:
"""Create remote handler for log aggregation (placeholder for future implementation)."""
# This would implement remote logging to services like ELK, Splunk, etc.
# For now, return None as it's not implemented
return None
def _setup_debug_loggers(self):
"""Setup additional loggers for debug mode."""
debug_logger = logging.getLogger("maverick_mcp.debug")
debug_logger.setLevel(logging.DEBUG)
request_logger = logging.getLogger("maverick_mcp.requests")
request_logger.setLevel(logging.DEBUG)
def get_performance_logger(self, logger_name: str) -> PerformanceMetricsLogger:
"""Get or create performance logger for specific component."""
if logger_name not in self.performance_loggers:
self.performance_loggers[logger_name] = PerformanceMetricsLogger(
logger_name
)
return self.performance_loggers[logger_name]
def get_logger(self, name: str) -> logging.Logger:
"""Get structured logger with correlation support."""
return logging.getLogger(name)
def create_dashboard_metrics(self) -> dict[str, Any]:
"""Create comprehensive metrics for performance dashboard."""
global _log_level_counts
dashboard_data = {
"system_metrics": {
"timestamp": datetime.now(UTC).isoformat(),
"log_level_counts": _log_level_counts.copy(),
"active_correlation_ids": len(
[cid for cid in [correlation_id_var.get()] if cid]
),
},
"performance_metrics": {},
"memory_stats": {},
}
# Aggregate performance metrics from all loggers
for logger_name, perf_logger in _performance_logger_registry.items():
dashboard_data["performance_metrics"][logger_name] = (
perf_logger.get_performance_summary()
)
# System memory stats
try:
process = psutil.Process()
memory_info = process.memory_info()
dashboard_data["memory_stats"] = {
"rss_mb": round(memory_info.rss / 1024 / 1024, 2),
"vms_mb": round(memory_info.vms / 1024 / 1024, 2),
"cpu_percent": process.cpu_percent(interval=None),
"gc_stats": {
"generation_0": gc.get_count()[0],
"generation_1": gc.get_count()[1],
"generation_2": gc.get_count()[2],
},
}
except (psutil.NoSuchProcess, psutil.AccessDenied):
dashboard_data["memory_stats"] = {"error": "Unable to collect memory stats"}
return dashboard_data
# Global instance
_logger_manager: StructuredLoggerManager | None = None
def get_logger_manager() -> StructuredLoggerManager:
"""Get global logger manager instance."""
global _logger_manager
if _logger_manager is None:
_logger_manager = StructuredLoggerManager()
return _logger_manager
def with_structured_logging(
operation_name: str,
include_performance: bool = True,
log_params: bool = True,
log_result: bool = False,
):
"""Decorator for automatic structured logging of operations."""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def async_wrapper(*args, **kwargs):
# Generate correlation ID if not present
correlation_id = CorrelationIDGenerator.get_correlation_id()
if not correlation_id:
correlation_id = CorrelationIDGenerator.set_correlation_id()
# Setup operation context
operation_id = f"{operation_name}_{int(time.time() * 1000)}"
tool_name_var.set(operation_name)
logger = get_logger_manager().get_logger(f"maverick_mcp.{operation_name}")
perf_logger = None
if include_performance:
perf_logger = get_logger_manager().get_performance_logger(
f"performance.{operation_name}"
)
perf_logger.start_operation(
operation_id=operation_id,
operation_type=operation_name,
tool_name=operation_name,
)
# Log operation start
extra_data = {
"operation_id": operation_id,
"correlation_id": correlation_id,
}
if log_params:
# Sanitize parameters
safe_kwargs = {
k: "***MASKED***"
if "password" in k.lower() or "token" in k.lower()
else v
for k, v in kwargs.items()
}
extra_data["parameters"] = safe_kwargs
logger.info(f"Starting {operation_name}", extra=extra_data)
try:
# Execute the function
result = await func(*args, **kwargs)
# Log success
success_data = {"operation_id": operation_id, "success": True}
if log_result and result is not None:
# Limit result size for logging
result_str = str(result)
success_data["result"] = (
result_str[:1000] + "..."
if len(result_str) > 1000
else result_str
)
logger.info(f"Completed {operation_name}", extra=success_data)
if perf_logger:
perf_logger.end_operation(operation_id, success=True)
return result
except Exception as e:
# Log error
logger.error(
f"Failed {operation_name}: {str(e)}",
exc_info=True,
extra={
"operation_id": operation_id,
"error_type": type(e).__name__,
"success": False,
},
)
if perf_logger:
perf_logger.end_operation(operation_id, success=False, error=str(e))
raise
@wraps(func)
def sync_wrapper(*args, **kwargs):
# Similar logic for sync functions
correlation_id = CorrelationIDGenerator.get_correlation_id()
if not correlation_id:
correlation_id = CorrelationIDGenerator.set_correlation_id()
operation_id = f"{operation_name}_{int(time.time() * 1000)}"
tool_name_var.set(operation_name)
logger = get_logger_manager().get_logger(f"maverick_mcp.{operation_name}")
perf_logger = None
if include_performance:
perf_logger = get_logger_manager().get_performance_logger(
f"performance.{operation_name}"
)
perf_logger.start_operation(
operation_id=operation_id,
operation_type=operation_name,
tool_name=operation_name,
)
extra_data = {
"operation_id": operation_id,
"correlation_id": correlation_id,
}
if log_params:
safe_kwargs = {
k: "***MASKED***"
if any(
sensitive in k.lower()
for sensitive in ["password", "token", "key", "secret"]
)
else v
for k, v in kwargs.items()
}
extra_data["parameters"] = safe_kwargs
logger.info(f"Starting {operation_name}", extra=extra_data)
try:
result = func(*args, **kwargs)
success_data = {"operation_id": operation_id, "success": True}
if log_result and result is not None:
result_str = str(result)
success_data["result"] = (
result_str[:1000] + "..."
if len(result_str) > 1000
else result_str
)
logger.info(f"Completed {operation_name}", extra=success_data)
if perf_logger:
perf_logger.end_operation(operation_id, success=True)
return result
except Exception as e:
logger.error(
f"Failed {operation_name}: {str(e)}",
exc_info=True,
extra={
"operation_id": operation_id,
"error_type": type(e).__name__,
"success": False,
},
)
if perf_logger:
perf_logger.end_operation(operation_id, success=False, error=str(e))
raise
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
# Convenience functions
def get_structured_logger(name: str) -> logging.Logger:
"""Get structured logger instance."""
return get_logger_manager().get_logger(name)
def get_performance_logger(component: str) -> PerformanceMetricsLogger:
"""Get performance logger for specific component."""
return get_logger_manager().get_performance_logger(component)
def setup_backtesting_logging(
log_level: str = "INFO", enable_debug: bool = False, log_file: str | None = None
):
"""Setup logging specifically configured for backtesting operations."""
# Set debug environment if requested
if enable_debug:
os.environ["MAVERICK_DEBUG"] = "true"
# Setup structured logging
manager = get_logger_manager()
manager.setup_structured_logging(
log_level=log_level,
log_format="json",
log_file=log_file or "logs/backtesting.log",
enable_async=True,
enable_rotation=True,
console_output="stderr", # Use stderr for MCP compatibility
)
# Configure debug filters for backtesting
if enable_debug:
manager.debug_manager.add_debug_filter(
"backtesting",
{
"log_request_response": True,
"operations": [
"run_backtest",
"optimize_parameters",
"get_historical_data",
],
},
)
# Update log level counts (for dashboard metrics)
class LogLevelCounterFilter(logging.Filter):
"""Filter to count log levels for dashboard metrics."""
def filter(self, record: logging.LogRecord) -> bool:
global _log_level_counts
_log_level_counts[record.levelname] = (
_log_level_counts.get(record.levelname, 0) + 1
)
return True
# Add the counter filter to root logger
logging.getLogger().addFilter(LogLevelCounterFilter())
```
--------------------------------------------------------------------------------
/tests/test_backtest_persistence.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive tests for backtest persistence layer.
Tests cover:
- PostgreSQL persistence layer with comprehensive database operations
- BacktestResult, BacktestTrade, OptimizationResult, and WalkForwardTest models
- Database CRUD operations with proper error handling
- Performance comparison and ranking functionality
- Backtest result caching and retrieval optimization
- Database constraint validation and data integrity
- Concurrent access and transaction handling
"""
from datetime import datetime, timedelta
from decimal import Decimal
from typing import Any
from unittest.mock import Mock, patch
from uuid import UUID, uuid4
import numpy as np
import pandas as pd
import pytest
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from maverick_mcp.backtesting.persistence import (
BacktestPersistenceError,
BacktestPersistenceManager,
find_best_strategy_for_symbol,
get_recent_backtests,
save_vectorbt_results,
)
from maverick_mcp.data.models import (
BacktestResult,
BacktestTrade,
OptimizationResult,
WalkForwardTest,
)
class TestBacktestPersistenceManager:
"""Test suite for BacktestPersistenceManager class."""
@pytest.fixture
def sample_vectorbt_results(self) -> dict[str, Any]:
"""Create sample VectorBT results for testing."""
dates = pd.date_range(start="2023-01-01", end="2023-12-31", freq="D")
equity_curve = np.cumsum(np.random.normal(0.001, 0.02, len(dates)))
drawdown_series = np.minimum(
0, equity_curve - np.maximum.accumulate(equity_curve)
)
return {
"symbol": "AAPL",
"strategy": "momentum_crossover",
"parameters": {
"fast_window": 10,
"slow_window": 20,
"signal_threshold": 0.02,
},
"start_date": "2023-01-01",
"end_date": "2023-12-31",
"initial_capital": 10000.0,
"metrics": {
"total_return": 0.15,
"annualized_return": 0.18,
"sharpe_ratio": 1.25,
"sortino_ratio": 1.45,
"calmar_ratio": 1.10,
"max_drawdown": -0.08,
"max_drawdown_duration": 45,
"volatility": 0.16,
"downside_volatility": 0.12,
"total_trades": 24,
"winning_trades": 14,
"losing_trades": 10,
"win_rate": 0.583,
"profit_factor": 1.35,
"average_win": 0.045,
"average_loss": -0.025,
"largest_win": 0.12,
"largest_loss": -0.08,
"final_value": 11500.0,
"peak_value": 12100.0,
"beta": 1.05,
"alpha": 0.03,
},
"equity_curve": equity_curve.tolist(),
"drawdown_series": drawdown_series.tolist(),
"trades": [
{
"entry_date": "2023-01-15",
"entry_price": 150.0,
"entry_time": "2023-01-15T09:30:00",
"exit_date": "2023-01-25",
"exit_price": 155.0,
"exit_time": "2023-01-25T16:00:00",
"position_size": 100,
"direction": "long",
"pnl": 500.0,
"pnl_percent": 0.033,
"mae": -150.0,
"mfe": 600.0,
"duration_days": 10,
"duration_hours": 6.5,
"exit_reason": "take_profit",
"fees_paid": 2.0,
"slippage_cost": 1.0,
},
{
"entry_date": "2023-02-01",
"entry_price": 160.0,
"entry_time": "2023-02-01T10:00:00",
"exit_date": "2023-02-10",
"exit_price": 156.0,
"exit_time": "2023-02-10T15:30:00",
"position_size": 100,
"direction": "long",
"pnl": -400.0,
"pnl_percent": -0.025,
"mae": -500.0,
"mfe": 200.0,
"duration_days": 9,
"duration_hours": 5.5,
"exit_reason": "stop_loss",
"fees_paid": 2.0,
"slippage_cost": 1.0,
},
],
}
@pytest.fixture
def persistence_manager(self, db_session: Session):
"""Create a persistence manager with test database session."""
return BacktestPersistenceManager(session=db_session)
def test_persistence_manager_context_manager(self, db_session: Session):
"""Test persistence manager as context manager."""
with BacktestPersistenceManager(session=db_session) as manager:
assert manager.session == db_session
assert not manager._owns_session
# Test with auto-session creation (mocked)
with patch(
"maverick_mcp.backtesting.persistence.SessionLocal"
) as mock_session_local:
mock_session = Mock(spec=Session)
mock_session_local.return_value = mock_session
with BacktestPersistenceManager() as manager:
assert manager.session == mock_session
assert manager._owns_session
mock_session.commit.assert_called_once()
mock_session.close.assert_called_once()
def test_save_backtest_result_success(
self, persistence_manager, sample_vectorbt_results
):
"""Test successful backtest result saving."""
backtest_id = persistence_manager.save_backtest_result(
vectorbt_results=sample_vectorbt_results,
execution_time=2.5,
notes="Test backtest run",
)
# Test return value
assert isinstance(backtest_id, str)
assert UUID(backtest_id) # Valid UUID
# Test database record
result = (
persistence_manager.session.query(BacktestResult)
.filter(BacktestResult.backtest_id == UUID(backtest_id))
.first()
)
assert result is not None
assert result.symbol == "AAPL"
assert result.strategy_type == "momentum_crossover"
assert result.total_return == Decimal("0.15")
assert result.sharpe_ratio == Decimal("1.25")
assert result.total_trades == 24
assert result.execution_time_seconds == Decimal("2.5")
assert result.notes == "Test backtest run"
# Test trades were saved
trades = (
persistence_manager.session.query(BacktestTrade)
.filter(BacktestTrade.backtest_id == UUID(backtest_id))
.all()
)
assert len(trades) == 2
assert trades[0].symbol == "AAPL"
assert trades[0].pnl == Decimal("500.0")
assert trades[1].pnl == Decimal("-400.0")
def test_save_backtest_result_validation_error(self, persistence_manager):
"""Test backtest saving with validation errors."""
# Missing required fields
invalid_results = {"symbol": "", "strategy": ""}
with pytest.raises(BacktestPersistenceError) as exc_info:
persistence_manager.save_backtest_result(invalid_results)
assert "Symbol and strategy type are required" in str(exc_info.value)
def test_save_backtest_result_database_error(
self, persistence_manager, sample_vectorbt_results
):
"""Test backtest saving with database errors."""
with patch.object(
persistence_manager.session, "add", side_effect=SQLAlchemyError("DB Error")
):
with pytest.raises(BacktestPersistenceError) as exc_info:
persistence_manager.save_backtest_result(sample_vectorbt_results)
assert "Failed to save backtest" in str(exc_info.value)
def test_get_backtest_by_id(self, persistence_manager, sample_vectorbt_results):
"""Test retrieval of backtest by ID."""
# Save a backtest first
backtest_id = persistence_manager.save_backtest_result(sample_vectorbt_results)
# Retrieve it
result = persistence_manager.get_backtest_by_id(backtest_id)
assert result is not None
assert str(result.backtest_id) == backtest_id
assert result.symbol == "AAPL"
assert result.strategy_type == "momentum_crossover"
# Test non-existent ID
fake_id = str(uuid4())
result = persistence_manager.get_backtest_by_id(fake_id)
assert result is None
# Test invalid UUID format
result = persistence_manager.get_backtest_by_id("invalid-uuid")
assert result is None
def test_get_backtests_by_symbol(
self, persistence_manager, sample_vectorbt_results
):
"""Test retrieval of backtests by symbol."""
# Save multiple backtests for same symbol
sample_vectorbt_results["strategy"] = "momentum_v1"
backtest_id1 = persistence_manager.save_backtest_result(sample_vectorbt_results)
sample_vectorbt_results["strategy"] = "momentum_v2"
backtest_id2 = persistence_manager.save_backtest_result(sample_vectorbt_results)
# Save backtest for different symbol
sample_vectorbt_results["symbol"] = "GOOGL"
sample_vectorbt_results["strategy"] = "momentum_v1"
backtest_id3 = persistence_manager.save_backtest_result(sample_vectorbt_results)
# Test retrieval by symbol
aapl_results = persistence_manager.get_backtests_by_symbol("AAPL")
assert len(aapl_results) == 2
assert all(result.symbol == "AAPL" for result in aapl_results)
assert backtest_id1 != backtest_id2
assert backtest_id3 not in {backtest_id1, backtest_id2}
retrieved_ids = {str(result.backtest_id) for result in aapl_results}
assert {backtest_id1, backtest_id2}.issubset(retrieved_ids)
# Test with strategy filter
aapl_v1_results = persistence_manager.get_backtests_by_symbol(
"AAPL", "momentum_v1"
)
assert len(aapl_v1_results) == 1
assert aapl_v1_results[0].strategy_type == "momentum_v1"
# Test with limit
limited_results = persistence_manager.get_backtests_by_symbol("AAPL", limit=1)
assert len(limited_results) == 1
# Test non-existent symbol
empty_results = persistence_manager.get_backtests_by_symbol("NONEXISTENT")
assert len(empty_results) == 0
def test_get_best_performing_strategies(
self, persistence_manager, sample_vectorbt_results
):
"""Test retrieval of best performing strategies."""
# Create multiple backtests with different performance
strategies_performance = [
(
"momentum",
{"sharpe_ratio": 1.5, "total_return": 0.2, "total_trades": 15},
),
(
"mean_reversion",
{"sharpe_ratio": 1.8, "total_return": 0.15, "total_trades": 20},
),
(
"breakout",
{"sharpe_ratio": 0.8, "total_return": 0.25, "total_trades": 10},
),
(
"momentum_v2",
{"sharpe_ratio": 2.0, "total_return": 0.3, "total_trades": 25},
),
]
backtest_ids = []
for strategy, metrics in strategies_performance:
sample_vectorbt_results["strategy"] = strategy
sample_vectorbt_results["metrics"].update(metrics)
backtest_id = persistence_manager.save_backtest_result(
sample_vectorbt_results
)
backtest_ids.append(backtest_id)
# Test best by Sharpe ratio (default)
best_sharpe = persistence_manager.get_best_performing_strategies(
"sharpe_ratio", limit=3
)
assert len(best_sharpe) == 3
assert best_sharpe[0].strategy_type == "momentum_v2" # Highest Sharpe
assert best_sharpe[1].strategy_type == "mean_reversion" # Second highest
assert best_sharpe[0].sharpe_ratio > best_sharpe[1].sharpe_ratio
# Test best by total return
best_return = persistence_manager.get_best_performing_strategies(
"total_return", limit=2
)
assert len(best_return) == 2
assert best_return[0].strategy_type == "momentum_v2" # Highest return
# Test minimum trades filter
high_volume = persistence_manager.get_best_performing_strategies(
"sharpe_ratio", min_trades=20
)
assert len(high_volume) == 2 # Only momentum_v2 and mean_reversion
assert all(result.total_trades >= 20 for result in high_volume)
def test_compare_strategies(self, persistence_manager, sample_vectorbt_results):
"""Test strategy comparison functionality."""
# Create backtests to compare
strategies = ["momentum", "mean_reversion", "breakout"]
backtest_ids = []
for i, strategy in enumerate(strategies):
sample_vectorbt_results["strategy"] = strategy
sample_vectorbt_results["metrics"]["sharpe_ratio"] = 1.0 + i * 0.5
sample_vectorbt_results["metrics"]["total_return"] = 0.1 + i * 0.05
sample_vectorbt_results["metrics"]["max_drawdown"] = -0.05 - i * 0.02
backtest_id = persistence_manager.save_backtest_result(
sample_vectorbt_results
)
backtest_ids.append(backtest_id)
# Test comparison
comparison = persistence_manager.compare_strategies(backtest_ids)
assert "backtests" in comparison
assert "rankings" in comparison
assert "summary" in comparison
assert len(comparison["backtests"]) == 3
# Test rankings
assert "sharpe_ratio" in comparison["rankings"]
sharpe_rankings = comparison["rankings"]["sharpe_ratio"]
assert len(sharpe_rankings) == 3
assert sharpe_rankings[0]["rank"] == 1 # Best rank
assert sharpe_rankings[0]["value"] > sharpe_rankings[1]["value"]
# Test max_drawdown ranking (lower is better)
assert "max_drawdown" in comparison["rankings"]
dd_rankings = comparison["rankings"]["max_drawdown"]
assert (
dd_rankings[0]["value"] > dd_rankings[-1]["value"]
) # Less negative is better
# Test summary
summary = comparison["summary"]
assert summary["total_backtests"] == 3
assert "date_range" in summary
def test_save_optimization_results(
self, persistence_manager, sample_vectorbt_results
):
"""Test saving parameter optimization results."""
# Save parent backtest first
backtest_id = persistence_manager.save_backtest_result(sample_vectorbt_results)
# Create optimization results
optimization_results = [
{
"parameters": {"window": 10, "threshold": 0.01},
"objective_value": 1.2,
"total_return": 0.15,
"sharpe_ratio": 1.2,
"max_drawdown": -0.08,
"win_rate": 0.6,
"profit_factor": 1.3,
"total_trades": 20,
"rank": 1,
},
{
"parameters": {"window": 20, "threshold": 0.02},
"objective_value": 1.5,
"total_return": 0.18,
"sharpe_ratio": 1.5,
"max_drawdown": -0.06,
"win_rate": 0.65,
"profit_factor": 1.4,
"total_trades": 18,
"rank": 2,
},
]
# Save optimization results
count = persistence_manager.save_optimization_results(
backtest_id=backtest_id,
optimization_results=optimization_results,
objective_function="sharpe_ratio",
)
assert count == 2
# Verify saved results
opt_results = (
persistence_manager.session.query(OptimizationResult)
.filter(OptimizationResult.backtest_id == UUID(backtest_id))
.all()
)
assert len(opt_results) == 2
assert opt_results[0].objective_function == "sharpe_ratio"
assert opt_results[0].parameters == {"window": 10, "threshold": 0.01}
assert opt_results[0].objective_value == Decimal("1.2")
def test_save_walk_forward_test(self, persistence_manager, sample_vectorbt_results):
"""Test saving walk-forward validation results."""
# Save parent backtest first
backtest_id = persistence_manager.save_backtest_result(sample_vectorbt_results)
# Create walk-forward test data
walk_forward_data = {
"window_size_months": 6,
"step_size_months": 1,
"training_start": "2023-01-01",
"training_end": "2023-06-30",
"test_period_start": "2023-07-01",
"test_period_end": "2023-07-31",
"optimal_parameters": {"window": 15, "threshold": 0.015},
"training_performance": 1.3,
"out_of_sample_return": 0.12,
"out_of_sample_sharpe": 1.1,
"out_of_sample_drawdown": -0.05,
"out_of_sample_trades": 8,
"performance_ratio": 0.85,
"degradation_factor": 0.15,
"is_profitable": True,
"is_statistically_significant": True,
}
# Save walk-forward test
wf_id = persistence_manager.save_walk_forward_test(
backtest_id, walk_forward_data
)
assert isinstance(wf_id, str)
assert UUID(wf_id)
# Verify saved result
wf_test = (
persistence_manager.session.query(WalkForwardTest)
.filter(WalkForwardTest.walk_forward_id == UUID(wf_id))
.first()
)
assert wf_test is not None
assert wf_test.parent_backtest_id == UUID(backtest_id)
assert wf_test.window_size_months == 6
assert wf_test.out_of_sample_sharpe == Decimal("1.1")
assert wf_test.is_profitable is True
def test_get_backtest_performance_summary(
self, persistence_manager, sample_vectorbt_results
):
"""Test performance summary generation."""
# Create backtests with different dates and performance
base_date = datetime.utcnow()
# Recent backtests (within 30 days)
for i in range(3):
sample_vectorbt_results["strategy"] = f"momentum_v{i + 1}"
sample_vectorbt_results["metrics"]["total_return"] = 0.1 + i * 0.05
sample_vectorbt_results["metrics"]["sharpe_ratio"] = 1.0 + i * 0.3
sample_vectorbt_results["metrics"]["win_rate"] = 0.5 + i * 0.1
with patch(
"maverick_mcp.data.models.BacktestResult.backtest_date",
base_date - timedelta(days=i * 10),
):
persistence_manager.save_backtest_result(sample_vectorbt_results)
# Old backtest (outside 30 days)
sample_vectorbt_results["strategy"] = "old_strategy"
with patch(
"maverick_mcp.data.models.BacktestResult.backtest_date",
base_date - timedelta(days=45),
):
persistence_manager.save_backtest_result(sample_vectorbt_results)
# Get summary
summary = persistence_manager.get_backtest_performance_summary(days_back=30)
assert "period" in summary
assert summary["total_backtests"] == 3 # Only recent ones
assert "performance_metrics" in summary
metrics = summary["performance_metrics"]
assert "average_return" in metrics
assert "best_return" in metrics
assert "worst_return" in metrics
assert "average_sharpe" in metrics
# Test strategy and symbol breakdowns
assert "strategy_breakdown" in summary
assert len(summary["strategy_breakdown"]) == 3
assert "symbol_breakdown" in summary
assert "AAPL" in summary["symbol_breakdown"]
def test_delete_backtest(self, persistence_manager, sample_vectorbt_results):
"""Test backtest deletion with cascading."""
# Save backtest with trades
backtest_id = persistence_manager.save_backtest_result(sample_vectorbt_results)
# Verify it exists
result = persistence_manager.get_backtest_by_id(backtest_id)
assert result is not None
trades = (
persistence_manager.session.query(BacktestTrade)
.filter(BacktestTrade.backtest_id == UUID(backtest_id))
.all()
)
assert len(trades) > 0
# Delete backtest
deleted = persistence_manager.delete_backtest(backtest_id)
assert deleted is True
# Verify deletion
result = persistence_manager.get_backtest_by_id(backtest_id)
assert result is None
# Test non-existent deletion
fake_id = str(uuid4())
deleted = persistence_manager.delete_backtest(fake_id)
assert deleted is False
def test_safe_decimal_conversion(self):
"""Test safe decimal conversion utility."""
from maverick_mcp.backtesting.persistence import BacktestPersistenceManager
# Test valid conversions
assert BacktestPersistenceManager._safe_decimal(123) == Decimal("123")
assert BacktestPersistenceManager._safe_decimal(123.45) == Decimal("123.45")
assert BacktestPersistenceManager._safe_decimal("456.78") == Decimal("456.78")
assert BacktestPersistenceManager._safe_decimal(Decimal("789.01")) == Decimal(
"789.01"
)
# Test None and invalid values
assert BacktestPersistenceManager._safe_decimal(None) is None
assert BacktestPersistenceManager._safe_decimal("invalid") is None
assert BacktestPersistenceManager._safe_decimal([1, 2, 3]) is None
class TestConvenienceFunctions:
"""Test suite for convenience functions."""
def test_save_vectorbt_results_function(
self, db_session: Session, sample_vectorbt_results
):
"""Test save_vectorbt_results convenience function."""
with patch(
"maverick_mcp.backtesting.persistence.get_persistence_manager"
) as mock_factory:
mock_manager = Mock(spec=BacktestPersistenceManager)
mock_manager.save_backtest_result.return_value = "test-uuid-123"
mock_manager.__enter__ = Mock(return_value=mock_manager)
mock_manager.__exit__ = Mock(return_value=None)
mock_factory.return_value = mock_manager
result = save_vectorbt_results(
vectorbt_results=sample_vectorbt_results,
execution_time=2.5,
notes="Test run",
)
assert result == "test-uuid-123"
mock_manager.save_backtest_result.assert_called_once_with(
sample_vectorbt_results, 2.5, "Test run"
)
def test_get_recent_backtests_function(self, db_session: Session):
"""Test get_recent_backtests convenience function."""
with patch(
"maverick_mcp.backtesting.persistence.get_persistence_manager"
) as mock_factory:
mock_manager = Mock(spec=BacktestPersistenceManager)
mock_session = Mock(spec=Session)
mock_query = Mock()
mock_manager.session = mock_session
mock_session.query.return_value = mock_query
mock_query.filter.return_value = mock_query
mock_query.order_by.return_value = mock_query
mock_query.all.return_value = ["result1", "result2"]
mock_manager.__enter__ = Mock(return_value=mock_manager)
mock_manager.__exit__ = Mock(return_value=None)
mock_factory.return_value = mock_manager
results = get_recent_backtests("AAPL", days=7)
assert results == ["result1", "result2"]
mock_session.query.assert_called_once_with(BacktestResult)
def test_find_best_strategy_for_symbol_function(self, db_session: Session):
"""Test find_best_strategy_for_symbol convenience function."""
with patch(
"maverick_mcp.backtesting.persistence.get_persistence_manager"
) as mock_factory:
mock_manager = Mock(spec=BacktestPersistenceManager)
mock_best_result = Mock(spec=BacktestResult)
mock_manager.get_best_performing_strategies.return_value = [
mock_best_result
]
mock_manager.get_backtests_by_symbol.return_value = [mock_best_result]
mock_manager.__enter__ = Mock(return_value=mock_manager)
mock_manager.__exit__ = Mock(return_value=None)
mock_factory.return_value = mock_manager
result = find_best_strategy_for_symbol("AAPL", "sharpe_ratio")
assert result == mock_best_result
mock_manager.get_backtests_by_symbol.assert_called_once_with(
"AAPL", limit=1000
)
class TestPersistenceStressTests:
"""Stress tests for persistence layer performance and reliability."""
def test_bulk_insert_performance(
self, persistence_manager, sample_vectorbt_results, benchmark_timer
):
"""Test bulk insert performance with many backtests."""
backtest_count = 50
with benchmark_timer() as timer:
for i in range(backtest_count):
sample_vectorbt_results["symbol"] = f"STOCK{i:03d}"
sample_vectorbt_results["strategy"] = (
f"strategy_{i % 5}" # 5 different strategies
)
persistence_manager.save_backtest_result(sample_vectorbt_results)
# Should complete within reasonable time
assert timer.elapsed < 30.0 # < 30 seconds for 50 backtests
# Verify all were saved
all_results = persistence_manager.session.query(BacktestResult).count()
assert all_results == backtest_count
def test_concurrent_access_handling(
self, db_session: Session, sample_vectorbt_results
):
"""Test handling of concurrent database access."""
import queue
import threading
results_queue = queue.Queue()
error_queue = queue.Queue()
def save_backtest(thread_id):
try:
# Each thread gets its own session
with BacktestPersistenceManager() as manager:
modified_results = sample_vectorbt_results.copy()
modified_results["symbol"] = f"THREAD{thread_id}"
backtest_id = manager.save_backtest_result(modified_results)
results_queue.put(backtest_id)
except Exception as e:
error_queue.put(f"Thread {thread_id}: {e}")
# Create multiple threads
threads = []
thread_count = 5
for i in range(thread_count):
thread = threading.Thread(target=save_backtest, args=(i,))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join(timeout=10) # 10 second timeout per thread
# Check results
assert error_queue.empty(), f"Errors occurred: {list(error_queue.queue)}"
assert results_queue.qsize() == thread_count
# Verify all backtests were saved with unique IDs
saved_ids = []
while not results_queue.empty():
saved_ids.append(results_queue.get())
assert len(saved_ids) == thread_count
assert len(set(saved_ids)) == thread_count # All unique
def test_large_result_handling(self, persistence_manager, sample_vectorbt_results):
"""Test handling of large backtest results."""
# Create large equity curve and drawdown series (1 year of minute data)
large_data_size = 365 * 24 * 60 # ~525k data points
sample_vectorbt_results["equity_curve"] = list(range(large_data_size))
sample_vectorbt_results["drawdown_series"] = [
-i / 1000 for i in range(large_data_size)
]
# Also add many trades
sample_vectorbt_results["trades"] = []
for i in range(1000): # 1000 trades
trade = {
"entry_date": f"2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}",
"entry_price": 100 + (i % 100),
"exit_date": f"2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}",
"exit_price": 101 + (i % 100),
"position_size": 100,
"direction": "long",
"pnl": i % 100 - 50,
"pnl_percent": (i % 100 - 50) / 1000,
"duration_days": i % 30 + 1,
"exit_reason": "time_exit",
}
sample_vectorbt_results["trades"].append(trade)
# Should handle large data without issues
backtest_id = persistence_manager.save_backtest_result(sample_vectorbt_results)
assert backtest_id is not None
# Verify retrieval works
result = persistence_manager.get_backtest_by_id(backtest_id)
assert result is not None
assert result.data_points == large_data_size
# Verify trades were saved
trades = (
persistence_manager.session.query(BacktestTrade)
.filter(BacktestTrade.backtest_id == UUID(backtest_id))
.count()
)
assert trades == 1000
def test_database_constraint_validation(
self, persistence_manager, sample_vectorbt_results
):
"""Test database constraint validation and error handling."""
# Save first backtest
backtest_id1 = persistence_manager.save_backtest_result(sample_vectorbt_results)
# Try to save with same UUID (should be prevented by unique constraint)
with patch("uuid.uuid4", return_value=UUID(backtest_id1)):
# This should handle the constraint violation gracefully
try:
backtest_id2 = persistence_manager.save_backtest_result(
sample_vectorbt_results
)
# If it succeeds, it should have generated a different UUID
assert backtest_id2 != backtest_id1
except BacktestPersistenceError:
# Or it should raise a proper persistence error
pass
def test_memory_usage_with_large_datasets(
self, persistence_manager, sample_vectorbt_results
):
"""Test memory usage doesn't grow excessively with large datasets."""
import os
import psutil
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss
# Create and save multiple large backtests
for i in range(10):
large_results = sample_vectorbt_results.copy()
large_results["symbol"] = f"LARGE{i}"
large_results["equity_curve"] = list(range(10000)) # 10k data points each
large_results["drawdown_series"] = [-j / 1000 for j in range(10000)]
persistence_manager.save_backtest_result(large_results)
final_memory = process.memory_info().rss
memory_growth = (final_memory - initial_memory) / 1024 / 1024 # MB
# Memory growth should be reasonable (< 100MB for 10 large backtests)
assert memory_growth < 100
if __name__ == "__main__":
# Run tests with detailed output
pytest.main([__file__, "-v", "--tb=short", "-x"])
```
--------------------------------------------------------------------------------
/docs/api/backtesting.md:
--------------------------------------------------------------------------------
```markdown
# Backtesting API Documentation
## Overview
The MaverickMCP backtesting system provides comprehensive strategy backtesting capabilities powered by VectorBT. It offers both traditional technical analysis strategies and advanced ML-enhanced approaches, with extensive optimization, validation, and analysis tools.
### Key Features
- **35+ Pre-built Strategies**: From simple moving averages to advanced ML ensembles
- **Strategy Optimization**: Grid search with coarse/medium/fine granularity
- **Walk-Forward Analysis**: Out-of-sample validation for strategy robustness
- **Monte Carlo Simulation**: Risk assessment with confidence intervals
- **Portfolio Backtesting**: Multi-symbol strategy application
- **Market Regime Analysis**: Intelligent strategy selection based on market conditions
- **ML-Enhanced Strategies**: Adaptive, ensemble, and regime-aware approaches
- **Comprehensive Visualization**: Charts, heatmaps, and performance dashboards
## Core Backtesting Tools
### run_backtest
Run a comprehensive backtest with specified strategy and parameters.
**Function**: `run_backtest`
**Parameters**:
- `symbol` (str, required): Stock symbol to backtest (e.g., "AAPL", "TSLA")
- `strategy` (str, default: "sma_cross"): Strategy type to use
- `start_date` (str, optional): Start date (YYYY-MM-DD), defaults to 1 year ago
- `end_date` (str, optional): End date (YYYY-MM-DD), defaults to today
- `initial_capital` (float, default: 10000.0): Starting capital for backtest
**Strategy-Specific Parameters**:
- `fast_period` (int, optional): Fast moving average period
- `slow_period` (int, optional): Slow moving average period
- `period` (int, optional): General period parameter (RSI, etc.)
- `oversold` (float, optional): RSI oversold threshold (default: 30)
- `overbought` (float, optional): RSI overbought threshold (default: 70)
- `signal_period` (int, optional): MACD signal line period
- `std_dev` (float, optional): Bollinger Bands standard deviation
- `lookback` (int, optional): Lookback period for momentum/breakout
- `threshold` (float, optional): Threshold for momentum strategies
- `z_score_threshold` (float, optional): Z-score threshold for mean reversion
- `breakout_factor` (float, optional): Breakout factor for channel strategies
**Returns**:
```json
{
"symbol": "AAPL",
"strategy": "sma_cross",
"period": "2023-01-01 to 2024-01-01",
"metrics": {
"total_return": 0.15,
"sharpe_ratio": 1.2,
"max_drawdown": -0.08,
"total_trades": 24,
"win_rate": 0.58,
"profit_factor": 1.45,
"calmar_ratio": 1.85,
"volatility": 0.18
},
"trades": [
{
"entry_date": "2023-01-15",
"exit_date": "2023-02-10",
"entry_price": 150.0,
"exit_price": 158.5,
"return": 0.057,
"holding_period": 26
}
],
"equity_curve": [10000, 10150, 10200, ...],
"drawdown_series": [0, -0.01, -0.02, ...],
"analysis": {
"risk_metrics": {...},
"performance_analysis": {...}
}
}
```
**Examples**:
```python
# Simple SMA crossover
run_backtest("AAPL", "sma_cross", fast_period=10, slow_period=20)
# RSI mean reversion
run_backtest("TSLA", "rsi", period=14, oversold=30, overbought=70)
# MACD strategy with custom parameters
run_backtest("MSFT", "macd", fast_period=12, slow_period=26, signal_period=9)
# Bollinger Bands strategy
run_backtest("GOOGL", "bollinger", period=20, std_dev=2.0)
```
### optimize_strategy
Optimize strategy parameters using grid search to find the best-performing configuration.
**Function**: `optimize_strategy`
**Parameters**:
- `symbol` (str, required): Stock symbol to optimize
- `strategy` (str, default: "sma_cross"): Strategy type to optimize
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `optimization_metric` (str, default: "sharpe_ratio"): Metric to optimize ("sharpe_ratio", "total_return", "win_rate", "calmar_ratio")
- `optimization_level` (str, default: "medium"): Level of optimization ("coarse", "medium", "fine")
- `top_n` (int, default: 10): Number of top results to return
**Returns**:
```json
{
"symbol": "AAPL",
"strategy": "sma_cross",
"optimization_metric": "sharpe_ratio",
"optimization_level": "medium",
"total_combinations": 64,
"execution_time": 45.2,
"best_parameters": {
"fast_period": 8,
"slow_period": 21,
"sharpe_ratio": 1.85,
"total_return": 0.28,
"max_drawdown": -0.06
},
"top_results": [
{
"parameters": {"fast_period": 8, "slow_period": 21},
"sharpe_ratio": 1.85,
"total_return": 0.28,
"max_drawdown": -0.06,
"total_trades": 18
}
],
"parameter_sensitivity": {
"fast_period": {"min": 5, "max": 20, "best": 8},
"slow_period": {"min": 20, "max": 50, "best": 21}
}
}
```
**Examples**:
```python
# Optimize SMA crossover for Sharpe ratio
optimize_strategy("AAPL", "sma_cross", optimization_metric="sharpe_ratio")
# Fine-tune RSI parameters for total return
optimize_strategy("TSLA", "rsi", optimization_metric="total_return", optimization_level="fine")
# Quick coarse optimization for multiple strategies
optimize_strategy("MSFT", "macd", optimization_level="coarse", top_n=5)
```
### walk_forward_analysis
Perform walk-forward analysis to test strategy robustness and out-of-sample performance.
**Function**: `walk_forward_analysis`
**Parameters**:
- `symbol` (str, required): Stock symbol to analyze
- `strategy` (str, default: "sma_cross"): Strategy type
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `window_size` (int, default: 252): Test window size in trading days (default: 1 year)
- `step_size` (int, default: 63): Step size for rolling window (default: 1 quarter)
**Returns**:
```json
{
"symbol": "AAPL",
"strategy": "sma_cross",
"total_windows": 8,
"window_size": 252,
"step_size": 63,
"out_of_sample_performance": {
"average_return": 0.12,
"average_sharpe": 0.95,
"consistency_score": 0.75,
"best_window": {"period": "2023-Q2", "return": 0.28},
"worst_window": {"period": "2023-Q4", "return": -0.05}
},
"window_results": [
{
"window_id": 1,
"optimization_period": "2022-01-01 to 2022-12-31",
"test_period": "2023-01-01 to 2023-03-31",
"best_parameters": {"fast_period": 10, "slow_period": 25},
"out_of_sample_return": 0.08,
"out_of_sample_sharpe": 1.1
}
],
"stability_metrics": {
"parameter_stability": 0.85,
"performance_stability": 0.72,
"overfitting_risk": "low"
}
}
```
### monte_carlo_simulation
Run Monte Carlo simulation on backtest results to assess risk and confidence intervals.
**Function**: `monte_carlo_simulation`
**Parameters**:
- `symbol` (str, required): Stock symbol
- `strategy` (str, default: "sma_cross"): Strategy type
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `num_simulations` (int, default: 1000): Number of Monte Carlo simulations
- Strategy-specific parameters (same as `run_backtest`)
**Returns**:
```json
{
"symbol": "AAPL",
"strategy": "sma_cross",
"num_simulations": 1000,
"confidence_intervals": {
"95%": {"lower": 0.05, "upper": 0.32},
"90%": {"lower": 0.08, "upper": 0.28},
"68%": {"lower": 0.12, "upper": 0.22}
},
"risk_metrics": {
"probability_of_loss": 0.15,
"expected_return": 0.17,
"value_at_risk_5%": -0.12,
"expected_shortfall": -0.18,
"maximum_drawdown_95%": -0.15
},
"simulation_statistics": {
"mean_return": 0.168,
"std_return": 0.089,
"skewness": -0.23,
"kurtosis": 2.85,
"best_simulation": 0.45,
"worst_simulation": -0.28
}
}
```
### compare_strategies
Compare multiple strategies on the same symbol to identify the best performer.
**Function**: `compare_strategies`
**Parameters**:
- `symbol` (str, required): Stock symbol
- `strategies` (list[str], optional): List of strategy types to compare (defaults to top 5)
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
**Returns**:
```json
{
"symbol": "AAPL",
"comparison_period": "2023-01-01 to 2024-01-01",
"strategies_compared": ["sma_cross", "rsi", "macd", "bollinger", "momentum"],
"rankings": {
"by_sharpe_ratio": [
{"strategy": "macd", "sharpe_ratio": 1.45},
{"strategy": "sma_cross", "sharpe_ratio": 1.22},
{"strategy": "momentum", "sharpe_ratio": 0.98}
],
"by_total_return": [
{"strategy": "momentum", "total_return": 0.32},
{"strategy": "macd", "total_return": 0.28},
{"strategy": "sma_cross", "total_return": 0.18}
]
},
"detailed_comparison": {
"sma_cross": {
"total_return": 0.18,
"sharpe_ratio": 1.22,
"max_drawdown": -0.08,
"total_trades": 24,
"win_rate": 0.58
}
},
"best_overall": "macd",
"recommendation": "MACD strategy provides best risk-adjusted returns"
}
```
### backtest_portfolio
Backtest a strategy across multiple symbols to create a diversified portfolio.
**Function**: `backtest_portfolio`
**Parameters**:
- `symbols` (list[str], required): List of stock symbols
- `strategy` (str, default: "sma_cross"): Strategy type to apply
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `initial_capital` (float, default: 10000.0): Starting capital
- `position_size` (float, default: 0.1): Position size per symbol (0.1 = 10%)
- Strategy-specific parameters (same as `run_backtest`)
**Returns**:
```json
{
"portfolio_metrics": {
"symbols_tested": 5,
"total_return": 0.22,
"average_sharpe": 1.15,
"max_drawdown": -0.12,
"total_trades": 120,
"diversification_benefit": 0.85
},
"individual_results": [
{
"symbol": "AAPL",
"total_return": 0.18,
"sharpe_ratio": 1.22,
"max_drawdown": -0.08,
"contribution_to_portfolio": 0.24
}
],
"correlation_matrix": {
"AAPL": {"MSFT": 0.72, "GOOGL": 0.68},
"MSFT": {"GOOGL": 0.75}
},
"summary": "Portfolio backtest of 5 symbols with sma_cross strategy"
}
```
## Strategy Management
### list_strategies
List all available backtesting strategies with descriptions and parameters.
**Function**: `list_strategies`
**Parameters**: None
**Returns**:
```json
{
"available_strategies": {
"sma_cross": {
"type": "sma_cross",
"name": "SMA Crossover",
"description": "Buy when fast SMA crosses above slow SMA, sell when it crosses below",
"default_parameters": {"fast_period": 10, "slow_period": 20},
"optimization_ranges": {
"fast_period": [5, 10, 15, 20],
"slow_period": [20, 30, 50, 100]
}
}
},
"total_count": 9,
"categories": {
"trend_following": ["sma_cross", "ema_cross", "macd", "breakout"],
"mean_reversion": ["rsi", "bollinger", "mean_reversion"],
"momentum": ["momentum", "volume_momentum"]
}
}
```
### parse_strategy
Parse natural language strategy description into VectorBT parameters.
**Function**: `parse_strategy`
**Parameters**:
- `description` (str, required): Natural language description of trading strategy
**Returns**:
```json
{
"success": true,
"strategy": {
"strategy_type": "rsi",
"parameters": {
"period": 14,
"oversold": 30,
"overbought": 70
}
},
"message": "Successfully parsed as rsi strategy"
}
```
**Examples**:
```python
# Parse natural language descriptions
parse_strategy("Buy when RSI is below 30 and sell when above 70")
parse_strategy("Use 10-day and 20-day moving average crossover")
parse_strategy("MACD strategy with standard parameters")
```
## Visualization Tools
### generate_backtest_charts
Generate comprehensive charts for a backtest including equity curve, trades, and performance dashboard.
**Function**: `generate_backtest_charts`
**Parameters**:
- `symbol` (str, required): Stock symbol
- `strategy` (str, default: "sma_cross"): Strategy type
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `theme` (str, default: "light"): Chart theme ("light" or "dark")
**Returns**:
```json
{
"equity_curve": "...",
"trade_scatter": "...",
"performance_dashboard": "..."
}
```
### generate_optimization_charts
Generate heatmap charts for strategy parameter optimization results.
**Function**: `generate_optimization_charts`
**Parameters**:
- `symbol` (str, required): Stock symbol
- `strategy` (str, default: "sma_cross"): Strategy type
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `theme` (str, default: "light"): Chart theme ("light" or "dark")
**Returns**:
```json
{
"optimization_heatmap": "..."
}
```
## ML-Enhanced Strategies
### run_ml_strategy_backtest
Run backtest using machine learning-enhanced strategies with adaptive capabilities.
**Function**: `run_ml_strategy_backtest`
**Parameters**:
- `symbol` (str, required): Stock symbol to backtest
- `strategy_type` (str, default: "ml_predictor"): ML strategy type ("ml_predictor", "adaptive", "ensemble", "regime_aware")
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `initial_capital` (float, default: 10000.0): Initial capital amount
- `train_ratio` (float, default: 0.8): Ratio of data for training (0.0-1.0)
- `model_type` (str, default: "random_forest"): ML model type
- `n_estimators` (int, default: 100): Number of estimators for ensemble models
- `max_depth` (int, optional): Maximum tree depth
- `learning_rate` (float, default: 0.01): Learning rate for adaptive strategies
- `adaptation_method` (str, default: "gradient"): Adaptation method ("gradient", "momentum")
**Returns**:
```json
{
"symbol": "AAPL",
"strategy_type": "ml_predictor",
"metrics": {
"total_return": 0.24,
"sharpe_ratio": 1.35,
"max_drawdown": -0.09
},
"ml_metrics": {
"training_period": 400,
"testing_period": 100,
"train_test_split": 0.8,
"feature_importance": {
"rsi": 0.25,
"macd": 0.22,
"volume_ratio": 0.18,
"price_momentum": 0.16
},
"model_accuracy": 0.68,
"prediction_confidence": 0.72
}
}
```
### train_ml_predictor
Train a machine learning predictor model for generating trading signals.
**Function**: `train_ml_predictor`
**Parameters**:
- `symbol` (str, required): Stock symbol to train on
- `start_date` (str, optional): Start date for training data
- `end_date` (str, optional): End date for training data
- `model_type` (str, default: "random_forest"): ML model type
- `target_periods` (int, default: 5): Forward periods for target variable
- `return_threshold` (float, default: 0.02): Return threshold for signal classification
- `n_estimators` (int, default: 100): Number of estimators
- `max_depth` (int, optional): Maximum tree depth
- `min_samples_split` (int, default: 2): Minimum samples to split
**Returns**:
```json
{
"symbol": "AAPL",
"model_type": "random_forest",
"training_period": "2022-01-01 to 2024-01-01",
"data_points": 500,
"target_periods": 5,
"return_threshold": 0.02,
"model_parameters": {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 2
},
"training_metrics": {
"accuracy": 0.68,
"precision": 0.72,
"recall": 0.65,
"f1_score": 0.68,
"feature_importance": {
"rsi_14": 0.25,
"macd_signal": 0.22,
"volume_sma_ratio": 0.18
}
}
}
```
### analyze_market_regimes
Analyze market regimes using machine learning to identify different market conditions.
**Function**: `analyze_market_regimes`
**Parameters**:
- `symbol` (str, required): Stock symbol to analyze
- `start_date` (str, optional): Start date for analysis
- `end_date` (str, optional): End date for analysis
- `method` (str, default: "hmm"): Detection method ("hmm", "kmeans", "threshold")
- `n_regimes` (int, default: 3): Number of regimes to detect
- `lookback_period` (int, default: 50): Lookback period for regime detection
**Returns**:
```json
{
"symbol": "AAPL",
"analysis_period": "2023-01-01 to 2024-01-01",
"method": "hmm",
"n_regimes": 3,
"regime_names": {
"0": "Bear/Declining",
"1": "Sideways/Uncertain",
"2": "Bull/Trending"
},
"current_regime": 2,
"regime_counts": {"0": 45, "1": 89, "2": 118},
"regime_percentages": {"0": 17.9, "1": 35.3, "2": 46.8},
"average_regime_durations": {"0": 15.2, "1": 22.3, "2": 28.7},
"recent_regime_history": [
{
"date": "2024-01-15",
"regime": 2,
"probabilities": [0.05, 0.15, 0.80]
}
],
"total_regime_switches": 18
}
```
### create_strategy_ensemble
Create and backtest a strategy ensemble that combines multiple base strategies.
**Function**: `create_strategy_ensemble`
**Parameters**:
- `symbols` (list[str], required): List of stock symbols
- `base_strategies` (list[str], optional): List of base strategy names (defaults to ["sma_cross", "rsi", "macd"])
- `weighting_method` (str, default: "performance"): Weighting method ("performance", "equal", "volatility")
- `start_date` (str, optional): Start date for backtesting
- `end_date` (str, optional): End date for backtesting
- `initial_capital` (float, default: 10000.0): Initial capital per symbol
**Returns**:
```json
{
"ensemble_summary": {
"symbols_tested": 5,
"base_strategies": ["sma_cross", "rsi", "macd"],
"weighting_method": "performance",
"average_return": 0.19,
"total_trades": 87,
"average_trades_per_symbol": 17.4
},
"individual_results": [
{
"symbol": "AAPL",
"results": {
"total_return": 0.21,
"sharpe_ratio": 1.18
},
"ensemble_metrics": {
"strategy_weights": {"sma_cross": 0.4, "rsi": 0.3, "macd": 0.3},
"strategy_performance": {"sma_cross": 0.15, "rsi": 0.12, "macd": 0.18}
}
}
],
"final_strategy_weights": {"sma_cross": 0.42, "rsi": 0.28, "macd": 0.30}
}
```
## Intelligent Backtesting Workflow
### run_intelligent_backtest
Run comprehensive intelligent backtesting workflow with automatic market regime analysis and strategy optimization.
**Function**: `run_intelligent_backtest`
**Parameters**:
- `symbol` (str, required): Stock symbol to analyze (e.g., 'AAPL', 'TSLA')
- `start_date` (str, optional): Start date (YYYY-MM-DD), defaults to 1 year ago
- `end_date` (str, optional): End date (YYYY-MM-DD), defaults to today
- `initial_capital` (float, default: 10000.0): Starting capital for backtest
- `requested_strategy` (str, optional): User-preferred strategy (e.g., 'sma_cross', 'rsi', 'macd')
**Returns**:
```json
{
"symbol": "AAPL",
"analysis_period": "2023-01-01 to 2024-01-01",
"execution_metadata": {
"total_execution_time": 45.2,
"steps_completed": 6,
"confidence_score": 0.87
},
"market_regime_analysis": {
"current_regime": "trending",
"regime_confidence": 0.85,
"market_characteristics": {
"volatility_percentile": 35,
"trend_strength": 0.72,
"volume_profile": "above_average"
}
},
"strategy_recommendations": [
{
"strategy": "macd",
"fitness_score": 0.92,
"recommended_parameters": {"fast_period": 12, "slow_period": 26, "signal_period": 9},
"expected_performance": {"sharpe_ratio": 1.45, "total_return": 0.28}
},
{
"strategy": "sma_cross",
"fitness_score": 0.88,
"recommended_parameters": {"fast_period": 8, "slow_period": 21},
"expected_performance": {"sharpe_ratio": 1.32, "total_return": 0.24}
}
],
"optimization_results": {
"best_strategy": "macd",
"optimized_parameters": {"fast_period": 12, "slow_period": 26, "signal_period": 9},
"optimization_method": "grid_search",
"combinations_tested": 48
},
"validation_results": {
"walk_forward_analysis": {
"out_of_sample_sharpe": 1.28,
"consistency_score": 0.82,
"overfitting_risk": "low"
},
"monte_carlo_simulation": {
"probability_of_loss": 0.12,
"95_percent_confidence_interval": {"lower": 0.08, "upper": 0.35}
}
},
"final_recommendation": {
"recommended_strategy": "macd",
"confidence_level": "high",
"expected_annual_return": 0.28,
"expected_sharpe_ratio": 1.45,
"maximum_expected_drawdown": -0.09,
"risk_assessment": "moderate",
"implementation_notes": [
"Strategy performs well in trending markets",
"Consider position sizing based on volatility",
"Monitor for regime changes"
]
}
}
```
### quick_market_regime_analysis
Perform fast market regime analysis and basic strategy recommendations without full optimization.
**Function**: `quick_market_regime_analysis`
**Parameters**:
- `symbol` (str, required): Stock symbol to analyze
- `start_date` (str, optional): Start date (YYYY-MM-DD), defaults to 1 year ago
- `end_date` (str, optional): End date (YYYY-MM-DD), defaults to today
**Returns**:
```json
{
"symbol": "AAPL",
"analysis_type": "quick_analysis",
"execution_time": 8.5,
"market_regime": {
"classification": "trending",
"confidence": 0.78,
"characteristics": {
"trend_direction": "bullish",
"volatility_level": "moderate",
"volume_profile": "above_average"
}
},
"strategy_recommendations": [
{
"strategy": "sma_cross",
"fitness_score": 0.85,
"reasoning": "Strong trend favors moving average strategies"
},
{
"strategy": "macd",
"fitness_score": 0.82,
"reasoning": "MACD works well in trending environments"
},
{
"strategy": "momentum",
"fitness_score": 0.79,
"reasoning": "Momentum strategies benefit from clear trends"
}
],
"market_conditions_summary": {
"overall_assessment": "favorable_for_trend_following",
"risk_level": "moderate",
"recommended_position_sizing": "standard"
}
}
```
### explain_market_regime
Get detailed explanation of market regime characteristics and suitable strategies.
**Function**: `explain_market_regime`
**Parameters**:
- `regime` (str, required): Market regime to explain ("trending", "ranging", "volatile", "volatile_trending", "low_volume")
**Returns**:
```json
{
"regime": "trending",
"explanation": {
"description": "A market in a clear directional movement (up or down trend)",
"characteristics": [
"Strong directional price movement",
"Higher highs and higher lows (uptrend) or lower highs and lower lows (downtrend)",
"Good momentum indicators",
"Volume supporting the trend direction"
],
"best_strategies": ["sma_cross", "ema_cross", "macd", "breakout", "momentum"],
"avoid_strategies": ["rsi", "mean_reversion", "bollinger"],
"risk_factors": [
"Trend reversals can be sudden",
"False breakouts in weak trends",
"Momentum strategies can give late signals"
]
},
"trading_tips": [
"Focus on sma_cross, ema_cross, macd, breakout, momentum strategies",
"Avoid rsi, mean_reversion, bollinger strategies",
"Always use proper risk management",
"Consider the broader market context"
]
}
```
## Available Strategies
### Traditional Technical Analysis Strategies
#### 1. SMA Crossover (`sma_cross`)
- **Description**: Buy when fast SMA crosses above slow SMA, sell when crosses below
- **Default Parameters**: `fast_period=10, slow_period=20`
- **Best For**: Trending markets
- **Optimization Ranges**: fast_period [5-20], slow_period [20-100]
#### 2. EMA Crossover (`ema_cross`)
- **Description**: Exponential moving average crossover with faster response than SMA
- **Default Parameters**: `fast_period=12, slow_period=26`
- **Best For**: Trending markets with more responsiveness
- **Optimization Ranges**: fast_period [8-20], slow_period [20-50]
#### 3. RSI Mean Reversion (`rsi`)
- **Description**: Buy oversold (RSI < 30), sell overbought (RSI > 70)
- **Default Parameters**: `period=14, oversold=30, overbought=70`
- **Best For**: Ranging/sideways markets
- **Optimization Ranges**: period [7-21], oversold [20-35], overbought [65-80]
#### 4. MACD Signal (`macd`)
- **Description**: Buy when MACD crosses above signal line, sell when crosses below
- **Default Parameters**: `fast_period=12, slow_period=26, signal_period=9`
- **Best For**: Trending markets with momentum confirmation
- **Optimization Ranges**: fast_period [8-14], slow_period [21-30], signal_period [7-11]
#### 5. Bollinger Bands (`bollinger`)
- **Description**: Buy at lower band (oversold), sell at upper band (overbought)
- **Default Parameters**: `period=20, std_dev=2.0`
- **Best For**: Mean-reverting/ranging markets
- **Optimization Ranges**: period [10-25], std_dev [1.5-3.0]
#### 6. Momentum (`momentum`)
- **Description**: Buy strong momentum, sell weak momentum based on returns threshold
- **Default Parameters**: `lookback=20, threshold=0.05`
- **Best For**: Trending markets with clear momentum
- **Optimization Ranges**: lookback [10-30], threshold [0.02-0.10]
#### 7. Mean Reversion (`mean_reversion`)
- **Description**: Buy when price is below moving average by threshold
- **Default Parameters**: `ma_period=20, entry_threshold=0.02, exit_threshold=0.01`
- **Best For**: Sideways/ranging markets
- **Optimization Ranges**: ma_period [15-50], entry_threshold [0.01-0.05]
#### 8. Channel Breakout (`breakout`)
- **Description**: Buy on breakout above rolling high, sell on breakdown below rolling low
- **Default Parameters**: `lookback=20, exit_lookback=10`
- **Best For**: Volatile trending markets
- **Optimization Ranges**: lookback [10-50], exit_lookback [5-20]
#### 9. Volume-Weighted Momentum (`volume_momentum`)
- **Description**: Momentum strategy filtered by volume surge
- **Default Parameters**: `momentum_period=20, volume_period=20, momentum_threshold=0.05, volume_multiplier=1.5`
- **Best For**: Markets with significant volume participation
- **Optimization Ranges**: momentum_period [10-30], volume_multiplier [1.2-2.0]
### ML-Enhanced Strategies
#### 1. ML Predictor (`ml_predictor`)
- Uses machine learning models (Random Forest, etc.) to predict future price movements
- Features: Technical indicators, price patterns, volume analysis
- Training/testing split with out-of-sample validation
#### 2. Adaptive Strategy (`adaptive`)
- Adapts base strategy parameters based on recent performance
- Uses gradient-based or momentum-based adaptation methods
- Continuously learns from market feedback
#### 3. Strategy Ensemble (`ensemble`)
- Combines multiple base strategies with dynamic weighting
- Weighting methods: performance-based, equal-weight, volatility-adjusted
- Provides diversification benefits
#### 4. Regime-Aware Strategy (`regime_aware`)
- Automatically switches between different strategies based on detected market regime
- Uses Hidden Markov Models or clustering for regime detection
- Optimizes strategy selection for current market conditions
## Performance Considerations
### Execution Times
- **Simple Backtest**: 2-5 seconds
- **Strategy Optimization**: 30-120 seconds (depending on level)
- **Walk-Forward Analysis**: 60-300 seconds
- **Monte Carlo Simulation**: 45-90 seconds
- **ML Strategy Training**: 60-180 seconds
- **Intelligent Backtest**: 120-300 seconds (full workflow)
### Memory Usage
- **Single Symbol**: 50-200 MB
- **Portfolio (5 symbols)**: 200-500 MB
- **ML Training**: 100-1000 MB (depending on data size)
### Optimization Levels
- **Coarse**: 16-36 parameter combinations, fastest
- **Medium**: 36-100 combinations, balanced speed/accuracy
- **Fine**: 100-500+ combinations, most thorough
## Error Handling
### Common Errors
#### Insufficient Data
```json
{
"error": "Insufficient data for backtest (minimum 100 data points)",
"symbol": "PENNY_STOCK",
"message": "Please use a longer time period or different symbol"
}
```
#### Invalid Strategy
```json
{
"error": "Unknown strategy type: invalid_strategy",
"available_strategies": ["sma_cross", "rsi", "macd", ...],
"message": "Please use one of the available strategy types"
}
```
#### Parameter Validation
```json
{
"error": "Invalid parameter value",
"parameter": "fast_period",
"value": -5,
"message": "fast_period must be positive integer"
}
```
#### ML Training Errors
```json
{
"error": "ML training failed: Insufficient data for training (minimum 200 data points)",
"symbol": "LOW_DATA_STOCK",
"message": "ML strategies require more historical data"
}
```
### Troubleshooting
1. **Data Issues**: Ensure sufficient historical data (minimum 100 points, 200+ for ML)
2. **Parameter Validation**: Check parameter types and ranges
3. **Memory Issues**: Reduce number of symbols in portfolio backtests
4. **Timeout Issues**: Use coarse optimization for faster results
5. **Strategy Parsing**: Use exact strategy names from `list_strategies`
## Integration Examples
### Claude Desktop Usage
```
# Basic backtest
"Run a backtest for AAPL using RSI strategy with 14-day period"
# Strategy comparison
"Compare SMA crossover, RSI, and MACD strategies on Tesla stock"
# Intelligent analysis
"Run intelligent backtest on Microsoft stock and recommend the best strategy"
# Portfolio backtest
"Backtest momentum strategy on AAPL, MSFT, GOOGL, AMZN, and TSLA"
# Optimization
"Optimize MACD parameters for Netflix stock over the last 2 years"
# ML strategies
"Train an ML predictor on Amazon stock and test its performance"
```
### API Integration
```python
# Using MCP client
import mcp
client = mcp.Client("maverick-mcp")
# Run backtest
result = await client.call_tool("run_backtest", {
"symbol": "AAPL",
"strategy": "sma_cross",
"fast_period": 10,
"slow_period": 20,
"initial_capital": 50000
})
# Optimize strategy
optimization = await client.call_tool("optimize_strategy", {
"symbol": "TSLA",
"strategy": "rsi",
"optimization_level": "medium",
"optimization_metric": "sharpe_ratio"
})
# Intelligent backtest
intelligent_result = await client.call_tool("run_intelligent_backtest", {
"symbol": "MSFT",
"start_date": "2022-01-01",
"end_date": "2023-12-31"
})
```
### Workflow Integration
```python
# Complete backtesting workflow
symbols = ["AAPL", "MSFT", "GOOGL"]
strategies = ["sma_cross", "rsi", "macd"]
for symbol in symbols:
# 1. Quick regime analysis
regime = await client.call_tool("quick_market_regime_analysis", {
"symbol": symbol
})
# 2. Strategy comparison
comparison = await client.call_tool("compare_strategies", {
"symbol": symbol,
"strategies": strategies
})
# 3. Optimize best strategy
best_strategy = comparison["best_overall"]
optimization = await client.call_tool("optimize_strategy", {
"symbol": symbol,
"strategy": best_strategy
})
# 4. Validate with walk-forward
validation = await client.call_tool("walk_forward_analysis", {
"symbol": symbol,
"strategy": best_strategy
})
```
## Best Practices
### Strategy Selection
1. **Trending Markets**: Use sma_cross, ema_cross, macd, breakout, momentum
2. **Ranging Markets**: Use rsi, bollinger, mean_reversion
3. **Volatile Markets**: Use breakout, volatility_breakout with wider stops
4. **Unknown Conditions**: Use intelligent_backtest for automatic selection
### Parameter Optimization
1. **Start with Default**: Test default parameters first
2. **Use Medium Level**: Good balance of thoroughness and speed
3. **Validate Results**: Always use walk-forward analysis for final validation
4. **Avoid Overfitting**: Check for consistent out-of-sample performance
### Risk Management
1. **Position Sizing**: Never risk more than 1-2% per trade
2. **Diversification**: Test strategies across multiple symbols
3. **Regime Awareness**: Monitor market regime changes
4. **Drawdown Limits**: Set maximum acceptable drawdown levels
### Performance Optimization
1. **Parallel Processing**: Use portfolio backtests for batch analysis
2. **Caching**: Results are cached for faster repeated analysis
3. **Data Efficiency**: Use appropriate date ranges to balance data needs and speed
4. **ML Considerations**: Ensure sufficient training data for ML strategies
This comprehensive API documentation provides everything needed to effectively use the MaverickMCP backtesting system. Each tool is designed to work independently or as part of larger workflows, with extensive error handling and performance optimization built-in.
```
--------------------------------------------------------------------------------
/maverick_mcp/agents/optimized_research.py:
--------------------------------------------------------------------------------
```python
"""
Optimized Deep Research Agent with LLM-side optimizations to prevent timeouts.
This module integrates the comprehensive LLM optimization strategies including:
- Adaptive model selection based on time constraints
- Progressive token budgeting with confidence tracking
- Parallel LLM processing with intelligent load balancing
- Optimized prompt engineering for speed
- Early termination based on confidence thresholds
- Content filtering to reduce processing overhead
"""
import asyncio
import logging
import time
from datetime import datetime
from typing import Any
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.checkpoint.memory import MemorySaver
from maverick_mcp.agents.deep_research import (
PERSONA_RESEARCH_FOCUS,
RESEARCH_DEPTH_LEVELS,
ContentAnalyzer,
DeepResearchAgent,
)
from maverick_mcp.providers.openrouter_provider import OpenRouterProvider, TaskType
from maverick_mcp.utils.llm_optimization import (
AdaptiveModelSelector,
ConfidenceTracker,
IntelligentContentFilter,
OptimizedPromptEngine,
ParallelLLMProcessor,
ProgressiveTokenBudgeter,
)
from maverick_mcp.utils.orchestration_logging import (
get_orchestration_logger,
log_method_call,
log_performance_metrics,
)
# Import moved to avoid circular dependency
logger = logging.getLogger(__name__)
class OptimizedContentAnalyzer(ContentAnalyzer):
"""Enhanced ContentAnalyzer with LLM optimizations."""
def __init__(self, openrouter_provider: OpenRouterProvider):
# Initialize with OpenRouter provider instead of single LLM
self.openrouter_provider = openrouter_provider
self.model_selector = AdaptiveModelSelector(openrouter_provider)
self.prompt_engine = OptimizedPromptEngine()
self.parallel_processor = ParallelLLMProcessor(openrouter_provider)
async def analyze_content_optimized(
self,
content: str,
persona: str,
analysis_focus: str = "general",
time_budget_seconds: float = 30.0,
current_confidence: float = 0.0,
) -> dict[str, Any]:
"""Analyze content with time-optimized LLM selection and prompting."""
if not content or not content.strip():
return self._create_empty_analysis()
# Calculate content complexity
complexity_score = self.model_selector.calculate_task_complexity(
content, TaskType.SENTIMENT_ANALYSIS, [analysis_focus]
)
# Select optimal model for time budget
model_config = self.model_selector.select_model_for_time_budget(
task_type=TaskType.SENTIMENT_ANALYSIS,
time_remaining_seconds=time_budget_seconds,
complexity_score=complexity_score,
content_size_tokens=len(content) // 4, # Rough token estimate
current_confidence=current_confidence,
)
# Create optimized prompt
optimized_prompt = self.prompt_engine.get_optimized_prompt(
prompt_type="content_analysis",
time_remaining=time_budget_seconds,
confidence_level=current_confidence,
content=content[: model_config.max_tokens * 3], # Limit content size
persona=persona,
focus_areas=analysis_focus,
)
# Execute with optimized LLM
try:
llm = self.openrouter_provider.get_llm(
model_override=model_config.model_id,
temperature=model_config.temperature,
max_tokens=model_config.max_tokens,
)
start_time = time.time()
response = await asyncio.wait_for(
llm.ainvoke(
[
SystemMessage(
content="You are a financial content analyst. Return structured JSON analysis."
),
HumanMessage(content=optimized_prompt),
]
),
timeout=model_config.timeout_seconds,
)
execution_time = time.time() - start_time
# Parse response
analysis = self._parse_optimized_response(response.content, persona)
analysis["execution_time"] = execution_time
analysis["model_used"] = model_config.model_id
analysis["optimization_applied"] = True
return analysis
except TimeoutError:
logger.warning(
f"Content analysis timed out after {model_config.timeout_seconds}s"
)
return self._fallback_analysis(content, persona)
except Exception as e:
logger.warning(f"Optimized content analysis failed: {e}")
return self._fallback_analysis(content, persona)
async def batch_analyze_content(
self,
sources: list[dict],
persona: str,
analysis_type: str,
time_budget_seconds: float,
current_confidence: float = 0.0,
) -> list[dict]:
"""Analyze multiple sources using parallel processing."""
return await self.parallel_processor.parallel_content_analysis(
sources=sources,
analysis_type=analysis_type,
persona=persona,
time_budget_seconds=time_budget_seconds,
current_confidence=current_confidence,
)
def _parse_optimized_response(
self, response_content: str, persona: str
) -> dict[str, Any]:
"""Parse LLM response with fallback handling."""
try:
# Try to parse as JSON first
import json
if response_content.strip().startswith("{"):
return json.loads(response_content.strip())
except Exception:
pass
# Try structured text parsing
try:
return self._parse_structured_response(response_content, persona)
except Exception:
# Final fallback
return self._fallback_analysis(response_content, persona)
def _parse_structured_response(self, response: str, persona: str) -> dict[str, Any]:
"""Parse structured text response."""
import re
# Extract sentiment
sentiment_match = re.search(
r"sentiment:?\s*(\w+)[,\s]*(?:confidence:?\s*([\d.]+))?", response.lower()
)
if sentiment_match:
direction = sentiment_match.group(1).lower()
confidence = float(sentiment_match.group(2) or 0.6)
# Normalize sentiment terms
if direction in ["bull", "bullish", "positive", "buy"]:
direction = "bullish"
elif direction in ["bear", "bearish", "negative", "sell"]:
direction = "bearish"
else:
direction = "neutral"
else:
direction = "neutral"
confidence = 0.5
# Extract insights
insights = []
insight_patterns = [
r"insight:?\s*([^\n.]+)",
r"key point:?\s*([^\n.]+)",
r"finding:?\s*([^\n.]+)",
]
for pattern in insight_patterns:
matches = re.findall(pattern, response, re.IGNORECASE)
insights.extend([m.strip() for m in matches if m.strip()])
# Extract risks and opportunities
risks = re.findall(r"risk:?\s*([^\n.]+)", response, re.IGNORECASE)
opportunities = re.findall(
r"opportunit(?:y|ies):?\s*([^\n.]+)", response, re.IGNORECASE
)
# Extract scores
relevance_match = re.search(r"relevance:?\s*([\d.]+)", response.lower())
relevance_score = float(relevance_match.group(1)) if relevance_match else 0.6
credibility_match = re.search(r"credibility:?\s*([\d.]+)", response.lower())
credibility_score = (
float(credibility_match.group(1)) if credibility_match else 0.7
)
return {
"insights": insights[:5],
"sentiment": {"direction": direction, "confidence": confidence},
"risk_factors": [r.strip() for r in risks[:3]],
"opportunities": [o.strip() for o in opportunities[:3]],
"credibility_score": credibility_score,
"relevance_score": relevance_score,
"summary": f"Analysis for {persona} investor using optimized processing",
"analysis_timestamp": datetime.now(),
"structured_parsing": True,
}
def _create_empty_analysis(self) -> dict[str, Any]:
"""Create empty analysis for invalid content."""
return {
"insights": [],
"sentiment": {"direction": "neutral", "confidence": 0.0},
"risk_factors": [],
"opportunities": [],
"credibility_score": 0.0,
"relevance_score": 0.0,
"summary": "No content to analyze",
"analysis_timestamp": datetime.now(),
"empty_content": True,
}
class OptimizedDeepResearchAgent(DeepResearchAgent):
"""
Deep research agent with comprehensive LLM-side optimizations to prevent timeouts.
Integrates all optimization strategies:
- Adaptive model selection
- Progressive token budgeting
- Parallel LLM processing
- Optimized prompting
- Early termination
- Content filtering
"""
def __init__(
self,
openrouter_provider: OpenRouterProvider,
persona: str = "moderate",
checkpointer: MemorySaver | None = None,
ttl_hours: int = 24,
exa_api_key: str | None = None,
default_depth: str = "standard",
max_sources: int | None = None,
research_depth: str | None = None,
enable_parallel_execution: bool = True,
parallel_config=None, # Type: ParallelResearchConfig | None
optimization_enabled: bool = True,
):
"""Initialize optimized deep research agent."""
# Import here to avoid circular dependency
self.openrouter_provider = openrouter_provider
self.optimization_enabled = optimization_enabled
# Initialize optimization components
if optimization_enabled:
self.model_selector = AdaptiveModelSelector(openrouter_provider)
self.token_budgeter = None # Will be created per request
self.prompt_engine = OptimizedPromptEngine()
self.confidence_tracker = None # Will be created per request
self.content_filter = IntelligentContentFilter()
self.parallel_processor = ParallelLLMProcessor(openrouter_provider)
# Replace content analyzer with optimized version
self.optimized_analyzer = OptimizedContentAnalyzer(openrouter_provider)
# Initialize base class with dummy LLM (we'll use OpenRouter provider instead)
dummy_llm = openrouter_provider.get_llm(TaskType.GENERAL)
super().__init__(
llm=dummy_llm,
persona=persona,
checkpointer=checkpointer,
ttl_hours=ttl_hours,
exa_api_key=exa_api_key,
default_depth=default_depth,
max_sources=max_sources,
research_depth=research_depth,
enable_parallel_execution=enable_parallel_execution,
parallel_config=parallel_config,
)
logger.info("OptimizedDeepResearchAgent initialized")
@log_method_call(component="OptimizedDeepResearchAgent", include_timing=True)
async def research_comprehensive(
self,
topic: str,
session_id: str,
depth: str | None = None,
focus_areas: list[str] | None = None,
timeframe: str = "30d",
time_budget_seconds: float = 120.0, # Default 2 minutes
target_confidence: float = 0.75,
**kwargs,
) -> dict[str, Any]:
"""
Comprehensive research with LLM optimizations to prevent timeouts.
Args:
topic: Research topic or company/symbol
session_id: Session identifier
depth: Research depth (basic/standard/comprehensive/exhaustive)
focus_areas: Specific areas to focus on
timeframe: Time range for research
time_budget_seconds: Maximum time allowed for research
target_confidence: Target confidence level for early termination
**kwargs: Additional parameters
Returns:
Comprehensive research results with optimization metrics
"""
if not self.optimization_enabled:
# Fall back to parent implementation
return await super().research_comprehensive(
topic, session_id, depth, focus_areas, timeframe, **kwargs
)
# Check if search providers are available
if not self.search_providers:
return {
"error": "Research functionality unavailable - no search providers configured",
"details": "Please configure EXA_API_KEY or TAVILY_API_KEY environment variables",
"topic": topic,
"optimization_enabled": self.optimization_enabled,
}
start_time = time.time()
depth = depth or self.default_depth
# Initialize optimization components for this request
self.token_budgeter = ProgressiveTokenBudgeter(
total_time_budget_seconds=time_budget_seconds,
confidence_target=target_confidence,
)
self.confidence_tracker = ConfidenceTracker(
target_confidence=target_confidence,
min_sources=3,
max_sources=RESEARCH_DEPTH_LEVELS[depth]["max_sources"],
)
orchestration_logger = get_orchestration_logger("OptimizedDeepResearchAgent")
orchestration_logger.set_request_context(
session_id=session_id,
topic=topic[:50],
time_budget=time_budget_seconds,
target_confidence=target_confidence,
)
orchestration_logger.info(
"🚀 OPTIMIZED_RESEARCH_START",
depth=depth,
focus_areas=focus_areas,
)
try:
# Phase 1: Search and Content Filtering
orchestration_logger.info("📋 PHASE_1_SEARCH_START")
search_time_budget = min(
time_budget_seconds * 0.2, 30
) # 20% of budget, max 30s
search_results = await self._optimized_search_phase(
topic, depth, focus_areas, search_time_budget
)
orchestration_logger.info(
"✅ PHASE_1_COMPLETE",
sources_found=len(search_results.get("filtered_sources", [])),
)
# Phase 2: Content Analysis with Parallel Processing
remaining_time = time_budget_seconds - (time.time() - start_time)
if remaining_time < 10:
orchestration_logger.warning(
"⚠️ TIME_CONSTRAINT_CRITICAL", remaining=f"{remaining_time:.1f}s"
)
return self._create_emergency_response(
topic, search_results, start_time
)
orchestration_logger.info("🔬 PHASE_2_ANALYSIS_START")
analysis_time_budget = remaining_time * 0.7 # 70% of remaining time
analysis_results = await self._optimized_analysis_phase(
search_results["filtered_sources"],
topic,
focus_areas,
analysis_time_budget,
)
orchestration_logger.info(
"✅ PHASE_2_COMPLETE",
sources_analyzed=len(analysis_results["analyzed_sources"]),
confidence=f"{analysis_results['final_confidence']:.2f}",
)
# Phase 3: Synthesis with Remaining Time
remaining_time = time_budget_seconds - (time.time() - start_time)
if remaining_time < 5:
# Skip synthesis if very little time left
synthesis_results = {
"synthesis": "Time constraints prevented full synthesis"
}
else:
orchestration_logger.info("🧠 PHASE_3_SYNTHESIS_START")
synthesis_results = await self._optimized_synthesis_phase(
analysis_results["analyzed_sources"], topic, remaining_time
)
orchestration_logger.info("✅ PHASE_3_COMPLETE")
# Compile final results
execution_time = time.time() - start_time
final_results = self._compile_optimized_results(
topic=topic,
session_id=session_id,
depth=depth,
search_results=search_results,
analysis_results=analysis_results,
synthesis_results=synthesis_results,
execution_time=execution_time,
time_budget=time_budget_seconds,
)
# Log performance metrics
log_performance_metrics(
"OptimizedDeepResearchAgent",
{
"total_execution_time": execution_time,
"time_budget_used_pct": (execution_time / time_budget_seconds)
* 100,
"sources_processed": len(analysis_results["analyzed_sources"]),
"final_confidence": analysis_results["final_confidence"],
"optimization_enabled": True,
"phases_completed": 3,
},
)
orchestration_logger.info(
"🎉 OPTIMIZED_RESEARCH_COMPLETE",
duration=f"{execution_time:.2f}s",
confidence=f"{analysis_results['final_confidence']:.2f}",
)
return final_results
except Exception as e:
execution_time = time.time() - start_time
orchestration_logger.error(
"💥 OPTIMIZED_RESEARCH_FAILED",
error=str(e),
execution_time=f"{execution_time:.2f}s",
)
return {
"status": "error",
"error": str(e),
"execution_time_ms": execution_time * 1000,
"agent_type": "optimized_deep_research",
"optimization_enabled": True,
"topic": topic,
}
async def _optimized_search_phase(
self, topic: str, depth: str, focus_areas: list[str], time_budget_seconds: float
) -> dict[str, Any]:
"""Execute search phase with content filtering."""
# Generate search queries (reuse parent logic)
persona_focus = PERSONA_RESEARCH_FOCUS[self.persona.name.lower()]
search_queries = await self._generate_search_queries(
topic, persona_focus, RESEARCH_DEPTH_LEVELS[depth]
)
# Execute searches (reuse parent logic but with time limits)
all_results = []
max_searches = min(len(search_queries), 4) # Limit searches for speed
search_tasks = []
for query in search_queries[:max_searches]:
for provider in self.search_providers[
:1
]: # Use only first provider for speed
task = self._search_with_timeout(
provider, query, time_budget_seconds / max_searches
)
search_tasks.append(task)
search_results = await asyncio.gather(*search_tasks, return_exceptions=True)
# Collect valid results
for result in search_results:
if isinstance(result, list):
all_results.extend(result)
# Apply intelligent content filtering
current_confidence = 0.0 # Starting confidence
research_focus = focus_areas[0] if focus_areas else "fundamental"
filtered_sources = await self.content_filter.filter_and_prioritize_sources(
sources=all_results,
research_focus=research_focus,
time_budget=time_budget_seconds,
current_confidence=current_confidence,
)
return {
"raw_results": all_results,
"filtered_sources": filtered_sources,
"search_queries": search_queries[:max_searches],
"filtering_applied": True,
}
async def _search_with_timeout(
self, provider, query: str, timeout: float
) -> list[dict]:
"""Execute search with timeout."""
try:
return await asyncio.wait_for(
provider.search(query, num_results=5), timeout=timeout
)
except TimeoutError:
logger.warning(f"Search timeout for query: {query}")
return []
except Exception as e:
logger.warning(f"Search failed for {query}: {e}")
return []
async def _optimized_analysis_phase(
self,
sources: list[dict],
topic: str,
focus_areas: list[str],
time_budget_seconds: float,
) -> dict[str, Any]:
"""Execute content analysis with optimizations and early termination."""
if not sources:
return {
"analyzed_sources": [],
"final_confidence": 0.0,
"early_terminated": False,
"termination_reason": "no_sources",
}
analyzed_sources = []
current_confidence = 0.0
sources_to_process = sources.copy()
# Calculate time per source
time_per_source = time_budget_seconds / len(sources_to_process)
# Use batch processing if time allows
if len(sources_to_process) > 3 and time_per_source < 8:
# Use parallel batch processing
analyzed_sources = await self.optimized_analyzer.batch_analyze_content(
sources=sources_to_process,
persona=self.persona.name.lower(),
analysis_type=focus_areas[0] if focus_areas else "general",
time_budget_seconds=time_budget_seconds,
current_confidence=current_confidence,
)
# Calculate final confidence from batch results
confidence_sum = 0
for source in analyzed_sources:
analysis = source.get("analysis", {})
sentiment = analysis.get("sentiment", {})
source_confidence = sentiment.get("confidence", 0.5)
credibility = analysis.get("credibility_score", 0.5)
confidence_sum += source_confidence * credibility
final_confidence = (
confidence_sum / len(analyzed_sources) if analyzed_sources else 0.0
)
return {
"analyzed_sources": analyzed_sources,
"final_confidence": final_confidence,
"early_terminated": False,
"termination_reason": "batch_processing_complete",
"processing_mode": "parallel_batch",
}
else:
# Use sequential processing with early termination
for _, source in enumerate(sources_to_process):
remaining_time = time_budget_seconds - (
len(analyzed_sources) * time_per_source
)
if remaining_time < 5: # Reserve minimum time
break
# Analyze source with optimizations
analysis_result = (
await self.optimized_analyzer.analyze_content_optimized(
content=source.get("content", ""),
persona=self.persona.name.lower(),
analysis_focus=focus_areas[0] if focus_areas else "general",
time_budget_seconds=min(
remaining_time / 2, 15
), # Max 15s per source
current_confidence=current_confidence,
)
)
# Add analysis to source
source["analysis"] = analysis_result
analyzed_sources.append(source)
# Update confidence tracker
credibility_score = analysis_result.get("credibility_score", 0.5)
confidence_update = self.confidence_tracker.update_confidence(
analysis_result, credibility_score
)
current_confidence = confidence_update["current_confidence"]
# Check for early termination
if not confidence_update["should_continue"]:
logger.info(
f"Early termination after {len(analyzed_sources)} sources: {confidence_update['early_termination_reason']}"
)
return {
"analyzed_sources": analyzed_sources,
"final_confidence": current_confidence,
"early_terminated": True,
"termination_reason": confidence_update[
"early_termination_reason"
],
"processing_mode": "sequential_early_termination",
}
return {
"analyzed_sources": analyzed_sources,
"final_confidence": current_confidence,
"early_terminated": False,
"termination_reason": "all_sources_processed",
"processing_mode": "sequential_complete",
}
async def _optimized_synthesis_phase(
self, analyzed_sources: list[dict], topic: str, time_budget_seconds: float
) -> dict[str, Any]:
"""Execute synthesis with optimized model selection."""
if not analyzed_sources:
return {"synthesis": "No sources available for synthesis"}
# Select optimal model for synthesis
combined_content = "\n".join(
[str(source.get("analysis", {})) for source in analyzed_sources[:5]]
)
complexity_score = self.model_selector.calculate_task_complexity(
combined_content, TaskType.RESULT_SYNTHESIS
)
model_config = self.model_selector.select_model_for_time_budget(
task_type=TaskType.RESULT_SYNTHESIS,
time_remaining_seconds=time_budget_seconds,
complexity_score=complexity_score,
content_size_tokens=len(combined_content) // 4,
)
# Create optimized synthesis prompt
synthesis_prompt = self.prompt_engine.create_time_optimized_synthesis_prompt(
sources=analyzed_sources,
persona=self.persona.name,
time_remaining=time_budget_seconds,
current_confidence=0.8, # Assume good confidence at synthesis stage
)
# Execute synthesis
try:
llm = self.openrouter_provider.get_llm(
model_override=model_config.model_id,
temperature=model_config.temperature,
max_tokens=model_config.max_tokens,
)
response = await asyncio.wait_for(
llm.ainvoke(
[
SystemMessage(
content="You are a financial research synthesizer."
),
HumanMessage(content=synthesis_prompt),
]
),
timeout=model_config.timeout_seconds,
)
return {
"synthesis": response.content,
"model_used": model_config.model_id,
"synthesis_optimized": True,
}
except Exception as e:
logger.warning(f"Optimized synthesis failed: {e}")
return {
"synthesis": f"Synthesis of {len(analyzed_sources)} sources completed with basic processing due to constraints.",
"fallback_used": True,
}
def _create_emergency_response(
self, topic: str, search_results: dict, start_time: float
) -> dict[str, Any]:
"""Create emergency response when time is critically low."""
execution_time = time.time() - start_time
source_count = len(search_results.get("filtered_sources", []))
return {
"status": "partial_success",
"agent_type": "optimized_deep_research",
"emergency_mode": True,
"topic": topic,
"sources_found": source_count,
"execution_time_ms": execution_time * 1000,
"findings": {
"synthesis": f"Emergency mode: Found {source_count} relevant sources for {topic}. "
"Full analysis was prevented by time constraints.",
"confidence_score": 0.3,
"sources_analyzed": source_count,
},
"optimization_metrics": {
"time_budget_exceeded": True,
"phases_completed": 1,
"emergency_fallback": True,
},
}
def _compile_optimized_results(
self,
topic: str,
session_id: str,
depth: str,
search_results: dict,
analysis_results: dict,
synthesis_results: dict,
execution_time: float,
time_budget: float,
) -> dict[str, Any]:
"""Compile final optimized research results."""
analyzed_sources = analysis_results["analyzed_sources"]
# Create citations
citations = []
for i, source in enumerate(analyzed_sources, 1):
analysis = source.get("analysis", {})
citation = {
"id": i,
"title": source.get("title", f"Source {i}"),
"url": source.get("url", ""),
"published_date": source.get("published_date"),
"credibility_score": analysis.get("credibility_score", 0.5),
"relevance_score": analysis.get("relevance_score", 0.5),
"optimized_analysis": analysis.get("optimization_applied", False),
}
citations.append(citation)
return {
"status": "success",
"agent_type": "optimized_deep_research",
"optimization_enabled": True,
"persona": self.persona.name,
"research_topic": topic,
"research_depth": depth,
"findings": {
"synthesis": synthesis_results.get(
"synthesis", "No synthesis available"
),
"confidence_score": analysis_results["final_confidence"],
"early_terminated": analysis_results.get("early_terminated", False),
"termination_reason": analysis_results.get("termination_reason"),
"processing_mode": analysis_results.get("processing_mode", "unknown"),
},
"sources_analyzed": len(analyzed_sources),
"citations": citations,
"execution_time_ms": execution_time * 1000,
"optimization_metrics": {
"time_budget_seconds": time_budget,
"time_used_seconds": execution_time,
"time_utilization_pct": (execution_time / time_budget) * 100,
"sources_found": len(search_results.get("raw_results", [])),
"sources_filtered": len(search_results.get("filtered_sources", [])),
"sources_processed": len(analyzed_sources),
"content_filtering_applied": search_results.get(
"filtering_applied", False
),
"parallel_processing_used": "batch"
in analysis_results.get("processing_mode", ""),
"synthesis_optimized": synthesis_results.get(
"synthesis_optimized", False
),
"optimization_features_used": [
"adaptive_model_selection",
"progressive_token_budgeting",
"content_filtering",
"optimized_prompts",
]
+ (
["parallel_processing"]
if "batch" in analysis_results.get("processing_mode", "")
else []
)
+ (
["early_termination"]
if analysis_results.get("early_terminated")
else []
),
},
"search_queries_used": search_results.get("search_queries", []),
"session_id": session_id,
}
# Factory function for easy integration
def create_optimized_research_agent(
openrouter_api_key: str,
persona: str = "moderate",
time_budget_seconds: float = 120.0,
target_confidence: float = 0.75,
**kwargs,
) -> OptimizedDeepResearchAgent:
"""Create an optimized deep research agent with recommended settings."""
openrouter_provider = OpenRouterProvider(openrouter_api_key)
return OptimizedDeepResearchAgent(
openrouter_provider=openrouter_provider,
persona=persona,
optimization_enabled=True,
**kwargs,
)
```
--------------------------------------------------------------------------------
/maverick_mcp/utils/circuit_breaker.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive circuit breaker implementation for all external API calls.
This module provides circuit breakers for:
- yfinance (Yahoo Finance)
- Tiingo API
- FRED API
- OpenRouter AI API
- Exa Search API
- Any other external services
Circuit breakers help prevent cascade failures and provide graceful degradation.
"""
import asyncio
import functools
import logging
import threading
import time
from collections import deque
from collections.abc import Callable
from enum import Enum
from typing import Any, ParamSpec, TypeVar, cast
from maverick_mcp.config.settings import get_settings
from maverick_mcp.exceptions import CircuitBreakerError, ExternalServiceError
logger = logging.getLogger(__name__)
settings = get_settings()
P = ParamSpec("P")
T = TypeVar("T")
F = TypeVar("F", bound=Callable[..., Any])
class CircuitState(Enum):
"""Circuit breaker states."""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject calls
HALF_OPEN = "half_open" # Testing if service recovered
class FailureDetectionStrategy(Enum):
"""Strategies for detecting failures."""
CONSECUTIVE_FAILURES = "consecutive" # N failures in a row
FAILURE_RATE = "failure_rate" # % of failures in time window
TIMEOUT_RATE = "timeout_rate" # % of timeouts in time window
COMBINED = "combined" # Any of the above
class CircuitBreakerConfig:
"""Configuration for a circuit breaker."""
def __init__(
self,
name: str,
failure_threshold: int = 5,
failure_rate_threshold: float = 0.5,
timeout_threshold: float = 10.0,
recovery_timeout: int = 60,
success_threshold: int = 3,
window_size: int = 60,
detection_strategy: FailureDetectionStrategy = FailureDetectionStrategy.COMBINED,
expected_exceptions: tuple[type[Exception], ...] = (Exception,),
):
"""
Initialize circuit breaker configuration.
Args:
name: Name of the circuit breaker
failure_threshold: Number of consecutive failures before opening
failure_rate_threshold: Failure rate (0-1) before opening
timeout_threshold: Timeout in seconds for calls
recovery_timeout: Seconds to wait before testing recovery
success_threshold: Successes needed in half-open to close
window_size: Time window in seconds for rate calculations
detection_strategy: Strategy for detecting failures
expected_exceptions: Exceptions to catch and count as failures
"""
self.name = name
self.failure_threshold = failure_threshold
self.failure_rate_threshold = failure_rate_threshold
self.timeout_threshold = timeout_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.window_size = window_size
self.detection_strategy = detection_strategy
self.expected_exceptions = expected_exceptions
class CircuitBreakerMetrics:
"""Metrics collection for circuit breakers."""
def __init__(self, window_size: int = 300):
"""Initialize metrics with a time window."""
self.window_size = window_size
self.calls: deque[tuple[float, bool, float]] = (
deque()
) # (timestamp, success, duration)
self.state_changes: deque[tuple[float, CircuitState]] = deque()
self._lock = threading.RLock()
def record_call(self, success: bool, duration: float):
"""Record a call result."""
with self._lock:
now = time.time()
self.calls.append((now, success, duration))
self._cleanup_old_data(now)
def record_state_change(self, new_state: CircuitState):
"""Record a state change."""
with self._lock:
now = time.time()
self.state_changes.append((now, new_state))
self._cleanup_old_data(now)
def get_stats(self) -> dict[str, Any]:
"""Get current statistics."""
with self._lock:
now = time.time()
self._cleanup_old_data(now)
if not self.calls:
return {
"total_calls": 0,
"success_rate": 1.0,
"failure_rate": 0.0,
"avg_duration": 0.0,
"timeout_rate": 0.0,
}
total = len(self.calls)
successes = sum(1 for _, success, _ in self.calls if success)
failures = total - successes
durations = [duration for _, _, duration in self.calls]
timeouts = sum(
1
for _, success, duration in self.calls
if not success and duration >= 10.0
)
return {
"total_calls": total,
"success_rate": successes / total if total > 0 else 1.0,
"failure_rate": failures / total if total > 0 else 0.0,
"avg_duration": sum(durations) / len(durations) if durations else 0.0,
"timeout_rate": timeouts / total if total > 0 else 0.0,
"min_duration": min(durations) if durations else 0.0,
"max_duration": max(durations) if durations else 0.0,
}
def get_total_calls(self) -> int:
"""Get total number of calls in the window."""
with self._lock:
now = time.time()
self._cleanup_old_data(now)
return len(self.calls)
def get_success_rate(self) -> float:
"""Get success rate in the window."""
stats = self.get_stats()
return stats["success_rate"]
def get_failure_rate(self) -> float:
"""Get failure rate in the window."""
stats = self.get_stats()
return stats["failure_rate"]
def get_average_response_time(self) -> float:
"""Get average response time in the window."""
stats = self.get_stats()
return stats["avg_duration"]
def get_last_failure_time(self) -> float | None:
"""Get timestamp of last failure."""
with self._lock:
for timestamp, success, _ in reversed(self.calls):
if not success:
return timestamp
return None
def get_uptime_percentage(self) -> float:
"""Get uptime percentage based on state changes."""
with self._lock:
if not self.state_changes:
return 100.0
now = time.time()
window_start = now - self.window_size
uptime = 0.0
last_time = window_start
last_state = CircuitState.CLOSED
for timestamp, state in self.state_changes:
if timestamp < window_start:
last_state = state
continue
if last_state == CircuitState.CLOSED:
uptime += timestamp - last_time
last_time = timestamp
last_state = state
if last_state == CircuitState.CLOSED:
uptime += now - last_time
total_time = now - window_start
return (uptime / total_time * 100) if total_time > 0 else 100.0
def _cleanup_old_data(self, now: float):
"""Remove data outside the window."""
cutoff = now - self.window_size
# Clean up calls
while self.calls and self.calls[0][0] < cutoff:
self.calls.popleft()
# Clean up state changes (keep longer history)
state_cutoff = now - (self.window_size * 10)
while self.state_changes and self.state_changes[0][0] < state_cutoff:
self.state_changes.popleft()
class EnhancedCircuitBreaker:
"""
Enhanced circuit breaker with failure rate tracking, timeouts, and metrics.
Thread-safe and supports both sync and async operations.
"""
def __init__(self, config: CircuitBreakerConfig):
"""Initialize enhanced circuit breaker."""
self.config = config
self._state = CircuitState.CLOSED
self._consecutive_failures = 0
self._half_open_successes = 0
self._last_failure_time: float | None = None
self._metrics = CircuitBreakerMetrics(config.window_size)
# Thread-safe locks
self._lock = threading.RLock()
self._async_lock = asyncio.Lock()
@property
def state(self) -> CircuitState:
"""Get current circuit state."""
with self._lock:
return self._state
@property
def consecutive_failures(self) -> int:
"""Get consecutive failures count."""
with self._lock:
return self._consecutive_failures
@property
def is_open(self) -> bool:
"""Check if circuit is open."""
return self.state == CircuitState.OPEN
@property
def is_closed(self) -> bool:
"""Check if circuit is closed."""
return self.state == CircuitState.CLOSED
def get_metrics(self) -> CircuitBreakerMetrics:
"""Get circuit breaker metrics."""
return self._metrics
def time_until_retry(self) -> float | None:
"""Get time until next retry attempt."""
with self._lock:
if self._state == CircuitState.OPEN and self._last_failure_time:
return max(
0,
self.config.recovery_timeout
- (time.time() - self._last_failure_time),
)
return None
def _should_open(self) -> bool:
"""Determine if circuit should open based on detection strategy."""
stats = self._metrics.get_stats()
if (
self.config.detection_strategy
== FailureDetectionStrategy.CONSECUTIVE_FAILURES
):
return self._consecutive_failures >= self.config.failure_threshold
elif self.config.detection_strategy == FailureDetectionStrategy.FAILURE_RATE:
return (
stats["total_calls"] >= 5 # Minimum calls for rate calculation
and stats["failure_rate"] >= self.config.failure_rate_threshold
)
elif self.config.detection_strategy == FailureDetectionStrategy.TIMEOUT_RATE:
return (
stats["total_calls"] >= 5
and stats["timeout_rate"] >= self.config.failure_rate_threshold
)
else: # COMBINED
return (
self._consecutive_failures >= self.config.failure_threshold
or (
stats["total_calls"] >= 5
and stats["failure_rate"] >= self.config.failure_rate_threshold
)
or (
stats["total_calls"] >= 5
and stats["timeout_rate"] >= self.config.failure_rate_threshold
)
)
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset."""
if self._last_failure_time is None:
return True
return (time.time() - self._last_failure_time) >= self.config.recovery_timeout
def _transition_state(self, new_state: CircuitState):
"""Transition to a new state."""
if self._state != new_state:
logger.info(
f"Circuit breaker '{self.config.name}' transitioning from {self._state.value} to {new_state.value}"
)
self._state = new_state
self._metrics.record_state_change(new_state)
def _on_success(self, duration: float):
"""Handle successful call."""
with self._lock:
self._metrics.record_call(True, duration)
self._consecutive_failures = 0
if self._state == CircuitState.HALF_OPEN:
self._half_open_successes += 1
if self._half_open_successes >= self.config.success_threshold:
self._transition_state(CircuitState.CLOSED)
self._half_open_successes = 0
def _on_failure(self, duration: float):
"""Handle failed call."""
with self._lock:
self._metrics.record_call(False, duration)
self._consecutive_failures += 1
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
self._transition_state(CircuitState.OPEN)
self._half_open_successes = 0
elif self._state == CircuitState.CLOSED and self._should_open():
self._transition_state(CircuitState.OPEN)
def call(self, func: Callable[..., Any], *args, **kwargs) -> Any:
"""Call function through circuit breaker (sync version)."""
return self.call_sync(func, *args, **kwargs)
async def call_async(self, func: Callable[..., Any], *args, **kwargs) -> Any:
"""
Call async function through circuit breaker with timeout support.
Args:
func: Async function to call
*args: Function arguments
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
CircuitBreakerError: If circuit is open
Exception: If function fails
"""
# Check if we should attempt reset
async with self._async_lock:
if self._state == CircuitState.OPEN:
if self._should_attempt_reset():
self._transition_state(CircuitState.HALF_OPEN)
self._half_open_successes = 0
else:
time_until_retry = self.config.recovery_timeout
if self._last_failure_time:
time_until_retry = max(
0,
self.config.recovery_timeout
- (time.time() - self._last_failure_time),
)
raise CircuitBreakerError(
service=self.config.name,
failure_count=self._consecutive_failures,
threshold=self.config.failure_threshold,
context={
"state": self._state.value,
"time_until_retry": round(time_until_retry, 1),
},
)
start_time = time.time()
try:
# Execute with timeout
result = await asyncio.wait_for(
func(*args, **kwargs), timeout=self.config.timeout_threshold
)
duration = time.time() - start_time
self._on_success(duration)
return result
except TimeoutError as e:
duration = time.time() - start_time
self._on_failure(duration)
logger.warning(
f"Circuit breaker '{self.config.name}' timeout after {duration:.2f}s"
)
raise ExternalServiceError(
service=self.config.name,
message=f"Service timed out after {self.config.timeout_threshold}s",
context={
"timeout": self.config.timeout_threshold,
},
) from e
except self.config.expected_exceptions:
duration = time.time() - start_time
self._on_failure(duration)
raise
def call_sync(self, func: Callable[..., Any], *args, **kwargs) -> Any:
"""
Call sync function through circuit breaker.
For sync functions, timeout is enforced differently depending on the function type.
HTTP requests should use their own timeout parameters.
"""
# Check if we should attempt reset
with self._lock:
if self._state == CircuitState.OPEN:
if self._should_attempt_reset():
self._transition_state(CircuitState.HALF_OPEN)
self._half_open_successes = 0
else:
time_until_retry = self.config.recovery_timeout
if self._last_failure_time:
time_until_retry = max(
0,
self.config.recovery_timeout
- (time.time() - self._last_failure_time),
)
raise CircuitBreakerError(
service=self.config.name,
failure_count=self._consecutive_failures,
threshold=self.config.failure_threshold,
context={
"state": self._state.value,
"time_until_retry": round(time_until_retry, 1),
},
)
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
self._on_success(duration)
return result
except self.config.expected_exceptions:
duration = time.time() - start_time
self._on_failure(duration)
raise
def reset(self):
"""Manually reset the circuit breaker."""
with self._lock:
self._transition_state(CircuitState.CLOSED)
self._consecutive_failures = 0
self._half_open_successes = 0
self._last_failure_time = None
logger.info(f"Circuit breaker '{self.config.name}' manually reset")
def get_status(self) -> dict[str, Any]:
"""Get detailed circuit breaker status."""
with self._lock:
stats = self._metrics.get_stats()
time_until_retry = None
if self._state == CircuitState.OPEN and self._last_failure_time:
time_until_retry = max(
0,
self.config.recovery_timeout
- (time.time() - self._last_failure_time),
)
return {
"name": self.config.name,
"state": self._state.value,
"consecutive_failures": self._consecutive_failures,
"time_until_retry": round(time_until_retry, 1)
if time_until_retry
else None,
"metrics": stats,
"config": {
"failure_threshold": self.config.failure_threshold,
"failure_rate_threshold": self.config.failure_rate_threshold,
"timeout_threshold": self.config.timeout_threshold,
"recovery_timeout": self.config.recovery_timeout,
"detection_strategy": self.config.detection_strategy.value,
},
}
# Global registry of circuit breakers
_breakers: dict[str, EnhancedCircuitBreaker] = {}
_breakers_lock = threading.Lock()
def _get_or_create_breaker(config: CircuitBreakerConfig) -> EnhancedCircuitBreaker:
"""Get or create a circuit breaker."""
with _breakers_lock:
if config.name not in _breakers:
_breakers[config.name] = EnhancedCircuitBreaker(config)
return _breakers[config.name]
def register_circuit_breaker(name: str, breaker: EnhancedCircuitBreaker):
"""Register a circuit breaker in the global registry."""
with _breakers_lock:
_breakers[name] = breaker
logger.debug(f"Registered circuit breaker: {name}")
def get_circuit_breaker(name: str) -> EnhancedCircuitBreaker | None:
"""Get a circuit breaker by name."""
return _breakers.get(name)
def get_all_circuit_breakers() -> dict[str, EnhancedCircuitBreaker]:
"""Get all circuit breakers."""
return _breakers.copy()
def reset_all_circuit_breakers():
"""Reset all circuit breakers."""
for breaker in _breakers.values():
breaker.reset()
def get_circuit_breaker_status() -> dict[str, dict[str, Any]]:
"""Get status of all circuit breakers."""
return {name: breaker.get_status() for name, breaker in _breakers.items()}
def circuit_breaker(
name: str | None = None,
failure_threshold: int | None = None,
failure_rate_threshold: float | None = None,
timeout_threshold: float | None = None,
recovery_timeout: int | None = None,
expected_exceptions: tuple[type[Exception], ...] | None = None,
) -> Callable:
"""
Decorator to apply circuit breaker to a function.
Args:
name: Circuit breaker name (defaults to function name)
failure_threshold: Override default failure threshold
failure_rate_threshold: Override default failure rate threshold
timeout_threshold: Override default timeout threshold
recovery_timeout: Override default recovery timeout
expected_exceptions: Exceptions to catch (defaults to Exception)
"""
def decorator(func: Callable[P, T]) -> Callable[P, T]:
# Create config with overrides
cb_name = name or f"{func.__module__}.{getattr(func, '__name__', 'unknown')}"
config = CircuitBreakerConfig(
name=cb_name,
failure_threshold=failure_threshold
or settings.agent.circuit_breaker_failure_threshold,
failure_rate_threshold=failure_rate_threshold or 0.5,
timeout_threshold=timeout_threshold or 30.0,
recovery_timeout=recovery_timeout
or settings.agent.circuit_breaker_recovery_timeout,
expected_exceptions=expected_exceptions or (Exception,),
)
# Get or create circuit breaker for this function
breaker = _get_or_create_breaker(config)
if asyncio.iscoroutinefunction(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
return await breaker.call_async(func, *args, **kwargs)
return cast(Callable[..., T], async_wrapper)
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
return breaker.call_sync(func, *args, **kwargs)
return cast(Callable[..., T], sync_wrapper)
return decorator
# Circuit breaker configurations for different services
CIRCUIT_BREAKER_CONFIGS = {
"yfinance": CircuitBreakerConfig(
name="yfinance",
failure_threshold=3,
failure_rate_threshold=0.6,
timeout_threshold=30.0,
recovery_timeout=120,
success_threshold=2,
window_size=300,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
),
"tiingo": CircuitBreakerConfig(
name="tiingo",
failure_threshold=5,
failure_rate_threshold=0.7,
timeout_threshold=15.0,
recovery_timeout=60,
success_threshold=3,
window_size=300,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
),
"fred_api": CircuitBreakerConfig(
name="fred_api",
failure_threshold=3,
failure_rate_threshold=0.5,
timeout_threshold=20.0,
recovery_timeout=180,
success_threshold=2,
window_size=600,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
),
"openrouter": CircuitBreakerConfig(
name="openrouter",
failure_threshold=5,
failure_rate_threshold=0.6,
timeout_threshold=60.0, # AI APIs can be slower
recovery_timeout=120,
success_threshold=2,
window_size=300,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
),
"exa": CircuitBreakerConfig(
name="exa",
failure_threshold=4,
failure_rate_threshold=0.6,
timeout_threshold=30.0,
recovery_timeout=90,
success_threshold=2,
window_size=300,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
),
"news_api": CircuitBreakerConfig(
name="news_api",
failure_threshold=3,
failure_rate_threshold=0.5,
timeout_threshold=25.0,
recovery_timeout=120,
success_threshold=2,
window_size=300,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
),
"finviz": CircuitBreakerConfig(
name="finviz",
failure_threshold=3,
failure_rate_threshold=0.6,
timeout_threshold=20.0,
recovery_timeout=150,
success_threshold=2,
window_size=300,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
),
"external_api": CircuitBreakerConfig(
name="external_api",
failure_threshold=4,
failure_rate_threshold=0.6,
timeout_threshold=25.0,
recovery_timeout=120,
success_threshold=2,
window_size=300,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
),
}
def initialize_circuit_breakers() -> dict[str, EnhancedCircuitBreaker]:
"""Initialize all circuit breakers for external services."""
circuit_breakers = {}
for service_name, config in CIRCUIT_BREAKER_CONFIGS.items():
try:
breaker = EnhancedCircuitBreaker(config)
register_circuit_breaker(service_name, breaker)
circuit_breakers[service_name] = breaker
logger.info(f"Initialized circuit breaker for {service_name}")
except Exception as e:
logger.error(
f"Failed to initialize circuit breaker for {service_name}: {e}"
)
logger.info(f"Initialized {len(circuit_breakers)} circuit breakers")
return circuit_breakers
def with_circuit_breaker(service_name: str):
"""Decorator to wrap functions with a circuit breaker.
Args:
service_name: Name of the service/circuit breaker to use
Usage:
@with_circuit_breaker("yfinance")
def fetch_stock_data(symbol: str):
# API call code here
pass
"""
def decorator(func: Callable[P, T]) -> Callable[P, T]:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> T:
breaker = get_circuit_breaker(service_name)
if not breaker:
logger.warning(
f"Circuit breaker '{service_name}' not found, executing without protection"
)
return func(*args, **kwargs)
return breaker.call(func, *args, **kwargs)
return wrapper
return decorator
def with_async_circuit_breaker(service_name: str):
"""Decorator to wrap async functions with a circuit breaker.
Args:
service_name: Name of the service/circuit breaker to use
Usage:
@with_async_circuit_breaker("tiingo")
async def fetch_real_time_data(symbol: str):
# Async API call code here
pass
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(func)
async def wrapper(*args, **kwargs) -> T:
breaker = get_circuit_breaker(service_name)
if not breaker:
logger.warning(
f"Circuit breaker '{service_name}' not found, executing without protection"
)
return await func(*args, **kwargs)
return await breaker.call_async(func, *args, **kwargs)
return wrapper
return decorator
class CircuitBreakerManager:
"""Manager for all circuit breakers in the application."""
def __init__(self):
self._breakers = {}
self._initialized = False
def initialize(self) -> bool:
"""Initialize all circuit breakers."""
if self._initialized:
return True
try:
self._breakers = initialize_circuit_breakers()
self._initialized = True
logger.info("Circuit breaker manager initialized successfully")
return True
except Exception as e:
logger.error(f"Failed to initialize circuit breaker manager: {e}")
return False
def get_breaker(self, service_name: str) -> EnhancedCircuitBreaker | None:
"""Get a circuit breaker by service name."""
if not self._initialized:
self.initialize()
return self._breakers.get(service_name)
def get_all_breakers(self) -> dict[str, EnhancedCircuitBreaker]:
"""Get all circuit breakers."""
if not self._initialized:
self.initialize()
return self._breakers.copy()
def reset_breaker(self, service_name: str) -> bool:
"""Reset a specific circuit breaker."""
breaker = self.get_breaker(service_name)
if breaker:
breaker.reset()
logger.info(f"Reset circuit breaker for {service_name}")
return True
return False
def reset_all_breakers(self) -> int:
"""Reset all circuit breakers."""
reset_count = 0
for service_name, breaker in self._breakers.items():
try:
breaker.reset()
reset_count += 1
logger.info(f"Reset circuit breaker for {service_name}")
except Exception as e:
logger.error(f"Failed to reset circuit breaker for {service_name}: {e}")
logger.info(f"Reset {reset_count} circuit breakers")
return reset_count
def get_health_status(self) -> dict[str, dict[str, Any]]:
"""Get health status of all circuit breakers."""
if not self._initialized:
self.initialize()
status = {}
for service_name, breaker in self._breakers.items():
try:
metrics = breaker.get_metrics()
status[service_name] = {
"name": service_name,
"state": breaker.state.value,
"consecutive_failures": breaker.consecutive_failures,
"time_until_retry": breaker.time_until_retry(),
"metrics": {
"total_calls": metrics.get_total_calls(),
"success_rate": metrics.get_success_rate(),
"failure_rate": metrics.get_failure_rate(),
"avg_response_time": metrics.get_average_response_time(),
"last_failure_time": metrics.get_last_failure_time(),
"uptime_percentage": metrics.get_uptime_percentage(),
},
}
except Exception as e:
status[service_name] = {
"name": service_name,
"state": "error",
"error": str(e),
}
return status
# Global circuit breaker manager instance
_circuit_breaker_manager = CircuitBreakerManager()
def get_circuit_breaker_manager() -> CircuitBreakerManager:
"""Get the global circuit breaker manager."""
return _circuit_breaker_manager
def initialize_all_circuit_breakers() -> bool:
"""Initialize all circuit breakers (convenience function)."""
return _circuit_breaker_manager.initialize()
def get_all_circuit_breaker_status() -> dict[str, dict[str, Any]]:
"""Get status of all circuit breakers (convenience function)."""
return _circuit_breaker_manager.get_health_status()
# Specific circuit breaker decorators for common services
def with_yfinance_circuit_breaker(func: F) -> F: # noqa: UP047
"""Decorator for yfinance API calls."""
return cast(F, with_circuit_breaker("yfinance")(func))
def with_tiingo_circuit_breaker(func: F) -> F: # noqa: UP047
"""Decorator for Tiingo API calls."""
return cast(F, with_circuit_breaker("tiingo")(func))
def with_fred_circuit_breaker(func: F) -> F: # noqa: UP047
"""Decorator for FRED API calls."""
return cast(F, with_circuit_breaker("fred_api")(func))
def with_openrouter_circuit_breaker(func: F) -> F: # noqa: UP047
"""Decorator for OpenRouter API calls."""
return cast(F, with_circuit_breaker("openrouter")(func))
def with_exa_circuit_breaker(func: F) -> F: # noqa: UP047
"""Decorator for Exa API calls."""
return cast(F, with_circuit_breaker("exa")(func))
# Async versions
def with_async_yfinance_circuit_breaker(func: F) -> F: # noqa: UP047
"""Async decorator for yfinance API calls."""
return cast(F, with_async_circuit_breaker("yfinance")(func))
def with_async_tiingo_circuit_breaker(func: F) -> F: # noqa: UP047
"""Async decorator for Tiingo API calls."""
return cast(F, with_async_circuit_breaker("tiingo")(func))
def with_async_fred_circuit_breaker(func: F) -> F: # noqa: UP047
"""Async decorator for FRED API calls."""
return cast(F, with_async_circuit_breaker("fred_api")(func))
def with_async_openrouter_circuit_breaker(func: F) -> F: # noqa: UP047
"""Async decorator for OpenRouter API calls."""
return cast(F, with_async_circuit_breaker("openrouter")(func))
def with_async_exa_circuit_breaker(func: F) -> F: # noqa: UP047
"""Async decorator for Exa API calls."""
return cast(F, with_async_circuit_breaker("exa")(func))
```