#
tokens: 46764/50000 6/435 files (page 21/29)
lines: off (toggle) GitHub
raw markdown copy
This is page 21 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── feature_request.md
│   │   ├── question.md
│   │   └── security_report.md
│   ├── pull_request_template.md
│   └── workflows
│       ├── claude-code-review.yml
│       └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│   ├── launch.json
│   └── settings.json
├── alembic
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       ├── 001_initial_schema.py
│       ├── 003_add_performance_indexes.py
│       ├── 006_rename_metadata_columns.py
│       ├── 008_performance_optimization_indexes.py
│       ├── 009_rename_to_supply_demand.py
│       ├── 010_self_contained_schema.py
│       ├── 011_remove_proprietary_terms.py
│       ├── 013_add_backtest_persistence_models.py
│       ├── 014_add_portfolio_models.py
│       ├── 08e3945a0c93_merge_heads.py
│       ├── 9374a5c9b679_merge_heads_for_testing.py
│       ├── abf9b9afb134_merge_multiple_heads.py
│       ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│       ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│       ├── f0696e2cac15_add_essential_performance_indexes.py
│       └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── api
│   │   └── backtesting.md
│   ├── BACKTESTING.md
│   ├── COST_BASIS_SPECIFICATION.md
│   ├── deep_research_agent.md
│   ├── exa_research_testing_strategy.md
│   ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│   ├── PORTFOLIO.md
│   ├── SETUP_SELF_CONTAINED.md
│   └── speed_testing_framework.md
├── examples
│   ├── complete_speed_validation.py
│   ├── deep_research_integration.py
│   ├── llm_optimization_example.py
│   ├── llm_speed_demo.py
│   ├── monitoring_example.py
│   ├── parallel_research_example.py
│   ├── speed_optimization_demo.py
│   └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│   ├── __init__.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── circuit_breaker.py
│   │   ├── deep_research.py
│   │   ├── market_analysis.py
│   │   ├── optimized_research.py
│   │   ├── supervisor.py
│   │   └── technical_analysis.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── api_server.py
│   │   ├── connection_manager.py
│   │   ├── dependencies
│   │   │   ├── __init__.py
│   │   │   ├── stock_analysis.py
│   │   │   └── technical_analysis.py
│   │   ├── error_handling.py
│   │   ├── inspector_compatible_sse.py
│   │   ├── inspector_sse.py
│   │   ├── middleware
│   │   │   ├── error_handling.py
│   │   │   ├── mcp_logging.py
│   │   │   ├── rate_limiting_enhanced.py
│   │   │   └── security.py
│   │   ├── openapi_config.py
│   │   ├── routers
│   │   │   ├── __init__.py
│   │   │   ├── agents.py
│   │   │   ├── backtesting.py
│   │   │   ├── data_enhanced.py
│   │   │   ├── data.py
│   │   │   ├── health_enhanced.py
│   │   │   ├── health_tools.py
│   │   │   ├── health.py
│   │   │   ├── intelligent_backtesting.py
│   │   │   ├── introspection.py
│   │   │   ├── mcp_prompts.py
│   │   │   ├── monitoring.py
│   │   │   ├── news_sentiment_enhanced.py
│   │   │   ├── performance.py
│   │   │   ├── portfolio.py
│   │   │   ├── research.py
│   │   │   ├── screening_ddd.py
│   │   │   ├── screening_parallel.py
│   │   │   ├── screening.py
│   │   │   ├── technical_ddd.py
│   │   │   ├── technical_enhanced.py
│   │   │   ├── technical.py
│   │   │   └── tool_registry.py
│   │   ├── server.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   ├── base_service.py
│   │   │   ├── market_service.py
│   │   │   ├── portfolio_service.py
│   │   │   ├── prompt_service.py
│   │   │   └── resource_service.py
│   │   ├── simple_sse.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── insomnia_export.py
│   │       └── postman_export.py
│   ├── application
│   │   ├── __init__.py
│   │   ├── commands
│   │   │   └── __init__.py
│   │   ├── dto
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_dto.py
│   │   ├── queries
│   │   │   ├── __init__.py
│   │   │   └── get_technical_analysis.py
│   │   └── screening
│   │       ├── __init__.py
│   │       ├── dtos.py
│   │       └── queries.py
│   ├── backtesting
│   │   ├── __init__.py
│   │   ├── ab_testing.py
│   │   ├── analysis.py
│   │   ├── batch_processing_stub.py
│   │   ├── batch_processing.py
│   │   ├── model_manager.py
│   │   ├── optimization.py
│   │   ├── persistence.py
│   │   ├── retraining_pipeline.py
│   │   ├── strategies
│   │   │   ├── __init__.py
│   │   │   ├── base.py
│   │   │   ├── ml
│   │   │   │   ├── __init__.py
│   │   │   │   ├── adaptive.py
│   │   │   │   ├── ensemble.py
│   │   │   │   ├── feature_engineering.py
│   │   │   │   └── regime_aware.py
│   │   │   ├── ml_strategies.py
│   │   │   ├── parser.py
│   │   │   └── templates.py
│   │   ├── strategy_executor.py
│   │   ├── vectorbt_engine.py
│   │   └── visualization.py
│   ├── config
│   │   ├── __init__.py
│   │   ├── constants.py
│   │   ├── database_self_contained.py
│   │   ├── database.py
│   │   ├── llm_optimization_config.py
│   │   ├── logging_settings.py
│   │   ├── plotly_config.py
│   │   ├── security_utils.py
│   │   ├── security.py
│   │   ├── settings.py
│   │   ├── technical_constants.py
│   │   ├── tool_estimation.py
│   │   └── validation.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── technical_analysis.py
│   │   └── visualization.py
│   ├── data
│   │   ├── __init__.py
│   │   ├── cache_manager.py
│   │   ├── cache.py
│   │   ├── django_adapter.py
│   │   ├── health.py
│   │   ├── models.py
│   │   ├── performance.py
│   │   ├── session_management.py
│   │   └── validation.py
│   ├── database
│   │   ├── __init__.py
│   │   ├── base.py
│   │   └── optimization.py
│   ├── dependencies.py
│   ├── domain
│   │   ├── __init__.py
│   │   ├── entities
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis.py
│   │   ├── events
│   │   │   └── __init__.py
│   │   ├── portfolio.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   ├── entities.py
│   │   │   ├── services.py
│   │   │   └── value_objects.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_service.py
│   │   ├── stock_analysis
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis_service.py
│   │   └── value_objects
│   │       ├── __init__.py
│   │       └── technical_indicators.py
│   ├── exceptions.py
│   ├── infrastructure
│   │   ├── __init__.py
│   │   ├── cache
│   │   │   └── __init__.py
│   │   ├── caching
│   │   │   ├── __init__.py
│   │   │   └── cache_management_service.py
│   │   ├── connection_manager.py
│   │   ├── data_fetching
│   │   │   ├── __init__.py
│   │   │   └── stock_data_service.py
│   │   ├── health
│   │   │   ├── __init__.py
│   │   │   └── health_checker.py
│   │   ├── persistence
│   │   │   ├── __init__.py
│   │   │   └── stock_repository.py
│   │   ├── providers
│   │   │   └── __init__.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   └── repositories.py
│   │   └── sse_optimizer.py
│   ├── langchain_tools
│   │   ├── __init__.py
│   │   ├── adapters.py
│   │   └── registry.py
│   ├── logging_config.py
│   ├── memory
│   │   ├── __init__.py
│   │   └── stores.py
│   ├── monitoring
│   │   ├── __init__.py
│   │   ├── health_check.py
│   │   ├── health_monitor.py
│   │   ├── integration_example.py
│   │   ├── metrics.py
│   │   ├── middleware.py
│   │   └── status_dashboard.py
│   ├── providers
│   │   ├── __init__.py
│   │   ├── dependencies.py
│   │   ├── factories
│   │   │   ├── __init__.py
│   │   │   ├── config_factory.py
│   │   │   └── provider_factory.py
│   │   ├── implementations
│   │   │   ├── __init__.py
│   │   │   ├── cache_adapter.py
│   │   │   ├── macro_data_adapter.py
│   │   │   ├── market_data_adapter.py
│   │   │   ├── persistence_adapter.py
│   │   │   └── stock_data_adapter.py
│   │   ├── interfaces
│   │   │   ├── __init__.py
│   │   │   ├── cache.py
│   │   │   ├── config.py
│   │   │   ├── macro_data.py
│   │   │   ├── market_data.py
│   │   │   ├── persistence.py
│   │   │   └── stock_data.py
│   │   ├── llm_factory.py
│   │   ├── macro_data.py
│   │   ├── market_data.py
│   │   ├── mocks
│   │   │   ├── __init__.py
│   │   │   ├── mock_cache.py
│   │   │   ├── mock_config.py
│   │   │   ├── mock_macro_data.py
│   │   │   ├── mock_market_data.py
│   │   │   ├── mock_persistence.py
│   │   │   └── mock_stock_data.py
│   │   ├── openrouter_provider.py
│   │   ├── optimized_screening.py
│   │   ├── optimized_stock_data.py
│   │   └── stock_data.py
│   ├── README.md
│   ├── tests
│   │   ├── __init__.py
│   │   ├── README_INMEMORY_TESTS.md
│   │   ├── test_cache_debug.py
│   │   ├── test_fixes_validation.py
│   │   ├── test_in_memory_routers.py
│   │   ├── test_in_memory_server.py
│   │   ├── test_macro_data_provider.py
│   │   ├── test_mailgun_email.py
│   │   ├── test_market_calendar_caching.py
│   │   ├── test_mcp_tool_fixes_pytest.py
│   │   ├── test_mcp_tool_fixes.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_models_functional.py
│   │   ├── test_server.py
│   │   ├── test_stock_data_enhanced.py
│   │   ├── test_stock_data_provider.py
│   │   └── test_technical_analysis.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── performance_monitoring.py
│   │   ├── portfolio_manager.py
│   │   ├── risk_management.py
│   │   └── sentiment_analysis.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── agent_errors.py
│   │   ├── batch_processing.py
│   │   ├── cache_warmer.py
│   │   ├── circuit_breaker_decorators.py
│   │   ├── circuit_breaker_services.py
│   │   ├── circuit_breaker.py
│   │   ├── data_chunking.py
│   │   ├── database_monitoring.py
│   │   ├── debug_utils.py
│   │   ├── fallback_strategies.py
│   │   ├── llm_optimization.py
│   │   ├── logging_example.py
│   │   ├── logging_init.py
│   │   ├── logging.py
│   │   ├── mcp_logging.py
│   │   ├── memory_profiler.py
│   │   ├── monitoring_middleware.py
│   │   ├── monitoring.py
│   │   ├── orchestration_logging.py
│   │   ├── parallel_research.py
│   │   ├── parallel_screening.py
│   │   ├── quick_cache.py
│   │   ├── resource_manager.py
│   │   ├── shutdown.py
│   │   ├── stock_helpers.py
│   │   ├── structured_logger.py
│   │   ├── tool_monitoring.py
│   │   ├── tracing.py
│   │   └── yfinance_pool.py
│   ├── validation
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── data.py
│   │   ├── middleware.py
│   │   ├── portfolio.py
│   │   ├── responses.py
│   │   ├── screening.py
│   │   └── technical.py
│   └── workflows
│       ├── __init__.py
│       ├── agents
│       │   ├── __init__.py
│       │   ├── market_analyzer.py
│       │   ├── optimizer_agent.py
│       │   ├── strategy_selector.py
│       │   └── validator_agent.py
│       ├── backtesting_workflow.py
│       └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── dev.sh
│   ├── INSTALLATION_GUIDE.md
│   ├── load_example.py
│   ├── load_market_data.py
│   ├── load_tiingo_data.py
│   ├── migrate_db.py
│   ├── README_TIINGO_LOADER.md
│   ├── requirements_tiingo.txt
│   ├── run_stock_screening.py
│   ├── run-migrations.sh
│   ├── seed_db.py
│   ├── seed_sp500.py
│   ├── setup_database.sh
│   ├── setup_self_contained.py
│   ├── setup_sp500_database.sh
│   ├── test_seeded_data.py
│   ├── test_tiingo_loader.py
│   ├── tiingo_config.py
│   └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── core
│   │   └── test_technical_analysis.py
│   ├── data
│   │   └── test_portfolio_models.py
│   ├── domain
│   │   ├── conftest.py
│   │   ├── test_portfolio_entities.py
│   │   └── test_technical_analysis_service.py
│   ├── fixtures
│   │   └── orchestration_fixtures.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── README.md
│   │   ├── run_integration_tests.sh
│   │   ├── test_api_technical.py
│   │   ├── test_chaos_engineering.py
│   │   ├── test_config_management.py
│   │   ├── test_full_backtest_workflow_advanced.py
│   │   ├── test_full_backtest_workflow.py
│   │   ├── test_high_volume.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_orchestration_complete.py
│   │   ├── test_portfolio_persistence.py
│   │   ├── test_redis_cache.py
│   │   ├── test_security_integration.py.disabled
│   │   └── vcr_setup.py
│   ├── performance
│   │   ├── __init__.py
│   │   ├── test_benchmarks.py
│   │   ├── test_load.py
│   │   ├── test_profiling.py
│   │   └── test_stress.py
│   ├── providers
│   │   └── test_stock_data_simple.py
│   ├── README.md
│   ├── test_agents_router_mcp.py
│   ├── test_backtest_persistence.py
│   ├── test_cache_management_service.py
│   ├── test_cache_serialization.py
│   ├── test_circuit_breaker.py
│   ├── test_database_pool_config_simple.py
│   ├── test_database_pool_config.py
│   ├── test_deep_research_functional.py
│   ├── test_deep_research_integration.py
│   ├── test_deep_research_parallel_execution.py
│   ├── test_error_handling.py
│   ├── test_event_loop_integrity.py
│   ├── test_exa_research_integration.py
│   ├── test_exception_hierarchy.py
│   ├── test_financial_search.py
│   ├── test_graceful_shutdown.py
│   ├── test_integration_simple.py
│   ├── test_langgraph_workflow.py
│   ├── test_market_data_async.py
│   ├── test_market_data_simple.py
│   ├── test_mcp_orchestration_functional.py
│   ├── test_ml_strategies.py
│   ├── test_optimized_research_agent.py
│   ├── test_orchestration_integration.py
│   ├── test_orchestration_logging.py
│   ├── test_orchestration_tools_simple.py
│   ├── test_parallel_research_integration.py
│   ├── test_parallel_research_orchestrator.py
│   ├── test_parallel_research_performance.py
│   ├── test_performance_optimizations.py
│   ├── test_production_validation.py
│   ├── test_provider_architecture.py
│   ├── test_rate_limiting_enhanced.py
│   ├── test_runner_validation.py
│   ├── test_security_comprehensive.py.disabled
│   ├── test_security_cors.py
│   ├── test_security_enhancements.py.disabled
│   ├── test_security_headers.py
│   ├── test_security_penetration.py
│   ├── test_session_management.py
│   ├── test_speed_optimization_validation.py
│   ├── test_stock_analysis_dependencies.py
│   ├── test_stock_analysis_service.py
│   ├── test_stock_data_fetching_service.py
│   ├── test_supervisor_agent.py
│   ├── test_supervisor_functional.py
│   ├── test_tool_estimation_config.py
│   ├── test_visualization.py
│   └── utils
│       ├── test_agent_errors.py
│       ├── test_logging.py
│       ├── test_parallel_screening.py
│       └── test_quick_cache.py
├── tools
│   ├── check_orchestration_config.py
│   ├── experiments
│   │   ├── validation_examples.py
│   │   └── validation_fixed.py
│   ├── fast_dev.sh
│   ├── hot_reload.py
│   ├── quick_test.py
│   └── templates
│       ├── new_router_template.py
│       ├── new_tool_template.py
│       ├── screening_strategy_template.py
│       └── test_template.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/maverick_mcp/providers/macro_data.py:
--------------------------------------------------------------------------------

```python
"""
Macroeconomic data providers and utilities for Maverick-MCP.
Provides GDP, inflation rate, unemployment rate, and other macroeconomic indicators.
"""

import logging
import os
from datetime import UTC, datetime, timedelta

import pandas as pd
from dotenv import load_dotenv
from sklearn.preprocessing import MinMaxScaler

from maverick_mcp.utils.circuit_breaker_decorators import (
    with_economic_data_circuit_breaker,
)

# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("maverick_mcp.macro_data")

# Configuration
FRED_API_KEY = os.getenv("FRED_API_KEY", "")


class MacroDataProvider:
    """Provider for macroeconomic data using FRED API."""

    MAX_WINDOW_DAYS = 365

    def __init__(self, window_days: int = MAX_WINDOW_DAYS):
        try:
            from fredapi import Fred

            self.fred = Fred(api_key=FRED_API_KEY)
            self.scaler = MinMaxScaler()
            self.window_days = window_days
            self.historical_data_bounds: dict[str, dict[str, str]] = {}
            self.update_historical_bounds()

            # For momentum swings, shorter lookbacks
            self.lookback_days = 30

            # Weights for macro sentiment
            self.weights = {
                # Short-term signals (60% total)
                "vix": 0.20,
                "sp500_momentum": 0.20,
                "nasdaq_momentum": 0.15,
                "usd_momentum": 0.05,
                # Macro signals (40% total)
                "inflation_rate": 0.15,
                "gdp_growth_rate": 0.15,
                "unemployment_rate": 0.10,
            }

            self.previous_sentiment_score = None
        except ImportError:
            logger.error(
                "fredapi not installed. Please install with 'pip install fredapi'"
            )
            raise

    @with_economic_data_circuit_breaker(
        use_fallback=False
    )  # Fallback handled at method level
    def _get_fred_series(
        self, series_id: str, start_date: str, end_date: str
    ) -> pd.Series:
        """
        Get FRED series data with circuit breaker protection.

        Args:
            series_id: FRED series identifier
            start_date: Start date in YYYY-MM-DD format
            end_date: End date in YYYY-MM-DD format

        Returns:
            Pandas Series with the data
        """
        return self.fred.get_series(series_id, start_date, end_date)

    def _calculate_weighted_rolling_performance(
        self, series_id: str, lookbacks: list[int], weights: list[float]
    ) -> float:
        """
        Utility method to compute a weighted performance measure over multiple rolling windows.
        For example, if lookbacks = [30, 90, 180] and weights = [0.5, 0.3, 0.2],
        it calculates performance for each window and returns the sum of the weighted values.
        """
        if len(lookbacks) != len(weights):
            logger.error("Lookbacks and weights must have the same length.")
            return 0.0

        end_date = datetime.now(UTC)
        total_performance = 0.0

        for days, w in zip(lookbacks, weights, strict=False):
            start_date = end_date - timedelta(days=days)
            series_data = self._get_fred_series(
                series_id,
                start_date.strftime("%Y-%m-%d"),
                end_date.strftime("%Y-%m-%d"),
            )
            # Ensure we have a pandas Series, then clean it
            if isinstance(series_data, pd.Series):
                df = series_data.dropna()
                if not df.empty:
                    # Simple rolling mean to reduce single-day spikes
                    df = df.rolling(window=2).mean().dropna()
                    if not df.empty:
                        start_price = df.iloc[0]
                        end_price = df.iloc[-1]
                        performance = ((end_price - start_price) / start_price) * 100
                        total_performance += performance * w
                else:
                    logger.warning(
                        f"No FRED data for {series_id} over last {days} days."
                    )
            else:
                logger.warning(
                    f"Unexpected data type from FRED API for {series_id}: {type(series_data)}"
                )
        return total_performance

    def get_sp500_performance(self) -> float:
        """
        Calculate a multi-timeframe rolling performance for S&P 500 (similar to SPY).
        Example using 1-month, 3-month, and 6-month lookbacks with custom weights.
        """
        try:
            lookbacks = [30, 90, 180]
            weights = [0.5, 0.3, 0.2]
            return self._calculate_weighted_rolling_performance(
                "SP500", lookbacks, weights
            )
        except Exception as e:
            logger.error(f"Error fetching S&P 500 rolling performance: {e}")
            return 0.0

    def get_nasdaq_performance(self) -> float:
        """
        Calculate a multi-timeframe rolling performance for NASDAQ-100 (similar to QQQ).
        Example using 1-month, 3-month, and 6-month lookbacks with custom weights.
        """
        try:
            lookbacks = [30, 90, 180]
            weights = [0.5, 0.3, 0.2]
            return self._calculate_weighted_rolling_performance(
                "NASDAQ100", lookbacks, weights
            )
        except Exception as e:
            logger.error(f"Error fetching NASDAQ rolling performance: {e}")
            return 0.0

    def get_gdp_growth_rate(self):
        """
        Fetch GDP growth rate with retry logic and better error handling.
        """
        try:
            # Get last 2 quarters of data to ensure we have the latest
            end_date = datetime.now(UTC)
            start_date = end_date - timedelta(days=180)

            data = self._get_fred_series(
                "A191RL1Q225SBEA",
                start_date.strftime("%Y-%m-%d"),
                end_date.strftime("%Y-%m-%d"),
            )

            if data.empty:
                logger.warning("No GDP data available from FRED")
                return {"current": 0.0, "previous": 0.0}

            # Get last two values
            last_two = data.tail(2)
            if len(last_two) >= 2:
                return {
                    "current": float(last_two.iloc[-1]),
                    "previous": float(last_two.iloc[-2]),
                }
            return {
                "current": float(last_two.iloc[-1]),
                "previous": float(last_two.iloc[-1]),
            }

        except Exception as e:
            logger.error(f"Error fetching GDP growth rate: {e}")
            return {"current": 0.0, "previous": 0.0}

    def get_unemployment_rate(self):
        try:
            # Get recent unemployment data
            end_date = datetime.now(UTC)
            start_date = end_date - timedelta(days=90)
            series_data = self._get_fred_series(
                "UNRATE", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")
            )
            if not isinstance(series_data, pd.Series):
                logger.error(
                    f"Expected pandas Series from FRED API, got {type(series_data)}"
                )
                return {"current": 0.0, "previous": 0.0}

            data = series_data.dropna()
            if len(data) >= 2:
                return {
                    "current": float(data.iloc[-1]),
                    "previous": float(data.iloc[-2]),
                }
            return {"current": float(data.iloc[-1]), "previous": float(data.iloc[-1])}
        except Exception as e:
            logger.error(f"Error fetching Unemployment rate: {e}")
            return {"current": None, "previous": None}

    def get_inflation_rate(self):
        """
        Fetch the annual core inflation rate based on CPI data from FRED.
        Uses CPILFESL (Core CPI: All Items Less Food and Energy).
        """
        try:
            end_date = datetime.now(UTC)
            # Get ~5 years of data to ensure we have enough
            start_date = end_date - timedelta(days=5 * 365)

            # 1) Fetch monthly CPILFESL data from FRED
            series_data = self.fred.get_series(
                "CPILFESL",
                observation_start=start_date.strftime("%Y-%m-%d"),
                observation_end=end_date.strftime("%Y-%m-%d"),
            )

            # 2) Ensure it's a pandas Series and clean it
            if not isinstance(series_data, pd.Series):
                logger.error(
                    f"Expected pandas Series from FRED API, got {type(series_data)}"
                )
                return {"current": None, "previous": None, "bounds": (None, None)}

            data = series_data.dropna().sort_index()

            # Optional: Force a monthly freq alignment in case data is stored daily
            data = data.asfreq("MS").dropna()

            if data.empty:
                logger.error("No inflation data available from FRED")
                return {"current": None, "previous": None, "bounds": (None, None)}

            # 3) The latest monthly index is now guaranteed to be the first of the month
            latest_idx = data.index[-1]
            latest_value = data.iloc[-1]

            # 4) Get data for exactly one year prior (the matching month)
            #    Because we forced MS freq, this is typically just `iloc[-13]` (12 steps back),
            #    but let's keep the logic explicit:
            if isinstance(latest_idx, pd.Timestamp):
                year_ago_idx = latest_idx - pd.DateOffset(years=1)
            else:
                # Fallback for unexpected index types
                year_ago_idx = pd.Timestamp(latest_idx) - pd.DateOffset(years=1)
            # If your data is strictly monthly, you can do:
            # year_ago_value = data.loc[year_ago_idx]  # might fail if missing data
            # Or fallback to "on or before" logic:
            year_ago_series = data[data.index <= year_ago_idx]
            if year_ago_series.empty:
                logger.warning(
                    "Not enough data to get year-ago CPI. Using 0 as fallback."
                )
                current_inflation = 0.0
            else:
                year_ago_value = year_ago_series.iloc[-1]
                current_inflation = (
                    (latest_value - year_ago_value) / year_ago_value
                ) * 100

            # 5) Compute previous month's YoY
            if isinstance(latest_idx, pd.Timestamp):
                prev_month_idx = latest_idx - pd.DateOffset(months=1)
            else:
                prev_month_idx = pd.Timestamp(latest_idx) - pd.DateOffset(months=1)
            prev_month_series = data[data.index <= prev_month_idx]
            if prev_month_series.empty:
                logger.warning("No data for previous month. Using 0 as fallback.")
                previous_inflation = 0.0
            else:
                prev_month_value = prev_month_series.iloc[-1]
                if isinstance(prev_month_idx, pd.Timestamp) and not pd.isna(
                    prev_month_idx
                ):
                    prev_year_ago_idx = prev_month_idx - pd.DateOffset(years=1)
                else:
                    # Handle NaT or other types
                    prev_year_ago_idx = pd.Timestamp(prev_month_idx) - pd.DateOffset(
                        years=1
                    )
                prev_year_ago_series = data[data.index <= prev_year_ago_idx]
                if prev_year_ago_series.empty:
                    logger.warning(
                        "No data for previous year's month. Using 0 as fallback."
                    )
                    previous_inflation = 0.0
                else:
                    prev_year_ago_value = prev_year_ago_series.iloc[-1]
                    previous_inflation = (
                        (prev_month_value - prev_year_ago_value) / prev_year_ago_value
                    ) * 100

            # 6) Optionally round
            current_inflation = round(current_inflation, 2)
            previous_inflation = round(previous_inflation, 2)

            # 7) Compute bounds
            yoy_changes = data.pct_change(periods=12) * 100
            yoy_changes = yoy_changes.dropna()
            if yoy_changes.empty:
                inflation_min, inflation_max = 0.0, 0.0
            else:
                inflation_min = yoy_changes.min()
                inflation_max = yoy_changes.max()

            bounds = (round(inflation_min, 2), round(inflation_max, 2))

            logger.info(
                f"Core CPI (YoY): current={current_inflation}%, previous={previous_inflation}%"
            )
            return {
                "current": current_inflation,
                "previous": previous_inflation,
                "bounds": bounds,
            }

        except Exception as e:
            logger.error(f"Error fetching core inflation rate: {e}", exc_info=True)
            return {"current": None, "previous": None, "bounds": (None, None)}

    def get_vix(self) -> float | None:
        """Get VIX data from FRED."""
        try:
            import yfinance as yf

            # Try Yahoo Finance first
            ticker = yf.Ticker("^VIX")
            data = ticker.history(period="1d")
            if not data.empty:
                return float(data["Close"].iloc[-1])

            # fallback to FRED
            end_date = datetime.now(UTC)
            start_date = end_date - timedelta(days=7)
            series_data = self.fred.get_series(
                "VIXCLS", start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")
            )
            if isinstance(series_data, pd.Series):
                df = series_data.dropna()
                if not df.empty:
                    return float(df.iloc[-1])

            return None
        except Exception as e:
            logger.error(f"Error fetching VIX: {e}")
            return None

    def get_sp500_momentum(self) -> float:
        """
        Calculate short-term momentum of the S&P 500 over multiple very short timeframes
        (3-day, 7-day, 14-day). We also add a tiny rolling average to reduce spikiness.
        """
        try:
            end_date = datetime.now(UTC)
            lookbacks = [3, 7, 14]
            momentums = []

            for days in lookbacks:
                start_date = end_date - timedelta(days=days)
                series_data = self.fred.get_series(
                    "SP500",
                    start_date.strftime("%Y-%m-%d"),
                    end_date.strftime("%Y-%m-%d"),
                )
                if isinstance(series_data, pd.Series):
                    df = series_data.dropna()
                    df = df.rolling(window=2).mean().dropna()

                    if len(df) >= 2:
                        momentum = ((df.iloc[-1] - df.iloc[0]) / df.iloc[0]) * 100
                        momentums.append(momentum)

            if momentums:
                weighted: float = (
                    0.5 * momentums[0] + 0.3 * momentums[1] + 0.2 * momentums[2]
                    if len(momentums) == 3
                    else sum(momentums) / len(momentums)
                )
                return weighted
            return 0.0
        except Exception as e:
            logger.error(f"Error fetching S&P 500 momentum: {e}")
            return 0.0

    def get_nasdaq_momentum(self) -> float:
        """
        Calculate short-term momentum of the NASDAQ-100 over multiple timeframes
        (3-day, 7-day, 14-day).
        """
        try:
            end_date = datetime.now(UTC)
            lookbacks = [3, 7, 14]
            momentums = []

            for days in lookbacks:
                start_date = end_date - timedelta(days=days + 5)
                series_data = self.fred.get_series(
                    "NASDAQ100",
                    start_date.strftime("%Y-%m-%d"),
                    end_date.strftime("%Y-%m-%d"),
                )
                if isinstance(series_data, pd.Series):
                    df = series_data.dropna()
                    df = df.rolling(window=2).mean().dropna()

                    if len(df) >= 2:
                        momentum = ((df.iloc[-1] - df.iloc[0]) / df.iloc[0]) * 100
                        momentums.append(momentum)
                else:
                    logger.warning(f"Insufficient NASDAQ data for {days}-day lookback")
                    momentums.append(0.0)

            if len(momentums) == 3:
                result: float = (
                    0.5 * momentums[0] + 0.3 * momentums[1] + 0.2 * momentums[2]
                )
                return result

            logger.warning("Insufficient data for NASDAQ momentum calculation")
            return sum(momentums) / len(momentums) if momentums else 0.0

        except Exception as e:
            logger.error(f"Error fetching NASDAQ momentum: {e}")
            return 0.0

    def get_usd_momentum(self) -> float:
        """
        Calculate USD momentum using DTWEXBGS (Broad USD Index) from FRED
        over multiple short-term lookbacks (3-day, 7-day, 14-day).
        """
        try:
            end_date = datetime.now(UTC)
            lookbacks = [3, 7, 14]
            momentums = []

            for days in lookbacks:
                start_date = end_date - timedelta(days=days + 5)
                df = self.fred.get_series(
                    "DTWEXBGS",
                    start_date.strftime("%Y-%m-%d"),
                    end_date.strftime("%Y-%m-%d"),
                )
                df = df.dropna()
                df = df.rolling(window=2).mean().dropna()

                if len(df) >= 2:
                    first_valid = df.iloc[0]
                    last_valid = df.iloc[-1]
                    momentum = ((last_valid - first_valid) / first_valid) * 100
                    momentums.append(momentum)
                else:
                    logger.warning(f"Insufficient USD data for {days}-day lookback")
                    momentums.append(0.0)

            if len(momentums) == 3:
                result: float = (
                    0.5 * momentums[0] + 0.3 * momentums[1] + 0.2 * momentums[2]
                )
                return result

            logger.warning("Insufficient data for USD momentum calculation")
            return sum(momentums) / len(momentums) if momentums else 0.0

        except Exception as e:
            logger.error(f"Error fetching USD momentum: {e}")
            return 0.0

    def update_historical_bounds(self):
        """
        Update historical bounds based on the last `window_days` of data.
        These bounds are used for normalization in `normalize_indicators()`.
        """
        end_date = datetime.now(UTC)
        start_date = end_date - timedelta(days=self.window_days)
        start_date_str = start_date.strftime("%Y-%m-%d")
        end_date_str = end_date.strftime("%Y-%m-%d")

        indicators = {
            "gdp_growth_rate": "A191RL1Q225SBEA",
            "unemployment_rate": "UNRATE",
            "inflation_rate": "CPILFESL",
            "sp500_momentum": "SP500",
            "nasdaq_momentum": "NASDAQCOM",
            "vix": "VIXCLS",
        }

        for key, series_id in indicators.items():
            try:
                if key == "gdp_growth_rate":
                    data = self.fred.get_series(series_id, start_date_str, end_date_str)
                elif key == "inflation_rate":
                    # For inflation bounds, get a wider historical range
                    wider_start = (end_date - timedelta(days=5 * 365)).strftime(
                        "%Y-%m-%d"
                    )
                    cpi = self.fred.get_series(series_id, wider_start, end_date_str)
                    cpi = cpi.dropna()

                    if len(cpi) > 13:
                        # Calculate year-over-year inflation rates
                        inflation_rates = []
                        for i in range(12, len(cpi)):
                            yoy_inflation = (
                                (cpi.iloc[i] - cpi.iloc[i - 12]) / cpi.iloc[i - 12]
                            ) * 100
                            inflation_rates.append(yoy_inflation)

                        if inflation_rates:
                            data = pd.Series(inflation_rates)
                        else:
                            data = pd.Series([], dtype=float)
                    else:
                        # Not enough data for YoY calculation
                        data = pd.Series([], dtype=float)
                elif key in ["sp500_momentum", "nasdaq_momentum"]:
                    df = self.fred.get_series(series_id, start_date_str, end_date_str)
                    df = df.dropna()
                    df = df.rolling(window=2).mean().dropna()
                    if not df.empty:
                        start_price = df.iloc[0]
                        end_price = df.iloc[-1]
                        performance = ((end_price - start_price) / start_price) * 100
                        data = pd.Series([performance], index=[df.index[-1]])
                    else:
                        data = pd.Series([], dtype=float)
                else:
                    data = self.fred.get_series(series_id, start_date_str, end_date_str)

                if not data.empty:
                    min_val = data.min()
                    max_val = data.max()
                    self.historical_data_bounds[key] = {"min": min_val, "max": max_val}
                else:
                    self.historical_data_bounds[key] = self.default_bounds(key)
                    logger.warning(f"No data fetched for {key}. Using default bounds.")
            except Exception as e:
                logger.error(f"Error updating historical bounds for {key}: {e}")
                self.historical_data_bounds[key] = self.default_bounds(key)

    def default_bounds(self, key: str):
        """
        Tighter or more relevant default bounds for short-term, but fix them
        so we don't recalculate them daily in a way that swings the scale.
        """
        default_bounds = {
            "vix": {"min": 10.0, "max": 50.0},
            "sp500_momentum": {"min": -15.0, "max": 15.0},
            "nasdaq_momentum": {"min": -20.0, "max": 20.0},
            "usd_momentum": {"min": -5.0, "max": 5.0},
            "inflation_rate": {"min": 0.0, "max": 10.0},
            "gdp_growth_rate": {"min": -2.0, "max": 6.0},
            "unemployment_rate": {"min": 2.0, "max": 10.0},
        }
        return default_bounds.get(key, {"min": 0.0, "max": 1.0})

    def normalize_indicators(self, indicators: dict) -> dict:
        """
        Convert raw indicators to [0,1], with risk-off indicators inverted (lower is better).
        """
        normalized = {}
        for key, value in indicators.items():
            if value is None:
                normalized[key] = 0.5
                continue

            bounds = self.historical_data_bounds.get(key, self.default_bounds(key))
            min_val = float(bounds["min"])
            max_val = float(bounds["max"])
            denom = max_val - min_val if (max_val != min_val) else 1e-9

            norm_val = (value - min_val) / denom

            if key in ["vix", "unemployment_rate", "inflation_rate"]:
                norm_val = 1.0 - norm_val

            norm_val = max(0.0, min(1.0, norm_val))
            normalized[key] = norm_val

        return normalized

    def get_historical_data(self) -> dict:
        """Get historical data for all indicators over self.lookback_days."""
        end_date = datetime.now(UTC)
        start_date = end_date - timedelta(days=self.lookback_days)
        start_date_str = start_date.strftime("%Y-%m-%d")
        end_date_str = end_date.strftime("%Y-%m-%d")

        try:
            sp500_data = self.fred.get_series("SP500", start_date_str, end_date_str)
            sp500_performance = []
            if not sp500_data.empty:
                first_value = sp500_data.iloc[0]
                sp500_performance = [
                    (x - first_value) / first_value * 100 for x in sp500_data
                ]

            nasdaq_data = self.fred.get_series(
                "NASDAQ100", start_date_str, end_date_str
            )
            nasdaq_performance = []
            if not nasdaq_data.empty:
                first_value = nasdaq_data.iloc[0]
                nasdaq_performance = [
                    (x - first_value) / first_value * 100 for x in nasdaq_data
                ]

            vix_data = self.fred.get_series("VIXCLS", start_date_str, end_date_str)
            vix_values = vix_data.tolist() if not vix_data.empty else []

            gdp_data = self.fred.get_series(
                "A191RL1Q225SBEA", start_date_str, end_date_str
            )
            gdp_values = gdp_data.tolist() if not gdp_data.empty else []

            unemployment_data = self.fred.get_series(
                "UNRATE", start_date_str, end_date_str
            )
            unemployment_values = (
                unemployment_data.tolist() if not unemployment_data.empty else []
            )

            cpi_data = self.fred.get_series("CPILFESL", start_date_str, end_date_str)
            inflation_values = []
            if not cpi_data.empty and len(cpi_data) > 12:
                inflation_values = [
                    ((cpi_data.iloc[i] - cpi_data.iloc[i - 12]) / cpi_data.iloc[i - 12])
                    * 100
                    for i in range(12, len(cpi_data))
                ]

            return {
                "sp500_performance": sp500_performance,
                "nasdaq_performance": nasdaq_performance,
                "vix": vix_values,
                "gdp_growth_rate": gdp_values,
                "unemployment_rate": unemployment_values,
                "inflation_rate": inflation_values,
            }
        except Exception as e:
            logger.error(f"Error fetching historical data: {str(e)}")
            return {
                "sp500_performance": [],
                "nasdaq_performance": [],
                "vix": [],
                "gdp_growth_rate": [],
                "unemployment_rate": [],
                "inflation_rate": [],
            }

    def get_macro_statistics(self):
        """
        Main method to aggregate macro stats with better error handling and smoothing.
        """
        try:
            self.update_historical_bounds()

            # Get inflation rate and bounds
            inflation_data = self.get_inflation_rate()
            gdp_data = self.get_gdp_growth_rate()
            unemployment_data = self.get_unemployment_rate()

            # Pull raw indicator values with safe defaults
            indicators = {
                "gdp_growth_rate": gdp_data["current"] or 0.0,
                "gdp_growth_rate_previous": gdp_data["previous"] or 0.0,
                "unemployment_rate": unemployment_data["current"] or 0.0,
                "unemployment_rate_previous": unemployment_data["previous"] or 0.0,
                "inflation_rate": inflation_data["current"] or 0.0,
                "inflation_rate_previous": inflation_data["previous"] or 0.0,
                "vix": self.get_vix() or 0.0,
                "sp500_momentum": self.get_sp500_momentum() or 0.0,
                "nasdaq_momentum": self.get_nasdaq_momentum() or 0.0,
                "usd_momentum": self.get_usd_momentum() or 0.0,
            }

            # Normalize
            normalized = self.normalize_indicators(indicators)
            sentiment_score = sum(normalized[k] * self.weights[k] for k in self.weights)
            sentiment_score = (sentiment_score / sum(self.weights.values())) * 100
            sentiment_score = max(1, min(100, sentiment_score))

            # Increase smoothing factor to reduce big overnight moves
            if self.previous_sentiment_score is not None:
                smoothing_factor = 0.8  # keep 80% old, 20% new
                sentiment_score = (
                    smoothing_factor * self.previous_sentiment_score
                    + (1 - smoothing_factor) * sentiment_score
                )

            self.previous_sentiment_score = sentiment_score

            historical_data = self.get_historical_data()

            # Return dictionary with all values guaranteed to be numeric
            return {
                "gdp_growth_rate": float(indicators["gdp_growth_rate"]),
                "gdp_growth_rate_previous": float(
                    indicators["gdp_growth_rate_previous"]
                ),
                "unemployment_rate": float(indicators["unemployment_rate"]),
                "unemployment_rate_previous": float(
                    indicators["unemployment_rate_previous"]
                ),
                "inflation_rate": float(indicators["inflation_rate"]),
                "inflation_rate_previous": float(indicators["inflation_rate_previous"]),
                "sp500_performance": float(self.get_sp500_performance() or 0.0),
                "nasdaq_performance": float(self.get_nasdaq_performance() or 0.0),
                "vix": float(indicators["vix"]),
                "sentiment_score": float(sentiment_score),
                "historical_data": historical_data,
            }

        except Exception as e:
            logger.error(f"Error in get_macro_statistics: {e}")
            # Return safe defaults if everything fails
            return {
                "gdp_growth_rate": 0.0,
                "gdp_growth_rate_previous": 0.0,
                "unemployment_rate": 0.0,
                "unemployment_rate_previous": 0.0,
                "inflation_rate": 0.0,
                "inflation_rate_previous": 0.0,
                "sp500_performance": 0.0,
                "nasdaq_performance": 0.0,
                "vix": 0.0,
                "sentiment_score": 50.0,
                "historical_data": {},
            }

```

--------------------------------------------------------------------------------
/maverick_mcp/utils/structured_logger.py:
--------------------------------------------------------------------------------

```python
"""
Enhanced structured logging infrastructure for backtesting system.

This module provides comprehensive structured logging capabilities with:
- Correlation ID generation and tracking across async boundaries
- Request context propagation
- JSON formatting for log aggregation
- Performance metrics logging
- Resource usage tracking
- Debug mode with verbose logging
- Async logging to avoid blocking operations
- Log rotation and compression
- Multiple output handlers (console, file, remote)
"""

import asyncio
import gc
import json
import logging
import logging.handlers
import os
import sys
import threading
import time
import traceback
import uuid
from collections.abc import Callable
from concurrent.futures import ThreadPoolExecutor
from contextvars import ContextVar
from datetime import UTC, datetime
from functools import wraps
from pathlib import Path
from typing import Any

import psutil

# Context variables for request tracking across async boundaries
correlation_id_var: ContextVar[str | None] = ContextVar("correlation_id", default=None)
request_start_var: ContextVar[float | None] = ContextVar("request_start", default=None)
user_id_var: ContextVar[str | None] = ContextVar("user_id", default=None)
tool_name_var: ContextVar[str | None] = ContextVar("tool_name", default=None)
operation_context_var: ContextVar[dict[str, Any] | None] = ContextVar(
    "operation_context", default=None
)

# Global logger registry for performance metrics aggregation
_performance_logger_registry: dict[str, "PerformanceMetricsLogger"] = {}
_log_level_counts: dict[str, int] = {
    "DEBUG": 0,
    "INFO": 0,
    "WARNING": 0,
    "ERROR": 0,
    "CRITICAL": 0,
}

# Thread pool for async logging operations
_async_log_executor: ThreadPoolExecutor | None = None
_async_log_lock = threading.Lock()


class CorrelationIDGenerator:
    """Enhanced correlation ID generation with backtesting context."""

    @staticmethod
    def generate_correlation_id(prefix: str = "bt") -> str:
        """Generate a unique correlation ID with backtesting prefix."""
        timestamp = int(time.time() * 1000) % 1000000  # Last 6 digits of timestamp
        random_part = uuid.uuid4().hex[:8]
        return f"{prefix}-{timestamp}-{random_part}"

    @staticmethod
    def set_correlation_id(
        correlation_id: str | None = None, prefix: str = "bt"
    ) -> str:
        """Set correlation ID in context with automatic generation."""
        if not correlation_id:
            correlation_id = CorrelationIDGenerator.generate_correlation_id(prefix)
        correlation_id_var.set(correlation_id)
        return correlation_id

    @staticmethod
    def get_correlation_id() -> str | None:
        """Get current correlation ID from context."""
        return correlation_id_var.get()

    @staticmethod
    def propagate_context(target_context: dict[str, Any]) -> dict[str, Any]:
        """Propagate correlation context to target dict."""
        target_context.update(
            {
                "correlation_id": correlation_id_var.get(),
                "user_id": user_id_var.get(),
                "tool_name": tool_name_var.get(),
                "operation_context": operation_context_var.get(),
            }
        )
        return target_context


class EnhancedStructuredFormatter(logging.Formatter):
    """Enhanced JSON formatter with performance metrics and resource tracking."""

    def __init__(
        self, include_performance: bool = True, include_resources: bool = True
    ):
        super().__init__()
        self.include_performance = include_performance
        self.include_resources = include_resources
        self._process = psutil.Process()

    def format(self, record: logging.LogRecord) -> str:
        """Format log record with comprehensive structured data."""
        # Base structured log data
        log_data = {
            "timestamp": datetime.now(UTC).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno,
            "thread": record.thread,
            "process_id": record.process,
        }

        # Add correlation context
        CorrelationIDGenerator.propagate_context(log_data)

        # Add performance metrics if enabled
        if self.include_performance:
            request_start = request_start_var.get()
            if request_start:
                log_data["duration_ms"] = int((time.time() - request_start) * 1000)

        # Add resource usage if enabled
        if self.include_resources:
            try:
                memory_info = self._process.memory_info()
                log_data["memory_rss_mb"] = round(memory_info.rss / 1024 / 1024, 2)
                log_data["memory_vms_mb"] = round(memory_info.vms / 1024 / 1024, 2)
                log_data["cpu_percent"] = self._process.cpu_percent(interval=None)
            except (psutil.NoSuchProcess, psutil.AccessDenied):
                # Process might have ended or access denied
                pass

        # Add exception information
        if record.exc_info:
            log_data["exception"] = {
                "type": record.exc_info[0].__name__
                if record.exc_info[0]
                else "Unknown",
                "message": str(record.exc_info[1]),
                "traceback": traceback.format_exception(*record.exc_info),
            }

        # Add extra fields from the record
        extra_fields = {}
        for key, value in record.__dict__.items():
            if key not in {
                "name",
                "msg",
                "args",
                "created",
                "filename",
                "funcName",
                "levelname",
                "levelno",
                "lineno",
                "module",
                "msecs",
                "pathname",
                "process",
                "processName",
                "relativeCreated",
                "thread",
                "threadName",
                "exc_info",
                "exc_text",
                "stack_info",
                "getMessage",
                "message",
            }:
                # Sanitize sensitive data
                if self._is_sensitive_field(key):
                    extra_fields[key] = "***MASKED***"
                else:
                    extra_fields[key] = self._serialize_value(value)

        if extra_fields:
            log_data["extra"] = extra_fields

        return json.dumps(log_data, default=str, ensure_ascii=False)

    def _is_sensitive_field(self, field_name: str) -> bool:
        """Check if field contains sensitive information."""
        sensitive_keywords = {
            "password",
            "token",
            "key",
            "secret",
            "auth",
            "credential",
            "bearer",
            "session",
            "cookie",
            "api_key",
            "access_token",
            "refresh_token",
            "private",
            "confidential",
        }
        return any(keyword in field_name.lower() for keyword in sensitive_keywords)

    def _serialize_value(self, value: Any) -> Any:
        """Safely serialize complex values for JSON output."""
        if isinstance(value, str | int | float | bool) or value is None:
            return value
        elif isinstance(value, dict):
            return {k: self._serialize_value(v) for k, v in value.items()}
        elif isinstance(value, list | tuple):
            return [self._serialize_value(item) for item in value]
        else:
            return str(value)


class AsyncLogHandler(logging.Handler):
    """Non-blocking async log handler to prevent performance impact."""

    def __init__(self, target_handler: logging.Handler, max_queue_size: int = 10000):
        super().__init__()
        self.target_handler = target_handler
        self.max_queue_size = max_queue_size
        self._queue: list[logging.LogRecord] = []
        self._queue_lock = threading.Lock()
        self._shutdown = False

        # Start background thread for processing logs
        self._worker_thread = threading.Thread(target=self._process_logs, daemon=True)
        self._worker_thread.start()

    def emit(self, record: logging.LogRecord):
        """Queue log record for async processing."""
        if self._shutdown:
            return

        with self._queue_lock:
            if len(self._queue) < self.max_queue_size:
                self._queue.append(record)
            # If queue is full, drop oldest records
            elif self._queue:
                self._queue.pop(0)
                self._queue.append(record)

    def _process_logs(self):
        """Background thread to process queued log records."""
        while not self._shutdown:
            records_to_process = []

            with self._queue_lock:
                if self._queue:
                    records_to_process = self._queue[:]
                    self._queue.clear()

            for record in records_to_process:
                try:
                    self.target_handler.emit(record)
                except Exception:
                    # Silently ignore errors to prevent infinite recursion
                    pass

            # Brief sleep to prevent busy waiting
            time.sleep(0.01)

    def close(self):
        """Close the handler and wait for queue to flush."""
        self._shutdown = True
        self._worker_thread.join(timeout=5.0)
        self.target_handler.close()
        super().close()


class PerformanceMetricsLogger:
    """Comprehensive performance metrics logging for backtesting operations."""

    def __init__(self, logger_name: str = "maverick_mcp.performance"):
        self.logger = logging.getLogger(logger_name)
        self.metrics: dict[str, list[float]] = {
            "execution_times": [],
            "memory_usage": [],
            "cpu_usage": [],
            "operation_counts": [],
        }
        self._start_times: dict[str, float] = {}
        self._lock = threading.Lock()

        # Register for global aggregation
        _performance_logger_registry[logger_name] = self

    def start_operation(self, operation_id: str, operation_type: str, **context):
        """Start tracking a performance-critical operation."""
        start_time = time.time()

        with self._lock:
            self._start_times[operation_id] = start_time

        # Set request context
        request_start_var.set(start_time)
        if "tool_name" in context:
            tool_name_var.set(context["tool_name"])

        self.logger.info(
            f"Started {operation_type} operation",
            extra={
                "operation_id": operation_id,
                "operation_type": operation_type,
                "start_time": start_time,
                **context,
            },
        )

    def end_operation(self, operation_id: str, success: bool = True, **metrics):
        """End tracking of a performance-critical operation."""
        end_time = time.time()

        with self._lock:
            start_time = self._start_times.pop(operation_id, end_time)

        duration_ms = (end_time - start_time) * 1000

        # Collect system metrics
        try:
            process = psutil.Process()
            memory_mb = process.memory_info().rss / 1024 / 1024
            cpu_percent = process.cpu_percent(interval=None)
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            memory_mb = 0
            cpu_percent = 0

        # Update internal metrics
        with self._lock:
            self.metrics["execution_times"].append(duration_ms)
            self.metrics["memory_usage"].append(memory_mb)
            self.metrics["cpu_usage"].append(cpu_percent)
            self.metrics["operation_counts"].append(1)

        log_level = logging.INFO if success else logging.ERROR
        self.logger.log(
            log_level,
            f"{'Completed' if success else 'Failed'} operation in {duration_ms:.2f}ms",
            extra={
                "operation_id": operation_id,
                "duration_ms": duration_ms,
                "memory_mb": memory_mb,
                "cpu_percent": cpu_percent,
                "success": success,
                **metrics,
            },
        )

    def log_business_metric(self, metric_name: str, value: int | float, **context):
        """Log business-specific metrics like strategies processed, success rates."""
        self.logger.info(
            f"Business metric: {metric_name} = {value}",
            extra={
                "metric_name": metric_name,
                "metric_value": value,
                "metric_type": "business",
                **context,
            },
        )

    def get_performance_summary(self) -> dict[str, Any]:
        """Get aggregated performance metrics summary."""
        with self._lock:
            if not self.metrics["execution_times"]:
                return {"message": "No performance data available"}

            execution_times = self.metrics["execution_times"]
            memory_usage = self.metrics["memory_usage"]
            cpu_usage = self.metrics["cpu_usage"]

            return {
                "operations_count": len(execution_times),
                "execution_time_stats": {
                    "avg_ms": sum(execution_times) / len(execution_times),
                    "min_ms": min(execution_times),
                    "max_ms": max(execution_times),
                    "total_ms": sum(execution_times),
                },
                "memory_stats": {
                    "avg_mb": sum(memory_usage) / len(memory_usage)
                    if memory_usage
                    else 0,
                    "peak_mb": max(memory_usage) if memory_usage else 0,
                },
                "cpu_stats": {
                    "avg_percent": sum(cpu_usage) / len(cpu_usage) if cpu_usage else 0,
                    "peak_percent": max(cpu_usage) if cpu_usage else 0,
                },
                "timestamp": datetime.now(UTC).isoformat(),
            }


class DebugModeManager:
    """Manages debug mode configuration and verbose logging."""

    def __init__(self):
        self._debug_enabled = os.getenv("MAVERICK_DEBUG", "false").lower() in (
            "true",
            "1",
            "on",
        )
        self._verbose_modules: set = set()
        self._debug_filters: dict[str, Any] = {}

    def is_debug_enabled(self, module_name: str = "") -> bool:
        """Check if debug mode is enabled globally or for specific module."""
        if not self._debug_enabled:
            return False

        if not module_name:
            return True

        # Check if specific module debug is enabled
        return module_name in self._verbose_modules or not self._verbose_modules

    def enable_verbose_logging(self, module_pattern: str):
        """Enable verbose logging for specific module pattern."""
        self._verbose_modules.add(module_pattern)

    def add_debug_filter(self, filter_name: str, filter_config: dict[str, Any]):
        """Add custom debug filter configuration."""
        self._debug_filters[filter_name] = filter_config

    def should_log_request_response(self, operation_name: str) -> bool:
        """Check if request/response should be logged for operation."""
        if not self._debug_enabled:
            return False

        # Check specific filters
        for _filter_name, config in self._debug_filters.items():
            if config.get("log_request_response") and operation_name in config.get(
                "operations", []
            ):
                return True

        return True  # Default to true in debug mode


class StructuredLoggerManager:
    """Central manager for structured logging configuration."""

    def __init__(self):
        self.debug_manager = DebugModeManager()
        self.performance_loggers: dict[str, PerformanceMetricsLogger] = {}
        self._configured = False

    def setup_structured_logging(
        self,
        log_level: str = "INFO",
        log_format: str = "json",
        log_file: str | None = None,
        enable_async: bool = True,
        enable_rotation: bool = True,
        max_log_size: int = 10 * 1024 * 1024,  # 10MB
        backup_count: int = 5,
        console_output: str = "stdout",  # stdout, stderr
        remote_handler_config: dict[str, Any] | None = None,
    ):
        """Setup comprehensive structured logging infrastructure."""

        if self._configured:
            return

        # Configure root logger
        root_logger = logging.getLogger()
        root_logger.setLevel(getattr(logging, log_level.upper()))

        # Clear existing handlers
        for handler in root_logger.handlers[:]:
            root_logger.removeHandler(handler)

        handlers = []

        # Console handler
        console_stream = sys.stdout if console_output == "stdout" else sys.stderr
        console_handler = logging.StreamHandler(console_stream)

        if log_format == "json":
            console_formatter = EnhancedStructuredFormatter(
                include_performance=True, include_resources=True
            )
        else:
            console_formatter = logging.Formatter(
                "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
            )

        console_handler.setFormatter(console_formatter)
        handlers.append(console_handler)

        # File handler with rotation if specified
        if log_file:
            log_path = Path(log_file)
            log_path.parent.mkdir(parents=True, exist_ok=True)

            if enable_rotation:
                file_handler = logging.handlers.RotatingFileHandler(
                    log_file, maxBytes=max_log_size, backupCount=backup_count
                )
            else:
                file_handler = logging.FileHandler(log_file)

            file_handler.setFormatter(EnhancedStructuredFormatter())
            handlers.append(file_handler)

        # Remote handler if configured (for log aggregation)
        if remote_handler_config:
            remote_handler = self._create_remote_handler(remote_handler_config)
            if remote_handler:
                handlers.append(remote_handler)

        # Wrap handlers with async processing if enabled
        if enable_async:
            handlers = [AsyncLogHandler(handler) for handler in handlers]

        # Add all handlers to root logger
        for handler in handlers:
            root_logger.addHandler(handler)

        # Set specific logger levels to reduce noise
        logging.getLogger("urllib3").setLevel(logging.WARNING)
        logging.getLogger("requests").setLevel(logging.WARNING)
        logging.getLogger("httpx").setLevel(logging.WARNING)
        logging.getLogger("asyncio").setLevel(logging.WARNING)

        # Enable debug mode loggers if configured
        if self.debug_manager.is_debug_enabled():
            self._setup_debug_loggers()

        self._configured = True

    def _create_remote_handler(self, config: dict[str, Any]) -> logging.Handler | None:
        """Create remote handler for log aggregation (placeholder for future implementation)."""
        # This would implement remote logging to services like ELK, Splunk, etc.
        # For now, return None as it's not implemented
        return None

    def _setup_debug_loggers(self):
        """Setup additional loggers for debug mode."""
        debug_logger = logging.getLogger("maverick_mcp.debug")
        debug_logger.setLevel(logging.DEBUG)

        request_logger = logging.getLogger("maverick_mcp.requests")
        request_logger.setLevel(logging.DEBUG)

    def get_performance_logger(self, logger_name: str) -> PerformanceMetricsLogger:
        """Get or create performance logger for specific component."""
        if logger_name not in self.performance_loggers:
            self.performance_loggers[logger_name] = PerformanceMetricsLogger(
                logger_name
            )
        return self.performance_loggers[logger_name]

    def get_logger(self, name: str) -> logging.Logger:
        """Get structured logger with correlation support."""
        return logging.getLogger(name)

    def create_dashboard_metrics(self) -> dict[str, Any]:
        """Create comprehensive metrics for performance dashboard."""
        global _log_level_counts

        dashboard_data = {
            "system_metrics": {
                "timestamp": datetime.now(UTC).isoformat(),
                "log_level_counts": _log_level_counts.copy(),
                "active_correlation_ids": len(
                    [cid for cid in [correlation_id_var.get()] if cid]
                ),
            },
            "performance_metrics": {},
            "memory_stats": {},
        }

        # Aggregate performance metrics from all loggers
        for logger_name, perf_logger in _performance_logger_registry.items():
            dashboard_data["performance_metrics"][logger_name] = (
                perf_logger.get_performance_summary()
            )

        # System memory stats
        try:
            process = psutil.Process()
            memory_info = process.memory_info()
            dashboard_data["memory_stats"] = {
                "rss_mb": round(memory_info.rss / 1024 / 1024, 2),
                "vms_mb": round(memory_info.vms / 1024 / 1024, 2),
                "cpu_percent": process.cpu_percent(interval=None),
                "gc_stats": {
                    "generation_0": gc.get_count()[0],
                    "generation_1": gc.get_count()[1],
                    "generation_2": gc.get_count()[2],
                },
            }
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            dashboard_data["memory_stats"] = {"error": "Unable to collect memory stats"}

        return dashboard_data


# Global instance
_logger_manager: StructuredLoggerManager | None = None


def get_logger_manager() -> StructuredLoggerManager:
    """Get global logger manager instance."""
    global _logger_manager
    if _logger_manager is None:
        _logger_manager = StructuredLoggerManager()
    return _logger_manager


def with_structured_logging(
    operation_name: str,
    include_performance: bool = True,
    log_params: bool = True,
    log_result: bool = False,
):
    """Decorator for automatic structured logging of operations."""

    def decorator(func: Callable) -> Callable:
        @wraps(func)
        async def async_wrapper(*args, **kwargs):
            # Generate correlation ID if not present
            correlation_id = CorrelationIDGenerator.get_correlation_id()
            if not correlation_id:
                correlation_id = CorrelationIDGenerator.set_correlation_id()

            # Setup operation context
            operation_id = f"{operation_name}_{int(time.time() * 1000)}"
            tool_name_var.set(operation_name)

            logger = get_logger_manager().get_logger(f"maverick_mcp.{operation_name}")
            perf_logger = None

            if include_performance:
                perf_logger = get_logger_manager().get_performance_logger(
                    f"performance.{operation_name}"
                )
                perf_logger.start_operation(
                    operation_id=operation_id,
                    operation_type=operation_name,
                    tool_name=operation_name,
                )

            # Log operation start
            extra_data = {
                "operation_id": operation_id,
                "correlation_id": correlation_id,
            }
            if log_params:
                # Sanitize parameters
                safe_kwargs = {
                    k: "***MASKED***"
                    if "password" in k.lower() or "token" in k.lower()
                    else v
                    for k, v in kwargs.items()
                }
                extra_data["parameters"] = safe_kwargs

            logger.info(f"Starting {operation_name}", extra=extra_data)

            try:
                # Execute the function
                result = await func(*args, **kwargs)

                # Log success
                success_data = {"operation_id": operation_id, "success": True}
                if log_result and result is not None:
                    # Limit result size for logging
                    result_str = str(result)
                    success_data["result"] = (
                        result_str[:1000] + "..."
                        if len(result_str) > 1000
                        else result_str
                    )

                logger.info(f"Completed {operation_name}", extra=success_data)

                if perf_logger:
                    perf_logger.end_operation(operation_id, success=True)

                return result

            except Exception as e:
                # Log error
                logger.error(
                    f"Failed {operation_name}: {str(e)}",
                    exc_info=True,
                    extra={
                        "operation_id": operation_id,
                        "error_type": type(e).__name__,
                        "success": False,
                    },
                )

                if perf_logger:
                    perf_logger.end_operation(operation_id, success=False, error=str(e))

                raise

        @wraps(func)
        def sync_wrapper(*args, **kwargs):
            # Similar logic for sync functions
            correlation_id = CorrelationIDGenerator.get_correlation_id()
            if not correlation_id:
                correlation_id = CorrelationIDGenerator.set_correlation_id()

            operation_id = f"{operation_name}_{int(time.time() * 1000)}"
            tool_name_var.set(operation_name)

            logger = get_logger_manager().get_logger(f"maverick_mcp.{operation_name}")
            perf_logger = None

            if include_performance:
                perf_logger = get_logger_manager().get_performance_logger(
                    f"performance.{operation_name}"
                )
                perf_logger.start_operation(
                    operation_id=operation_id,
                    operation_type=operation_name,
                    tool_name=operation_name,
                )

            extra_data = {
                "operation_id": operation_id,
                "correlation_id": correlation_id,
            }
            if log_params:
                safe_kwargs = {
                    k: "***MASKED***"
                    if any(
                        sensitive in k.lower()
                        for sensitive in ["password", "token", "key", "secret"]
                    )
                    else v
                    for k, v in kwargs.items()
                }
                extra_data["parameters"] = safe_kwargs

            logger.info(f"Starting {operation_name}", extra=extra_data)

            try:
                result = func(*args, **kwargs)

                success_data = {"operation_id": operation_id, "success": True}
                if log_result and result is not None:
                    result_str = str(result)
                    success_data["result"] = (
                        result_str[:1000] + "..."
                        if len(result_str) > 1000
                        else result_str
                    )

                logger.info(f"Completed {operation_name}", extra=success_data)

                if perf_logger:
                    perf_logger.end_operation(operation_id, success=True)

                return result

            except Exception as e:
                logger.error(
                    f"Failed {operation_name}: {str(e)}",
                    exc_info=True,
                    extra={
                        "operation_id": operation_id,
                        "error_type": type(e).__name__,
                        "success": False,
                    },
                )

                if perf_logger:
                    perf_logger.end_operation(operation_id, success=False, error=str(e))

                raise

        return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper

    return decorator


# Convenience functions
def get_structured_logger(name: str) -> logging.Logger:
    """Get structured logger instance."""
    return get_logger_manager().get_logger(name)


def get_performance_logger(component: str) -> PerformanceMetricsLogger:
    """Get performance logger for specific component."""
    return get_logger_manager().get_performance_logger(component)


def setup_backtesting_logging(
    log_level: str = "INFO", enable_debug: bool = False, log_file: str | None = None
):
    """Setup logging specifically configured for backtesting operations."""

    # Set debug environment if requested
    if enable_debug:
        os.environ["MAVERICK_DEBUG"] = "true"

    # Setup structured logging
    manager = get_logger_manager()
    manager.setup_structured_logging(
        log_level=log_level,
        log_format="json",
        log_file=log_file or "logs/backtesting.log",
        enable_async=True,
        enable_rotation=True,
        console_output="stderr",  # Use stderr for MCP compatibility
    )

    # Configure debug filters for backtesting
    if enable_debug:
        manager.debug_manager.add_debug_filter(
            "backtesting",
            {
                "log_request_response": True,
                "operations": [
                    "run_backtest",
                    "optimize_parameters",
                    "get_historical_data",
                ],
            },
        )


# Update log level counts (for dashboard metrics)
class LogLevelCounterFilter(logging.Filter):
    """Filter to count log levels for dashboard metrics."""

    def filter(self, record: logging.LogRecord) -> bool:
        global _log_level_counts
        _log_level_counts[record.levelname] = (
            _log_level_counts.get(record.levelname, 0) + 1
        )
        return True


# Add the counter filter to root logger
logging.getLogger().addFilter(LogLevelCounterFilter())

```

--------------------------------------------------------------------------------
/tests/test_backtest_persistence.py:
--------------------------------------------------------------------------------

```python
"""
Comprehensive tests for backtest persistence layer.

Tests cover:
- PostgreSQL persistence layer with comprehensive database operations
- BacktestResult, BacktestTrade, OptimizationResult, and WalkForwardTest models
- Database CRUD operations with proper error handling
- Performance comparison and ranking functionality
- Backtest result caching and retrieval optimization
- Database constraint validation and data integrity
- Concurrent access and transaction handling
"""

from datetime import datetime, timedelta
from decimal import Decimal
from typing import Any
from unittest.mock import Mock, patch
from uuid import UUID, uuid4

import numpy as np
import pandas as pd
import pytest
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session

from maverick_mcp.backtesting.persistence import (
    BacktestPersistenceError,
    BacktestPersistenceManager,
    find_best_strategy_for_symbol,
    get_recent_backtests,
    save_vectorbt_results,
)
from maverick_mcp.data.models import (
    BacktestResult,
    BacktestTrade,
    OptimizationResult,
    WalkForwardTest,
)


class TestBacktestPersistenceManager:
    """Test suite for BacktestPersistenceManager class."""

    @pytest.fixture
    def sample_vectorbt_results(self) -> dict[str, Any]:
        """Create sample VectorBT results for testing."""
        dates = pd.date_range(start="2023-01-01", end="2023-12-31", freq="D")
        equity_curve = np.cumsum(np.random.normal(0.001, 0.02, len(dates)))
        drawdown_series = np.minimum(
            0, equity_curve - np.maximum.accumulate(equity_curve)
        )

        return {
            "symbol": "AAPL",
            "strategy": "momentum_crossover",
            "parameters": {
                "fast_window": 10,
                "slow_window": 20,
                "signal_threshold": 0.02,
            },
            "start_date": "2023-01-01",
            "end_date": "2023-12-31",
            "initial_capital": 10000.0,
            "metrics": {
                "total_return": 0.15,
                "annualized_return": 0.18,
                "sharpe_ratio": 1.25,
                "sortino_ratio": 1.45,
                "calmar_ratio": 1.10,
                "max_drawdown": -0.08,
                "max_drawdown_duration": 45,
                "volatility": 0.16,
                "downside_volatility": 0.12,
                "total_trades": 24,
                "winning_trades": 14,
                "losing_trades": 10,
                "win_rate": 0.583,
                "profit_factor": 1.35,
                "average_win": 0.045,
                "average_loss": -0.025,
                "largest_win": 0.12,
                "largest_loss": -0.08,
                "final_value": 11500.0,
                "peak_value": 12100.0,
                "beta": 1.05,
                "alpha": 0.03,
            },
            "equity_curve": equity_curve.tolist(),
            "drawdown_series": drawdown_series.tolist(),
            "trades": [
                {
                    "entry_date": "2023-01-15",
                    "entry_price": 150.0,
                    "entry_time": "2023-01-15T09:30:00",
                    "exit_date": "2023-01-25",
                    "exit_price": 155.0,
                    "exit_time": "2023-01-25T16:00:00",
                    "position_size": 100,
                    "direction": "long",
                    "pnl": 500.0,
                    "pnl_percent": 0.033,
                    "mae": -150.0,
                    "mfe": 600.0,
                    "duration_days": 10,
                    "duration_hours": 6.5,
                    "exit_reason": "take_profit",
                    "fees_paid": 2.0,
                    "slippage_cost": 1.0,
                },
                {
                    "entry_date": "2023-02-01",
                    "entry_price": 160.0,
                    "entry_time": "2023-02-01T10:00:00",
                    "exit_date": "2023-02-10",
                    "exit_price": 156.0,
                    "exit_time": "2023-02-10T15:30:00",
                    "position_size": 100,
                    "direction": "long",
                    "pnl": -400.0,
                    "pnl_percent": -0.025,
                    "mae": -500.0,
                    "mfe": 200.0,
                    "duration_days": 9,
                    "duration_hours": 5.5,
                    "exit_reason": "stop_loss",
                    "fees_paid": 2.0,
                    "slippage_cost": 1.0,
                },
            ],
        }

    @pytest.fixture
    def persistence_manager(self, db_session: Session):
        """Create a persistence manager with test database session."""
        return BacktestPersistenceManager(session=db_session)

    def test_persistence_manager_context_manager(self, db_session: Session):
        """Test persistence manager as context manager."""
        with BacktestPersistenceManager(session=db_session) as manager:
            assert manager.session == db_session
            assert not manager._owns_session

        # Test with auto-session creation (mocked)
        with patch(
            "maverick_mcp.backtesting.persistence.SessionLocal"
        ) as mock_session_local:
            mock_session = Mock(spec=Session)
            mock_session_local.return_value = mock_session

            with BacktestPersistenceManager() as manager:
                assert manager.session == mock_session
                assert manager._owns_session
                mock_session.commit.assert_called_once()
                mock_session.close.assert_called_once()

    def test_save_backtest_result_success(
        self, persistence_manager, sample_vectorbt_results
    ):
        """Test successful backtest result saving."""
        backtest_id = persistence_manager.save_backtest_result(
            vectorbt_results=sample_vectorbt_results,
            execution_time=2.5,
            notes="Test backtest run",
        )

        # Test return value
        assert isinstance(backtest_id, str)
        assert UUID(backtest_id)  # Valid UUID

        # Test database record
        result = (
            persistence_manager.session.query(BacktestResult)
            .filter(BacktestResult.backtest_id == UUID(backtest_id))
            .first()
        )

        assert result is not None
        assert result.symbol == "AAPL"
        assert result.strategy_type == "momentum_crossover"
        assert result.total_return == Decimal("0.15")
        assert result.sharpe_ratio == Decimal("1.25")
        assert result.total_trades == 24
        assert result.execution_time_seconds == Decimal("2.5")
        assert result.notes == "Test backtest run"

        # Test trades were saved
        trades = (
            persistence_manager.session.query(BacktestTrade)
            .filter(BacktestTrade.backtest_id == UUID(backtest_id))
            .all()
        )

        assert len(trades) == 2
        assert trades[0].symbol == "AAPL"
        assert trades[0].pnl == Decimal("500.0")
        assert trades[1].pnl == Decimal("-400.0")

    def test_save_backtest_result_validation_error(self, persistence_manager):
        """Test backtest saving with validation errors."""
        # Missing required fields
        invalid_results = {"symbol": "", "strategy": ""}

        with pytest.raises(BacktestPersistenceError) as exc_info:
            persistence_manager.save_backtest_result(invalid_results)

        assert "Symbol and strategy type are required" in str(exc_info.value)

    def test_save_backtest_result_database_error(
        self, persistence_manager, sample_vectorbt_results
    ):
        """Test backtest saving with database errors."""
        with patch.object(
            persistence_manager.session, "add", side_effect=SQLAlchemyError("DB Error")
        ):
            with pytest.raises(BacktestPersistenceError) as exc_info:
                persistence_manager.save_backtest_result(sample_vectorbt_results)

            assert "Failed to save backtest" in str(exc_info.value)

    def test_get_backtest_by_id(self, persistence_manager, sample_vectorbt_results):
        """Test retrieval of backtest by ID."""
        # Save a backtest first
        backtest_id = persistence_manager.save_backtest_result(sample_vectorbt_results)

        # Retrieve it
        result = persistence_manager.get_backtest_by_id(backtest_id)

        assert result is not None
        assert str(result.backtest_id) == backtest_id
        assert result.symbol == "AAPL"
        assert result.strategy_type == "momentum_crossover"

        # Test non-existent ID
        fake_id = str(uuid4())
        result = persistence_manager.get_backtest_by_id(fake_id)
        assert result is None

        # Test invalid UUID format
        result = persistence_manager.get_backtest_by_id("invalid-uuid")
        assert result is None

    def test_get_backtests_by_symbol(
        self, persistence_manager, sample_vectorbt_results
    ):
        """Test retrieval of backtests by symbol."""
        # Save multiple backtests for same symbol
        sample_vectorbt_results["strategy"] = "momentum_v1"
        backtest_id1 = persistence_manager.save_backtest_result(sample_vectorbt_results)

        sample_vectorbt_results["strategy"] = "momentum_v2"
        backtest_id2 = persistence_manager.save_backtest_result(sample_vectorbt_results)

        # Save backtest for different symbol
        sample_vectorbt_results["symbol"] = "GOOGL"
        sample_vectorbt_results["strategy"] = "momentum_v1"
        backtest_id3 = persistence_manager.save_backtest_result(sample_vectorbt_results)

        # Test retrieval by symbol
        aapl_results = persistence_manager.get_backtests_by_symbol("AAPL")
        assert len(aapl_results) == 2
        assert all(result.symbol == "AAPL" for result in aapl_results)
        assert backtest_id1 != backtest_id2
        assert backtest_id3 not in {backtest_id1, backtest_id2}
        retrieved_ids = {str(result.backtest_id) for result in aapl_results}
        assert {backtest_id1, backtest_id2}.issubset(retrieved_ids)

        # Test with strategy filter
        aapl_v1_results = persistence_manager.get_backtests_by_symbol(
            "AAPL", "momentum_v1"
        )
        assert len(aapl_v1_results) == 1
        assert aapl_v1_results[0].strategy_type == "momentum_v1"

        # Test with limit
        limited_results = persistence_manager.get_backtests_by_symbol("AAPL", limit=1)
        assert len(limited_results) == 1

        # Test non-existent symbol
        empty_results = persistence_manager.get_backtests_by_symbol("NONEXISTENT")
        assert len(empty_results) == 0

    def test_get_best_performing_strategies(
        self, persistence_manager, sample_vectorbt_results
    ):
        """Test retrieval of best performing strategies."""
        # Create multiple backtests with different performance
        strategies_performance = [
            (
                "momentum",
                {"sharpe_ratio": 1.5, "total_return": 0.2, "total_trades": 15},
            ),
            (
                "mean_reversion",
                {"sharpe_ratio": 1.8, "total_return": 0.15, "total_trades": 20},
            ),
            (
                "breakout",
                {"sharpe_ratio": 0.8, "total_return": 0.25, "total_trades": 10},
            ),
            (
                "momentum_v2",
                {"sharpe_ratio": 2.0, "total_return": 0.3, "total_trades": 25},
            ),
        ]

        backtest_ids = []
        for strategy, metrics in strategies_performance:
            sample_vectorbt_results["strategy"] = strategy
            sample_vectorbt_results["metrics"].update(metrics)
            backtest_id = persistence_manager.save_backtest_result(
                sample_vectorbt_results
            )
            backtest_ids.append(backtest_id)

        # Test best by Sharpe ratio (default)
        best_sharpe = persistence_manager.get_best_performing_strategies(
            "sharpe_ratio", limit=3
        )
        assert len(best_sharpe) == 3
        assert best_sharpe[0].strategy_type == "momentum_v2"  # Highest Sharpe
        assert best_sharpe[1].strategy_type == "mean_reversion"  # Second highest
        assert best_sharpe[0].sharpe_ratio > best_sharpe[1].sharpe_ratio

        # Test best by total return
        best_return = persistence_manager.get_best_performing_strategies(
            "total_return", limit=2
        )
        assert len(best_return) == 2
        assert best_return[0].strategy_type == "momentum_v2"  # Highest return

        # Test minimum trades filter
        high_volume = persistence_manager.get_best_performing_strategies(
            "sharpe_ratio", min_trades=20
        )
        assert len(high_volume) == 2  # Only momentum_v2 and mean_reversion
        assert all(result.total_trades >= 20 for result in high_volume)

    def test_compare_strategies(self, persistence_manager, sample_vectorbt_results):
        """Test strategy comparison functionality."""
        # Create backtests to compare
        strategies = ["momentum", "mean_reversion", "breakout"]
        backtest_ids = []

        for i, strategy in enumerate(strategies):
            sample_vectorbt_results["strategy"] = strategy
            sample_vectorbt_results["metrics"]["sharpe_ratio"] = 1.0 + i * 0.5
            sample_vectorbt_results["metrics"]["total_return"] = 0.1 + i * 0.05
            sample_vectorbt_results["metrics"]["max_drawdown"] = -0.05 - i * 0.02
            backtest_id = persistence_manager.save_backtest_result(
                sample_vectorbt_results
            )
            backtest_ids.append(backtest_id)

        # Test comparison
        comparison = persistence_manager.compare_strategies(backtest_ids)

        assert "backtests" in comparison
        assert "rankings" in comparison
        assert "summary" in comparison
        assert len(comparison["backtests"]) == 3

        # Test rankings
        assert "sharpe_ratio" in comparison["rankings"]
        sharpe_rankings = comparison["rankings"]["sharpe_ratio"]
        assert len(sharpe_rankings) == 3
        assert sharpe_rankings[0]["rank"] == 1  # Best rank
        assert sharpe_rankings[0]["value"] > sharpe_rankings[1]["value"]

        # Test max_drawdown ranking (lower is better)
        assert "max_drawdown" in comparison["rankings"]
        dd_rankings = comparison["rankings"]["max_drawdown"]
        assert (
            dd_rankings[0]["value"] > dd_rankings[-1]["value"]
        )  # Less negative is better

        # Test summary
        summary = comparison["summary"]
        assert summary["total_backtests"] == 3
        assert "date_range" in summary

    def test_save_optimization_results(
        self, persistence_manager, sample_vectorbt_results
    ):
        """Test saving parameter optimization results."""
        # Save parent backtest first
        backtest_id = persistence_manager.save_backtest_result(sample_vectorbt_results)

        # Create optimization results
        optimization_results = [
            {
                "parameters": {"window": 10, "threshold": 0.01},
                "objective_value": 1.2,
                "total_return": 0.15,
                "sharpe_ratio": 1.2,
                "max_drawdown": -0.08,
                "win_rate": 0.6,
                "profit_factor": 1.3,
                "total_trades": 20,
                "rank": 1,
            },
            {
                "parameters": {"window": 20, "threshold": 0.02},
                "objective_value": 1.5,
                "total_return": 0.18,
                "sharpe_ratio": 1.5,
                "max_drawdown": -0.06,
                "win_rate": 0.65,
                "profit_factor": 1.4,
                "total_trades": 18,
                "rank": 2,
            },
        ]

        # Save optimization results
        count = persistence_manager.save_optimization_results(
            backtest_id=backtest_id,
            optimization_results=optimization_results,
            objective_function="sharpe_ratio",
        )

        assert count == 2

        # Verify saved results
        opt_results = (
            persistence_manager.session.query(OptimizationResult)
            .filter(OptimizationResult.backtest_id == UUID(backtest_id))
            .all()
        )

        assert len(opt_results) == 2
        assert opt_results[0].objective_function == "sharpe_ratio"
        assert opt_results[0].parameters == {"window": 10, "threshold": 0.01}
        assert opt_results[0].objective_value == Decimal("1.2")

    def test_save_walk_forward_test(self, persistence_manager, sample_vectorbt_results):
        """Test saving walk-forward validation results."""
        # Save parent backtest first
        backtest_id = persistence_manager.save_backtest_result(sample_vectorbt_results)

        # Create walk-forward test data
        walk_forward_data = {
            "window_size_months": 6,
            "step_size_months": 1,
            "training_start": "2023-01-01",
            "training_end": "2023-06-30",
            "test_period_start": "2023-07-01",
            "test_period_end": "2023-07-31",
            "optimal_parameters": {"window": 15, "threshold": 0.015},
            "training_performance": 1.3,
            "out_of_sample_return": 0.12,
            "out_of_sample_sharpe": 1.1,
            "out_of_sample_drawdown": -0.05,
            "out_of_sample_trades": 8,
            "performance_ratio": 0.85,
            "degradation_factor": 0.15,
            "is_profitable": True,
            "is_statistically_significant": True,
        }

        # Save walk-forward test
        wf_id = persistence_manager.save_walk_forward_test(
            backtest_id, walk_forward_data
        )

        assert isinstance(wf_id, str)
        assert UUID(wf_id)

        # Verify saved result
        wf_test = (
            persistence_manager.session.query(WalkForwardTest)
            .filter(WalkForwardTest.walk_forward_id == UUID(wf_id))
            .first()
        )

        assert wf_test is not None
        assert wf_test.parent_backtest_id == UUID(backtest_id)
        assert wf_test.window_size_months == 6
        assert wf_test.out_of_sample_sharpe == Decimal("1.1")
        assert wf_test.is_profitable is True

    def test_get_backtest_performance_summary(
        self, persistence_manager, sample_vectorbt_results
    ):
        """Test performance summary generation."""
        # Create backtests with different dates and performance
        base_date = datetime.utcnow()

        # Recent backtests (within 30 days)
        for i in range(3):
            sample_vectorbt_results["strategy"] = f"momentum_v{i + 1}"
            sample_vectorbt_results["metrics"]["total_return"] = 0.1 + i * 0.05
            sample_vectorbt_results["metrics"]["sharpe_ratio"] = 1.0 + i * 0.3
            sample_vectorbt_results["metrics"]["win_rate"] = 0.5 + i * 0.1

            with patch(
                "maverick_mcp.data.models.BacktestResult.backtest_date",
                base_date - timedelta(days=i * 10),
            ):
                persistence_manager.save_backtest_result(sample_vectorbt_results)

        # Old backtest (outside 30 days)
        sample_vectorbt_results["strategy"] = "old_strategy"
        with patch(
            "maverick_mcp.data.models.BacktestResult.backtest_date",
            base_date - timedelta(days=45),
        ):
            persistence_manager.save_backtest_result(sample_vectorbt_results)

        # Get summary
        summary = persistence_manager.get_backtest_performance_summary(days_back=30)

        assert "period" in summary
        assert summary["total_backtests"] == 3  # Only recent ones
        assert "performance_metrics" in summary

        metrics = summary["performance_metrics"]
        assert "average_return" in metrics
        assert "best_return" in metrics
        assert "worst_return" in metrics
        assert "average_sharpe" in metrics

        # Test strategy and symbol breakdowns
        assert "strategy_breakdown" in summary
        assert len(summary["strategy_breakdown"]) == 3
        assert "symbol_breakdown" in summary
        assert "AAPL" in summary["symbol_breakdown"]

    def test_delete_backtest(self, persistence_manager, sample_vectorbt_results):
        """Test backtest deletion with cascading."""
        # Save backtest with trades
        backtest_id = persistence_manager.save_backtest_result(sample_vectorbt_results)

        # Verify it exists
        result = persistence_manager.get_backtest_by_id(backtest_id)
        assert result is not None

        trades = (
            persistence_manager.session.query(BacktestTrade)
            .filter(BacktestTrade.backtest_id == UUID(backtest_id))
            .all()
        )
        assert len(trades) > 0

        # Delete backtest
        deleted = persistence_manager.delete_backtest(backtest_id)
        assert deleted is True

        # Verify deletion
        result = persistence_manager.get_backtest_by_id(backtest_id)
        assert result is None

        # Test non-existent deletion
        fake_id = str(uuid4())
        deleted = persistence_manager.delete_backtest(fake_id)
        assert deleted is False

    def test_safe_decimal_conversion(self):
        """Test safe decimal conversion utility."""
        from maverick_mcp.backtesting.persistence import BacktestPersistenceManager

        # Test valid conversions
        assert BacktestPersistenceManager._safe_decimal(123) == Decimal("123")
        assert BacktestPersistenceManager._safe_decimal(123.45) == Decimal("123.45")
        assert BacktestPersistenceManager._safe_decimal("456.78") == Decimal("456.78")
        assert BacktestPersistenceManager._safe_decimal(Decimal("789.01")) == Decimal(
            "789.01"
        )

        # Test None and invalid values
        assert BacktestPersistenceManager._safe_decimal(None) is None
        assert BacktestPersistenceManager._safe_decimal("invalid") is None
        assert BacktestPersistenceManager._safe_decimal([1, 2, 3]) is None


class TestConvenienceFunctions:
    """Test suite for convenience functions."""

    def test_save_vectorbt_results_function(
        self, db_session: Session, sample_vectorbt_results
    ):
        """Test save_vectorbt_results convenience function."""
        with patch(
            "maverick_mcp.backtesting.persistence.get_persistence_manager"
        ) as mock_factory:
            mock_manager = Mock(spec=BacktestPersistenceManager)
            mock_manager.save_backtest_result.return_value = "test-uuid-123"
            mock_manager.__enter__ = Mock(return_value=mock_manager)
            mock_manager.__exit__ = Mock(return_value=None)
            mock_factory.return_value = mock_manager

            result = save_vectorbt_results(
                vectorbt_results=sample_vectorbt_results,
                execution_time=2.5,
                notes="Test run",
            )

            assert result == "test-uuid-123"
            mock_manager.save_backtest_result.assert_called_once_with(
                sample_vectorbt_results, 2.5, "Test run"
            )

    def test_get_recent_backtests_function(self, db_session: Session):
        """Test get_recent_backtests convenience function."""
        with patch(
            "maverick_mcp.backtesting.persistence.get_persistence_manager"
        ) as mock_factory:
            mock_manager = Mock(spec=BacktestPersistenceManager)
            mock_session = Mock(spec=Session)
            mock_query = Mock()

            mock_manager.session = mock_session
            mock_session.query.return_value = mock_query
            mock_query.filter.return_value = mock_query
            mock_query.order_by.return_value = mock_query
            mock_query.all.return_value = ["result1", "result2"]

            mock_manager.__enter__ = Mock(return_value=mock_manager)
            mock_manager.__exit__ = Mock(return_value=None)
            mock_factory.return_value = mock_manager

            results = get_recent_backtests("AAPL", days=7)

            assert results == ["result1", "result2"]
            mock_session.query.assert_called_once_with(BacktestResult)

    def test_find_best_strategy_for_symbol_function(self, db_session: Session):
        """Test find_best_strategy_for_symbol convenience function."""
        with patch(
            "maverick_mcp.backtesting.persistence.get_persistence_manager"
        ) as mock_factory:
            mock_manager = Mock(spec=BacktestPersistenceManager)
            mock_best_result = Mock(spec=BacktestResult)

            mock_manager.get_best_performing_strategies.return_value = [
                mock_best_result
            ]
            mock_manager.get_backtests_by_symbol.return_value = [mock_best_result]
            mock_manager.__enter__ = Mock(return_value=mock_manager)
            mock_manager.__exit__ = Mock(return_value=None)
            mock_factory.return_value = mock_manager

            result = find_best_strategy_for_symbol("AAPL", "sharpe_ratio")

            assert result == mock_best_result
            mock_manager.get_backtests_by_symbol.assert_called_once_with(
                "AAPL", limit=1000
            )


class TestPersistenceStressTests:
    """Stress tests for persistence layer performance and reliability."""

    def test_bulk_insert_performance(
        self, persistence_manager, sample_vectorbt_results, benchmark_timer
    ):
        """Test bulk insert performance with many backtests."""
        backtest_count = 50

        with benchmark_timer() as timer:
            for i in range(backtest_count):
                sample_vectorbt_results["symbol"] = f"STOCK{i:03d}"
                sample_vectorbt_results["strategy"] = (
                    f"strategy_{i % 5}"  # 5 different strategies
                )
                persistence_manager.save_backtest_result(sample_vectorbt_results)

        # Should complete within reasonable time
        assert timer.elapsed < 30.0  # < 30 seconds for 50 backtests

        # Verify all were saved
        all_results = persistence_manager.session.query(BacktestResult).count()
        assert all_results == backtest_count

    def test_concurrent_access_handling(
        self, db_session: Session, sample_vectorbt_results
    ):
        """Test handling of concurrent database access."""
        import queue
        import threading

        results_queue = queue.Queue()
        error_queue = queue.Queue()

        def save_backtest(thread_id):
            try:
                # Each thread gets its own session
                with BacktestPersistenceManager() as manager:
                    modified_results = sample_vectorbt_results.copy()
                    modified_results["symbol"] = f"THREAD{thread_id}"
                    backtest_id = manager.save_backtest_result(modified_results)
                    results_queue.put(backtest_id)
            except Exception as e:
                error_queue.put(f"Thread {thread_id}: {e}")

        # Create multiple threads
        threads = []
        thread_count = 5

        for i in range(thread_count):
            thread = threading.Thread(target=save_backtest, args=(i,))
            threads.append(thread)
            thread.start()

        # Wait for all threads to complete
        for thread in threads:
            thread.join(timeout=10)  # 10 second timeout per thread

        # Check results
        assert error_queue.empty(), f"Errors occurred: {list(error_queue.queue)}"
        assert results_queue.qsize() == thread_count

        # Verify all backtests were saved with unique IDs
        saved_ids = []
        while not results_queue.empty():
            saved_ids.append(results_queue.get())

        assert len(saved_ids) == thread_count
        assert len(set(saved_ids)) == thread_count  # All unique

    def test_large_result_handling(self, persistence_manager, sample_vectorbt_results):
        """Test handling of large backtest results."""
        # Create large equity curve and drawdown series (1 year of minute data)
        large_data_size = 365 * 24 * 60  # ~525k data points

        sample_vectorbt_results["equity_curve"] = list(range(large_data_size))
        sample_vectorbt_results["drawdown_series"] = [
            -i / 1000 for i in range(large_data_size)
        ]

        # Also add many trades
        sample_vectorbt_results["trades"] = []
        for i in range(1000):  # 1000 trades
            trade = {
                "entry_date": f"2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}",
                "entry_price": 100 + (i % 100),
                "exit_date": f"2023-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}",
                "exit_price": 101 + (i % 100),
                "position_size": 100,
                "direction": "long",
                "pnl": i % 100 - 50,
                "pnl_percent": (i % 100 - 50) / 1000,
                "duration_days": i % 30 + 1,
                "exit_reason": "time_exit",
            }
            sample_vectorbt_results["trades"].append(trade)

        # Should handle large data without issues
        backtest_id = persistence_manager.save_backtest_result(sample_vectorbt_results)

        assert backtest_id is not None

        # Verify retrieval works
        result = persistence_manager.get_backtest_by_id(backtest_id)
        assert result is not None
        assert result.data_points == large_data_size

        # Verify trades were saved
        trades = (
            persistence_manager.session.query(BacktestTrade)
            .filter(BacktestTrade.backtest_id == UUID(backtest_id))
            .count()
        )
        assert trades == 1000

    def test_database_constraint_validation(
        self, persistence_manager, sample_vectorbt_results
    ):
        """Test database constraint validation and error handling."""
        # Save first backtest
        backtest_id1 = persistence_manager.save_backtest_result(sample_vectorbt_results)

        # Try to save with same UUID (should be prevented by unique constraint)
        with patch("uuid.uuid4", return_value=UUID(backtest_id1)):
            # This should handle the constraint violation gracefully
            try:
                backtest_id2 = persistence_manager.save_backtest_result(
                    sample_vectorbt_results
                )
                # If it succeeds, it should have generated a different UUID
                assert backtest_id2 != backtest_id1
            except BacktestPersistenceError:
                # Or it should raise a proper persistence error
                pass

    def test_memory_usage_with_large_datasets(
        self, persistence_manager, sample_vectorbt_results
    ):
        """Test memory usage doesn't grow excessively with large datasets."""
        import os

        import psutil

        process = psutil.Process(os.getpid())
        initial_memory = process.memory_info().rss

        # Create and save multiple large backtests
        for i in range(10):
            large_results = sample_vectorbt_results.copy()
            large_results["symbol"] = f"LARGE{i}"
            large_results["equity_curve"] = list(range(10000))  # 10k data points each
            large_results["drawdown_series"] = [-j / 1000 for j in range(10000)]

            persistence_manager.save_backtest_result(large_results)

        final_memory = process.memory_info().rss
        memory_growth = (final_memory - initial_memory) / 1024 / 1024  # MB

        # Memory growth should be reasonable (< 100MB for 10 large backtests)
        assert memory_growth < 100


if __name__ == "__main__":
    # Run tests with detailed output
    pytest.main([__file__, "-v", "--tb=short", "-x"])

```

--------------------------------------------------------------------------------
/docs/api/backtesting.md:
--------------------------------------------------------------------------------

```markdown
# Backtesting API Documentation

## Overview

The MaverickMCP backtesting system provides comprehensive strategy backtesting capabilities powered by VectorBT. It offers both traditional technical analysis strategies and advanced ML-enhanced approaches, with extensive optimization, validation, and analysis tools.

### Key Features

- **35+ Pre-built Strategies**: From simple moving averages to advanced ML ensembles
- **Strategy Optimization**: Grid search with coarse/medium/fine granularity
- **Walk-Forward Analysis**: Out-of-sample validation for strategy robustness
- **Monte Carlo Simulation**: Risk assessment with confidence intervals
- **Portfolio Backtesting**: Multi-symbol strategy application
- **Market Regime Analysis**: Intelligent strategy selection based on market conditions
- **ML-Enhanced Strategies**: Adaptive, ensemble, and regime-aware approaches
- **Comprehensive Visualization**: Charts, heatmaps, and performance dashboards

## Core Backtesting Tools

### run_backtest

Run a comprehensive backtest with specified strategy and parameters.

**Function**: `run_backtest`

**Parameters**:
- `symbol` (str, required): Stock symbol to backtest (e.g., "AAPL", "TSLA")
- `strategy` (str, default: "sma_cross"): Strategy type to use
- `start_date` (str, optional): Start date (YYYY-MM-DD), defaults to 1 year ago
- `end_date` (str, optional): End date (YYYY-MM-DD), defaults to today
- `initial_capital` (float, default: 10000.0): Starting capital for backtest

**Strategy-Specific Parameters**:
- `fast_period` (int, optional): Fast moving average period
- `slow_period` (int, optional): Slow moving average period
- `period` (int, optional): General period parameter (RSI, etc.)
- `oversold` (float, optional): RSI oversold threshold (default: 30)
- `overbought` (float, optional): RSI overbought threshold (default: 70)
- `signal_period` (int, optional): MACD signal line period
- `std_dev` (float, optional): Bollinger Bands standard deviation
- `lookback` (int, optional): Lookback period for momentum/breakout
- `threshold` (float, optional): Threshold for momentum strategies
- `z_score_threshold` (float, optional): Z-score threshold for mean reversion
- `breakout_factor` (float, optional): Breakout factor for channel strategies

**Returns**:
```json
{
  "symbol": "AAPL",
  "strategy": "sma_cross",
  "period": "2023-01-01 to 2024-01-01",
  "metrics": {
    "total_return": 0.15,
    "sharpe_ratio": 1.2,
    "max_drawdown": -0.08,
    "total_trades": 24,
    "win_rate": 0.58,
    "profit_factor": 1.45,
    "calmar_ratio": 1.85,
    "volatility": 0.18
  },
  "trades": [
    {
      "entry_date": "2023-01-15",
      "exit_date": "2023-02-10",
      "entry_price": 150.0,
      "exit_price": 158.5,
      "return": 0.057,
      "holding_period": 26
    }
  ],
  "equity_curve": [10000, 10150, 10200, ...],
  "drawdown_series": [0, -0.01, -0.02, ...],
  "analysis": {
    "risk_metrics": {...},
    "performance_analysis": {...}
  }
}
```

**Examples**:
```python
# Simple SMA crossover
run_backtest("AAPL", "sma_cross", fast_period=10, slow_period=20)

# RSI mean reversion
run_backtest("TSLA", "rsi", period=14, oversold=30, overbought=70)

# MACD strategy with custom parameters
run_backtest("MSFT", "macd", fast_period=12, slow_period=26, signal_period=9)

# Bollinger Bands strategy
run_backtest("GOOGL", "bollinger", period=20, std_dev=2.0)
```

### optimize_strategy

Optimize strategy parameters using grid search to find the best-performing configuration.

**Function**: `optimize_strategy`

**Parameters**:
- `symbol` (str, required): Stock symbol to optimize
- `strategy` (str, default: "sma_cross"): Strategy type to optimize
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `optimization_metric` (str, default: "sharpe_ratio"): Metric to optimize ("sharpe_ratio", "total_return", "win_rate", "calmar_ratio")
- `optimization_level` (str, default: "medium"): Level of optimization ("coarse", "medium", "fine")
- `top_n` (int, default: 10): Number of top results to return

**Returns**:
```json
{
  "symbol": "AAPL",
  "strategy": "sma_cross",
  "optimization_metric": "sharpe_ratio",
  "optimization_level": "medium",
  "total_combinations": 64,
  "execution_time": 45.2,
  "best_parameters": {
    "fast_period": 8,
    "slow_period": 21,
    "sharpe_ratio": 1.85,
    "total_return": 0.28,
    "max_drawdown": -0.06
  },
  "top_results": [
    {
      "parameters": {"fast_period": 8, "slow_period": 21},
      "sharpe_ratio": 1.85,
      "total_return": 0.28,
      "max_drawdown": -0.06,
      "total_trades": 18
    }
  ],
  "parameter_sensitivity": {
    "fast_period": {"min": 5, "max": 20, "best": 8},
    "slow_period": {"min": 20, "max": 50, "best": 21}
  }
}
```

**Examples**:
```python
# Optimize SMA crossover for Sharpe ratio
optimize_strategy("AAPL", "sma_cross", optimization_metric="sharpe_ratio")

# Fine-tune RSI parameters for total return
optimize_strategy("TSLA", "rsi", optimization_metric="total_return", optimization_level="fine")

# Quick coarse optimization for multiple strategies
optimize_strategy("MSFT", "macd", optimization_level="coarse", top_n=5)
```

### walk_forward_analysis

Perform walk-forward analysis to test strategy robustness and out-of-sample performance.

**Function**: `walk_forward_analysis`

**Parameters**:
- `symbol` (str, required): Stock symbol to analyze
- `strategy` (str, default: "sma_cross"): Strategy type
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `window_size` (int, default: 252): Test window size in trading days (default: 1 year)
- `step_size` (int, default: 63): Step size for rolling window (default: 1 quarter)

**Returns**:
```json
{
  "symbol": "AAPL",
  "strategy": "sma_cross",
  "total_windows": 8,
  "window_size": 252,
  "step_size": 63,
  "out_of_sample_performance": {
    "average_return": 0.12,
    "average_sharpe": 0.95,
    "consistency_score": 0.75,
    "best_window": {"period": "2023-Q2", "return": 0.28},
    "worst_window": {"period": "2023-Q4", "return": -0.05}
  },
  "window_results": [
    {
      "window_id": 1,
      "optimization_period": "2022-01-01 to 2022-12-31",
      "test_period": "2023-01-01 to 2023-03-31",
      "best_parameters": {"fast_period": 10, "slow_period": 25},
      "out_of_sample_return": 0.08,
      "out_of_sample_sharpe": 1.1
    }
  ],
  "stability_metrics": {
    "parameter_stability": 0.85,
    "performance_stability": 0.72,
    "overfitting_risk": "low"
  }
}
```

### monte_carlo_simulation

Run Monte Carlo simulation on backtest results to assess risk and confidence intervals.

**Function**: `monte_carlo_simulation`

**Parameters**:
- `symbol` (str, required): Stock symbol
- `strategy` (str, default: "sma_cross"): Strategy type
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `num_simulations` (int, default: 1000): Number of Monte Carlo simulations
- Strategy-specific parameters (same as `run_backtest`)

**Returns**:
```json
{
  "symbol": "AAPL",
  "strategy": "sma_cross",
  "num_simulations": 1000,
  "confidence_intervals": {
    "95%": {"lower": 0.05, "upper": 0.32},
    "90%": {"lower": 0.08, "upper": 0.28},
    "68%": {"lower": 0.12, "upper": 0.22}
  },
  "risk_metrics": {
    "probability_of_loss": 0.15,
    "expected_return": 0.17,
    "value_at_risk_5%": -0.12,
    "expected_shortfall": -0.18,
    "maximum_drawdown_95%": -0.15
  },
  "simulation_statistics": {
    "mean_return": 0.168,
    "std_return": 0.089,
    "skewness": -0.23,
    "kurtosis": 2.85,
    "best_simulation": 0.45,
    "worst_simulation": -0.28
  }
}
```

### compare_strategies

Compare multiple strategies on the same symbol to identify the best performer.

**Function**: `compare_strategies`

**Parameters**:
- `symbol` (str, required): Stock symbol
- `strategies` (list[str], optional): List of strategy types to compare (defaults to top 5)
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)

**Returns**:
```json
{
  "symbol": "AAPL",
  "comparison_period": "2023-01-01 to 2024-01-01",
  "strategies_compared": ["sma_cross", "rsi", "macd", "bollinger", "momentum"],
  "rankings": {
    "by_sharpe_ratio": [
      {"strategy": "macd", "sharpe_ratio": 1.45},
      {"strategy": "sma_cross", "sharpe_ratio": 1.22},
      {"strategy": "momentum", "sharpe_ratio": 0.98}
    ],
    "by_total_return": [
      {"strategy": "momentum", "total_return": 0.32},
      {"strategy": "macd", "total_return": 0.28},
      {"strategy": "sma_cross", "total_return": 0.18}
    ]
  },
  "detailed_comparison": {
    "sma_cross": {
      "total_return": 0.18,
      "sharpe_ratio": 1.22,
      "max_drawdown": -0.08,
      "total_trades": 24,
      "win_rate": 0.58
    }
  },
  "best_overall": "macd",
  "recommendation": "MACD strategy provides best risk-adjusted returns"
}
```

### backtest_portfolio

Backtest a strategy across multiple symbols to create a diversified portfolio.

**Function**: `backtest_portfolio`

**Parameters**:
- `symbols` (list[str], required): List of stock symbols
- `strategy` (str, default: "sma_cross"): Strategy type to apply
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `initial_capital` (float, default: 10000.0): Starting capital
- `position_size` (float, default: 0.1): Position size per symbol (0.1 = 10%)
- Strategy-specific parameters (same as `run_backtest`)

**Returns**:
```json
{
  "portfolio_metrics": {
    "symbols_tested": 5,
    "total_return": 0.22,
    "average_sharpe": 1.15,
    "max_drawdown": -0.12,
    "total_trades": 120,
    "diversification_benefit": 0.85
  },
  "individual_results": [
    {
      "symbol": "AAPL",
      "total_return": 0.18,
      "sharpe_ratio": 1.22,
      "max_drawdown": -0.08,
      "contribution_to_portfolio": 0.24
    }
  ],
  "correlation_matrix": {
    "AAPL": {"MSFT": 0.72, "GOOGL": 0.68},
    "MSFT": {"GOOGL": 0.75}
  },
  "summary": "Portfolio backtest of 5 symbols with sma_cross strategy"
}
```

## Strategy Management

### list_strategies

List all available backtesting strategies with descriptions and parameters.

**Function**: `list_strategies`

**Parameters**: None

**Returns**:
```json
{
  "available_strategies": {
    "sma_cross": {
      "type": "sma_cross",
      "name": "SMA Crossover",
      "description": "Buy when fast SMA crosses above slow SMA, sell when it crosses below",
      "default_parameters": {"fast_period": 10, "slow_period": 20},
      "optimization_ranges": {
        "fast_period": [5, 10, 15, 20],
        "slow_period": [20, 30, 50, 100]
      }
    }
  },
  "total_count": 9,
  "categories": {
    "trend_following": ["sma_cross", "ema_cross", "macd", "breakout"],
    "mean_reversion": ["rsi", "bollinger", "mean_reversion"],
    "momentum": ["momentum", "volume_momentum"]
  }
}
```

### parse_strategy

Parse natural language strategy description into VectorBT parameters.

**Function**: `parse_strategy`

**Parameters**:
- `description` (str, required): Natural language description of trading strategy

**Returns**:
```json
{
  "success": true,
  "strategy": {
    "strategy_type": "rsi",
    "parameters": {
      "period": 14,
      "oversold": 30,
      "overbought": 70
    }
  },
  "message": "Successfully parsed as rsi strategy"
}
```

**Examples**:
```python
# Parse natural language descriptions
parse_strategy("Buy when RSI is below 30 and sell when above 70")
parse_strategy("Use 10-day and 20-day moving average crossover")
parse_strategy("MACD strategy with standard parameters")
```

## Visualization Tools

### generate_backtest_charts

Generate comprehensive charts for a backtest including equity curve, trades, and performance dashboard.

**Function**: `generate_backtest_charts`

**Parameters**:
- `symbol` (str, required): Stock symbol
- `strategy` (str, default: "sma_cross"): Strategy type
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `theme` (str, default: "light"): Chart theme ("light" or "dark")

**Returns**:
```json
{
  "equity_curve": "...",
  "trade_scatter": "...",
  "performance_dashboard": "..."
}
```

### generate_optimization_charts

Generate heatmap charts for strategy parameter optimization results.

**Function**: `generate_optimization_charts`

**Parameters**:
- `symbol` (str, required): Stock symbol
- `strategy` (str, default: "sma_cross"): Strategy type
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `theme` (str, default: "light"): Chart theme ("light" or "dark")

**Returns**:
```json
{
  "optimization_heatmap": "..."
}
```

## ML-Enhanced Strategies

### run_ml_strategy_backtest

Run backtest using machine learning-enhanced strategies with adaptive capabilities.

**Function**: `run_ml_strategy_backtest`

**Parameters**:
- `symbol` (str, required): Stock symbol to backtest
- `strategy_type` (str, default: "ml_predictor"): ML strategy type ("ml_predictor", "adaptive", "ensemble", "regime_aware")
- `start_date` (str, optional): Start date (YYYY-MM-DD)
- `end_date` (str, optional): End date (YYYY-MM-DD)
- `initial_capital` (float, default: 10000.0): Initial capital amount
- `train_ratio` (float, default: 0.8): Ratio of data for training (0.0-1.0)
- `model_type` (str, default: "random_forest"): ML model type
- `n_estimators` (int, default: 100): Number of estimators for ensemble models
- `max_depth` (int, optional): Maximum tree depth
- `learning_rate` (float, default: 0.01): Learning rate for adaptive strategies
- `adaptation_method` (str, default: "gradient"): Adaptation method ("gradient", "momentum")

**Returns**:
```json
{
  "symbol": "AAPL",
  "strategy_type": "ml_predictor",
  "metrics": {
    "total_return": 0.24,
    "sharpe_ratio": 1.35,
    "max_drawdown": -0.09
  },
  "ml_metrics": {
    "training_period": 400,
    "testing_period": 100,
    "train_test_split": 0.8,
    "feature_importance": {
      "rsi": 0.25,
      "macd": 0.22,
      "volume_ratio": 0.18,
      "price_momentum": 0.16
    },
    "model_accuracy": 0.68,
    "prediction_confidence": 0.72
  }
}
```

### train_ml_predictor

Train a machine learning predictor model for generating trading signals.

**Function**: `train_ml_predictor`

**Parameters**:
- `symbol` (str, required): Stock symbol to train on
- `start_date` (str, optional): Start date for training data
- `end_date` (str, optional): End date for training data
- `model_type` (str, default: "random_forest"): ML model type
- `target_periods` (int, default: 5): Forward periods for target variable
- `return_threshold` (float, default: 0.02): Return threshold for signal classification
- `n_estimators` (int, default: 100): Number of estimators
- `max_depth` (int, optional): Maximum tree depth
- `min_samples_split` (int, default: 2): Minimum samples to split

**Returns**:
```json
{
  "symbol": "AAPL",
  "model_type": "random_forest",
  "training_period": "2022-01-01 to 2024-01-01",
  "data_points": 500,
  "target_periods": 5,
  "return_threshold": 0.02,
  "model_parameters": {
    "n_estimators": 100,
    "max_depth": 10,
    "min_samples_split": 2
  },
  "training_metrics": {
    "accuracy": 0.68,
    "precision": 0.72,
    "recall": 0.65,
    "f1_score": 0.68,
    "feature_importance": {
      "rsi_14": 0.25,
      "macd_signal": 0.22,
      "volume_sma_ratio": 0.18
    }
  }
}
```

### analyze_market_regimes

Analyze market regimes using machine learning to identify different market conditions.

**Function**: `analyze_market_regimes`

**Parameters**:
- `symbol` (str, required): Stock symbol to analyze
- `start_date` (str, optional): Start date for analysis
- `end_date` (str, optional): End date for analysis
- `method` (str, default: "hmm"): Detection method ("hmm", "kmeans", "threshold")
- `n_regimes` (int, default: 3): Number of regimes to detect
- `lookback_period` (int, default: 50): Lookback period for regime detection

**Returns**:
```json
{
  "symbol": "AAPL",
  "analysis_period": "2023-01-01 to 2024-01-01",
  "method": "hmm",
  "n_regimes": 3,
  "regime_names": {
    "0": "Bear/Declining",
    "1": "Sideways/Uncertain",
    "2": "Bull/Trending"
  },
  "current_regime": 2,
  "regime_counts": {"0": 45, "1": 89, "2": 118},
  "regime_percentages": {"0": 17.9, "1": 35.3, "2": 46.8},
  "average_regime_durations": {"0": 15.2, "1": 22.3, "2": 28.7},
  "recent_regime_history": [
    {
      "date": "2024-01-15",
      "regime": 2,
      "probabilities": [0.05, 0.15, 0.80]
    }
  ],
  "total_regime_switches": 18
}
```

### create_strategy_ensemble

Create and backtest a strategy ensemble that combines multiple base strategies.

**Function**: `create_strategy_ensemble`

**Parameters**:
- `symbols` (list[str], required): List of stock symbols
- `base_strategies` (list[str], optional): List of base strategy names (defaults to ["sma_cross", "rsi", "macd"])
- `weighting_method` (str, default: "performance"): Weighting method ("performance", "equal", "volatility")
- `start_date` (str, optional): Start date for backtesting
- `end_date` (str, optional): End date for backtesting
- `initial_capital` (float, default: 10000.0): Initial capital per symbol

**Returns**:
```json
{
  "ensemble_summary": {
    "symbols_tested": 5,
    "base_strategies": ["sma_cross", "rsi", "macd"],
    "weighting_method": "performance",
    "average_return": 0.19,
    "total_trades": 87,
    "average_trades_per_symbol": 17.4
  },
  "individual_results": [
    {
      "symbol": "AAPL",
      "results": {
        "total_return": 0.21,
        "sharpe_ratio": 1.18
      },
      "ensemble_metrics": {
        "strategy_weights": {"sma_cross": 0.4, "rsi": 0.3, "macd": 0.3},
        "strategy_performance": {"sma_cross": 0.15, "rsi": 0.12, "macd": 0.18}
      }
    }
  ],
  "final_strategy_weights": {"sma_cross": 0.42, "rsi": 0.28, "macd": 0.30}
}
```

## Intelligent Backtesting Workflow

### run_intelligent_backtest

Run comprehensive intelligent backtesting workflow with automatic market regime analysis and strategy optimization.

**Function**: `run_intelligent_backtest`

**Parameters**:
- `symbol` (str, required): Stock symbol to analyze (e.g., 'AAPL', 'TSLA')
- `start_date` (str, optional): Start date (YYYY-MM-DD), defaults to 1 year ago
- `end_date` (str, optional): End date (YYYY-MM-DD), defaults to today
- `initial_capital` (float, default: 10000.0): Starting capital for backtest
- `requested_strategy` (str, optional): User-preferred strategy (e.g., 'sma_cross', 'rsi', 'macd')

**Returns**:
```json
{
  "symbol": "AAPL",
  "analysis_period": "2023-01-01 to 2024-01-01",
  "execution_metadata": {
    "total_execution_time": 45.2,
    "steps_completed": 6,
    "confidence_score": 0.87
  },
  "market_regime_analysis": {
    "current_regime": "trending",
    "regime_confidence": 0.85,
    "market_characteristics": {
      "volatility_percentile": 35,
      "trend_strength": 0.72,
      "volume_profile": "above_average"
    }
  },
  "strategy_recommendations": [
    {
      "strategy": "macd",
      "fitness_score": 0.92,
      "recommended_parameters": {"fast_period": 12, "slow_period": 26, "signal_period": 9},
      "expected_performance": {"sharpe_ratio": 1.45, "total_return": 0.28}
    },
    {
      "strategy": "sma_cross",
      "fitness_score": 0.88,
      "recommended_parameters": {"fast_period": 8, "slow_period": 21},
      "expected_performance": {"sharpe_ratio": 1.32, "total_return": 0.24}
    }
  ],
  "optimization_results": {
    "best_strategy": "macd",
    "optimized_parameters": {"fast_period": 12, "slow_period": 26, "signal_period": 9},
    "optimization_method": "grid_search",
    "combinations_tested": 48
  },
  "validation_results": {
    "walk_forward_analysis": {
      "out_of_sample_sharpe": 1.28,
      "consistency_score": 0.82,
      "overfitting_risk": "low"
    },
    "monte_carlo_simulation": {
      "probability_of_loss": 0.12,
      "95_percent_confidence_interval": {"lower": 0.08, "upper": 0.35}
    }
  },
  "final_recommendation": {
    "recommended_strategy": "macd",
    "confidence_level": "high",
    "expected_annual_return": 0.28,
    "expected_sharpe_ratio": 1.45,
    "maximum_expected_drawdown": -0.09,
    "risk_assessment": "moderate",
    "implementation_notes": [
      "Strategy performs well in trending markets",
      "Consider position sizing based on volatility",
      "Monitor for regime changes"
    ]
  }
}
```

### quick_market_regime_analysis

Perform fast market regime analysis and basic strategy recommendations without full optimization.

**Function**: `quick_market_regime_analysis`

**Parameters**:
- `symbol` (str, required): Stock symbol to analyze
- `start_date` (str, optional): Start date (YYYY-MM-DD), defaults to 1 year ago
- `end_date` (str, optional): End date (YYYY-MM-DD), defaults to today

**Returns**:
```json
{
  "symbol": "AAPL",
  "analysis_type": "quick_analysis",
  "execution_time": 8.5,
  "market_regime": {
    "classification": "trending",
    "confidence": 0.78,
    "characteristics": {
      "trend_direction": "bullish",
      "volatility_level": "moderate",
      "volume_profile": "above_average"
    }
  },
  "strategy_recommendations": [
    {
      "strategy": "sma_cross",
      "fitness_score": 0.85,
      "reasoning": "Strong trend favors moving average strategies"
    },
    {
      "strategy": "macd",
      "fitness_score": 0.82,
      "reasoning": "MACD works well in trending environments"
    },
    {
      "strategy": "momentum",
      "fitness_score": 0.79,
      "reasoning": "Momentum strategies benefit from clear trends"
    }
  ],
  "market_conditions_summary": {
    "overall_assessment": "favorable_for_trend_following",
    "risk_level": "moderate",
    "recommended_position_sizing": "standard"
  }
}
```

### explain_market_regime

Get detailed explanation of market regime characteristics and suitable strategies.

**Function**: `explain_market_regime`

**Parameters**:
- `regime` (str, required): Market regime to explain ("trending", "ranging", "volatile", "volatile_trending", "low_volume")

**Returns**:
```json
{
  "regime": "trending",
  "explanation": {
    "description": "A market in a clear directional movement (up or down trend)",
    "characteristics": [
      "Strong directional price movement",
      "Higher highs and higher lows (uptrend) or lower highs and lower lows (downtrend)",
      "Good momentum indicators",
      "Volume supporting the trend direction"
    ],
    "best_strategies": ["sma_cross", "ema_cross", "macd", "breakout", "momentum"],
    "avoid_strategies": ["rsi", "mean_reversion", "bollinger"],
    "risk_factors": [
      "Trend reversals can be sudden",
      "False breakouts in weak trends",
      "Momentum strategies can give late signals"
    ]
  },
  "trading_tips": [
    "Focus on sma_cross, ema_cross, macd, breakout, momentum strategies",
    "Avoid rsi, mean_reversion, bollinger strategies",
    "Always use proper risk management",
    "Consider the broader market context"
  ]
}
```

## Available Strategies

### Traditional Technical Analysis Strategies

#### 1. SMA Crossover (`sma_cross`)
- **Description**: Buy when fast SMA crosses above slow SMA, sell when crosses below
- **Default Parameters**: `fast_period=10, slow_period=20`
- **Best For**: Trending markets
- **Optimization Ranges**: fast_period [5-20], slow_period [20-100]

#### 2. EMA Crossover (`ema_cross`)
- **Description**: Exponential moving average crossover with faster response than SMA
- **Default Parameters**: `fast_period=12, slow_period=26`
- **Best For**: Trending markets with more responsiveness
- **Optimization Ranges**: fast_period [8-20], slow_period [20-50]

#### 3. RSI Mean Reversion (`rsi`)
- **Description**: Buy oversold (RSI < 30), sell overbought (RSI > 70)
- **Default Parameters**: `period=14, oversold=30, overbought=70`
- **Best For**: Ranging/sideways markets
- **Optimization Ranges**: period [7-21], oversold [20-35], overbought [65-80]

#### 4. MACD Signal (`macd`)
- **Description**: Buy when MACD crosses above signal line, sell when crosses below
- **Default Parameters**: `fast_period=12, slow_period=26, signal_period=9`
- **Best For**: Trending markets with momentum confirmation
- **Optimization Ranges**: fast_period [8-14], slow_period [21-30], signal_period [7-11]

#### 5. Bollinger Bands (`bollinger`)
- **Description**: Buy at lower band (oversold), sell at upper band (overbought)
- **Default Parameters**: `period=20, std_dev=2.0`
- **Best For**: Mean-reverting/ranging markets
- **Optimization Ranges**: period [10-25], std_dev [1.5-3.0]

#### 6. Momentum (`momentum`)
- **Description**: Buy strong momentum, sell weak momentum based on returns threshold
- **Default Parameters**: `lookback=20, threshold=0.05`
- **Best For**: Trending markets with clear momentum
- **Optimization Ranges**: lookback [10-30], threshold [0.02-0.10]

#### 7. Mean Reversion (`mean_reversion`)
- **Description**: Buy when price is below moving average by threshold
- **Default Parameters**: `ma_period=20, entry_threshold=0.02, exit_threshold=0.01`
- **Best For**: Sideways/ranging markets
- **Optimization Ranges**: ma_period [15-50], entry_threshold [0.01-0.05]

#### 8. Channel Breakout (`breakout`)
- **Description**: Buy on breakout above rolling high, sell on breakdown below rolling low
- **Default Parameters**: `lookback=20, exit_lookback=10`
- **Best For**: Volatile trending markets
- **Optimization Ranges**: lookback [10-50], exit_lookback [5-20]

#### 9. Volume-Weighted Momentum (`volume_momentum`)
- **Description**: Momentum strategy filtered by volume surge
- **Default Parameters**: `momentum_period=20, volume_period=20, momentum_threshold=0.05, volume_multiplier=1.5`
- **Best For**: Markets with significant volume participation
- **Optimization Ranges**: momentum_period [10-30], volume_multiplier [1.2-2.0]

### ML-Enhanced Strategies

#### 1. ML Predictor (`ml_predictor`)
- Uses machine learning models (Random Forest, etc.) to predict future price movements
- Features: Technical indicators, price patterns, volume analysis
- Training/testing split with out-of-sample validation

#### 2. Adaptive Strategy (`adaptive`)
- Adapts base strategy parameters based on recent performance
- Uses gradient-based or momentum-based adaptation methods
- Continuously learns from market feedback

#### 3. Strategy Ensemble (`ensemble`)
- Combines multiple base strategies with dynamic weighting
- Weighting methods: performance-based, equal-weight, volatility-adjusted
- Provides diversification benefits

#### 4. Regime-Aware Strategy (`regime_aware`)
- Automatically switches between different strategies based on detected market regime
- Uses Hidden Markov Models or clustering for regime detection
- Optimizes strategy selection for current market conditions

## Performance Considerations

### Execution Times
- **Simple Backtest**: 2-5 seconds
- **Strategy Optimization**: 30-120 seconds (depending on level)
- **Walk-Forward Analysis**: 60-300 seconds
- **Monte Carlo Simulation**: 45-90 seconds
- **ML Strategy Training**: 60-180 seconds
- **Intelligent Backtest**: 120-300 seconds (full workflow)

### Memory Usage
- **Single Symbol**: 50-200 MB
- **Portfolio (5 symbols)**: 200-500 MB
- **ML Training**: 100-1000 MB (depending on data size)

### Optimization Levels
- **Coarse**: 16-36 parameter combinations, fastest
- **Medium**: 36-100 combinations, balanced speed/accuracy
- **Fine**: 100-500+ combinations, most thorough

## Error Handling

### Common Errors

#### Insufficient Data
```json
{
  "error": "Insufficient data for backtest (minimum 100 data points)",
  "symbol": "PENNY_STOCK",
  "message": "Please use a longer time period or different symbol"
}
```

#### Invalid Strategy
```json
{
  "error": "Unknown strategy type: invalid_strategy",
  "available_strategies": ["sma_cross", "rsi", "macd", ...],
  "message": "Please use one of the available strategy types"
}
```

#### Parameter Validation
```json
{
  "error": "Invalid parameter value",
  "parameter": "fast_period",
  "value": -5,
  "message": "fast_period must be positive integer"
}
```

#### ML Training Errors
```json
{
  "error": "ML training failed: Insufficient data for training (minimum 200 data points)",
  "symbol": "LOW_DATA_STOCK",
  "message": "ML strategies require more historical data"
}
```

### Troubleshooting

1. **Data Issues**: Ensure sufficient historical data (minimum 100 points, 200+ for ML)
2. **Parameter Validation**: Check parameter types and ranges
3. **Memory Issues**: Reduce number of symbols in portfolio backtests
4. **Timeout Issues**: Use coarse optimization for faster results
5. **Strategy Parsing**: Use exact strategy names from `list_strategies`

## Integration Examples

### Claude Desktop Usage

```
# Basic backtest
"Run a backtest for AAPL using RSI strategy with 14-day period"

# Strategy comparison
"Compare SMA crossover, RSI, and MACD strategies on Tesla stock"

# Intelligent analysis
"Run intelligent backtest on Microsoft stock and recommend the best strategy"

# Portfolio backtest
"Backtest momentum strategy on AAPL, MSFT, GOOGL, AMZN, and TSLA"

# Optimization
"Optimize MACD parameters for Netflix stock over the last 2 years"

# ML strategies
"Train an ML predictor on Amazon stock and test its performance"
```

### API Integration

```python
# Using MCP client
import mcp

client = mcp.Client("maverick-mcp")

# Run backtest
result = await client.call_tool("run_backtest", {
    "symbol": "AAPL",
    "strategy": "sma_cross",
    "fast_period": 10,
    "slow_period": 20,
    "initial_capital": 50000
})

# Optimize strategy
optimization = await client.call_tool("optimize_strategy", {
    "symbol": "TSLA",
    "strategy": "rsi",
    "optimization_level": "medium",
    "optimization_metric": "sharpe_ratio"
})

# Intelligent backtest
intelligent_result = await client.call_tool("run_intelligent_backtest", {
    "symbol": "MSFT",
    "start_date": "2022-01-01",
    "end_date": "2023-12-31"
})
```

### Workflow Integration

```python
# Complete backtesting workflow
symbols = ["AAPL", "MSFT", "GOOGL"]
strategies = ["sma_cross", "rsi", "macd"]

for symbol in symbols:
    # 1. Quick regime analysis
    regime = await client.call_tool("quick_market_regime_analysis", {
        "symbol": symbol
    })

    # 2. Strategy comparison
    comparison = await client.call_tool("compare_strategies", {
        "symbol": symbol,
        "strategies": strategies
    })

    # 3. Optimize best strategy
    best_strategy = comparison["best_overall"]
    optimization = await client.call_tool("optimize_strategy", {
        "symbol": symbol,
        "strategy": best_strategy
    })

    # 4. Validate with walk-forward
    validation = await client.call_tool("walk_forward_analysis", {
        "symbol": symbol,
        "strategy": best_strategy
    })
```

## Best Practices

### Strategy Selection
1. **Trending Markets**: Use sma_cross, ema_cross, macd, breakout, momentum
2. **Ranging Markets**: Use rsi, bollinger, mean_reversion
3. **Volatile Markets**: Use breakout, volatility_breakout with wider stops
4. **Unknown Conditions**: Use intelligent_backtest for automatic selection

### Parameter Optimization
1. **Start with Default**: Test default parameters first
2. **Use Medium Level**: Good balance of thoroughness and speed
3. **Validate Results**: Always use walk-forward analysis for final validation
4. **Avoid Overfitting**: Check for consistent out-of-sample performance

### Risk Management
1. **Position Sizing**: Never risk more than 1-2% per trade
2. **Diversification**: Test strategies across multiple symbols
3. **Regime Awareness**: Monitor market regime changes
4. **Drawdown Limits**: Set maximum acceptable drawdown levels

### Performance Optimization
1. **Parallel Processing**: Use portfolio backtests for batch analysis
2. **Caching**: Results are cached for faster repeated analysis
3. **Data Efficiency**: Use appropriate date ranges to balance data needs and speed
4. **ML Considerations**: Ensure sufficient training data for ML strategies

This comprehensive API documentation provides everything needed to effectively use the MaverickMCP backtesting system. Each tool is designed to work independently or as part of larger workflows, with extensive error handling and performance optimization built-in.
```

--------------------------------------------------------------------------------
/maverick_mcp/agents/optimized_research.py:
--------------------------------------------------------------------------------

```python
"""
Optimized Deep Research Agent with LLM-side optimizations to prevent timeouts.

This module integrates the comprehensive LLM optimization strategies including:
- Adaptive model selection based on time constraints
- Progressive token budgeting with confidence tracking
- Parallel LLM processing with intelligent load balancing
- Optimized prompt engineering for speed
- Early termination based on confidence thresholds
- Content filtering to reduce processing overhead
"""

import asyncio
import logging
import time
from datetime import datetime
from typing import Any

from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.checkpoint.memory import MemorySaver

from maverick_mcp.agents.deep_research import (
    PERSONA_RESEARCH_FOCUS,
    RESEARCH_DEPTH_LEVELS,
    ContentAnalyzer,
    DeepResearchAgent,
)
from maverick_mcp.providers.openrouter_provider import OpenRouterProvider, TaskType
from maverick_mcp.utils.llm_optimization import (
    AdaptiveModelSelector,
    ConfidenceTracker,
    IntelligentContentFilter,
    OptimizedPromptEngine,
    ParallelLLMProcessor,
    ProgressiveTokenBudgeter,
)
from maverick_mcp.utils.orchestration_logging import (
    get_orchestration_logger,
    log_method_call,
    log_performance_metrics,
)

# Import moved to avoid circular dependency

logger = logging.getLogger(__name__)


class OptimizedContentAnalyzer(ContentAnalyzer):
    """Enhanced ContentAnalyzer with LLM optimizations."""

    def __init__(self, openrouter_provider: OpenRouterProvider):
        # Initialize with OpenRouter provider instead of single LLM
        self.openrouter_provider = openrouter_provider
        self.model_selector = AdaptiveModelSelector(openrouter_provider)
        self.prompt_engine = OptimizedPromptEngine()
        self.parallel_processor = ParallelLLMProcessor(openrouter_provider)

    async def analyze_content_optimized(
        self,
        content: str,
        persona: str,
        analysis_focus: str = "general",
        time_budget_seconds: float = 30.0,
        current_confidence: float = 0.0,
    ) -> dict[str, Any]:
        """Analyze content with time-optimized LLM selection and prompting."""

        if not content or not content.strip():
            return self._create_empty_analysis()

        # Calculate content complexity
        complexity_score = self.model_selector.calculate_task_complexity(
            content, TaskType.SENTIMENT_ANALYSIS, [analysis_focus]
        )

        # Select optimal model for time budget
        model_config = self.model_selector.select_model_for_time_budget(
            task_type=TaskType.SENTIMENT_ANALYSIS,
            time_remaining_seconds=time_budget_seconds,
            complexity_score=complexity_score,
            content_size_tokens=len(content) // 4,  # Rough token estimate
            current_confidence=current_confidence,
        )

        # Create optimized prompt
        optimized_prompt = self.prompt_engine.get_optimized_prompt(
            prompt_type="content_analysis",
            time_remaining=time_budget_seconds,
            confidence_level=current_confidence,
            content=content[: model_config.max_tokens * 3],  # Limit content size
            persona=persona,
            focus_areas=analysis_focus,
        )

        # Execute with optimized LLM
        try:
            llm = self.openrouter_provider.get_llm(
                model_override=model_config.model_id,
                temperature=model_config.temperature,
                max_tokens=model_config.max_tokens,
            )

            start_time = time.time()
            response = await asyncio.wait_for(
                llm.ainvoke(
                    [
                        SystemMessage(
                            content="You are a financial content analyst. Return structured JSON analysis."
                        ),
                        HumanMessage(content=optimized_prompt),
                    ]
                ),
                timeout=model_config.timeout_seconds,
            )

            execution_time = time.time() - start_time

            # Parse response
            analysis = self._parse_optimized_response(response.content, persona)
            analysis["execution_time"] = execution_time
            analysis["model_used"] = model_config.model_id
            analysis["optimization_applied"] = True

            return analysis

        except TimeoutError:
            logger.warning(
                f"Content analysis timed out after {model_config.timeout_seconds}s"
            )
            return self._fallback_analysis(content, persona)
        except Exception as e:
            logger.warning(f"Optimized content analysis failed: {e}")
            return self._fallback_analysis(content, persona)

    async def batch_analyze_content(
        self,
        sources: list[dict],
        persona: str,
        analysis_type: str,
        time_budget_seconds: float,
        current_confidence: float = 0.0,
    ) -> list[dict]:
        """Analyze multiple sources using parallel processing."""

        return await self.parallel_processor.parallel_content_analysis(
            sources=sources,
            analysis_type=analysis_type,
            persona=persona,
            time_budget_seconds=time_budget_seconds,
            current_confidence=current_confidence,
        )

    def _parse_optimized_response(
        self, response_content: str, persona: str
    ) -> dict[str, Any]:
        """Parse LLM response with fallback handling."""

        try:
            # Try to parse as JSON first
            import json

            if response_content.strip().startswith("{"):
                return json.loads(response_content.strip())
        except Exception:
            pass

        # Try structured text parsing
        try:
            return self._parse_structured_response(response_content, persona)
        except Exception:
            # Final fallback
            return self._fallback_analysis(response_content, persona)

    def _parse_structured_response(self, response: str, persona: str) -> dict[str, Any]:
        """Parse structured text response."""

        import re

        # Extract sentiment
        sentiment_match = re.search(
            r"sentiment:?\s*(\w+)[,\s]*(?:confidence:?\s*([\d.]+))?", response.lower()
        )
        if sentiment_match:
            direction = sentiment_match.group(1).lower()
            confidence = float(sentiment_match.group(2) or 0.6)

            # Normalize sentiment terms
            if direction in ["bull", "bullish", "positive", "buy"]:
                direction = "bullish"
            elif direction in ["bear", "bearish", "negative", "sell"]:
                direction = "bearish"
            else:
                direction = "neutral"
        else:
            direction = "neutral"
            confidence = 0.5

        # Extract insights
        insights = []
        insight_patterns = [
            r"insight:?\s*([^\n.]+)",
            r"key point:?\s*([^\n.]+)",
            r"finding:?\s*([^\n.]+)",
        ]
        for pattern in insight_patterns:
            matches = re.findall(pattern, response, re.IGNORECASE)
            insights.extend([m.strip() for m in matches if m.strip()])

        # Extract risks and opportunities
        risks = re.findall(r"risk:?\s*([^\n.]+)", response, re.IGNORECASE)
        opportunities = re.findall(
            r"opportunit(?:y|ies):?\s*([^\n.]+)", response, re.IGNORECASE
        )

        # Extract scores
        relevance_match = re.search(r"relevance:?\s*([\d.]+)", response.lower())
        relevance_score = float(relevance_match.group(1)) if relevance_match else 0.6

        credibility_match = re.search(r"credibility:?\s*([\d.]+)", response.lower())
        credibility_score = (
            float(credibility_match.group(1)) if credibility_match else 0.7
        )

        return {
            "insights": insights[:5],
            "sentiment": {"direction": direction, "confidence": confidence},
            "risk_factors": [r.strip() for r in risks[:3]],
            "opportunities": [o.strip() for o in opportunities[:3]],
            "credibility_score": credibility_score,
            "relevance_score": relevance_score,
            "summary": f"Analysis for {persona} investor using optimized processing",
            "analysis_timestamp": datetime.now(),
            "structured_parsing": True,
        }

    def _create_empty_analysis(self) -> dict[str, Any]:
        """Create empty analysis for invalid content."""
        return {
            "insights": [],
            "sentiment": {"direction": "neutral", "confidence": 0.0},
            "risk_factors": [],
            "opportunities": [],
            "credibility_score": 0.0,
            "relevance_score": 0.0,
            "summary": "No content to analyze",
            "analysis_timestamp": datetime.now(),
            "empty_content": True,
        }


class OptimizedDeepResearchAgent(DeepResearchAgent):
    """
    Deep research agent with comprehensive LLM-side optimizations to prevent timeouts.

    Integrates all optimization strategies:
    - Adaptive model selection
    - Progressive token budgeting
    - Parallel LLM processing
    - Optimized prompting
    - Early termination
    - Content filtering
    """

    def __init__(
        self,
        openrouter_provider: OpenRouterProvider,
        persona: str = "moderate",
        checkpointer: MemorySaver | None = None,
        ttl_hours: int = 24,
        exa_api_key: str | None = None,
        default_depth: str = "standard",
        max_sources: int | None = None,
        research_depth: str | None = None,
        enable_parallel_execution: bool = True,
        parallel_config=None,  # Type: ParallelResearchConfig | None
        optimization_enabled: bool = True,
    ):
        """Initialize optimized deep research agent."""

        # Import here to avoid circular dependency

        self.openrouter_provider = openrouter_provider
        self.optimization_enabled = optimization_enabled

        # Initialize optimization components
        if optimization_enabled:
            self.model_selector = AdaptiveModelSelector(openrouter_provider)
            self.token_budgeter = None  # Will be created per request
            self.prompt_engine = OptimizedPromptEngine()
            self.confidence_tracker = None  # Will be created per request
            self.content_filter = IntelligentContentFilter()
            self.parallel_processor = ParallelLLMProcessor(openrouter_provider)

            # Replace content analyzer with optimized version
            self.optimized_analyzer = OptimizedContentAnalyzer(openrouter_provider)

        # Initialize base class with dummy LLM (we'll use OpenRouter provider instead)
        dummy_llm = openrouter_provider.get_llm(TaskType.GENERAL)

        super().__init__(
            llm=dummy_llm,
            persona=persona,
            checkpointer=checkpointer,
            ttl_hours=ttl_hours,
            exa_api_key=exa_api_key,
            default_depth=default_depth,
            max_sources=max_sources,
            research_depth=research_depth,
            enable_parallel_execution=enable_parallel_execution,
            parallel_config=parallel_config,
        )

        logger.info("OptimizedDeepResearchAgent initialized")

    @log_method_call(component="OptimizedDeepResearchAgent", include_timing=True)
    async def research_comprehensive(
        self,
        topic: str,
        session_id: str,
        depth: str | None = None,
        focus_areas: list[str] | None = None,
        timeframe: str = "30d",
        time_budget_seconds: float = 120.0,  # Default 2 minutes
        target_confidence: float = 0.75,
        **kwargs,
    ) -> dict[str, Any]:
        """
        Comprehensive research with LLM optimizations to prevent timeouts.

        Args:
            topic: Research topic or company/symbol
            session_id: Session identifier
            depth: Research depth (basic/standard/comprehensive/exhaustive)
            focus_areas: Specific areas to focus on
            timeframe: Time range for research
            time_budget_seconds: Maximum time allowed for research
            target_confidence: Target confidence level for early termination
            **kwargs: Additional parameters

        Returns:
            Comprehensive research results with optimization metrics
        """

        if not self.optimization_enabled:
            # Fall back to parent implementation
            return await super().research_comprehensive(
                topic, session_id, depth, focus_areas, timeframe, **kwargs
            )

        # Check if search providers are available
        if not self.search_providers:
            return {
                "error": "Research functionality unavailable - no search providers configured",
                "details": "Please configure EXA_API_KEY or TAVILY_API_KEY environment variables",
                "topic": topic,
                "optimization_enabled": self.optimization_enabled,
            }

        start_time = time.time()
        depth = depth or self.default_depth

        # Initialize optimization components for this request
        self.token_budgeter = ProgressiveTokenBudgeter(
            total_time_budget_seconds=time_budget_seconds,
            confidence_target=target_confidence,
        )
        self.confidence_tracker = ConfidenceTracker(
            target_confidence=target_confidence,
            min_sources=3,
            max_sources=RESEARCH_DEPTH_LEVELS[depth]["max_sources"],
        )

        orchestration_logger = get_orchestration_logger("OptimizedDeepResearchAgent")
        orchestration_logger.set_request_context(
            session_id=session_id,
            topic=topic[:50],
            time_budget=time_budget_seconds,
            target_confidence=target_confidence,
        )

        orchestration_logger.info(
            "🚀 OPTIMIZED_RESEARCH_START",
            depth=depth,
            focus_areas=focus_areas,
        )

        try:
            # Phase 1: Search and Content Filtering
            orchestration_logger.info("📋 PHASE_1_SEARCH_START")
            search_time_budget = min(
                time_budget_seconds * 0.2, 30
            )  # 20% of budget, max 30s

            search_results = await self._optimized_search_phase(
                topic, depth, focus_areas, search_time_budget
            )

            orchestration_logger.info(
                "✅ PHASE_1_COMPLETE",
                sources_found=len(search_results.get("filtered_sources", [])),
            )

            # Phase 2: Content Analysis with Parallel Processing
            remaining_time = time_budget_seconds - (time.time() - start_time)
            if remaining_time < 10:
                orchestration_logger.warning(
                    "⚠️ TIME_CONSTRAINT_CRITICAL", remaining=f"{remaining_time:.1f}s"
                )
                return self._create_emergency_response(
                    topic, search_results, start_time
                )

            orchestration_logger.info("🔬 PHASE_2_ANALYSIS_START")
            analysis_time_budget = remaining_time * 0.7  # 70% of remaining time

            analysis_results = await self._optimized_analysis_phase(
                search_results["filtered_sources"],
                topic,
                focus_areas,
                analysis_time_budget,
            )

            orchestration_logger.info(
                "✅ PHASE_2_COMPLETE",
                sources_analyzed=len(analysis_results["analyzed_sources"]),
                confidence=f"{analysis_results['final_confidence']:.2f}",
            )

            # Phase 3: Synthesis with Remaining Time
            remaining_time = time_budget_seconds - (time.time() - start_time)
            if remaining_time < 5:
                # Skip synthesis if very little time left
                synthesis_results = {
                    "synthesis": "Time constraints prevented full synthesis"
                }
            else:
                orchestration_logger.info("🧠 PHASE_3_SYNTHESIS_START")
                synthesis_results = await self._optimized_synthesis_phase(
                    analysis_results["analyzed_sources"], topic, remaining_time
                )
                orchestration_logger.info("✅ PHASE_3_COMPLETE")

            # Compile final results
            execution_time = time.time() - start_time
            final_results = self._compile_optimized_results(
                topic=topic,
                session_id=session_id,
                depth=depth,
                search_results=search_results,
                analysis_results=analysis_results,
                synthesis_results=synthesis_results,
                execution_time=execution_time,
                time_budget=time_budget_seconds,
            )

            # Log performance metrics
            log_performance_metrics(
                "OptimizedDeepResearchAgent",
                {
                    "total_execution_time": execution_time,
                    "time_budget_used_pct": (execution_time / time_budget_seconds)
                    * 100,
                    "sources_processed": len(analysis_results["analyzed_sources"]),
                    "final_confidence": analysis_results["final_confidence"],
                    "optimization_enabled": True,
                    "phases_completed": 3,
                },
            )

            orchestration_logger.info(
                "🎉 OPTIMIZED_RESEARCH_COMPLETE",
                duration=f"{execution_time:.2f}s",
                confidence=f"{analysis_results['final_confidence']:.2f}",
            )

            return final_results

        except Exception as e:
            execution_time = time.time() - start_time
            orchestration_logger.error(
                "💥 OPTIMIZED_RESEARCH_FAILED",
                error=str(e),
                execution_time=f"{execution_time:.2f}s",
            )

            return {
                "status": "error",
                "error": str(e),
                "execution_time_ms": execution_time * 1000,
                "agent_type": "optimized_deep_research",
                "optimization_enabled": True,
                "topic": topic,
            }

    async def _optimized_search_phase(
        self, topic: str, depth: str, focus_areas: list[str], time_budget_seconds: float
    ) -> dict[str, Any]:
        """Execute search phase with content filtering."""

        # Generate search queries (reuse parent logic)
        persona_focus = PERSONA_RESEARCH_FOCUS[self.persona.name.lower()]
        search_queries = await self._generate_search_queries(
            topic, persona_focus, RESEARCH_DEPTH_LEVELS[depth]
        )

        # Execute searches (reuse parent logic but with time limits)
        all_results = []
        max_searches = min(len(search_queries), 4)  # Limit searches for speed

        search_tasks = []
        for query in search_queries[:max_searches]:
            for provider in self.search_providers[
                :1
            ]:  # Use only first provider for speed
                task = self._search_with_timeout(
                    provider, query, time_budget_seconds / max_searches
                )
                search_tasks.append(task)

        search_results = await asyncio.gather(*search_tasks, return_exceptions=True)

        # Collect valid results
        for result in search_results:
            if isinstance(result, list):
                all_results.extend(result)

        # Apply intelligent content filtering
        current_confidence = 0.0  # Starting confidence
        research_focus = focus_areas[0] if focus_areas else "fundamental"

        filtered_sources = await self.content_filter.filter_and_prioritize_sources(
            sources=all_results,
            research_focus=research_focus,
            time_budget=time_budget_seconds,
            current_confidence=current_confidence,
        )

        return {
            "raw_results": all_results,
            "filtered_sources": filtered_sources,
            "search_queries": search_queries[:max_searches],
            "filtering_applied": True,
        }

    async def _search_with_timeout(
        self, provider, query: str, timeout: float
    ) -> list[dict]:
        """Execute search with timeout."""
        try:
            return await asyncio.wait_for(
                provider.search(query, num_results=5), timeout=timeout
            )
        except TimeoutError:
            logger.warning(f"Search timeout for query: {query}")
            return []
        except Exception as e:
            logger.warning(f"Search failed for {query}: {e}")
            return []

    async def _optimized_analysis_phase(
        self,
        sources: list[dict],
        topic: str,
        focus_areas: list[str],
        time_budget_seconds: float,
    ) -> dict[str, Any]:
        """Execute content analysis with optimizations and early termination."""

        if not sources:
            return {
                "analyzed_sources": [],
                "final_confidence": 0.0,
                "early_terminated": False,
                "termination_reason": "no_sources",
            }

        analyzed_sources = []
        current_confidence = 0.0
        sources_to_process = sources.copy()

        # Calculate time per source
        time_per_source = time_budget_seconds / len(sources_to_process)

        # Use batch processing if time allows
        if len(sources_to_process) > 3 and time_per_source < 8:
            # Use parallel batch processing

            analyzed_sources = await self.optimized_analyzer.batch_analyze_content(
                sources=sources_to_process,
                persona=self.persona.name.lower(),
                analysis_type=focus_areas[0] if focus_areas else "general",
                time_budget_seconds=time_budget_seconds,
                current_confidence=current_confidence,
            )

            # Calculate final confidence from batch results
            confidence_sum = 0
            for source in analyzed_sources:
                analysis = source.get("analysis", {})
                sentiment = analysis.get("sentiment", {})
                source_confidence = sentiment.get("confidence", 0.5)
                credibility = analysis.get("credibility_score", 0.5)
                confidence_sum += source_confidence * credibility

            final_confidence = (
                confidence_sum / len(analyzed_sources) if analyzed_sources else 0.0
            )

            return {
                "analyzed_sources": analyzed_sources,
                "final_confidence": final_confidence,
                "early_terminated": False,
                "termination_reason": "batch_processing_complete",
                "processing_mode": "parallel_batch",
            }

        else:
            # Use sequential processing with early termination

            for _, source in enumerate(sources_to_process):
                remaining_time = time_budget_seconds - (
                    len(analyzed_sources) * time_per_source
                )

                if remaining_time < 5:  # Reserve minimum time
                    break

                # Analyze source with optimizations
                analysis_result = (
                    await self.optimized_analyzer.analyze_content_optimized(
                        content=source.get("content", ""),
                        persona=self.persona.name.lower(),
                        analysis_focus=focus_areas[0] if focus_areas else "general",
                        time_budget_seconds=min(
                            remaining_time / 2, 15
                        ),  # Max 15s per source
                        current_confidence=current_confidence,
                    )
                )

                # Add analysis to source
                source["analysis"] = analysis_result
                analyzed_sources.append(source)

                # Update confidence tracker
                credibility_score = analysis_result.get("credibility_score", 0.5)
                confidence_update = self.confidence_tracker.update_confidence(
                    analysis_result, credibility_score
                )

                current_confidence = confidence_update["current_confidence"]

                # Check for early termination
                if not confidence_update["should_continue"]:
                    logger.info(
                        f"Early termination after {len(analyzed_sources)} sources: {confidence_update['early_termination_reason']}"
                    )
                    return {
                        "analyzed_sources": analyzed_sources,
                        "final_confidence": current_confidence,
                        "early_terminated": True,
                        "termination_reason": confidence_update[
                            "early_termination_reason"
                        ],
                        "processing_mode": "sequential_early_termination",
                    }

            return {
                "analyzed_sources": analyzed_sources,
                "final_confidence": current_confidence,
                "early_terminated": False,
                "termination_reason": "all_sources_processed",
                "processing_mode": "sequential_complete",
            }

    async def _optimized_synthesis_phase(
        self, analyzed_sources: list[dict], topic: str, time_budget_seconds: float
    ) -> dict[str, Any]:
        """Execute synthesis with optimized model selection."""

        if not analyzed_sources:
            return {"synthesis": "No sources available for synthesis"}

        # Select optimal model for synthesis
        combined_content = "\n".join(
            [str(source.get("analysis", {})) for source in analyzed_sources[:5]]
        )

        complexity_score = self.model_selector.calculate_task_complexity(
            combined_content, TaskType.RESULT_SYNTHESIS
        )

        model_config = self.model_selector.select_model_for_time_budget(
            task_type=TaskType.RESULT_SYNTHESIS,
            time_remaining_seconds=time_budget_seconds,
            complexity_score=complexity_score,
            content_size_tokens=len(combined_content) // 4,
        )

        # Create optimized synthesis prompt
        synthesis_prompt = self.prompt_engine.create_time_optimized_synthesis_prompt(
            sources=analyzed_sources,
            persona=self.persona.name,
            time_remaining=time_budget_seconds,
            current_confidence=0.8,  # Assume good confidence at synthesis stage
        )

        # Execute synthesis
        try:
            llm = self.openrouter_provider.get_llm(
                model_override=model_config.model_id,
                temperature=model_config.temperature,
                max_tokens=model_config.max_tokens,
            )

            response = await asyncio.wait_for(
                llm.ainvoke(
                    [
                        SystemMessage(
                            content="You are a financial research synthesizer."
                        ),
                        HumanMessage(content=synthesis_prompt),
                    ]
                ),
                timeout=model_config.timeout_seconds,
            )

            return {
                "synthesis": response.content,
                "model_used": model_config.model_id,
                "synthesis_optimized": True,
            }

        except Exception as e:
            logger.warning(f"Optimized synthesis failed: {e}")
            return {
                "synthesis": f"Synthesis of {len(analyzed_sources)} sources completed with basic processing due to constraints.",
                "fallback_used": True,
            }

    def _create_emergency_response(
        self, topic: str, search_results: dict, start_time: float
    ) -> dict[str, Any]:
        """Create emergency response when time is critically low."""

        execution_time = time.time() - start_time
        source_count = len(search_results.get("filtered_sources", []))

        return {
            "status": "partial_success",
            "agent_type": "optimized_deep_research",
            "emergency_mode": True,
            "topic": topic,
            "sources_found": source_count,
            "execution_time_ms": execution_time * 1000,
            "findings": {
                "synthesis": f"Emergency mode: Found {source_count} relevant sources for {topic}. "
                "Full analysis was prevented by time constraints.",
                "confidence_score": 0.3,
                "sources_analyzed": source_count,
            },
            "optimization_metrics": {
                "time_budget_exceeded": True,
                "phases_completed": 1,
                "emergency_fallback": True,
            },
        }

    def _compile_optimized_results(
        self,
        topic: str,
        session_id: str,
        depth: str,
        search_results: dict,
        analysis_results: dict,
        synthesis_results: dict,
        execution_time: float,
        time_budget: float,
    ) -> dict[str, Any]:
        """Compile final optimized research results."""

        analyzed_sources = analysis_results["analyzed_sources"]

        # Create citations
        citations = []
        for i, source in enumerate(analyzed_sources, 1):
            analysis = source.get("analysis", {})
            citation = {
                "id": i,
                "title": source.get("title", f"Source {i}"),
                "url": source.get("url", ""),
                "published_date": source.get("published_date"),
                "credibility_score": analysis.get("credibility_score", 0.5),
                "relevance_score": analysis.get("relevance_score", 0.5),
                "optimized_analysis": analysis.get("optimization_applied", False),
            }
            citations.append(citation)

        return {
            "status": "success",
            "agent_type": "optimized_deep_research",
            "optimization_enabled": True,
            "persona": self.persona.name,
            "research_topic": topic,
            "research_depth": depth,
            "findings": {
                "synthesis": synthesis_results.get(
                    "synthesis", "No synthesis available"
                ),
                "confidence_score": analysis_results["final_confidence"],
                "early_terminated": analysis_results.get("early_terminated", False),
                "termination_reason": analysis_results.get("termination_reason"),
                "processing_mode": analysis_results.get("processing_mode", "unknown"),
            },
            "sources_analyzed": len(analyzed_sources),
            "citations": citations,
            "execution_time_ms": execution_time * 1000,
            "optimization_metrics": {
                "time_budget_seconds": time_budget,
                "time_used_seconds": execution_time,
                "time_utilization_pct": (execution_time / time_budget) * 100,
                "sources_found": len(search_results.get("raw_results", [])),
                "sources_filtered": len(search_results.get("filtered_sources", [])),
                "sources_processed": len(analyzed_sources),
                "content_filtering_applied": search_results.get(
                    "filtering_applied", False
                ),
                "parallel_processing_used": "batch"
                in analysis_results.get("processing_mode", ""),
                "synthesis_optimized": synthesis_results.get(
                    "synthesis_optimized", False
                ),
                "optimization_features_used": [
                    "adaptive_model_selection",
                    "progressive_token_budgeting",
                    "content_filtering",
                    "optimized_prompts",
                ]
                + (
                    ["parallel_processing"]
                    if "batch" in analysis_results.get("processing_mode", "")
                    else []
                )
                + (
                    ["early_termination"]
                    if analysis_results.get("early_terminated")
                    else []
                ),
            },
            "search_queries_used": search_results.get("search_queries", []),
            "session_id": session_id,
        }


# Factory function for easy integration
def create_optimized_research_agent(
    openrouter_api_key: str,
    persona: str = "moderate",
    time_budget_seconds: float = 120.0,
    target_confidence: float = 0.75,
    **kwargs,
) -> OptimizedDeepResearchAgent:
    """Create an optimized deep research agent with recommended settings."""

    openrouter_provider = OpenRouterProvider(openrouter_api_key)

    return OptimizedDeepResearchAgent(
        openrouter_provider=openrouter_provider,
        persona=persona,
        optimization_enabled=True,
        **kwargs,
    )

```

--------------------------------------------------------------------------------
/maverick_mcp/utils/circuit_breaker.py:
--------------------------------------------------------------------------------

```python
"""
Comprehensive circuit breaker implementation for all external API calls.

This module provides circuit breakers for:
- yfinance (Yahoo Finance)
- Tiingo API
- FRED API
- OpenRouter AI API
- Exa Search API
- Any other external services

Circuit breakers help prevent cascade failures and provide graceful degradation.
"""

import asyncio
import functools
import logging
import threading
import time
from collections import deque
from collections.abc import Callable
from enum import Enum
from typing import Any, ParamSpec, TypeVar, cast

from maverick_mcp.config.settings import get_settings
from maverick_mcp.exceptions import CircuitBreakerError, ExternalServiceError

logger = logging.getLogger(__name__)
settings = get_settings()

P = ParamSpec("P")
T = TypeVar("T")
F = TypeVar("F", bound=Callable[..., Any])


class CircuitState(Enum):
    """Circuit breaker states."""

    CLOSED = "closed"  # Normal operation
    OPEN = "open"  # Failing, reject calls
    HALF_OPEN = "half_open"  # Testing if service recovered


class FailureDetectionStrategy(Enum):
    """Strategies for detecting failures."""

    CONSECUTIVE_FAILURES = "consecutive"  # N failures in a row
    FAILURE_RATE = "failure_rate"  # % of failures in time window
    TIMEOUT_RATE = "timeout_rate"  # % of timeouts in time window
    COMBINED = "combined"  # Any of the above


class CircuitBreakerConfig:
    """Configuration for a circuit breaker."""

    def __init__(
        self,
        name: str,
        failure_threshold: int = 5,
        failure_rate_threshold: float = 0.5,
        timeout_threshold: float = 10.0,
        recovery_timeout: int = 60,
        success_threshold: int = 3,
        window_size: int = 60,
        detection_strategy: FailureDetectionStrategy = FailureDetectionStrategy.COMBINED,
        expected_exceptions: tuple[type[Exception], ...] = (Exception,),
    ):
        """
        Initialize circuit breaker configuration.

        Args:
            name: Name of the circuit breaker
            failure_threshold: Number of consecutive failures before opening
            failure_rate_threshold: Failure rate (0-1) before opening
            timeout_threshold: Timeout in seconds for calls
            recovery_timeout: Seconds to wait before testing recovery
            success_threshold: Successes needed in half-open to close
            window_size: Time window in seconds for rate calculations
            detection_strategy: Strategy for detecting failures
            expected_exceptions: Exceptions to catch and count as failures
        """
        self.name = name
        self.failure_threshold = failure_threshold
        self.failure_rate_threshold = failure_rate_threshold
        self.timeout_threshold = timeout_threshold
        self.recovery_timeout = recovery_timeout
        self.success_threshold = success_threshold
        self.window_size = window_size
        self.detection_strategy = detection_strategy
        self.expected_exceptions = expected_exceptions


class CircuitBreakerMetrics:
    """Metrics collection for circuit breakers."""

    def __init__(self, window_size: int = 300):
        """Initialize metrics with a time window."""
        self.window_size = window_size
        self.calls: deque[tuple[float, bool, float]] = (
            deque()
        )  # (timestamp, success, duration)
        self.state_changes: deque[tuple[float, CircuitState]] = deque()
        self._lock = threading.RLock()

    def record_call(self, success: bool, duration: float):
        """Record a call result."""
        with self._lock:
            now = time.time()
            self.calls.append((now, success, duration))
            self._cleanup_old_data(now)

    def record_state_change(self, new_state: CircuitState):
        """Record a state change."""
        with self._lock:
            now = time.time()
            self.state_changes.append((now, new_state))
            self._cleanup_old_data(now)

    def get_stats(self) -> dict[str, Any]:
        """Get current statistics."""
        with self._lock:
            now = time.time()
            self._cleanup_old_data(now)

            if not self.calls:
                return {
                    "total_calls": 0,
                    "success_rate": 1.0,
                    "failure_rate": 0.0,
                    "avg_duration": 0.0,
                    "timeout_rate": 0.0,
                }

            total = len(self.calls)
            successes = sum(1 for _, success, _ in self.calls if success)
            failures = total - successes
            durations = [duration for _, _, duration in self.calls]
            timeouts = sum(
                1
                for _, success, duration in self.calls
                if not success and duration >= 10.0
            )

            return {
                "total_calls": total,
                "success_rate": successes / total if total > 0 else 1.0,
                "failure_rate": failures / total if total > 0 else 0.0,
                "avg_duration": sum(durations) / len(durations) if durations else 0.0,
                "timeout_rate": timeouts / total if total > 0 else 0.0,
                "min_duration": min(durations) if durations else 0.0,
                "max_duration": max(durations) if durations else 0.0,
            }

    def get_total_calls(self) -> int:
        """Get total number of calls in the window."""
        with self._lock:
            now = time.time()
            self._cleanup_old_data(now)
            return len(self.calls)

    def get_success_rate(self) -> float:
        """Get success rate in the window."""
        stats = self.get_stats()
        return stats["success_rate"]

    def get_failure_rate(self) -> float:
        """Get failure rate in the window."""
        stats = self.get_stats()
        return stats["failure_rate"]

    def get_average_response_time(self) -> float:
        """Get average response time in the window."""
        stats = self.get_stats()
        return stats["avg_duration"]

    def get_last_failure_time(self) -> float | None:
        """Get timestamp of last failure."""
        with self._lock:
            for timestamp, success, _ in reversed(self.calls):
                if not success:
                    return timestamp
            return None

    def get_uptime_percentage(self) -> float:
        """Get uptime percentage based on state changes."""
        with self._lock:
            if not self.state_changes:
                return 100.0

            now = time.time()
            window_start = now - self.window_size
            uptime = 0.0
            last_time = window_start
            last_state = CircuitState.CLOSED

            for timestamp, state in self.state_changes:
                if timestamp < window_start:
                    last_state = state
                    continue

                if last_state == CircuitState.CLOSED:
                    uptime += timestamp - last_time

                last_time = timestamp
                last_state = state

            if last_state == CircuitState.CLOSED:
                uptime += now - last_time

            total_time = now - window_start
            return (uptime / total_time * 100) if total_time > 0 else 100.0

    def _cleanup_old_data(self, now: float):
        """Remove data outside the window."""
        cutoff = now - self.window_size

        # Clean up calls
        while self.calls and self.calls[0][0] < cutoff:
            self.calls.popleft()

        # Clean up state changes (keep longer history)
        state_cutoff = now - (self.window_size * 10)
        while self.state_changes and self.state_changes[0][0] < state_cutoff:
            self.state_changes.popleft()


class EnhancedCircuitBreaker:
    """
    Enhanced circuit breaker with failure rate tracking, timeouts, and metrics.
    Thread-safe and supports both sync and async operations.
    """

    def __init__(self, config: CircuitBreakerConfig):
        """Initialize enhanced circuit breaker."""
        self.config = config
        self._state = CircuitState.CLOSED
        self._consecutive_failures = 0
        self._half_open_successes = 0
        self._last_failure_time: float | None = None
        self._metrics = CircuitBreakerMetrics(config.window_size)

        # Thread-safe locks
        self._lock = threading.RLock()
        self._async_lock = asyncio.Lock()

    @property
    def state(self) -> CircuitState:
        """Get current circuit state."""
        with self._lock:
            return self._state

    @property
    def consecutive_failures(self) -> int:
        """Get consecutive failures count."""
        with self._lock:
            return self._consecutive_failures

    @property
    def is_open(self) -> bool:
        """Check if circuit is open."""
        return self.state == CircuitState.OPEN

    @property
    def is_closed(self) -> bool:
        """Check if circuit is closed."""
        return self.state == CircuitState.CLOSED

    def get_metrics(self) -> CircuitBreakerMetrics:
        """Get circuit breaker metrics."""
        return self._metrics

    def time_until_retry(self) -> float | None:
        """Get time until next retry attempt."""
        with self._lock:
            if self._state == CircuitState.OPEN and self._last_failure_time:
                return max(
                    0,
                    self.config.recovery_timeout
                    - (time.time() - self._last_failure_time),
                )
            return None

    def _should_open(self) -> bool:
        """Determine if circuit should open based on detection strategy."""
        stats = self._metrics.get_stats()

        if (
            self.config.detection_strategy
            == FailureDetectionStrategy.CONSECUTIVE_FAILURES
        ):
            return self._consecutive_failures >= self.config.failure_threshold

        elif self.config.detection_strategy == FailureDetectionStrategy.FAILURE_RATE:
            return (
                stats["total_calls"] >= 5  # Minimum calls for rate calculation
                and stats["failure_rate"] >= self.config.failure_rate_threshold
            )

        elif self.config.detection_strategy == FailureDetectionStrategy.TIMEOUT_RATE:
            return (
                stats["total_calls"] >= 5
                and stats["timeout_rate"] >= self.config.failure_rate_threshold
            )

        else:  # COMBINED
            return (
                self._consecutive_failures >= self.config.failure_threshold
                or (
                    stats["total_calls"] >= 5
                    and stats["failure_rate"] >= self.config.failure_rate_threshold
                )
                or (
                    stats["total_calls"] >= 5
                    and stats["timeout_rate"] >= self.config.failure_rate_threshold
                )
            )

    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset."""
        if self._last_failure_time is None:
            return True
        return (time.time() - self._last_failure_time) >= self.config.recovery_timeout

    def _transition_state(self, new_state: CircuitState):
        """Transition to a new state."""
        if self._state != new_state:
            logger.info(
                f"Circuit breaker '{self.config.name}' transitioning from {self._state.value} to {new_state.value}"
            )
            self._state = new_state
            self._metrics.record_state_change(new_state)

    def _on_success(self, duration: float):
        """Handle successful call."""
        with self._lock:
            self._metrics.record_call(True, duration)
            self._consecutive_failures = 0

            if self._state == CircuitState.HALF_OPEN:
                self._half_open_successes += 1
                if self._half_open_successes >= self.config.success_threshold:
                    self._transition_state(CircuitState.CLOSED)
                    self._half_open_successes = 0

    def _on_failure(self, duration: float):
        """Handle failed call."""
        with self._lock:
            self._metrics.record_call(False, duration)
            self._consecutive_failures += 1
            self._last_failure_time = time.time()

            if self._state == CircuitState.HALF_OPEN:
                self._transition_state(CircuitState.OPEN)
                self._half_open_successes = 0
            elif self._state == CircuitState.CLOSED and self._should_open():
                self._transition_state(CircuitState.OPEN)

    def call(self, func: Callable[..., Any], *args, **kwargs) -> Any:
        """Call function through circuit breaker (sync version)."""
        return self.call_sync(func, *args, **kwargs)

    async def call_async(self, func: Callable[..., Any], *args, **kwargs) -> Any:
        """
        Call async function through circuit breaker with timeout support.

        Args:
            func: Async function to call
            *args: Function arguments
            **kwargs: Function keyword arguments

        Returns:
            Function result

        Raises:
            CircuitBreakerError: If circuit is open
            Exception: If function fails
        """
        # Check if we should attempt reset
        async with self._async_lock:
            if self._state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self._transition_state(CircuitState.HALF_OPEN)
                    self._half_open_successes = 0
                else:
                    time_until_retry = self.config.recovery_timeout
                    if self._last_failure_time:
                        time_until_retry = max(
                            0,
                            self.config.recovery_timeout
                            - (time.time() - self._last_failure_time),
                        )
                    raise CircuitBreakerError(
                        service=self.config.name,
                        failure_count=self._consecutive_failures,
                        threshold=self.config.failure_threshold,
                        context={
                            "state": self._state.value,
                            "time_until_retry": round(time_until_retry, 1),
                        },
                    )

        start_time = time.time()
        try:
            # Execute with timeout
            result = await asyncio.wait_for(
                func(*args, **kwargs), timeout=self.config.timeout_threshold
            )
            duration = time.time() - start_time
            self._on_success(duration)
            return result

        except TimeoutError as e:
            duration = time.time() - start_time
            self._on_failure(duration)
            logger.warning(
                f"Circuit breaker '{self.config.name}' timeout after {duration:.2f}s"
            )
            raise ExternalServiceError(
                service=self.config.name,
                message=f"Service timed out after {self.config.timeout_threshold}s",
                context={
                    "timeout": self.config.timeout_threshold,
                },
            ) from e

        except self.config.expected_exceptions:
            duration = time.time() - start_time
            self._on_failure(duration)
            raise

    def call_sync(self, func: Callable[..., Any], *args, **kwargs) -> Any:
        """
        Call sync function through circuit breaker.

        For sync functions, timeout is enforced differently depending on the function type.
        HTTP requests should use their own timeout parameters.
        """
        # Check if we should attempt reset
        with self._lock:
            if self._state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self._transition_state(CircuitState.HALF_OPEN)
                    self._half_open_successes = 0
                else:
                    time_until_retry = self.config.recovery_timeout
                    if self._last_failure_time:
                        time_until_retry = max(
                            0,
                            self.config.recovery_timeout
                            - (time.time() - self._last_failure_time),
                        )
                    raise CircuitBreakerError(
                        service=self.config.name,
                        failure_count=self._consecutive_failures,
                        threshold=self.config.failure_threshold,
                        context={
                            "state": self._state.value,
                            "time_until_retry": round(time_until_retry, 1),
                        },
                    )

        start_time = time.time()
        try:
            result = func(*args, **kwargs)
            duration = time.time() - start_time
            self._on_success(duration)
            return result

        except self.config.expected_exceptions:
            duration = time.time() - start_time
            self._on_failure(duration)
            raise

    def reset(self):
        """Manually reset the circuit breaker."""
        with self._lock:
            self._transition_state(CircuitState.CLOSED)
            self._consecutive_failures = 0
            self._half_open_successes = 0
            self._last_failure_time = None
            logger.info(f"Circuit breaker '{self.config.name}' manually reset")

    def get_status(self) -> dict[str, Any]:
        """Get detailed circuit breaker status."""
        with self._lock:
            stats = self._metrics.get_stats()
            time_until_retry = None

            if self._state == CircuitState.OPEN and self._last_failure_time:
                time_until_retry = max(
                    0,
                    self.config.recovery_timeout
                    - (time.time() - self._last_failure_time),
                )

            return {
                "name": self.config.name,
                "state": self._state.value,
                "consecutive_failures": self._consecutive_failures,
                "time_until_retry": round(time_until_retry, 1)
                if time_until_retry
                else None,
                "metrics": stats,
                "config": {
                    "failure_threshold": self.config.failure_threshold,
                    "failure_rate_threshold": self.config.failure_rate_threshold,
                    "timeout_threshold": self.config.timeout_threshold,
                    "recovery_timeout": self.config.recovery_timeout,
                    "detection_strategy": self.config.detection_strategy.value,
                },
            }


# Global registry of circuit breakers
_breakers: dict[str, EnhancedCircuitBreaker] = {}
_breakers_lock = threading.Lock()


def _get_or_create_breaker(config: CircuitBreakerConfig) -> EnhancedCircuitBreaker:
    """Get or create a circuit breaker."""
    with _breakers_lock:
        if config.name not in _breakers:
            _breakers[config.name] = EnhancedCircuitBreaker(config)
        return _breakers[config.name]


def register_circuit_breaker(name: str, breaker: EnhancedCircuitBreaker):
    """Register a circuit breaker in the global registry."""
    with _breakers_lock:
        _breakers[name] = breaker
        logger.debug(f"Registered circuit breaker: {name}")


def get_circuit_breaker(name: str) -> EnhancedCircuitBreaker | None:
    """Get a circuit breaker by name."""
    return _breakers.get(name)


def get_all_circuit_breakers() -> dict[str, EnhancedCircuitBreaker]:
    """Get all circuit breakers."""
    return _breakers.copy()


def reset_all_circuit_breakers():
    """Reset all circuit breakers."""
    for breaker in _breakers.values():
        breaker.reset()


def get_circuit_breaker_status() -> dict[str, dict[str, Any]]:
    """Get status of all circuit breakers."""
    return {name: breaker.get_status() for name, breaker in _breakers.items()}


def circuit_breaker(
    name: str | None = None,
    failure_threshold: int | None = None,
    failure_rate_threshold: float | None = None,
    timeout_threshold: float | None = None,
    recovery_timeout: int | None = None,
    expected_exceptions: tuple[type[Exception], ...] | None = None,
) -> Callable:
    """
    Decorator to apply circuit breaker to a function.

    Args:
        name: Circuit breaker name (defaults to function name)
        failure_threshold: Override default failure threshold
        failure_rate_threshold: Override default failure rate threshold
        timeout_threshold: Override default timeout threshold
        recovery_timeout: Override default recovery timeout
        expected_exceptions: Exceptions to catch (defaults to Exception)
    """

    def decorator(func: Callable[P, T]) -> Callable[P, T]:
        # Create config with overrides
        cb_name = name or f"{func.__module__}.{getattr(func, '__name__', 'unknown')}"
        config = CircuitBreakerConfig(
            name=cb_name,
            failure_threshold=failure_threshold
            or settings.agent.circuit_breaker_failure_threshold,
            failure_rate_threshold=failure_rate_threshold or 0.5,
            timeout_threshold=timeout_threshold or 30.0,
            recovery_timeout=recovery_timeout
            or settings.agent.circuit_breaker_recovery_timeout,
            expected_exceptions=expected_exceptions or (Exception,),
        )

        # Get or create circuit breaker for this function
        breaker = _get_or_create_breaker(config)

        if asyncio.iscoroutinefunction(func):

            @functools.wraps(func)
            async def async_wrapper(*args, **kwargs):
                return await breaker.call_async(func, *args, **kwargs)

            return cast(Callable[..., T], async_wrapper)
        else:

            @functools.wraps(func)
            def sync_wrapper(*args, **kwargs):
                return breaker.call_sync(func, *args, **kwargs)

            return cast(Callable[..., T], sync_wrapper)

    return decorator


# Circuit breaker configurations for different services
CIRCUIT_BREAKER_CONFIGS = {
    "yfinance": CircuitBreakerConfig(
        name="yfinance",
        failure_threshold=3,
        failure_rate_threshold=0.6,
        timeout_threshold=30.0,
        recovery_timeout=120,
        success_threshold=2,
        window_size=300,
        detection_strategy=FailureDetectionStrategy.COMBINED,
        expected_exceptions=(Exception,),
    ),
    "tiingo": CircuitBreakerConfig(
        name="tiingo",
        failure_threshold=5,
        failure_rate_threshold=0.7,
        timeout_threshold=15.0,
        recovery_timeout=60,
        success_threshold=3,
        window_size=300,
        detection_strategy=FailureDetectionStrategy.COMBINED,
        expected_exceptions=(Exception,),
    ),
    "fred_api": CircuitBreakerConfig(
        name="fred_api",
        failure_threshold=3,
        failure_rate_threshold=0.5,
        timeout_threshold=20.0,
        recovery_timeout=180,
        success_threshold=2,
        window_size=600,
        detection_strategy=FailureDetectionStrategy.COMBINED,
        expected_exceptions=(Exception,),
    ),
    "openrouter": CircuitBreakerConfig(
        name="openrouter",
        failure_threshold=5,
        failure_rate_threshold=0.6,
        timeout_threshold=60.0,  # AI APIs can be slower
        recovery_timeout=120,
        success_threshold=2,
        window_size=300,
        detection_strategy=FailureDetectionStrategy.COMBINED,
        expected_exceptions=(Exception,),
    ),
    "exa": CircuitBreakerConfig(
        name="exa",
        failure_threshold=4,
        failure_rate_threshold=0.6,
        timeout_threshold=30.0,
        recovery_timeout=90,
        success_threshold=2,
        window_size=300,
        detection_strategy=FailureDetectionStrategy.COMBINED,
        expected_exceptions=(Exception,),
    ),
    "news_api": CircuitBreakerConfig(
        name="news_api",
        failure_threshold=3,
        failure_rate_threshold=0.5,
        timeout_threshold=25.0,
        recovery_timeout=120,
        success_threshold=2,
        window_size=300,
        detection_strategy=FailureDetectionStrategy.COMBINED,
        expected_exceptions=(Exception,),
    ),
    "finviz": CircuitBreakerConfig(
        name="finviz",
        failure_threshold=3,
        failure_rate_threshold=0.6,
        timeout_threshold=20.0,
        recovery_timeout=150,
        success_threshold=2,
        window_size=300,
        detection_strategy=FailureDetectionStrategy.COMBINED,
        expected_exceptions=(Exception,),
    ),
    "external_api": CircuitBreakerConfig(
        name="external_api",
        failure_threshold=4,
        failure_rate_threshold=0.6,
        timeout_threshold=25.0,
        recovery_timeout=120,
        success_threshold=2,
        window_size=300,
        detection_strategy=FailureDetectionStrategy.COMBINED,
        expected_exceptions=(Exception,),
    ),
}


def initialize_circuit_breakers() -> dict[str, EnhancedCircuitBreaker]:
    """Initialize all circuit breakers for external services."""
    circuit_breakers = {}

    for service_name, config in CIRCUIT_BREAKER_CONFIGS.items():
        try:
            breaker = EnhancedCircuitBreaker(config)
            register_circuit_breaker(service_name, breaker)
            circuit_breakers[service_name] = breaker
            logger.info(f"Initialized circuit breaker for {service_name}")
        except Exception as e:
            logger.error(
                f"Failed to initialize circuit breaker for {service_name}: {e}"
            )

    logger.info(f"Initialized {len(circuit_breakers)} circuit breakers")
    return circuit_breakers


def with_circuit_breaker(service_name: str):
    """Decorator to wrap functions with a circuit breaker.

    Args:
        service_name: Name of the service/circuit breaker to use

    Usage:
        @with_circuit_breaker("yfinance")
        def fetch_stock_data(symbol: str):
            # API call code here
            pass
    """

    def decorator(func: Callable[P, T]) -> Callable[P, T]:
        @functools.wraps(func)
        def wrapper(*args, **kwargs) -> T:
            breaker = get_circuit_breaker(service_name)
            if not breaker:
                logger.warning(
                    f"Circuit breaker '{service_name}' not found, executing without protection"
                )
                return func(*args, **kwargs)

            return breaker.call(func, *args, **kwargs)

        return wrapper

    return decorator


def with_async_circuit_breaker(service_name: str):
    """Decorator to wrap async functions with a circuit breaker.

    Args:
        service_name: Name of the service/circuit breaker to use

    Usage:
        @with_async_circuit_breaker("tiingo")
        async def fetch_real_time_data(symbol: str):
            # Async API call code here
            pass
    """

    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            breaker = get_circuit_breaker(service_name)
            if not breaker:
                logger.warning(
                    f"Circuit breaker '{service_name}' not found, executing without protection"
                )
                return await func(*args, **kwargs)

            return await breaker.call_async(func, *args, **kwargs)

        return wrapper

    return decorator


class CircuitBreakerManager:
    """Manager for all circuit breakers in the application."""

    def __init__(self):
        self._breakers = {}
        self._initialized = False

    def initialize(self) -> bool:
        """Initialize all circuit breakers."""
        if self._initialized:
            return True

        try:
            self._breakers = initialize_circuit_breakers()
            self._initialized = True
            logger.info("Circuit breaker manager initialized successfully")
            return True
        except Exception as e:
            logger.error(f"Failed to initialize circuit breaker manager: {e}")
            return False

    def get_breaker(self, service_name: str) -> EnhancedCircuitBreaker | None:
        """Get a circuit breaker by service name."""
        if not self._initialized:
            self.initialize()

        return self._breakers.get(service_name)

    def get_all_breakers(self) -> dict[str, EnhancedCircuitBreaker]:
        """Get all circuit breakers."""
        if not self._initialized:
            self.initialize()

        return self._breakers.copy()

    def reset_breaker(self, service_name: str) -> bool:
        """Reset a specific circuit breaker."""
        breaker = self.get_breaker(service_name)
        if breaker:
            breaker.reset()
            logger.info(f"Reset circuit breaker for {service_name}")
            return True
        return False

    def reset_all_breakers(self) -> int:
        """Reset all circuit breakers."""
        reset_count = 0
        for service_name, breaker in self._breakers.items():
            try:
                breaker.reset()
                reset_count += 1
                logger.info(f"Reset circuit breaker for {service_name}")
            except Exception as e:
                logger.error(f"Failed to reset circuit breaker for {service_name}: {e}")

        logger.info(f"Reset {reset_count} circuit breakers")
        return reset_count

    def get_health_status(self) -> dict[str, dict[str, Any]]:
        """Get health status of all circuit breakers."""
        if not self._initialized:
            self.initialize()

        status = {}
        for service_name, breaker in self._breakers.items():
            try:
                metrics = breaker.get_metrics()
                status[service_name] = {
                    "name": service_name,
                    "state": breaker.state.value,
                    "consecutive_failures": breaker.consecutive_failures,
                    "time_until_retry": breaker.time_until_retry(),
                    "metrics": {
                        "total_calls": metrics.get_total_calls(),
                        "success_rate": metrics.get_success_rate(),
                        "failure_rate": metrics.get_failure_rate(),
                        "avg_response_time": metrics.get_average_response_time(),
                        "last_failure_time": metrics.get_last_failure_time(),
                        "uptime_percentage": metrics.get_uptime_percentage(),
                    },
                }
            except Exception as e:
                status[service_name] = {
                    "name": service_name,
                    "state": "error",
                    "error": str(e),
                }

        return status


# Global circuit breaker manager instance
_circuit_breaker_manager = CircuitBreakerManager()


def get_circuit_breaker_manager() -> CircuitBreakerManager:
    """Get the global circuit breaker manager."""
    return _circuit_breaker_manager


def initialize_all_circuit_breakers() -> bool:
    """Initialize all circuit breakers (convenience function)."""
    return _circuit_breaker_manager.initialize()


def get_all_circuit_breaker_status() -> dict[str, dict[str, Any]]:
    """Get status of all circuit breakers (convenience function)."""
    return _circuit_breaker_manager.get_health_status()


# Specific circuit breaker decorators for common services


def with_yfinance_circuit_breaker(func: F) -> F:  # noqa: UP047
    """Decorator for yfinance API calls."""
    return cast(F, with_circuit_breaker("yfinance")(func))


def with_tiingo_circuit_breaker(func: F) -> F:  # noqa: UP047
    """Decorator for Tiingo API calls."""
    return cast(F, with_circuit_breaker("tiingo")(func))


def with_fred_circuit_breaker(func: F) -> F:  # noqa: UP047
    """Decorator for FRED API calls."""
    return cast(F, with_circuit_breaker("fred_api")(func))


def with_openrouter_circuit_breaker(func: F) -> F:  # noqa: UP047
    """Decorator for OpenRouter API calls."""
    return cast(F, with_circuit_breaker("openrouter")(func))


def with_exa_circuit_breaker(func: F) -> F:  # noqa: UP047
    """Decorator for Exa API calls."""
    return cast(F, with_circuit_breaker("exa")(func))


# Async versions


def with_async_yfinance_circuit_breaker(func: F) -> F:  # noqa: UP047
    """Async decorator for yfinance API calls."""
    return cast(F, with_async_circuit_breaker("yfinance")(func))


def with_async_tiingo_circuit_breaker(func: F) -> F:  # noqa: UP047
    """Async decorator for Tiingo API calls."""
    return cast(F, with_async_circuit_breaker("tiingo")(func))


def with_async_fred_circuit_breaker(func: F) -> F:  # noqa: UP047
    """Async decorator for FRED API calls."""
    return cast(F, with_async_circuit_breaker("fred_api")(func))


def with_async_openrouter_circuit_breaker(func: F) -> F:  # noqa: UP047
    """Async decorator for OpenRouter API calls."""
    return cast(F, with_async_circuit_breaker("openrouter")(func))


def with_async_exa_circuit_breaker(func: F) -> F:  # noqa: UP047
    """Async decorator for Exa API calls."""
    return cast(F, with_async_circuit_breaker("exa")(func))

```
Page 21/29FirstPrevNextLast