#
tokens: 48165/50000 4/437 files (page 27/28)
lines: off (toggle) GitHub
raw markdown copy
This is page 27 of 28. Use http://codebase.md/wshobson/maverick-mcp?page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── feature_request.md
│   │   ├── question.md
│   │   └── security_report.md
│   ├── pull_request_template.md
│   └── workflows
│       ├── claude-code-review.yml
│       └── claude.yml
├── .gitignore
├── .jules
│   └── bolt.md
├── .python-version
├── .vscode
│   ├── launch.json
│   └── settings.json
├── alembic
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       ├── 001_initial_schema.py
│       ├── 003_add_performance_indexes.py
│       ├── 006_rename_metadata_columns.py
│       ├── 008_performance_optimization_indexes.py
│       ├── 009_rename_to_supply_demand.py
│       ├── 010_self_contained_schema.py
│       ├── 011_remove_proprietary_terms.py
│       ├── 013_add_backtest_persistence_models.py
│       ├── 014_add_portfolio_models.py
│       ├── 08e3945a0c93_merge_heads.py
│       ├── 9374a5c9b679_merge_heads_for_testing.py
│       ├── abf9b9afb134_merge_multiple_heads.py
│       ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│       ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│       ├── f0696e2cac15_add_essential_performance_indexes.py
│       └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── api
│   │   └── backtesting.md
│   ├── BACKTESTING.md
│   ├── COST_BASIS_SPECIFICATION.md
│   ├── deep_research_agent.md
│   ├── exa_research_testing_strategy.md
│   ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│   ├── PORTFOLIO.md
│   ├── SETUP_SELF_CONTAINED.md
│   └── speed_testing_framework.md
├── examples
│   ├── complete_speed_validation.py
│   ├── deep_research_integration.py
│   ├── llm_optimization_example.py
│   ├── llm_speed_demo.py
│   ├── monitoring_example.py
│   ├── parallel_research_example.py
│   ├── speed_optimization_demo.py
│   └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│   ├── __init__.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── circuit_breaker.py
│   │   ├── deep_research.py
│   │   ├── market_analysis.py
│   │   ├── optimized_research.py
│   │   ├── supervisor.py
│   │   └── technical_analysis.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── api_server.py
│   │   ├── connection_manager.py
│   │   ├── dependencies
│   │   │   ├── __init__.py
│   │   │   ├── stock_analysis.py
│   │   │   └── technical_analysis.py
│   │   ├── error_handling.py
│   │   ├── inspector_compatible_sse.py
│   │   ├── inspector_sse.py
│   │   ├── middleware
│   │   │   ├── error_handling.py
│   │   │   ├── mcp_logging.py
│   │   │   ├── rate_limiting_enhanced.py
│   │   │   └── security.py
│   │   ├── openapi_config.py
│   │   ├── routers
│   │   │   ├── __init__.py
│   │   │   ├── agents.py
│   │   │   ├── backtesting.py
│   │   │   ├── data_enhanced.py
│   │   │   ├── data.py
│   │   │   ├── health_enhanced.py
│   │   │   ├── health_tools.py
│   │   │   ├── health.py
│   │   │   ├── intelligent_backtesting.py
│   │   │   ├── introspection.py
│   │   │   ├── mcp_prompts.py
│   │   │   ├── monitoring.py
│   │   │   ├── news_sentiment_enhanced.py
│   │   │   ├── performance.py
│   │   │   ├── portfolio.py
│   │   │   ├── research.py
│   │   │   ├── screening_ddd.py
│   │   │   ├── screening_parallel.py
│   │   │   ├── screening.py
│   │   │   ├── technical_ddd.py
│   │   │   ├── technical_enhanced.py
│   │   │   ├── technical.py
│   │   │   └── tool_registry.py
│   │   ├── server.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   ├── base_service.py
│   │   │   ├── market_service.py
│   │   │   ├── portfolio_service.py
│   │   │   ├── prompt_service.py
│   │   │   └── resource_service.py
│   │   ├── simple_sse.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── insomnia_export.py
│   │       └── postman_export.py
│   ├── application
│   │   ├── __init__.py
│   │   ├── commands
│   │   │   └── __init__.py
│   │   ├── dto
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_dto.py
│   │   ├── queries
│   │   │   ├── __init__.py
│   │   │   └── get_technical_analysis.py
│   │   └── screening
│   │       ├── __init__.py
│   │       ├── dtos.py
│   │       └── queries.py
│   ├── backtesting
│   │   ├── __init__.py
│   │   ├── ab_testing.py
│   │   ├── analysis.py
│   │   ├── batch_processing_stub.py
│   │   ├── batch_processing.py
│   │   ├── model_manager.py
│   │   ├── optimization.py
│   │   ├── persistence.py
│   │   ├── retraining_pipeline.py
│   │   ├── strategies
│   │   │   ├── __init__.py
│   │   │   ├── base.py
│   │   │   ├── ml
│   │   │   │   ├── __init__.py
│   │   │   │   ├── adaptive.py
│   │   │   │   ├── ensemble.py
│   │   │   │   ├── feature_engineering.py
│   │   │   │   └── regime_aware.py
│   │   │   ├── ml_strategies.py
│   │   │   ├── parser.py
│   │   │   └── templates.py
│   │   ├── strategy_executor.py
│   │   ├── vectorbt_engine.py
│   │   └── visualization.py
│   ├── config
│   │   ├── __init__.py
│   │   ├── constants.py
│   │   ├── database_self_contained.py
│   │   ├── database.py
│   │   ├── llm_optimization_config.py
│   │   ├── logging_settings.py
│   │   ├── plotly_config.py
│   │   ├── security_utils.py
│   │   ├── security.py
│   │   ├── settings.py
│   │   ├── technical_constants.py
│   │   ├── tool_estimation.py
│   │   └── validation.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── technical_analysis.py
│   │   └── visualization.py
│   ├── data
│   │   ├── __init__.py
│   │   ├── cache_manager.py
│   │   ├── cache.py
│   │   ├── django_adapter.py
│   │   ├── health.py
│   │   ├── models.py
│   │   ├── performance.py
│   │   ├── session_management.py
│   │   └── validation.py
│   ├── database
│   │   ├── __init__.py
│   │   ├── base.py
│   │   └── optimization.py
│   ├── dependencies.py
│   ├── domain
│   │   ├── __init__.py
│   │   ├── entities
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis.py
│   │   ├── events
│   │   │   └── __init__.py
│   │   ├── portfolio.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   ├── entities.py
│   │   │   ├── services.py
│   │   │   └── value_objects.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_service.py
│   │   ├── stock_analysis
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis_service.py
│   │   └── value_objects
│   │       ├── __init__.py
│   │       └── technical_indicators.py
│   ├── exceptions.py
│   ├── infrastructure
│   │   ├── __init__.py
│   │   ├── cache
│   │   │   └── __init__.py
│   │   ├── caching
│   │   │   ├── __init__.py
│   │   │   └── cache_management_service.py
│   │   ├── connection_manager.py
│   │   ├── data_fetching
│   │   │   ├── __init__.py
│   │   │   └── stock_data_service.py
│   │   ├── health
│   │   │   ├── __init__.py
│   │   │   └── health_checker.py
│   │   ├── persistence
│   │   │   ├── __init__.py
│   │   │   └── stock_repository.py
│   │   ├── providers
│   │   │   └── __init__.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   └── repositories.py
│   │   └── sse_optimizer.py
│   ├── langchain_tools
│   │   ├── __init__.py
│   │   ├── adapters.py
│   │   └── registry.py
│   ├── logging_config.py
│   ├── memory
│   │   ├── __init__.py
│   │   └── stores.py
│   ├── monitoring
│   │   ├── __init__.py
│   │   ├── health_check.py
│   │   ├── health_monitor.py
│   │   ├── integration_example.py
│   │   ├── metrics.py
│   │   ├── middleware.py
│   │   └── status_dashboard.py
│   ├── providers
│   │   ├── __init__.py
│   │   ├── dependencies.py
│   │   ├── factories
│   │   │   ├── __init__.py
│   │   │   ├── config_factory.py
│   │   │   └── provider_factory.py
│   │   ├── implementations
│   │   │   ├── __init__.py
│   │   │   ├── cache_adapter.py
│   │   │   ├── macro_data_adapter.py
│   │   │   ├── market_data_adapter.py
│   │   │   ├── persistence_adapter.py
│   │   │   └── stock_data_adapter.py
│   │   ├── interfaces
│   │   │   ├── __init__.py
│   │   │   ├── cache.py
│   │   │   ├── config.py
│   │   │   ├── macro_data.py
│   │   │   ├── market_data.py
│   │   │   ├── persistence.py
│   │   │   └── stock_data.py
│   │   ├── llm_factory.py
│   │   ├── macro_data.py
│   │   ├── market_data.py
│   │   ├── mocks
│   │   │   ├── __init__.py
│   │   │   ├── mock_cache.py
│   │   │   ├── mock_config.py
│   │   │   ├── mock_macro_data.py
│   │   │   ├── mock_market_data.py
│   │   │   ├── mock_persistence.py
│   │   │   └── mock_stock_data.py
│   │   ├── openrouter_provider.py
│   │   ├── optimized_screening.py
│   │   ├── optimized_stock_data.py
│   │   └── stock_data.py
│   ├── README.md
│   ├── tests
│   │   ├── __init__.py
│   │   ├── README_INMEMORY_TESTS.md
│   │   ├── test_cache_debug.py
│   │   ├── test_fixes_validation.py
│   │   ├── test_in_memory_routers.py
│   │   ├── test_in_memory_server.py
│   │   ├── test_macro_data_provider.py
│   │   ├── test_mailgun_email.py
│   │   ├── test_market_calendar_caching.py
│   │   ├── test_mcp_tool_fixes_pytest.py
│   │   ├── test_mcp_tool_fixes.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_models_functional.py
│   │   ├── test_server.py
│   │   ├── test_stock_data_enhanced.py
│   │   ├── test_stock_data_provider.py
│   │   └── test_technical_analysis.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── performance_monitoring.py
│   │   ├── portfolio_manager.py
│   │   ├── risk_management.py
│   │   └── sentiment_analysis.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── agent_errors.py
│   │   ├── batch_processing.py
│   │   ├── cache_warmer.py
│   │   ├── circuit_breaker_decorators.py
│   │   ├── circuit_breaker_services.py
│   │   ├── circuit_breaker.py
│   │   ├── data_chunking.py
│   │   ├── database_monitoring.py
│   │   ├── debug_utils.py
│   │   ├── fallback_strategies.py
│   │   ├── llm_optimization.py
│   │   ├── logging_example.py
│   │   ├── logging_init.py
│   │   ├── logging.py
│   │   ├── mcp_logging.py
│   │   ├── memory_profiler.py
│   │   ├── monitoring_middleware.py
│   │   ├── monitoring.py
│   │   ├── orchestration_logging.py
│   │   ├── parallel_research.py
│   │   ├── parallel_screening.py
│   │   ├── quick_cache.py
│   │   ├── resource_manager.py
│   │   ├── shutdown.py
│   │   ├── stock_helpers.py
│   │   ├── structured_logger.py
│   │   ├── tool_monitoring.py
│   │   ├── tracing.py
│   │   └── yfinance_pool.py
│   ├── validation
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── data.py
│   │   ├── middleware.py
│   │   ├── portfolio.py
│   │   ├── responses.py
│   │   ├── screening.py
│   │   └── technical.py
│   └── workflows
│       ├── __init__.py
│       ├── agents
│       │   ├── __init__.py
│       │   ├── market_analyzer.py
│       │   ├── optimizer_agent.py
│       │   ├── strategy_selector.py
│       │   └── validator_agent.py
│       ├── backtesting_workflow.py
│       └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── dev.sh
│   ├── INSTALLATION_GUIDE.md
│   ├── load_example.py
│   ├── load_market_data.py
│   ├── load_tiingo_data.py
│   ├── migrate_db.py
│   ├── README_TIINGO_LOADER.md
│   ├── requirements_tiingo.txt
│   ├── run_stock_screening.py
│   ├── run-migrations.sh
│   ├── seed_db.py
│   ├── seed_sp500.py
│   ├── setup_database.sh
│   ├── setup_self_contained.py
│   ├── setup_sp500_database.sh
│   ├── test_seeded_data.py
│   ├── test_tiingo_loader.py
│   ├── tiingo_config.py
│   └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── core
│   │   └── test_technical_analysis.py
│   ├── data
│   │   └── test_portfolio_models.py
│   ├── domain
│   │   ├── conftest.py
│   │   ├── test_portfolio_entities.py
│   │   └── test_technical_analysis_service.py
│   ├── fixtures
│   │   └── orchestration_fixtures.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── README.md
│   │   ├── run_integration_tests.sh
│   │   ├── test_api_technical.py
│   │   ├── test_chaos_engineering.py
│   │   ├── test_config_management.py
│   │   ├── test_full_backtest_workflow_advanced.py
│   │   ├── test_full_backtest_workflow.py
│   │   ├── test_high_volume.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_orchestration_complete.py
│   │   ├── test_portfolio_persistence.py
│   │   ├── test_redis_cache.py
│   │   ├── test_security_integration.py.disabled
│   │   └── vcr_setup.py
│   ├── performance
│   │   ├── __init__.py
│   │   ├── test_benchmarks.py
│   │   ├── test_load.py
│   │   ├── test_profiling.py
│   │   └── test_stress.py
│   ├── providers
│   │   └── test_stock_data_simple.py
│   ├── README.md
│   ├── test_agents_router_mcp.py
│   ├── test_backtest_persistence.py
│   ├── test_cache_management_service.py
│   ├── test_cache_serialization.py
│   ├── test_circuit_breaker.py
│   ├── test_database_pool_config_simple.py
│   ├── test_database_pool_config.py
│   ├── test_deep_research_functional.py
│   ├── test_deep_research_integration.py
│   ├── test_deep_research_parallel_execution.py
│   ├── test_error_handling.py
│   ├── test_event_loop_integrity.py
│   ├── test_exa_research_integration.py
│   ├── test_exception_hierarchy.py
│   ├── test_financial_search.py
│   ├── test_graceful_shutdown.py
│   ├── test_integration_simple.py
│   ├── test_langgraph_workflow.py
│   ├── test_market_data_async.py
│   ├── test_market_data_simple.py
│   ├── test_mcp_orchestration_functional.py
│   ├── test_ml_strategies.py
│   ├── test_optimized_research_agent.py
│   ├── test_orchestration_integration.py
│   ├── test_orchestration_logging.py
│   ├── test_orchestration_tools_simple.py
│   ├── test_parallel_research_integration.py
│   ├── test_parallel_research_orchestrator.py
│   ├── test_parallel_research_performance.py
│   ├── test_performance_optimizations.py
│   ├── test_production_validation.py
│   ├── test_provider_architecture.py
│   ├── test_rate_limiting_enhanced.py
│   ├── test_runner_validation.py
│   ├── test_security_comprehensive.py.disabled
│   ├── test_security_cors.py
│   ├── test_security_enhancements.py.disabled
│   ├── test_security_headers.py
│   ├── test_security_penetration.py
│   ├── test_session_management.py
│   ├── test_speed_optimization_validation.py
│   ├── test_stock_analysis_dependencies.py
│   ├── test_stock_analysis_service.py
│   ├── test_stock_data_fetching_service.py
│   ├── test_supervisor_agent.py
│   ├── test_supervisor_functional.py
│   ├── test_tool_estimation_config.py
│   ├── test_visualization.py
│   ├── unit
│   │   └── test_stock_repository_adapter.py
│   └── utils
│       ├── test_agent_errors.py
│       ├── test_logging.py
│       ├── test_parallel_screening.py
│       └── test_quick_cache.py
├── tools
│   ├── check_orchestration_config.py
│   ├── experiments
│   │   ├── validation_examples.py
│   │   └── validation_fixed.py
│   ├── fast_dev.sh
│   ├── hot_reload.py
│   ├── quick_test.py
│   └── templates
│       ├── new_router_template.py
│       ├── new_tool_template.py
│       ├── screening_strategy_template.py
│       └── test_template.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/maverick_mcp/providers/stock_data.py:
--------------------------------------------------------------------------------

```python
"""
Enhanced stock data provider with SQLAlchemy integration and screening recommendations.
Provides comprehensive stock data retrieval with database caching and maverick screening.
"""

# Suppress specific pyright warnings for pandas operations
# pyright: reportOperatorIssue=false

import logging
from datetime import UTC, datetime, timedelta

import pandas as pd
import pandas_market_calendars as mcal
import pytz
import yfinance as yf
from dotenv import load_dotenv
from sqlalchemy import text
from sqlalchemy.orm import Session

from maverick_mcp.data.models import (
    MaverickBearStocks,
    MaverickStocks,
    PriceCache,
    SessionLocal,
    Stock,
    SupplyDemandBreakoutStocks,
    bulk_insert_price_data,
    get_latest_maverick_screening,
)
from maverick_mcp.data.session_management import get_db_session_read_only
from maverick_mcp.utils.circuit_breaker_decorators import (
    with_stock_data_circuit_breaker,
)
from maverick_mcp.utils.yfinance_pool import get_yfinance_pool

# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("maverick_mcp.stock_data")


class EnhancedStockDataProvider:
    """
    Enhanced provider for stock data with database caching and screening recommendations.
    """

    def __init__(self, db_session: Session | None = None):
        """
        Initialize the stock data provider.

        Args:
            db_session: Optional database session for dependency injection.
                       If not provided, will get sessions as needed.
        """
        self.timeout = 30
        self.max_retries = 3
        self.cache_days = 1  # Cache data for 1 day by default
        # Initialize NYSE calendar for US stock market
        self.market_calendar = mcal.get_calendar("NYSE")
        self._db_session = db_session
        # Initialize yfinance connection pool
        self._yf_pool = get_yfinance_pool()
        if db_session:
            # Test the provided session
            self._test_db_connection_with_session(db_session)
        else:
            # Test creating a new session
            self._test_db_connection()

    def _test_db_connection(self):
        """Test database connection on initialization."""
        try:
            # Use read-only context manager for automatic session management
            with get_db_session_read_only() as session:
                # Try a simple query
                result = session.execute(text("SELECT 1"))
                result.fetchone()
                logger.info("Database connection successful")
        except Exception as e:
            logger.warning(
                f"Database connection test failed: {e}. Caching will be disabled."
            )

    def _test_db_connection_with_session(self, session: Session):
        """Test provided database session."""
        try:
            # Try a simple query
            result = session.execute(text("SELECT 1"))
            result.fetchone()
            logger.info("Database session test successful")
        except Exception as e:
            logger.warning(
                f"Database session test failed: {e}. Caching may not work properly."
            )

    def _get_data_with_smart_cache(
        self, symbol: str, start_date: str, end_date: str, interval: str
    ) -> pd.DataFrame:
        """
        Get stock data using smart caching strategy.

        This method:
        1. Gets all available data from cache
        2. Identifies missing date ranges
        3. Fetches only missing data from yfinance
        4. Combines and returns the complete dataset

        Args:
            symbol: Stock ticker symbol
            start_date: Start date in YYYY-MM-DD format
            end_date: End date in YYYY-MM-DD format
            interval: Data interval (only '1d' is cached)

        Returns:
            DataFrame with complete stock data
        """
        symbol = symbol.upper()
        session, should_close = self._get_db_session()

        try:
            # Step 1: Get ALL available cached data for the date range
            logger.info(f"Checking cache for {symbol} from {start_date} to {end_date}")
            cached_df = self._get_cached_data_flexible(
                session, symbol, start_date, end_date
            )

            # Convert dates for comparison - ensure timezone-naive for consistency
            start_dt = pd.to_datetime(start_date).tz_localize(None)
            end_dt = pd.to_datetime(end_date).tz_localize(None)

            # Step 2: Determine what data we need
            if cached_df is not None and not cached_df.empty:
                logger.info(f"Found {len(cached_df)} cached records for {symbol}")

                # Check if we have all the data we need - ensure timezone-naive for comparison
                cached_start = pd.to_datetime(cached_df.index.min()).tz_localize(None)
                cached_end = pd.to_datetime(cached_df.index.max()).tz_localize(None)

                # Identify missing ranges
                missing_ranges = []

                # Missing data at the beginning?
                if start_dt < cached_start:
                    # Get trading days in the missing range
                    missing_start_trading = self._get_trading_days(
                        start_dt, cached_start - timedelta(days=1)
                    )
                    if len(missing_start_trading) > 0:
                        # Only request data if there are trading days
                        missing_ranges.append(
                            (
                                missing_start_trading[0].strftime("%Y-%m-%d"),
                                missing_start_trading[-1].strftime("%Y-%m-%d"),
                            )
                        )

                # Missing recent data?
                if end_dt > cached_end:
                    # Check if there are any trading days after our cached data
                    if self._is_trading_day_between(cached_end, end_dt):
                        # Get the actual trading days we need
                        missing_end_trading = self._get_trading_days(
                            cached_end + timedelta(days=1), end_dt
                        )
                        if len(missing_end_trading) > 0:
                            missing_ranges.append(
                                (
                                    missing_end_trading[0].strftime("%Y-%m-%d"),
                                    missing_end_trading[-1].strftime("%Y-%m-%d"),
                                )
                            )

                # If no missing data, return cached data
                if not missing_ranges:
                    logger.info(
                        f"Cache hit! Returning {len(cached_df)} cached records for {symbol}"
                    )
                    # Filter to requested range - ensure index is timezone-naive
                    cached_df.index = pd.to_datetime(cached_df.index).tz_localize(None)
                    mask = (cached_df.index >= start_dt) & (cached_df.index <= end_dt)
                    return cached_df.loc[mask]

                # Step 3: Fetch only missing data
                logger.info(f"Cache partial hit. Missing ranges: {missing_ranges}")
                all_dfs = [cached_df]

                for miss_start, miss_end in missing_ranges:
                    logger.info(
                        f"Fetching missing data for {symbol} from {miss_start} to {miss_end}"
                    )
                    missing_df = self._fetch_stock_data_from_yfinance(
                        symbol, miss_start, miss_end, None, interval
                    )
                    if not missing_df.empty:
                        all_dfs.append(missing_df)
                        # Cache the new data
                        self._cache_price_data(session, symbol, missing_df)

                # Combine all data
                combined_df = pd.concat(all_dfs).sort_index()
                # Remove any duplicates (keep first)
                combined_df = combined_df[~combined_df.index.duplicated(keep="first")]

                # Filter to requested range - ensure index is timezone-naive
                combined_df.index = pd.to_datetime(combined_df.index).tz_localize(None)
                mask = (combined_df.index >= start_dt) & (combined_df.index <= end_dt)
                return combined_df.loc[mask]

            else:
                # No cached data, fetch everything but only for trading days
                logger.info(
                    f"No cached data found for {symbol}, fetching from yfinance"
                )

                # Adjust dates to trading days
                trading_days = self._get_trading_days(start_date, end_date)
                if len(trading_days) == 0:
                    logger.warning(
                        f"No trading days found between {start_date} and {end_date}"
                    )
                    return pd.DataFrame(
                        columns=[  # type: ignore[arg-type]
                            "Open",
                            "High",
                            "Low",
                            "Close",
                            "Volume",
                            "Dividends",
                            "Stock Splits",
                        ]
                    )

                # Fetch data only for the trading day range
                fetch_start = trading_days[0].strftime("%Y-%m-%d")
                fetch_end = trading_days[-1].strftime("%Y-%m-%d")

                logger.info(
                    f"Fetching data for trading days: {fetch_start} to {fetch_end}"
                )
                df = self._fetch_stock_data_from_yfinance(
                    symbol, fetch_start, fetch_end, None, interval
                )
                if not df.empty:
                    # Ensure stock exists and cache the data
                    self._get_or_create_stock(session, symbol)
                    self._cache_price_data(session, symbol, df)
                return df

        finally:
            if should_close:
                session.close()

    def _get_cached_data_flexible(
        self, session: Session, symbol: str, start_date: str, end_date: str
    ) -> pd.DataFrame | None:
        """
        Get cached data with flexible date range.

        Unlike the strict version, this returns whatever cached data exists
        within the requested range, even if incomplete.

        Args:
            session: Database session
            symbol: Stock ticker symbol (will be uppercased)
            start_date: Start date in YYYY-MM-DD format
            end_date: End date in YYYY-MM-DD format

        Returns:
            DataFrame with available cached data or None
        """
        try:
            # Get whatever data exists in the range
            df = PriceCache.get_price_data(session, symbol, start_date, end_date)

            if df.empty:
                return None

            # Add expected columns for compatibility
            for col in ["Dividends", "Stock Splits"]:
                if col not in df.columns:
                    df[col] = 0.0

            # Ensure column names match yfinance format
            column_mapping = {
                "open": "Open",
                "high": "High",
                "low": "Low",
                "close": "Close",
                "volume": "Volume",
            }
            df.rename(columns=column_mapping, inplace=True)

            # Ensure proper data types to match yfinance
            # Convert Decimal to float for price columns
            for col in ["Open", "High", "Low", "Close"]:
                if col in df.columns:
                    df[col] = pd.to_numeric(df[col], errors="coerce").astype("float64")

            # Convert volume to int
            if "Volume" in df.columns:
                df["Volume"] = (
                    pd.to_numeric(df["Volume"], errors="coerce")
                    .fillna(0)
                    .astype("int64")
                )

            # Ensure index is timezone-naive for consistency
            df.index = pd.to_datetime(df.index).tz_localize(None)

            return df

        except Exception as e:
            logger.error(f"Error getting flexible cached data: {e}")
            return None

    def _is_trading_day_between(
        self, start_date: pd.Timestamp, end_date: pd.Timestamp
    ) -> bool:
        """
        Check if there's a trading day between two dates using market calendar.

        Args:
            start_date: Start date
            end_date: End date

        Returns:
            True if there's a trading day between the dates
        """
        # Add one day to start since we're checking "between"
        check_start = start_date + timedelta(days=1)

        if check_start > end_date:
            return False

        # Get trading days in the range
        trading_days = self._get_trading_days(check_start, end_date)
        return len(trading_days) > 0

    def _get_trading_days(self, start_date, end_date) -> pd.DatetimeIndex:
        """
        Get all trading days between start and end dates.

        Args:
            start_date: Start date (can be string or datetime)
            end_date: End date (can be string or datetime)

        Returns:
            DatetimeIndex of trading days (timezone-naive)
        """
        # Ensure dates are datetime objects (timezone-naive)
        if isinstance(start_date, str):
            start_date = pd.to_datetime(start_date).tz_localize(None)
        else:
            start_date = pd.to_datetime(start_date).tz_localize(None)
        if isinstance(end_date, str):
            end_date = pd.to_datetime(end_date).tz_localize(None)
        else:
            end_date = pd.to_datetime(end_date).tz_localize(None)

        # Get valid trading days from market calendar
        schedule = self.market_calendar.schedule(
            start_date=start_date, end_date=end_date
        )
        # Return timezone-naive index
        return schedule.index.tz_localize(None)

    def _get_last_trading_day(self, date) -> pd.Timestamp:
        """
        Get the last trading day on or before the given date.

        Args:
            date: Date to check (can be string or datetime)

        Returns:
            Last trading day as pd.Timestamp
        """
        if isinstance(date, str):
            date = pd.to_datetime(date)

        # Check if the date itself is a trading day
        if self._is_trading_day(date):
            return date

        # Otherwise, find the previous trading day
        for i in range(1, 10):  # Look back up to 10 days
            check_date = date - timedelta(days=i)
            if self._is_trading_day(check_date):
                return check_date

        # Fallback to the date itself if no trading day found
        return date

    def _is_trading_day(self, date) -> bool:
        """
        Check if a specific date is a trading day.

        Args:
            date: Date to check

        Returns:
            True if it's a trading day
        """
        if isinstance(date, str):
            date = pd.to_datetime(date)

        schedule = self.market_calendar.schedule(start_date=date, end_date=date)
        return len(schedule) > 0

    def _get_db_session(self) -> tuple[Session, bool]:
        """
        Get a database session.

        Returns:
            Tuple of (session, should_close) where should_close indicates
            whether the caller should close the session.
        """
        # Use injected session if available - should NOT be closed
        if self._db_session:
            return self._db_session, False

        # Otherwise, create a new session using session factory - should be closed
        try:
            session = SessionLocal()
            return session, True
        except Exception as e:
            logger.error(f"Failed to get database session: {e}", exc_info=True)
            raise

    def _get_or_create_stock(self, session: Session, symbol: str) -> Stock:
        """
        Get or create a stock in the database.

        Args:
            session: Database session
            symbol: Stock ticker symbol

        Returns:
            Stock object
        """
        stock = Stock.get_or_create(session, symbol)

        # Try to update stock info if it's missing
        company_name = getattr(stock, "company_name", None)
        if company_name is None or company_name == "":
            try:
                # Use connection pool for info retrieval
                info = self._yf_pool.get_info(symbol)

                stock.company_name = info.get("longName", info.get("shortName"))
                stock.sector = info.get("sector")
                stock.industry = info.get("industry")
                stock.exchange = info.get("exchange")
                stock.currency = info.get("currency", "USD")
                stock.country = info.get("country")

                session.commit()
            except Exception as e:
                logger.warning(f"Could not update stock info for {symbol}: {e}")
                session.rollback()

        return stock

    def _get_cached_price_data(
        self, session: Session, symbol: str, start_date: str, end_date: str
    ) -> pd.DataFrame | None:
        """
        DEPRECATED: Use _get_data_with_smart_cache instead.

        This method is kept for backward compatibility but is no longer used
        in the main flow. The new smart caching approach provides better
        database prioritization.
        """
        logger.warning("Using deprecated _get_cached_price_data method")
        return self._get_cached_data_flexible(
            session, symbol.upper(), start_date, end_date
        )

    def _cache_price_data(
        self, session: Session, symbol: str, df: pd.DataFrame
    ) -> None:
        """
        Cache price data in the database.

        Args:
            session: Database session
            symbol: Stock ticker symbol
            df: DataFrame with price data
        """
        try:
            if df.empty:
                return

            # Ensure symbol is uppercase to match database
            symbol = symbol.upper()

            # Ensure proper column names
            column_mapping = {
                "Open": "open",
                "High": "high",
                "Low": "low",
                "Close": "close",
                "Volume": "volume",
            }
            # Rename returns a new DataFrame, avoiding the need for an explicit copy first
            cache_df = df.rename(columns=column_mapping)

            # Log DataFrame info for debugging
            logger.debug(
                f"DataFrame columns before caching: {cache_df.columns.tolist()}"
            )
            logger.debug(f"DataFrame shape: {cache_df.shape}")
            logger.debug(f"DataFrame index type: {type(cache_df.index)}")
            if not cache_df.empty:
                logger.debug(f"Sample row: {cache_df.iloc[0].to_dict()}")

            # Insert data
            count = bulk_insert_price_data(session, symbol, cache_df)
            if count == 0:
                logger.info(
                    f"No new records cached for {symbol} (data may already exist)"
                )
            else:
                logger.info(f"Cached {count} new price records for {symbol}")

        except Exception as e:
            logger.error(f"Error caching price data for {symbol}: {e}", exc_info=True)
            session.rollback()

    def get_stock_data(
        self,
        symbol: str,
        start_date: str | None = None,
        end_date: str | None = None,
        period: str | None = None,
        interval: str = "1d",
        use_cache: bool = True,
    ) -> pd.DataFrame:
        """
        Fetch stock data with database caching support.

        This method prioritizes cached data from the database and only fetches
        missing data from yfinance when necessary.

        Args:
            symbol: Stock ticker symbol
            start_date: Start date in YYYY-MM-DD format
            end_date: End date in YYYY-MM-DD format
            period: Alternative to start/end dates (e.g., '1d', '5d', '1mo', '3mo', '1y', etc.)
            interval: Data interval ('1d', '1wk', '1mo', '1m', '5m', etc.)
            use_cache: Whether to use cached data if available

        Returns:
            DataFrame with stock data
        """
        # For non-daily intervals or periods, always fetch fresh data
        if interval != "1d" or period:
            return self._fetch_stock_data_from_yfinance(
                symbol, start_date, end_date, period, interval
            )

        # Set default dates if not provided
        if start_date is None:
            start_date = (datetime.now(UTC) - timedelta(days=365)).strftime("%Y-%m-%d")
        if end_date is None:
            end_date = datetime.now(UTC).strftime("%Y-%m-%d")

        # For daily data, adjust end date to last trading day if it's not a trading day
        # This prevents unnecessary cache misses on weekends/holidays
        if interval == "1d" and use_cache:
            end_dt = pd.to_datetime(end_date)
            if not self._is_trading_day(end_dt):
                last_trading = self._get_last_trading_day(end_dt)
                logger.debug(
                    f"Adjusting end date from {end_date} to last trading day {last_trading.strftime('%Y-%m-%d')}"
                )
                end_date = last_trading.strftime("%Y-%m-%d")

        # If cache is disabled, fetch directly from yfinance
        if not use_cache:
            logger.info(f"Cache disabled, fetching from yfinance for {symbol}")
            return self._fetch_stock_data_from_yfinance(
                symbol, start_date, end_date, period, interval
            )

        # Try a smarter caching approach
        try:
            return self._get_data_with_smart_cache(
                symbol, start_date, end_date, interval
            )
        except Exception as e:
            logger.warning(f"Smart cache failed, falling back to yfinance: {e}")
            return self._fetch_stock_data_from_yfinance(
                symbol, start_date, end_date, period, interval
            )

    async def get_stock_data_async(
        self,
        symbol: str,
        start_date: str | None = None,
        end_date: str | None = None,
        period: str | None = None,
        interval: str = "1d",
        use_cache: bool = True,
    ) -> pd.DataFrame:
        """
        Async version of get_stock_data for parallel processing.

        This method wraps the synchronous get_stock_data method to provide
        an async interface for use in parallel backtesting operations.

        Args:
            symbol: Stock ticker symbol
            start_date: Start date in YYYY-MM-DD format
            end_date: End date in YYYY-MM-DD format
            period: Alternative to start/end dates (e.g., '1d', '5d', '1mo', '3mo', '1y', etc.)
            interval: Data interval ('1d', '1wk', '1mo', '1m', '5m', etc.)
            use_cache: Whether to use cached data if available

        Returns:
            DataFrame with stock data
        """
        import asyncio
        import functools

        # Run the synchronous method in a thread pool to avoid blocking
        loop = asyncio.get_event_loop()

        # Use functools.partial to create a callable with all arguments
        sync_method = functools.partial(
            self.get_stock_data,
            symbol=symbol,
            start_date=start_date,
            end_date=end_date,
            period=period,
            interval=interval,
            use_cache=use_cache,
        )

        # Execute in thread pool to avoid blocking the event loop
        return await loop.run_in_executor(None, sync_method)

    @with_stock_data_circuit_breaker(
        use_fallback=False
    )  # Fallback handled at higher level
    def _fetch_stock_data_from_yfinance(
        self,
        symbol: str,
        start_date: str | None = None,
        end_date: str | None = None,
        period: str | None = None,
        interval: str = "1d",
    ) -> pd.DataFrame:
        """
        Fetch stock data from yfinance with circuit breaker protection.

        Note: Circuit breaker is applied with use_fallback=False because
        fallback strategies are handled at the get_stock_data level.
        """
        logger.info(
            f"Fetching data from yfinance for {symbol} - Start: {start_date}, End: {end_date}, Period: {period}, Interval: {interval}"
        )
        # Use connection pool for better performance
        # The pool handles session management and retries internally

        # Use the optimized connection pool
        df = self._yf_pool.get_history(
            symbol=symbol,
            start=start_date,
            end=end_date,
            period=period,
            interval=interval,
        )

        # Check if dataframe is empty or if required columns are missing
        if df.empty:
            logger.warning(f"Empty dataframe returned for {symbol}")
            return pd.DataFrame(
                columns=["Open", "High", "Low", "Close", "Volume"]  # type: ignore[arg-type]
            )

        # Ensure all expected columns exist
        for col in ["Open", "High", "Low", "Close", "Volume"]:
            if col not in df.columns:
                logger.warning(
                    f"Column {col} missing from data for {symbol}, adding empty column"
                )
                # Use appropriate default values
                if col == "Volume":
                    df[col] = 0
                else:
                    df[col] = 0.0

        df.index.name = "Date"
        return df

    def get_maverick_recommendations(
        self, limit: int = 20, min_score: int | None = None
    ) -> list[dict]:
        """
        Get top Maverick stock recommendations from the database.

        Args:
            limit: Maximum number of recommendations
            min_score: Minimum combined score filter

        Returns:
            List of stock recommendations with details
        """
        session, should_close = self._get_db_session()
        try:
            # Build query with filtering at database level
            query = session.query(MaverickStocks)

            # Apply min_score filter in the query if specified
            if min_score:
                query = query.filter(MaverickStocks.combined_score >= min_score)

            # Order by score and limit results
            stocks = (
                query.order_by(MaverickStocks.combined_score.desc()).limit(limit).all()
            )

            # Process results with list comprehension for better performance
            recommendations = [
                {
                    **stock.to_dict(),
                    "recommendation_type": "maverick_bullish",
                    "reason": self._generate_maverick_reason(stock),
                }
                for stock in stocks
            ]

            return recommendations
        except Exception as e:
            logger.error(f"Error getting maverick recommendations: {e}")
            return []
        finally:
            if should_close:
                session.close()

    def get_maverick_bear_recommendations(
        self, limit: int = 20, min_score: int | None = None
    ) -> list[dict]:
        """
        Get top Maverick bear stock recommendations from the database.

        Args:
            limit: Maximum number of recommendations
            min_score: Minimum score filter

        Returns:
            List of bear stock recommendations with details
        """
        session, should_close = self._get_db_session()
        try:
            # Build query with filtering at database level
            query = session.query(MaverickBearStocks)

            # Apply min_score filter in the query if specified
            if min_score:
                query = query.filter(MaverickBearStocks.score >= min_score)

            # Order by score and limit results
            stocks = query.order_by(MaverickBearStocks.score.desc()).limit(limit).all()

            # Process results with list comprehension for better performance
            recommendations = [
                {
                    **stock.to_dict(),
                    "recommendation_type": "maverick_bearish",
                    "reason": self._generate_bear_reason(stock),
                }
                for stock in stocks
            ]

            return recommendations
        except Exception as e:
            logger.error(f"Error getting bear recommendations: {e}")
            return []
        finally:
            if should_close:
                session.close()

    def get_supply_demand_breakout_recommendations(
        self, limit: int = 20, min_momentum_score: float | None = None
    ) -> list[dict]:
        """
        Get stocks showing supply/demand breakout patterns from accumulation phases.

        Args:
            limit: Maximum number of recommendations
            min_momentum_score: Minimum momentum score filter

        Returns:
            List of supply/demand breakout recommendations with market structure analysis
        """
        session, should_close = self._get_db_session()
        try:
            # Build query with all filters at database level
            query = session.query(SupplyDemandBreakoutStocks).filter(
                # Supply/demand breakout criteria: price above all moving averages (demand zone)
                SupplyDemandBreakoutStocks.close_price
                > SupplyDemandBreakoutStocks.sma_50,
                SupplyDemandBreakoutStocks.close_price
                > SupplyDemandBreakoutStocks.sma_150,
                SupplyDemandBreakoutStocks.close_price
                > SupplyDemandBreakoutStocks.sma_200,
                # Moving average alignment indicates accumulation structure
                SupplyDemandBreakoutStocks.sma_50 > SupplyDemandBreakoutStocks.sma_150,
                SupplyDemandBreakoutStocks.sma_150 > SupplyDemandBreakoutStocks.sma_200,
            )

            # Apply min_momentum_score filter if specified
            if min_momentum_score:
                query = query.filter(
                    SupplyDemandBreakoutStocks.momentum_score >= min_momentum_score
                )

            # Order by momentum score and limit results
            stocks = (
                query.order_by(SupplyDemandBreakoutStocks.momentum_score.desc())
                .limit(limit)
                .all()
            )

            # Process results with list comprehension for better performance
            recommendations = [
                {
                    **stock.to_dict(),
                    "recommendation_type": "supply_demand_breakout",
                    "reason": self._generate_supply_demand_reason(stock),
                }
                for stock in stocks
            ]

            return recommendations
        except Exception as e:
            logger.error(f"Error getting trending recommendations: {e}")
            return []
        finally:
            if should_close:
                session.close()

    def get_all_screening_recommendations(self) -> dict[str, list[dict]]:
        """
        Get all screening recommendations in one call.

        Returns:
            Dictionary with all screening types and their recommendations
        """
        try:
            results = get_latest_maverick_screening()

            # Add recommendation reasons
            for stock in results.get("maverick_stocks", []):
                stock["recommendation_type"] = "maverick_bullish"
                stock["reason"] = self._generate_maverick_reason_from_dict(stock)

            for stock in results.get("maverick_bear_stocks", []):
                stock["recommendation_type"] = "maverick_bearish"
                stock["reason"] = self._generate_bear_reason_from_dict(stock)

            for stock in results.get("supply_demand_breakouts", []):
                stock["recommendation_type"] = "supply_demand_breakout"
                stock["reason"] = self._generate_supply_demand_reason_from_dict(stock)

            return results
        except Exception as e:
            logger.error(f"Error getting all screening recommendations: {e}")
            return {
                "maverick_stocks": [],
                "maverick_bear_stocks": [],
                "supply_demand_breakouts": [],
            }

    def _generate_maverick_reason(self, stock: MaverickStocks) -> str:
        """Generate recommendation reason for Maverick stock."""
        reasons = []

        combined_score = getattr(stock, "combined_score", None)
        if combined_score is not None and combined_score >= 90:
            reasons.append("Exceptional combined score")
        elif combined_score is not None and combined_score >= 80:
            reasons.append("Strong combined score")

        momentum_score = getattr(stock, "momentum_score", None)
        if momentum_score is not None and momentum_score >= 90:
            reasons.append("outstanding relative strength")
        elif momentum_score is not None and momentum_score >= 80:
            reasons.append("strong relative strength")

        pat = getattr(stock, "pat", None)
        if pat is not None and pat != "":
            reasons.append(f"{pat} pattern detected")

        consolidation = getattr(stock, "consolidation", None)
        if consolidation is not None and consolidation == "yes":
            reasons.append("consolidation characteristics")

        sqz = getattr(stock, "sqz", None)
        if sqz is not None and sqz != "":
            reasons.append(f"squeeze indicator: {sqz}")

        return (
            "Bullish setup with " + ", ".join(reasons)
            if reasons
            else "Strong technical setup"
        )

    def _generate_bear_reason(self, stock: MaverickBearStocks) -> str:
        """Generate recommendation reason for bear stock."""
        reasons = []

        score = getattr(stock, "score", None)
        if score is not None and score >= 90:
            reasons.append("Exceptional bear score")
        elif score is not None and score >= 80:
            reasons.append("Strong bear score")

        momentum_score = getattr(stock, "momentum_score", None)
        if momentum_score is not None and momentum_score <= 30:
            reasons.append("weak relative strength")

        rsi_14 = getattr(stock, "rsi_14", None)
        if rsi_14 is not None and rsi_14 <= 30:
            reasons.append("oversold RSI")

        atr_contraction = getattr(stock, "atr_contraction", False)
        if atr_contraction is True:
            reasons.append("ATR contraction")

        big_down_vol = getattr(stock, "big_down_vol", False)
        if big_down_vol is True:
            reasons.append("heavy selling volume")

        return (
            "Bearish setup with " + ", ".join(reasons)
            if reasons
            else "Weak technical setup"
        )

    def _generate_supply_demand_reason(self, stock: SupplyDemandBreakoutStocks) -> str:
        """Generate recommendation reason for supply/demand breakout stock."""
        reasons = ["Supply/demand breakout from accumulation"]

        momentum_score = getattr(stock, "momentum_score", None)
        if momentum_score is not None and momentum_score >= 90:
            reasons.append("exceptional relative strength")
        elif momentum_score is not None and momentum_score >= 80:
            reasons.append("strong relative strength")

        reasons.append("price above all major moving averages")
        reasons.append("moving averages in proper alignment")

        pat = getattr(stock, "pat", None)
        if pat is not None and pat != "":
            reasons.append(f"{pat} pattern")

        return " with ".join(reasons)

    def _generate_maverick_reason_from_dict(self, stock: dict) -> str:
        """Generate recommendation reason for Maverick stock from dict."""
        reasons = []

        score = stock.get("combined_score", 0)
        if score >= 90:
            reasons.append("Exceptional combined score")
        elif score >= 80:
            reasons.append("Strong combined score")

        momentum = stock.get("momentum_score", 0)
        if momentum >= 90:
            reasons.append("outstanding relative strength")
        elif momentum >= 80:
            reasons.append("strong relative strength")

        if stock.get("pattern"):
            reasons.append(f"{stock['pattern']} pattern detected")

        if stock.get("consolidation") == "yes":
            reasons.append("consolidation characteristics")

        if stock.get("squeeze"):
            reasons.append(f"squeeze indicator: {stock['squeeze']}")

        return (
            "Bullish setup with " + ", ".join(reasons)
            if reasons
            else "Strong technical setup"
        )

    def _generate_bear_reason_from_dict(self, stock: dict) -> str:
        """Generate recommendation reason for bear stock from dict."""
        reasons = []

        score = stock.get("score", 0)
        if score >= 90:
            reasons.append("Exceptional bear score")
        elif score >= 80:
            reasons.append("Strong bear score")

        momentum = stock.get("momentum_score", 100)
        if momentum <= 30:
            reasons.append("weak relative strength")

        rsi = stock.get("rsi_14")
        if rsi and rsi <= 30:
            reasons.append("oversold RSI")

        if stock.get("atr_contraction"):
            reasons.append("ATR contraction")

        if stock.get("big_down_vol"):
            reasons.append("heavy selling volume")

        return (
            "Bearish setup with " + ", ".join(reasons)
            if reasons
            else "Weak technical setup"
        )

    def _generate_supply_demand_reason_from_dict(self, stock: dict) -> str:
        """Generate recommendation reason for supply/demand breakout stock from dict."""
        reasons = ["Supply/demand breakout from accumulation"]

        momentum = stock.get("momentum_score", 0)
        if momentum >= 90:
            reasons.append("exceptional relative strength")
        elif momentum >= 80:
            reasons.append("strong relative strength")

        reasons.append("price above all major moving averages")
        reasons.append("moving averages in proper alignment")

        if stock.get("pattern"):
            reasons.append(f"{stock['pattern']} pattern")

        return " with ".join(reasons)

    # Keep all original methods for backward compatibility
    @with_stock_data_circuit_breaker(use_fallback=False)
    def get_stock_info(self, symbol: str) -> dict:
        """Get detailed stock information from yfinance with circuit breaker protection."""
        # Use connection pool for better performance
        return self._yf_pool.get_info(symbol)

    def get_realtime_data(self, symbol):
        """Get the latest real-time data for a symbol using yfinance."""
        try:
            # Use connection pool for real-time data
            data = self._yf_pool.get_history(symbol, period="1d")

            if data.empty:
                return None

            latest = data.iloc[-1]

            # Get previous close for change calculation
            info = self._yf_pool.get_info(symbol)
            prev_close = info.get("previousClose", None)
            if prev_close is None:
                # Try to get from 2-day history
                data_2d = self._yf_pool.get_history(symbol, period="2d")
                if len(data_2d) > 1:
                    prev_close = data_2d.iloc[0]["Close"]
                else:
                    prev_close = latest["Close"]

            # Calculate change
            price = latest["Close"]
            change = price - prev_close
            change_percent = (change / prev_close) * 100 if prev_close != 0 else 0

            return {
                "symbol": symbol,
                "price": round(price, 2),
                "change": round(change, 2),
                "change_percent": round(change_percent, 2),
                "volume": int(latest["Volume"]),
                "timestamp": data.index[-1],
                "timestamp_display": data.index[-1].strftime("%Y-%m-%d %H:%M:%S"),
                "is_real_time": False,  # yfinance data has some delay
            }
        except Exception as e:
            logger.error(f"Error fetching realtime data for {symbol}: {str(e)}")
            return None

    def get_all_realtime_data(self, symbols):
        """
        Get all latest real-time data for multiple symbols efficiently.
        Optimized to use batch downloading to reduce network requests.
        """
        if not symbols:
            return {}

        results = {}
        try:
            # Batch download 5 days of data to ensure we have previous close
            # Using group_by='ticker' makes the structure predictable: Level 0 = Ticker, Level 1 = Price Type
            batch_df = self._yf_pool.batch_download(
                symbols=symbols, period="5d", interval="1d", group_by="ticker"
            )

            # Check if we got any data
            if batch_df.empty:
                logger.warning("Batch download returned empty DataFrame")
                return {}

            # Handle both MultiIndex (multiple symbols) and single symbol cases
            is_multi_ticker = isinstance(batch_df.columns, pd.MultiIndex)

            for symbol in symbols:
                try:
                    symbol_data = None

                    if is_multi_ticker:
                        if symbol in batch_df.columns:
                            symbol_data = batch_df[symbol]
                    elif len(symbols) == 1 and symbols[0] == symbol:
                        # Single symbol case, columns are just price types
                        symbol_data = batch_df

                    if symbol_data is None or symbol_data.empty:
                        logger.debug(f"No batch data for {symbol}, falling back to individual fetch")
                        # Fallback to individual fetch
                        data = self.get_realtime_data(symbol)
                        if data:
                            results[symbol] = data
                        continue

                    # Drop NaNs (e.g., if one stock has missing data for a day)
                    symbol_data = symbol_data.dropna(how="all")

                    if len(symbol_data) < 1:
                        continue

                    latest = symbol_data.iloc[-1]
                    price = float(latest["Close"])
                    volume = int(latest["Volume"])

                    # Calculate change
                    if len(symbol_data) > 1:
                        prev_close = float(symbol_data.iloc[-2]["Close"])
                        change = price - prev_close
                        change_percent = (
                            (change / prev_close) * 100 if prev_close != 0 else 0
                        )
                    else:
                        change = 0.0
                        change_percent = 0.0

                    results[symbol] = {
                        "symbol": symbol,
                        "price": round(price, 2),
                        "change": round(change, 2),
                        "change_percent": round(change_percent, 2),
                        "volume": volume,
                        "timestamp": symbol_data.index[-1],
                        "timestamp_display": symbol_data.index[-1].strftime(
                            "%Y-%m-%d %H:%M:%S"
                        ),
                        "is_real_time": False,  # yfinance data has some delay
                    }

                except Exception as e:
                    logger.error(f"Error processing batch data for {symbol}: {e}")
                    # Try fallback
                    data = self.get_realtime_data(symbol)
                    if data:
                        results[symbol] = data

        except Exception as e:
            logger.error(f"Batch download failed: {e}")
            # Fallback to iterative approach
            for symbol in symbols:
                data = self.get_realtime_data(symbol)
                if data:
                    results[symbol] = data

        return results

    def is_market_open(self) -> bool:
        """Check if the US stock market is currently open."""
        now = datetime.now(pytz.timezone("US/Eastern"))

        # Check if it's a weekday
        if now.weekday() >= 5:  # 5 and 6 are Saturday and Sunday
            return False

        # Check if it's between 9:30 AM and 4:00 PM Eastern Time
        market_open = now.replace(hour=9, minute=30, second=0, microsecond=0)
        market_close = now.replace(hour=16, minute=0, second=0, microsecond=0)

        return market_open <= now <= market_close

    def get_news(self, symbol: str, limit: int = 10) -> pd.DataFrame:
        """Get news for a stock from yfinance."""
        try:
            ticker = yf.Ticker(symbol)
            news = ticker.news

            if not news:
                return pd.DataFrame(
                    columns=[  # type: ignore[arg-type]
                        "title",
                        "publisher",
                        "link",
                        "providerPublishTime",
                        "type",
                    ]
                )

            df = pd.DataFrame(news[:limit])

            # Convert timestamp to datetime
            if "providerPublishTime" in df.columns:
                df["providerPublishTime"] = pd.to_datetime(
                    df["providerPublishTime"], unit="s"
                )

            return df
        except Exception as e:
            logger.error(f"Error fetching news for {symbol}: {str(e)}")
            return pd.DataFrame(
                columns=["title", "publisher", "link", "providerPublishTime", "type"]  # type: ignore[arg-type]
            )

    def get_earnings(self, symbol: str) -> dict:
        """Get earnings information for a stock."""
        try:
            ticker = yf.Ticker(symbol)
            return {
                "earnings": ticker.earnings.to_dict()
                if hasattr(ticker, "earnings") and not ticker.earnings.empty
                else {},
                "earnings_dates": ticker.earnings_dates.to_dict()
                if hasattr(ticker, "earnings_dates") and not ticker.earnings_dates.empty
                else {},
                "earnings_trend": ticker.earnings_trend
                if hasattr(ticker, "earnings_trend")
                else {},
            }
        except Exception as e:
            logger.error(f"Error fetching earnings for {symbol}: {str(e)}")
            return {"earnings": {}, "earnings_dates": {}, "earnings_trend": {}}

    def get_recommendations(self, symbol: str) -> pd.DataFrame:
        """Get analyst recommendations for a stock."""
        try:
            ticker = yf.Ticker(symbol)
            recommendations = ticker.recommendations

            if recommendations is None or recommendations.empty:
                return pd.DataFrame(columns=["firm", "toGrade", "fromGrade", "action"])  # type: ignore[arg-type]

            return recommendations
        except Exception as e:
            logger.error(f"Error fetching recommendations for {symbol}: {str(e)}")
            return pd.DataFrame(columns=["firm", "toGrade", "fromGrade", "action"])  # type: ignore[arg-type]

    def is_etf(self, symbol: str) -> bool:
        """Check if a given symbol is an ETF."""
        try:
            stock = yf.Ticker(symbol)
            # Check if quoteType exists and is ETF
            if "quoteType" in stock.info:
                return stock.info["quoteType"].upper() == "ETF"  # type: ignore[no-any-return]
            # Fallback check for common ETF identifiers
            return any(
                [
                    symbol.endswith(("ETF", "FUND")),
                    symbol
                    in [
                        "SPY",
                        "QQQ",
                        "IWM",
                        "DIA",
                        "XLB",
                        "XLE",
                        "XLF",
                        "XLI",
                        "XLK",
                        "XLP",
                        "XLU",
                        "XLV",
                        "XLY",
                        "XLC",
                        "XLRE",
                        "XME",
                    ],
                    "ETF" in stock.info.get("longName", "").upper(),
                ]
            )
        except Exception as e:
            logger.error(f"Error checking if {symbol} is ETF: {e}")
            return False


# Maintain backward compatibility
StockDataProvider = EnhancedStockDataProvider

```

--------------------------------------------------------------------------------
/tests/fixtures/orchestration_fixtures.py:
--------------------------------------------------------------------------------

```python
"""
Comprehensive test fixtures for orchestration testing.

Provides realistic mock data for LLM responses, API responses, market data,
and test scenarios for the SupervisorAgent and DeepResearchAgent orchestration system.
"""

import json
from datetime import datetime, timedelta
from typing import Any
from unittest.mock import MagicMock

import numpy as np
import pandas as pd
import pytest
from langchain_core.messages import AIMessage

# ==============================================================================
# MOCK LLM RESPONSES
# ==============================================================================


class MockLLMResponses:
    """Realistic LLM responses for various orchestration scenarios."""

    @staticmethod
    def query_classification_response(
        category: str = "stock_investment_decision",
        confidence: float = 0.85,
        parallel_capable: bool = True,
    ) -> str:
        """Mock query classification response from LLM."""
        routing_agents_map = {
            "market_screening": ["market"],
            "technical_analysis": ["technical"],
            "stock_investment_decision": ["market", "technical"],
            "portfolio_analysis": ["market", "technical"],
            "deep_research": ["research"],
            "company_research": ["research"],
            "sentiment_analysis": ["research"],
            "risk_assessment": ["market", "technical"],
        }

        return json.dumps(
            {
                "category": category,
                "confidence": confidence,
                "required_agents": routing_agents_map.get(category, ["market"]),
                "complexity": "moderate" if confidence > 0.7 else "complex",
                "estimated_execution_time_ms": 45000
                if category == "deep_research"
                else 30000,
                "parallel_capable": parallel_capable,
                "reasoning": f"Query classified as {category} based on content analysis and intent detection.",
            }
        )

    @staticmethod
    def result_synthesis_response(
        persona: str = "moderate",
        agents_used: list[str] = None,
        confidence: float = 0.82,
    ) -> str:
        """Mock result synthesis response from LLM."""
        if agents_used is None:
            agents_used = ["market", "technical"]

        persona_focused_content = {
            "conservative": """
            Based on comprehensive analysis from our specialist agents, AAPL presents a balanced investment opportunity
            with strong fundamentals and reasonable risk profile. The market analysis indicates stable sector
            positioning with consistent dividend growth, while technical indicators suggest a consolidation phase
            with support at $170. For conservative investors, consider gradual position building with
            strict stop-loss at $165 to preserve capital. The risk-adjusted return profile aligns well
            with conservative portfolio objectives, offering both income stability and modest growth potential.
            """,
            "moderate": """
            Our multi-agent analysis reveals AAPL as a compelling investment opportunity with balanced risk-reward
            characteristics. Market screening identified strong fundamentals including 15% revenue growth and
            expanding services segment. Technical analysis shows bullish momentum with RSI at 58 and MACD
            trending positive. Entry points around $175-180 offer favorable risk-reward with targets at $200-210.
            Position sizing of 3-5% of portfolio aligns with moderate risk tolerance while capitalizing on
            the current uptrend momentum.
            """,
            "aggressive": """
            Multi-agent analysis identifies AAPL as a high-conviction growth play with exceptional upside potential.
            Market analysis reveals accelerating AI adoption driving hardware refresh cycles, while technical
            indicators signal strong breakout momentum above $185 resistance. The confluence of fundamental
            catalysts and technical setup supports aggressive position sizing up to 8-10% allocation.
            Target price of $220+ represents 25% upside with momentum likely to continue through earnings season.
            This represents a prime opportunity for growth-focused portfolios seeking alpha generation.
            """,
        }

        return persona_focused_content.get(
            persona, persona_focused_content["moderate"]
        ).strip()

    @staticmethod
    def content_analysis_response(
        sentiment: str = "bullish", confidence: float = 0.75, credibility: float = 0.8
    ) -> str:
        """Mock content analysis response from LLM."""
        return json.dumps(
            {
                "KEY_INSIGHTS": [
                    "Apple's Q4 earnings exceeded expectations with 15% revenue growth",
                    "Services segment continues to expand with 12% year-over-year growth",
                    "iPhone 15 sales showing strong adoption in key markets",
                    "Cash position remains robust at $165B supporting capital allocation",
                    "AI integration across product line driving next upgrade cycle",
                ],
                "SENTIMENT": {"direction": sentiment, "confidence": confidence},
                "RISK_FACTORS": [
                    "China market regulatory concerns persist",
                    "Supply chain dependencies in Taiwan and South Korea",
                    "Increasing competition in services market",
                    "Currency headwinds affecting international revenue",
                ],
                "OPPORTUNITIES": [
                    "AI-powered device upgrade cycle beginning",
                    "Vision Pro market penetration expanding",
                    "Services recurring revenue model strengthening",
                    "Emerging markets iPhone adoption accelerating",
                ],
                "CREDIBILITY": credibility,
                "RELEVANCE": 0.9,
                "SUMMARY": f"Comprehensive analysis suggests {sentiment} outlook for Apple with strong fundamentals and growth catalysts, though regulatory and competitive risks require monitoring.",
            }
        )

    @staticmethod
    def research_synthesis_response(persona: str = "moderate") -> str:
        """Mock research synthesis response for deep research agent."""
        synthesis_by_persona = {
            "conservative": """
            ## Executive Summary
            Apple represents a stable, dividend-paying technology stock suitable for conservative portfolios seeking
            balanced growth and income preservation.

            ## Key Findings
            • Consistent dividend growth averaging 8% annually over past 5 years
            • Strong balance sheet with $165B cash providing downside protection
            • Services revenue provides recurring income stream growing at 12% annually
            • P/E ratio of 28x reasonable for quality growth stock
            • Beta of 1.1 indicates moderate volatility relative to market
            • Debt-to-equity ratio of 0.3 shows conservative capital structure
            • Free cash flow yield of 3.2% supports dividend sustainability

            ## Investment Implications for Conservative Investors
            Apple's combination of dividend growth, balance sheet strength, and market leadership makes it suitable
            for conservative portfolios. The company's pivot to services provides recurring revenue stability while
            hardware sales offer moderate growth potential.

            ## Risk Considerations
            Primary risks include China market exposure (19% of revenue), technology obsolescence, and regulatory
            pressure on App Store policies. However, strong cash position provides significant downside protection.

            ## Recommended Actions
            Consider 2-3% portfolio allocation with gradual accumulation on pullbacks below $170.
            Appropriate stop-loss at $160 to limit downside risk.
            """,
            "moderate": """
            ## Executive Summary
            Apple presents a balanced investment opportunity combining growth potential with quality fundamentals,
            well-suited for diversified moderate-risk portfolios.

            ## Key Findings
            • Revenue growth acceleration to 15% driven by AI-enhanced products
            • Services segment margins expanding to 70%, improving overall profitability
            • Strong competitive moats in ecosystem and brand loyalty
            • Capital allocation balance between growth investment and shareholder returns
            • Technical indicators suggesting continued uptrend momentum
            • Valuation appears fair at current levels with room for multiple expansion
            • Market leadership position in premium smartphone and tablet segments

            ## Investment Implications for Moderate Investors
            Apple offers an attractive blend of stability and growth potential. The company's evolution toward
            services provides recurring revenue while hardware innovation drives periodic upgrade cycles.

            ## Risk Considerations
            Key risks include supply chain disruption, China regulatory issues, and increasing competition
            in services. Currency headwinds may impact international revenue growth.

            ## Recommended Actions
            Target 4-5% portfolio allocation with entry points between $175-185. Consider taking profits
            above $210 and adding on weakness below $170.
            """,
            "aggressive": """
            ## Executive Summary
            Apple stands at the forefront of the next technology revolution with AI integration across its ecosystem,
            presenting significant alpha generation potential for growth-focused investors.

            ## Key Findings
            • AI-driven product refresh cycle beginning with iPhone 15 Pro and Vision Pro launch
            • Services revenue trajectory accelerating with 18% growth potential
            • Market share expansion opportunities in emerging markets and enterprise
            • Vision Pro early adoption exceeding expectations, validating spatial computing thesis
            • Developer ecosystem strengthening with AI tools integration
            • Operating leverage improving with services mix shift
            • Stock momentum indicators showing bullish technical setup

            ## Investment Implications for Aggressive Investors
            Apple represents a high-conviction growth play with multiple expansion catalysts. The convergence
            of AI adoption, new product categories, and services growth creates exceptional upside potential.

            ## Risk Considerations
            Execution risk on Vision Pro adoption, competitive response from Android ecosystem, and
            regulatory pressure on App Store represent key downside risks requiring active monitoring.

            ## Recommended Actions
            Consider aggressive 8-10% allocation with momentum-based entry above $185 resistance.
            Target price $230+ over 12-month horizon with trailing stop at 15% to protect gains.
            """,
        }

        return synthesis_by_persona.get(
            persona, synthesis_by_persona["moderate"]
        ).strip()


# ==============================================================================
# MOCK EXA API RESPONSES
# ==============================================================================


class MockExaResponses:
    """Realistic Exa API responses for financial research."""

    @staticmethod
    def search_results_aapl() -> list[dict[str, Any]]:
        """Mock Exa search results for AAPL analysis."""
        return [
            {
                "url": "https://www.bloomberg.com/news/articles/2024-01-15/apple-earnings-beat",
                "title": "Apple Earnings Beat Expectations as iPhone Sales Surge",
                "content": "Apple Inc. reported quarterly revenue of $119.6 billion, surpassing analyst expectations as iPhone 15 sales showed strong momentum in key markets. The technology giant's services segment grew 12% year-over-year to $23.1 billion, demonstrating the recurring revenue model's strength. CEO Tim Cook highlighted AI integration across the product lineup as a key driver for the next upgrade cycle. Gross margins expanded to 45.9% compared to 43.3% in the prior year period, reflecting improved mix and operational efficiency. The company's cash position remains robust at $165.1 billion, providing flexibility for strategic investments and shareholder returns. China revenue declined 2% due to competitive pressures, though management expressed optimism about long-term opportunities in the region.",
                "summary": "Apple exceeded Q4 earnings expectations with strong iPhone 15 sales and services growth, while maintaining robust cash position and expanding margins despite China headwinds.",
                "highlights": [
                    "iPhone 15 strong sales momentum",
                    "Services grew 12% year-over-year",
                    "$165.1B cash position",
                ],
                "published_date": "2024-01-15T08:30:00Z",
                "author": "Mark Gurman",
                "score": 0.94,
                "provider": "exa",
            },
            {
                "url": "https://seekingalpha.com/article/4665432-apple-stock-analysis-ai-catalyst",
                "title": "Apple Stock: AI Integration Could Drive Next Super Cycle",
                "content": "Apple's integration of artificial intelligence across its ecosystem represents a potential catalyst for the next device super cycle. The company's on-device AI processing capabilities, enabled by the A17 Pro chip, position Apple uniquely in the mobile AI landscape. Industry analysts project AI-enhanced features could drive iPhone replacement cycles to accelerate from the current 3.5 years to approximately 2.8 years. The services ecosystem benefits significantly from AI integration, with enhanced Siri capabilities driving increased App Store engagement and subscription services adoption. Vision Pro early metrics suggest spatial computing adoption is tracking ahead of initial estimates, with developer interest surging 300% quarter-over-quarter. The convergence of AI, spatial computing, and services creates multiple revenue expansion vectors over the next 3-5 years.",
                "summary": "AI integration across Apple's ecosystem could accelerate device replacement cycles and expand services revenue through enhanced user engagement.",
                "highlights": [
                    "AI-driven replacement cycle acceleration",
                    "Vision Pro adoption tracking well",
                    "Services ecosystem AI benefits",
                ],
                "published_date": "2024-01-14T14:20:00Z",
                "author": "Tech Analyst Team",
                "score": 0.87,
                "provider": "exa",
            },
            {
                "url": "https://www.morningstar.com/stocks/aapl-valuation-analysis",
                "title": "Apple Valuation Analysis: Fair Value Assessment",
                "content": "Our discounted cash flow analysis suggests Apple's fair value ranges between $185-195 per share, indicating the stock trades near intrinsic value at current levels. The company's transition toward higher-margin services revenue supports multiple expansion, though hardware cycle dependency introduces valuation volatility. Key valuation drivers include services attach rates (currently 85% of active devices), gross margin trajectory (target 47-48% long-term), and capital allocation efficiency. The dividend yield of 0.5% appears sustainable with strong free cash flow generation of $95+ billion annually. Compared to technology peers, Apple trades at a 15% premium to the sector median, justified by superior return on invested capital and cash generation capabilities.",
                "summary": "DCF analysis places Apple's fair value at $185-195, with current valuation supported by services transition and strong cash generation.",
                "highlights": [
                    "Fair value $185-195 range",
                    "Services driving multiple expansion",
                    "Strong free cash flow $95B+",
                ],
                "published_date": "2024-01-13T11:45:00Z",
                "author": "Sarah Chen",
                "score": 0.91,
                "provider": "exa",
            },
            {
                "url": "https://www.reuters.com/technology/apple-china-challenges-2024-01-12",
                "title": "Apple Faces Growing Competition in China Market",
                "content": "Apple confronts intensifying competition in China as local brands gain market share and regulatory scrutiny increases. Huawei's Mate 60 Pro launch has resonated strongly with Chinese consumers, contributing to Apple's 2% revenue decline in Greater China for Q4. The Chinese government's restrictions on iPhone use in government agencies signal potential broader policy shifts. Despite challenges, Apple maintains premium market leadership with 47% share in smartphones priced above $600. Management highlighted ongoing investments in local partnerships and supply chain relationships to navigate the complex regulatory environment. The company's services revenue in China grew 8% despite hardware headwinds, demonstrating ecosystem stickiness among existing users.",
                "summary": "Apple faces competitive and regulatory challenges in China, though maintains premium market leadership and growing services revenue.",
                "highlights": [
                    "China revenue down 2%",
                    "Regulatory iPhone restrictions",
                    "Premium segment leadership maintained",
                ],
                "published_date": "2024-01-12T16:15:00Z",
                "author": "Reuters Technology Team",
                "score": 0.89,
                "provider": "exa",
            },
        ]

    @staticmethod
    def search_results_market_sentiment() -> list[dict[str, Any]]:
        """Mock Exa results for market sentiment analysis."""
        return [
            {
                "url": "https://www.cnbc.com/2024/01/16/market-outlook-tech-stocks",
                "title": "Tech Stocks Rally on AI Optimism Despite Rate Concerns",
                "content": "Technology stocks surged 2.3% as artificial intelligence momentum overcame Federal Reserve policy concerns. Investors rotated into AI-beneficiary names including Apple, Microsoft, and Nvidia following strong earnings guidance across the sector. The Technology Select Sector SPDR ETF (XLK) reached new 52-week highs despite 10-year Treasury yields hovering near 4.5%. Institutional flows show $12.8 billion net inflows to technology funds over the past month, the strongest since early 2023. Options activity indicates continued bullish sentiment with call volume exceeding puts by 1.8:1 across major tech names. Analyst upgrades accelerated with 67% of tech stocks carrying buy ratings versus 52% sector average.",
                "summary": "Tech stocks rally on AI optimism with strong institutional inflows and bullish options activity despite interest rate headwinds.",
                "highlights": [
                    "Tech sector +2.3%",
                    "$12.8B institutional inflows",
                    "Call/put ratio 1.8:1",
                ],
                "published_date": "2024-01-16T09:45:00Z",
                "author": "CNBC Markets Team",
                "score": 0.92,
                "provider": "exa",
            },
            {
                "url": "https://finance.yahoo.com/news/vix-fear-greed-market-sentiment",
                "title": "VIX Falls to Multi-Month Lows as Fear Subsides",
                "content": "The VIX volatility index dropped to 13.8, the lowest level since November 2021, signaling reduced market anxiety and increased risk appetite among investors. The CNN Fear & Greed Index shifted to 'Greed' territory at 72, up from 'Neutral' just two weeks ago. Credit spreads tightened across investment-grade and high-yield markets, with IG spreads at 85 basis points versus 110 in December. Equity put/call ratios declined to 0.45, indicating overwhelming bullish positioning. Margin debt increased 8% month-over-month as investors leverage up for continued market gains.",
                "summary": "Market sentiment indicators show reduced fear and increased greed with VIX at multi-month lows and bullish positioning accelerating.",
                "highlights": [
                    "VIX at 13.8 multi-month low",
                    "Fear & Greed at 72",
                    "Margin debt up 8%",
                ],
                "published_date": "2024-01-16T14:30:00Z",
                "author": "Market Sentiment Team",
                "score": 0.88,
                "provider": "exa",
            },
        ]

    @staticmethod
    def search_results_empty() -> list[dict[str, Any]]:
        """Mock empty Exa search results for testing edge cases."""
        return []

    @staticmethod
    def search_results_low_quality() -> list[dict[str, Any]]:
        """Mock low-quality Exa search results for credibility testing."""
        return [
            {
                "url": "https://sketchy-site.com/apple-prediction",
                "title": "AAPL Will 100X - Trust Me Bro Analysis",
                "content": "Apple stock is going to the moon because reasons. My uncle works at Apple and says they're releasing iPhones made of gold next year. This is not financial advice but also definitely is financial advice. Buy now or cry later. Diamond hands to the moon rockets.",
                "summary": "Questionable analysis with unsubstantiated claims about Apple's prospects.",
                "highlights": [
                    "Gold iPhones coming",
                    "100x returns predicted",
                    "Uncle insider info",
                ],
                "published_date": "2024-01-16T23:59:00Z",
                "author": "Random Internet User",
                "score": 0.12,
                "provider": "exa",
            }
        ]


# ==============================================================================
# MOCK TAVILY API RESPONSES
# ==============================================================================


class MockTavilyResponses:
    """Realistic Tavily API responses for web search."""

    @staticmethod
    def search_results_aapl() -> dict[str, Any]:
        """Mock Tavily search response for AAPL analysis."""
        return {
            "query": "Apple stock analysis AAPL investment outlook",
            "follow_up_questions": [
                "What are Apple's main revenue drivers?",
                "How does Apple compare to competitors?",
                "What are the key risks for Apple stock?",
            ],
            "answer": "Apple (AAPL) shows strong fundamentals with growing services revenue and AI integration opportunities, though faces competition in China and regulatory pressures.",
            "results": [
                {
                    "title": "Apple Stock Analysis: Strong Fundamentals Despite Headwinds",
                    "url": "https://www.fool.com/investing/2024/01/15/apple-stock-analysis",
                    "content": "Apple's latest quarter demonstrated the resilience of its business model, with services revenue hitting a new record and iPhone sales exceeding expectations. The company's focus on artificial intelligence integration across its product ecosystem positions it well for future growth cycles. However, investors should monitor China market dynamics and App Store regulatory challenges that could impact long-term growth trajectories.",
                    "raw_content": "Apple Inc. (AAPL) continues to demonstrate strong business fundamentals in its latest quarterly report, with services revenue reaching new records and iPhone sales beating analyst expectations across key markets. The technology giant has strategically positioned itself at the forefront of artificial intelligence integration, with on-device AI processing capabilities that differentiate its products from competitors. Looking ahead, the company's ecosystem approach and services transition provide multiple growth vectors, though challenges in China and regulatory pressures on App Store policies require careful monitoring. The stock's current valuation appears reasonable given the company's cash generation capabilities and market position.",
                    "published_date": "2024-01-15",
                    "score": 0.89,
                },
                {
                    "title": "Tech Sector Outlook: AI Revolution Drives Growth",
                    "url": "https://www.barrons.com/articles/tech-outlook-ai-growth",
                    "content": "The technology sector stands at the beginning of a multi-year artificial intelligence transformation that could reshape revenue models and competitive dynamics. Companies with strong AI integration capabilities, including Apple, Microsoft, and Google, are positioned to benefit from this shift. Apple's approach of on-device AI processing provides privacy advantages and reduces cloud infrastructure costs compared to competitors relying heavily on cloud-based AI services.",
                    "raw_content": "The technology sector is experiencing a fundamental transformation as artificial intelligence capabilities become central to product differentiation and user experience. Companies that can effectively integrate AI while maintaining user privacy and system performance are likely to capture disproportionate value creation over the next 3-5 years. Apple's strategy of combining custom silicon with on-device AI processing provides competitive advantages in both performance and privacy, potentially driving accelerated device replacement cycles and services engagement. This positions Apple favorably compared to competitors relying primarily on cloud-based AI infrastructure.",
                    "published_date": "2024-01-14",
                    "score": 0.85,
                },
                {
                    "title": "Investment Analysis: Apple's Services Transformation",
                    "url": "https://www.investopedia.com/apple-services-analysis",
                    "content": "Apple's transformation from a hardware-centric to services-enabled company continues to gain momentum, with services revenue now representing over 22% of total revenue and growing at double-digit rates. This shift toward recurring revenue streams improves business model predictability and supports higher valuation multiples. The company's services ecosystem benefits from its large installed base and strong customer loyalty metrics.",
                    "raw_content": "Apple Inc.'s strategic evolution toward a services-centric business model represents one of the most successful corporate transformations in technology sector history. The company has leveraged its installed base of over 2 billion active devices to create a thriving services ecosystem encompassing the App Store, Apple Music, iCloud, Apple Pay, and various subscription services. This services revenue now exceeds $85 billion annually and continues growing at rates exceeding 10% year-over-year, providing both revenue diversification and margin enhancement. The recurring nature of services revenue creates more predictable cash flows and justifies premium valuation multiples compared to pure hardware companies.",
                    "published_date": "2024-01-13",
                    "score": 0.91,
                },
            ],
            "response_time": 1.2,
        }

    @staticmethod
    def search_results_market_sentiment() -> dict[str, Any]:
        """Mock Tavily search response for market sentiment analysis."""
        return {
            "query": "stock market sentiment investor mood analysis 2024",
            "follow_up_questions": [
                "What are current market sentiment indicators?",
                "How do investors feel about tech stocks?",
                "What factors are driving market optimism?",
            ],
            "answer": "Current market sentiment shows cautious optimism with reduced volatility and increased risk appetite, driven by AI enthusiasm and strong corporate earnings despite interest rate concerns.",
            "results": [
                {
                    "title": "Market Sentiment Indicators Signal Bullish Mood",
                    "url": "https://www.marketwatch.com/story/market-sentiment-bullish",
                    "content": "Multiple sentiment indicators suggest investors have shifted from defensive to risk-on positioning as 2024 progresses. The VIX volatility index has declined to multi-month lows while institutional money flows accelerate into equities. Credit markets show tightening spreads and increased issuance activity, reflecting improved risk appetite across asset classes.",
                    "raw_content": "A comprehensive analysis of market sentiment indicators reveals a significant shift in investor psychology over the past month. The CBOE Volatility Index (VIX) has dropped below 14, its lowest level since late 2021, indicating reduced fear and increased complacency among options traders. Simultaneously, the American Association of Individual Investors (AAII) sentiment survey shows bullish respondents outnumbering bearish by a 2:1 margin, the widest spread since early 2023. Institutional flows data from EPFR shows $45 billion in net inflows to equity funds over the past four weeks, with technology and growth sectors receiving disproportionate allocation.",
                    "published_date": "2024-01-16",
                    "score": 0.93,
                },
                {
                    "title": "Investor Psychology: Fear of Missing Out Returns",
                    "url": "https://www.wsj.com/markets/stocks/fomo-returns-markets",
                    "content": "The fear of missing out (FOMO) mentality has returned to equity markets as investors chase performance and increase leverage. Margin debt has increased significantly while cash positions at major brokerages have declined to multi-year lows. This shift in behavior suggests sentiment has moved from cautious to optimistic, though some analysts warn of potential overextension.",
                    "raw_content": "Behavioral indicators suggest a fundamental shift in investor psychology from the cautious stance that characterized much of 2023 to a more aggressive, opportunity-seeking mindset. NYSE margin debt has increased 15% over the past two months, reaching $750 billion as investors leverage up to participate in market gains. Cash positions at major discount brokerages have declined to just 3.2% of assets, compared to 5.8% during peak uncertainty in October 2023. Options market activity shows call volume exceeding put volume by the widest margin in 18 months, with particular strength in technology and AI-related names.",
                    "published_date": "2024-01-15",
                    "score": 0.88,
                },
            ],
            "response_time": 1.4,
        }

    @staticmethod
    def search_results_error() -> dict[str, Any]:
        """Mock Tavily error response for testing error handling."""
        return {
            "error": "rate_limit_exceeded",
            "message": "API rate limit exceeded. Please try again later.",
            "retry_after": 60,
        }


# ==============================================================================
# MOCK MARKET DATA
# ==============================================================================


class MockMarketData:
    """Realistic market data for testing financial analysis."""

    @staticmethod
    def stock_price_history(
        symbol: str = "AAPL", days: int = 100, current_price: float = 185.0
    ) -> pd.DataFrame:
        """Generate realistic stock price history."""
        end_date = datetime.now()
        start_date = end_date - timedelta(days=days)
        dates = pd.date_range(start=start_date, end=end_date, freq="D")

        # Generate realistic price movement
        np.random.seed(42)  # Consistent data for testing
        returns = np.random.normal(
            0.0008, 0.02, len(dates)
        )  # ~0.2% daily return, 2% volatility

        # Start with a base price and apply returns
        base_price = current_price * 0.9  # Start 10% lower
        prices = [base_price]

        for return_val in returns[1:]:
            next_price = prices[-1] * (1 + return_val)
            prices.append(max(next_price, 50))  # Floor price at $50

        # Create OHLCV data
        data = []
        for i, (date, close_price) in enumerate(zip(dates, prices, strict=False)):
            # Generate realistic OHLC from close price
            volatility = abs(np.random.normal(0, 0.015))  # Intraday volatility

            high = close_price * (1 + volatility)
            low = close_price * (1 - volatility)

            # Determine open based on previous close with gap
            if i == 0:
                open_price = close_price
            else:
                gap = np.random.normal(0, 0.005)  # Small gap
                open_price = prices[i - 1] * (1 + gap)

            # Ensure OHLC relationships are valid
            high = max(high, open_price, close_price)
            low = min(low, open_price, close_price)

            # Generate volume
            base_volume = 50_000_000  # Base volume
            volume_multiplier = np.random.uniform(0.5, 2.0)
            volume = int(base_volume * volume_multiplier)

            data.append(
                {
                    "Date": date,
                    "Open": round(open_price, 2),
                    "High": round(high, 2),
                    "Low": round(low, 2),
                    "Close": round(close_price, 2),
                    "Volume": volume,
                }
            )

        df = pd.DataFrame(data)
        df.set_index("Date", inplace=True)
        return df

    @staticmethod
    def technical_indicators(symbol: str = "AAPL") -> dict[str, Any]:
        """Mock technical indicators for a stock."""
        return {
            "symbol": symbol,
            "timestamp": datetime.now(),
            "rsi": {
                "value": 58.3,
                "signal": "neutral",
                "interpretation": "Neither overbought nor oversold",
            },
            "macd": {
                "value": 2.15,
                "signal_line": 1.89,
                "histogram": 0.26,
                "signal": "bullish",
                "interpretation": "MACD above signal line suggests bullish momentum",
            },
            "bollinger_bands": {
                "upper": 192.45,
                "middle": 185.20,
                "lower": 177.95,
                "position": "middle",
                "squeeze": False,
            },
            "moving_averages": {
                "sma_20": 183.45,
                "sma_50": 178.90,
                "sma_200": 172.15,
                "ema_12": 184.80,
                "ema_26": 181.30,
            },
            "support_resistance": {
                "support_levels": [175.00, 170.50, 165.25],
                "resistance_levels": [190.00, 195.75, 200.50],
                "current_level": "between_support_resistance",
            },
            "volume_analysis": {
                "average_volume": 52_000_000,
                "current_volume": 68_000_000,
                "relative_volume": 1.31,
                "volume_trend": "increasing",
            },
        }

    @staticmethod
    def market_overview() -> dict[str, Any]:
        """Mock market overview data."""
        return {
            "timestamp": datetime.now(),
            "indices": {
                "SPY": {"price": 485.30, "change": +2.15, "change_pct": +0.44},
                "QQQ": {"price": 412.85, "change": +5.42, "change_pct": +1.33},
                "IWM": {"price": 195.67, "change": -1.23, "change_pct": -0.62},
                "VIX": {"price": 13.8, "change": -1.2, "change_pct": -8.0},
            },
            "sector_performance": {
                "Technology": +1.85,
                "Healthcare": +0.45,
                "Financial Services": -0.32,
                "Consumer Cyclical": +0.78,
                "Industrials": -0.15,
                "Energy": -1.22,
                "Utilities": +0.33,
                "Real Estate": +0.91,
                "Materials": -0.67,
                "Consumer Defensive": +0.12,
                "Communication Services": +1.34,
            },
            "market_breadth": {
                "advancers": 1845,
                "decliners": 1230,
                "unchanged": 125,
                "new_highs": 89,
                "new_lows": 12,
                "up_volume": 8.2e9,
                "down_volume": 4.1e9,
            },
            "sentiment_indicators": {
                "fear_greed_index": 72,
                "vix_level": "low",
                "put_call_ratio": 0.45,
                "margin_debt_trend": "increasing",
            },
        }


# ==============================================================================
# TEST QUERY EXAMPLES
# ==============================================================================


class TestQueries:
    """Realistic user queries for different classification categories."""

    MARKET_SCREENING = [
        "Find me momentum stocks in the technology sector with strong earnings growth",
        "Screen for dividend-paying stocks with yields above 3% and consistent payout history",
        "Show me small-cap stocks with high revenue growth and low debt levels",
        "Find stocks breaking out of consolidation patterns with increasing volume",
        "Screen for value stocks trading below book value with improving fundamentals",
    ]

    COMPANY_RESEARCH = [
        "Analyze Apple's competitive position in the smartphone market",
        "Research Tesla's battery technology advantages and manufacturing scale",
        "Provide comprehensive analysis of Microsoft's cloud computing strategy",
        "Analyze Amazon's e-commerce margins and AWS growth potential",
        "Research Nvidia's AI chip market dominance and competitive threats",
    ]

    TECHNICAL_ANALYSIS = [
        "Analyze AAPL's chart patterns and provide entry/exit recommendations",
        "What do the technical indicators say about SPY's short-term direction?",
        "Analyze TSLA's support and resistance levels for swing trading",
        "Show me the RSI and MACD signals for QQQ",
        "Identify chart patterns in the Nasdaq that suggest market direction",
    ]

    SENTIMENT_ANALYSIS = [
        "What's the current market sentiment around tech stocks?",
        "Analyze investor sentiment toward electric vehicle companies",
        "How are traders feeling about the Fed's interest rate policy?",
        "What's the mood in crypto markets right now?",
        "Analyze sentiment around bank stocks after recent earnings",
    ]

    PORTFOLIO_ANALYSIS = [
        "Optimize my portfolio allocation for moderate risk tolerance",
        "Analyze the correlation between my holdings and suggest diversification",
        "Review my portfolio for sector concentration risk",
        "Suggest rebalancing strategy for my retirement portfolio",
        "Analyze my portfolio's beta and suggest hedging strategies",
    ]

    RISK_ASSESSMENT = [
        "Calculate appropriate position size for AAPL given my $100k account",
        "What's the maximum drawdown risk for a 60/40 portfolio?",
        "Analyze the tail risk in my growth stock positions",
        "Calculate VaR for my current portfolio allocation",
        "Assess concentration risk in my tech-heavy portfolio",
    ]

    @classmethod
    def get_random_query(cls, category: str) -> str:
        """Get a random query from the specified category."""
        queries_map = {
            "market_screening": cls.MARKET_SCREENING,
            "company_research": cls.COMPANY_RESEARCH,
            "technical_analysis": cls.TECHNICAL_ANALYSIS,
            "sentiment_analysis": cls.SENTIMENT_ANALYSIS,
            "portfolio_analysis": cls.PORTFOLIO_ANALYSIS,
            "risk_assessment": cls.RISK_ASSESSMENT,
        }

        queries = queries_map.get(category, cls.MARKET_SCREENING)
        return np.random.choice(queries)


# ==============================================================================
# PERSONA-SPECIFIC FIXTURES
# ==============================================================================


class PersonaFixtures:
    """Persona-specific test data and responses."""

    @staticmethod
    def conservative_investor_data() -> dict[str, Any]:
        """Data for conservative investor persona testing."""
        return {
            "persona": "conservative",
            "characteristics": [
                "capital preservation",
                "income generation",
                "low volatility",
                "dividend focus",
            ],
            "risk_tolerance": 0.3,
            "preferred_sectors": ["Utilities", "Consumer Defensive", "Healthcare"],
            "analysis_focus": [
                "dividend yield",
                "debt levels",
                "stability",
                "downside protection",
            ],
            "position_sizing": {
                "max_single_position": 0.05,  # 5% max
                "stop_loss_multiplier": 1.5,
                "target_volatility": 0.12,
            },
            "sample_recommendations": [
                "Consider gradual position building with strict risk management",
                "Focus on dividend-paying stocks with consistent payout history",
                "Maintain defensive positioning until market clarity improves",
                "Prioritize capital preservation over aggressive growth",
            ],
        }

    @staticmethod
    def moderate_investor_data() -> dict[str, Any]:
        """Data for moderate investor persona testing."""
        return {
            "persona": "moderate",
            "characteristics": [
                "balanced growth",
                "diversification",
                "moderate risk",
                "long-term focus",
            ],
            "risk_tolerance": 0.6,
            "preferred_sectors": [
                "Technology",
                "Healthcare",
                "Financial Services",
                "Industrials",
            ],
            "analysis_focus": [
                "risk-adjusted returns",
                "diversification",
                "growth potential",
                "fundamentals",
            ],
            "position_sizing": {
                "max_single_position": 0.08,  # 8% max
                "stop_loss_multiplier": 2.0,
                "target_volatility": 0.18,
            },
            "sample_recommendations": [
                "Balance growth opportunities with risk management",
                "Consider diversified allocation across sectors and market caps",
                "Target 4-6% position sizing for high-conviction ideas",
                "Monitor both technical and fundamental indicators",
            ],
        }

    @staticmethod
    def aggressive_investor_data() -> dict[str, Any]:
        """Data for aggressive investor persona testing."""
        return {
            "persona": "aggressive",
            "characteristics": [
                "high growth",
                "momentum",
                "concentrated positions",
                "active trading",
            ],
            "risk_tolerance": 0.9,
            "preferred_sectors": [
                "Technology",
                "Communication Services",
                "Consumer Cyclical",
            ],
            "analysis_focus": [
                "growth potential",
                "momentum",
                "catalysts",
                "alpha generation",
            ],
            "position_sizing": {
                "max_single_position": 0.15,  # 15% max
                "stop_loss_multiplier": 3.0,
                "target_volatility": 0.25,
            },
            "sample_recommendations": [
                "Consider concentrated positions in high-conviction names",
                "Target momentum stocks with strong catalysts",
                "Use 10-15% position sizing for best opportunities",
                "Focus on alpha generation over risk management",
            ],
        }


# ==============================================================================
# EDGE CASE AND ERROR FIXTURES
# ==============================================================================


class EdgeCaseFixtures:
    """Fixtures for testing edge cases and error conditions."""

    @staticmethod
    def api_failure_responses() -> dict[str, Any]:
        """Mock API failure responses for error handling testing."""
        return {
            "exa_rate_limit": {
                "error": "rate_limit_exceeded",
                "message": "You have exceeded your API rate limit",
                "retry_after": 3600,
                "status_code": 429,
            },
            "tavily_unauthorized": {
                "error": "unauthorized",
                "message": "Invalid API key provided",
                "status_code": 401,
            },
            "llm_timeout": {
                "error": "timeout",
                "message": "Request timed out after 30 seconds",
                "status_code": 408,
            },
            "network_error": {
                "error": "network_error",
                "message": "Unable to connect to external service",
                "status_code": 503,
            },
        }

    @staticmethod
    def conflicting_agent_results() -> dict[str, dict[str, Any]]:
        """Mock conflicting results from different agents for synthesis testing."""
        return {
            "market": {
                "recommendation": "BUY",
                "confidence": 0.85,
                "reasoning": "Strong fundamentals and sector rotation into technology",
                "target_price": 210.0,
                "sentiment": "bullish",
            },
            "technical": {
                "recommendation": "SELL",
                "confidence": 0.78,
                "reasoning": "Bearish divergence in RSI and approaching strong resistance",
                "target_price": 165.0,
                "sentiment": "bearish",
            },
            "research": {
                "recommendation": "HOLD",
                "confidence": 0.72,
                "reasoning": "Mixed signals from fundamental analysis and market conditions",
                "target_price": 185.0,
                "sentiment": "neutral",
            },
        }

    @staticmethod
    def incomplete_data() -> dict[str, Any]:
        """Mock incomplete or missing data scenarios."""
        return {
            "missing_price_data": {
                "symbol": "AAPL",
                "error": "Price data not available for requested timeframe",
                "available_data": None,
            },
            "partial_search_results": {
                "results_found": 2,
                "results_expected": 10,
                "provider_errors": ["exa_timeout", "tavily_rate_limit"],
                "partial_data": True,
            },
            "llm_partial_response": {
                "analysis": "Partial analysis completed before",
                "truncated": True,
                "completion_percentage": 0.6,
            },
        }

    @staticmethod
    def malformed_data() -> dict[str, Any]:
        """Mock malformed or invalid data for error testing."""
        return {
            "invalid_json": '{"analysis": "incomplete json"',  # Missing closing brace
            "wrong_schema": {
                "unexpected_field": "value",
                "missing_required_field": None,
            },
            "invalid_dates": {
                "published_date": "not-a-date",
                "timestamp": "invalid-timestamp",
            },
            "invalid_numbers": {"confidence": "not-a-number", "price": "invalid-price"},
        }


# ==============================================================================
# PYTEST FIXTURES
# ==============================================================================


@pytest.fixture
def mock_llm_responses():
    """Fixture providing mock LLM responses."""
    return MockLLMResponses()


@pytest.fixture
def mock_exa_responses():
    """Fixture providing mock Exa API responses."""
    return MockExaResponses()


@pytest.fixture
def mock_tavily_responses():
    """Fixture providing mock Tavily API responses."""
    return MockTavilyResponses()


@pytest.fixture
def mock_market_data():
    """Fixture providing mock market data."""
    return MockMarketData()


@pytest.fixture
def test_queries():
    """Fixture providing test queries."""
    return TestQueries()


@pytest.fixture
def persona_fixtures():
    """Fixture providing persona-specific data."""
    return PersonaFixtures()


@pytest.fixture
def edge_case_fixtures():
    """Fixture providing edge case test data."""
    return EdgeCaseFixtures()


@pytest.fixture(params=["conservative", "moderate", "aggressive"])
def investor_persona(request):
    """Parametrized fixture for testing across all investor personas."""
    return request.param


@pytest.fixture(
    params=[
        "market_screening",
        "company_research",
        "technical_analysis",
        "sentiment_analysis",
    ]
)
def query_category(request):
    """Parametrized fixture for testing across all query categories."""
    return request.param


# ==============================================================================
# HELPER FUNCTIONS
# ==============================================================================


def create_mock_llm_with_responses(responses: list[str]) -> MagicMock:
    """Create a mock LLM that returns specific responses in order."""
    mock_llm = MagicMock()

    # Create AIMessage objects for each response
    ai_messages = [AIMessage(content=response) for response in responses]
    mock_llm.ainvoke.side_effect = ai_messages

    return mock_llm


def create_mock_agent_result(
    agent_type: str,
    confidence: float = 0.8,
    recommendation: str = "BUY",
    additional_data: dict[str, Any] = None,
) -> dict[str, Any]:
    """Create a mock agent result with realistic structure."""
    base_result = {
        "status": "success",
        "agent_type": agent_type,
        "confidence_score": confidence,
        "recommendation": recommendation,
        "timestamp": datetime.now(),
        "execution_time_ms": np.random.uniform(1000, 5000),
    }

    if additional_data:
        base_result.update(additional_data)

    return base_result


def create_realistic_stock_data(
    symbol: str = "AAPL", price: float = 185.0, volume: int = 50_000_000
) -> dict[str, Any]:
    """Create realistic stock data for testing."""
    return {
        "symbol": symbol,
        "current_price": price,
        "volume": volume,
        "market_cap": 2_850_000_000_000,  # $2.85T for AAPL
        "pe_ratio": 28.5,
        "dividend_yield": 0.005,
        "beta": 1.1,
        "52_week_high": 198.23,
        "52_week_low": 164.08,
        "average_volume": 48_000_000,
        "sector": "Technology",
        "industry": "Consumer Electronics",
    }


# Export main classes for easy importing
__all__ = [
    "MockLLMResponses",
    "MockExaResponses",
    "MockTavilyResponses",
    "MockMarketData",
    "TestQueries",
    "PersonaFixtures",
    "EdgeCaseFixtures",
    "create_mock_llm_with_responses",
    "create_mock_agent_result",
    "create_realistic_stock_data",
]

```

--------------------------------------------------------------------------------
/tests/test_exa_research_integration.py:
--------------------------------------------------------------------------------

```python
"""
Comprehensive test suite for ExaSearch integration with research agents.

This test suite validates the complete research agent architecture with ExaSearch provider,
including timeout handling, parallel execution, specialized subagents, and performance
benchmarking across all research depths and focus areas.
"""

import asyncio
import time
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from exa_py import Exa

from maverick_mcp.agents.deep_research import (
    RESEARCH_DEPTH_LEVELS,
    CompetitiveResearchAgent,
    ContentAnalyzer,
    DeepResearchAgent,
    ExaSearchProvider,
    FundamentalResearchAgent,
    SentimentResearchAgent,
    TechnicalResearchAgent,
)
from maverick_mcp.api.routers.research import (
    ResearchRequest,
    comprehensive_research,
    get_research_agent,
)
from maverick_mcp.exceptions import WebSearchError
from maverick_mcp.utils.parallel_research import (
    ParallelResearchConfig,
    ParallelResearchOrchestrator,
    ResearchResult,
    ResearchTask,
    TaskDistributionEngine,
)

# Test Data Factories and Fixtures


@pytest.fixture
def mock_llm():
    """Mock LLM with realistic response patterns for research scenarios."""
    llm = MagicMock()
    llm.ainvoke = AsyncMock()

    # Mock different response types for different research phases
    def mock_response_content(messages):
        """Generate realistic mock responses based on message content."""
        content = str(messages[-1].content).lower()

        if "synthesis" in content:
            return MagicMock(
                content='{"synthesis": "Comprehensive analysis shows positive outlook", "confidence": 0.8}'
            )
        elif "analyze" in content or "financial" in content:
            return MagicMock(
                content='{"KEY_INSIGHTS": ["Strong earnings growth", "Market share expansion"], "SENTIMENT": {"direction": "bullish", "confidence": 0.75}, "RISK_FACTORS": ["Interest rate sensitivity"], "OPPORTUNITIES": ["Market expansion"], "CREDIBILITY": 0.8, "RELEVANCE": 0.9, "SUMMARY": "Positive financial outlook"}'
            )
        else:
            return MagicMock(content="Analysis completed successfully")

    llm.ainvoke.side_effect = lambda messages, **kwargs: mock_response_content(messages)
    return llm


@pytest.fixture
def mock_exa_client():
    """Mock Exa client with realistic search responses."""
    mock_client = MagicMock(spec=Exa)

    def create_mock_result(title, text, url_suffix=""):
        """Create mock Exa result object."""
        result = MagicMock()
        result.url = f"https://example.com/{url_suffix}"
        result.title = title
        result.text = text
        result.published_date = "2024-01-15T10:00:00Z"
        result.score = 0.85
        result.author = "Financial Analyst"
        return result

    def mock_search_and_contents(query, num_results=5, **kwargs):
        """Generate mock search results based on query content."""
        response = MagicMock()
        results = []

        query_lower = query.lower()

        if "aapl" in query_lower or "apple" in query_lower:
            results.extend(
                [
                    create_mock_result(
                        "Apple Q4 Earnings Beat Expectations",
                        "Apple reported strong quarterly earnings with iPhone sales growth of 15% and services revenue reaching new highs. The company's financial position remains robust with strong cash flow.",
                        "apple-earnings",
                    ),
                    create_mock_result(
                        "Apple Stock Technical Analysis",
                        "Apple stock shows bullish technical patterns with support at $180 and resistance at $200. RSI indicates oversold conditions presenting buying opportunity.",
                        "apple-technical",
                    ),
                ]
            )
        elif "sentiment" in query_lower:
            results.extend(
                [
                    create_mock_result(
                        "Market Sentiment Turns Positive",
                        "Investor sentiment shows improvement with increased confidence in tech sector. Analyst upgrades and positive earnings surprises drive optimism.",
                        "market-sentiment",
                    ),
                ]
            )
        elif "competitive" in query_lower or "industry" in query_lower:
            results.extend(
                [
                    create_mock_result(
                        "Tech Industry Competitive Landscape",
                        "The technology sector shows fierce competition with market leaders maintaining strong positions. Innovation and market share battles intensify.",
                        "competitive-analysis",
                    ),
                ]
            )
        else:
            # Default financial research results
            results.extend(
                [
                    create_mock_result(
                        "Financial Market Analysis",
                        "Current market conditions show mixed signals with growth prospects balanced against economic uncertainties. Investors remain cautiously optimistic.",
                        "market-analysis",
                    ),
                    create_mock_result(
                        "Investment Outlook 2024",
                        "Investment opportunities emerge in technology and healthcare sectors despite ongoing market volatility. Diversification remains key strategy.",
                        "investment-outlook",
                    ),
                ]
            )

        # Limit results to requested number
        response.results = results[:num_results]
        return response

    mock_client.search_and_contents.side_effect = mock_search_and_contents
    return mock_client


@pytest.fixture
def sample_research_tasks():
    """Sample research tasks for parallel execution testing."""
    return [
        ResearchTask(
            task_id="session_123_fundamental",
            task_type="fundamental",
            target_topic="AAPL financial analysis",
            focus_areas=["earnings", "valuation", "growth"],
            priority=8,
            timeout=20,
        ),
        ResearchTask(
            task_id="session_123_technical",
            task_type="technical",
            target_topic="AAPL technical analysis",
            focus_areas=["chart_patterns", "support_resistance"],
            priority=7,
            timeout=15,
        ),
        ResearchTask(
            task_id="session_123_sentiment",
            task_type="sentiment",
            target_topic="AAPL market sentiment",
            focus_areas=["news_sentiment", "analyst_ratings"],
            priority=6,
            timeout=15,
        ),
    ]


@pytest.fixture
def mock_settings():
    """Mock settings with ExaSearch configuration."""
    settings = MagicMock()
    settings.research.exa_api_key = "test_exa_api_key"
    settings.data_limits.max_parallel_agents = 4
    settings.performance.search_timeout_failure_threshold = 12
    settings.performance.search_circuit_breaker_failure_threshold = 8
    settings.performance.search_circuit_breaker_recovery_timeout = 30
    return settings


# ExaSearchProvider Tests


class TestExaSearchProvider:
    """Test ExaSearch provider integration and functionality."""

    @pytest.mark.unit
    def test_exa_provider_initialization(self):
        """Test ExaSearchProvider initialization."""
        api_key = "test_api_key_123"
        provider = ExaSearchProvider(api_key)

        assert provider.api_key == api_key
        assert provider._api_key_verified is True
        assert provider.is_healthy() is True
        assert provider._failure_count == 0

    @pytest.mark.unit
    def test_exa_provider_initialization_without_key(self):
        """Test ExaSearchProvider initialization without API key."""
        provider = ExaSearchProvider("")

        assert provider.api_key == ""
        assert provider._api_key_verified is False
        assert provider.is_healthy() is True  # Still healthy, but searches will fail

    @pytest.mark.unit
    def test_timeout_calculation(self):
        """Test adaptive timeout calculation for different query complexities."""
        provider = ExaSearchProvider("test_key")

        # Simple query
        timeout = provider._calculate_timeout("AAPL", None)
        assert timeout >= 4.0  # Minimum for Exa reliability

        # Complex query
        complex_query = "comprehensive analysis of Apple Inc financial performance and market position with competitive analysis"
        timeout_complex = provider._calculate_timeout(complex_query, None)
        assert timeout_complex >= timeout

        # Budget constrained query
        timeout_budget = provider._calculate_timeout("AAPL", 8.0)
        assert 4.0 <= timeout_budget <= 8.0

    @pytest.mark.unit
    def test_failure_recording_and_health_status(self):
        """Test failure recording and health status management."""
        provider = ExaSearchProvider("test_key")

        # Initially healthy
        assert provider.is_healthy() is True

        # Record several timeout failures
        for _ in range(5):
            provider._record_failure("timeout")

        assert provider._failure_count == 5
        assert provider.is_healthy() is True  # Still healthy, threshold not reached

        # Exceed timeout threshold (default 12)
        for _ in range(8):
            provider._record_failure("timeout")

        assert provider._failure_count == 13
        assert provider.is_healthy() is False  # Now unhealthy

        # Test recovery
        provider._record_success()
        assert provider.is_healthy() is True
        assert provider._failure_count == 0

    @pytest.mark.unit
    @patch("exa_py.Exa")
    async def test_exa_search_success(self, mock_exa_class, mock_exa_client):
        """Test successful ExaSearch operation."""
        mock_exa_class.return_value = mock_exa_client
        provider = ExaSearchProvider("test_key")

        results = await provider.search("AAPL financial analysis", num_results=3)

        assert len(results) >= 1
        assert all("url" in result for result in results)
        assert all("title" in result for result in results)
        assert all("content" in result for result in results)
        assert all(result["provider"] == "exa" for result in results)

    @pytest.mark.unit
    @patch("exa_py.Exa")
    async def test_exa_search_timeout(self, mock_exa_class):
        """Test ExaSearch timeout handling."""
        # Mock Exa client that takes too long
        mock_client = MagicMock()

        def slow_search(*args, **kwargs):
            import time

            time.sleep(10)  # Simulate slow synchronous response

        mock_client.search_and_contents.side_effect = slow_search
        mock_exa_class.return_value = mock_client

        provider = ExaSearchProvider("test_key")

        with pytest.raises(WebSearchError, match="timed out"):
            await provider.search("test query", timeout_budget=2.0)

        # Check that failure was recorded
        assert not provider.is_healthy() or provider._failure_count > 0

    @pytest.mark.unit
    @patch("exa_py.Exa")
    async def test_exa_search_unhealthy_provider(self, mock_exa_class):
        """Test behavior when provider is marked as unhealthy."""
        provider = ExaSearchProvider("test_key")
        provider._is_healthy = False

        with pytest.raises(WebSearchError, match="disabled due to repeated failures"):
            await provider.search("test query")


# DeepResearchAgent Tests


class TestDeepResearchAgent:
    """Test DeepResearchAgent with ExaSearch integration."""

    @pytest.mark.unit
    @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
    async def test_agent_initialization_with_exa(self, mock_provider, mock_llm):
        """Test DeepResearchAgent initialization with ExaSearch provider."""
        mock_exa_provider = MagicMock(spec=ExaSearchProvider)
        mock_provider.return_value = mock_exa_provider

        agent = DeepResearchAgent(
            llm=mock_llm,
            persona="moderate",
            exa_api_key="test_key",
            research_depth="standard",
        )

        await agent.initialize()

        assert agent.search_providers == [mock_exa_provider]
        assert agent._search_providers_loaded is True
        assert agent.default_depth == "standard"

    @pytest.mark.unit
    @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
    async def test_agent_initialization_without_providers(
        self, mock_provider, mock_llm
    ):
        """Test agent behavior when no search providers are available."""
        mock_provider.return_value = None

        agent = DeepResearchAgent(
            llm=mock_llm,
            persona="moderate",
            exa_api_key=None,
        )

        await agent.initialize()

        assert agent.search_providers == []
        assert agent._search_providers_loaded is True

    @pytest.mark.unit
    @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
    async def test_research_comprehensive_no_providers(self, mock_provider, mock_llm):
        """Test research behavior when no search providers are configured."""
        mock_provider.return_value = None

        agent = DeepResearchAgent(llm=mock_llm, exa_api_key=None)

        result = await agent.research_comprehensive(
            topic="AAPL analysis", session_id="test_session", depth="basic"
        )

        assert "error" in result
        assert "no search providers configured" in result["error"]
        assert result["topic"] == "AAPL analysis"

    @pytest.mark.integration
    @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
    @patch("exa_py.Exa")
    async def test_research_comprehensive_success(
        self, mock_exa_class, mock_provider, mock_llm, mock_exa_client
    ):
        """Test successful comprehensive research with ExaSearch."""
        # Setup mocks
        mock_exa_provider = ExaSearchProvider("test_key")
        mock_provider.return_value = mock_exa_provider
        mock_exa_class.return_value = mock_exa_client

        agent = DeepResearchAgent(
            llm=mock_llm,
            persona="moderate",
            exa_api_key="test_key",
            research_depth="basic",
        )

        # Execute research
        result = await agent.research_comprehensive(
            topic="AAPL financial analysis",
            session_id="test_session_123",
            depth="basic",
            timeout_budget=15.0,
        )

        # Verify result structure
        assert result["status"] == "success"
        assert result["agent_type"] == "deep_research"
        assert result["research_topic"] == "AAPL financial analysis"
        assert result["research_depth"] == "basic"
        assert "findings" in result
        assert "confidence_score" in result
        assert "execution_time_ms" in result

    @pytest.mark.unit
    def test_research_depth_levels(self):
        """Test research depth level configurations."""
        assert "basic" in RESEARCH_DEPTH_LEVELS
        assert "standard" in RESEARCH_DEPTH_LEVELS
        assert "comprehensive" in RESEARCH_DEPTH_LEVELS
        assert "exhaustive" in RESEARCH_DEPTH_LEVELS

        # Verify basic level has minimal settings for speed
        basic = RESEARCH_DEPTH_LEVELS["basic"]
        assert basic["max_sources"] <= 5
        assert basic["max_searches"] <= 2
        assert basic["validation_required"] is False

        # Verify exhaustive has maximum settings
        exhaustive = RESEARCH_DEPTH_LEVELS["exhaustive"]
        assert exhaustive["max_sources"] >= 10
        assert exhaustive["validation_required"] is True


# Specialized Subagent Tests


class TestSpecializedSubagents:
    """Test specialized research subagents."""

    @pytest.fixture
    def mock_parent_agent(self, mock_llm):
        """Mock parent DeepResearchAgent for subagent testing."""
        agent = MagicMock()
        agent.llm = mock_llm
        agent.search_providers = [MagicMock(spec=ExaSearchProvider)]
        agent.content_analyzer = MagicMock(spec=ContentAnalyzer)
        agent.persona = MagicMock()
        agent.persona.name = "moderate"
        agent._calculate_source_credibility = MagicMock(return_value=0.8)
        return agent

    @pytest.mark.unit
    async def test_fundamental_research_agent(
        self, mock_parent_agent, sample_research_tasks
    ):
        """Test FundamentalResearchAgent execution."""
        task = sample_research_tasks[0]  # fundamental task
        agent = FundamentalResearchAgent(mock_parent_agent)

        # Mock search results
        mock_search_results = [
            {
                "title": "AAPL Q4 Earnings Report",
                "url": "https://example.com/earnings",
                "content": "Apple reported strong quarterly earnings with revenue growth of 12%...",
                "published_date": "2024-01-15",
            }
        ]

        agent._perform_specialized_search = AsyncMock(return_value=mock_search_results)
        agent._analyze_search_results = AsyncMock(
            return_value=[
                {
                    **mock_search_results[0],
                    "analysis": {
                        "insights": [
                            "Strong earnings growth",
                            "Revenue diversification",
                        ],
                        "risk_factors": ["Market competition"],
                        "opportunities": ["Market expansion"],
                        "sentiment": {"direction": "bullish", "confidence": 0.8},
                    },
                    "credibility_score": 0.8,
                }
            ]
        )

        result = await agent.execute_research(task)

        assert result["research_type"] == "fundamental"
        assert "insights" in result
        assert "risk_factors" in result
        assert "opportunities" in result
        assert "sentiment" in result
        assert "sources" in result
        assert len(result["focus_areas"]) > 0
        assert "earnings" in result["focus_areas"]

    @pytest.mark.unit
    async def test_technical_research_agent(
        self, mock_parent_agent, sample_research_tasks
    ):
        """Test TechnicalResearchAgent execution."""
        task = sample_research_tasks[1]  # technical task
        agent = TechnicalResearchAgent(mock_parent_agent)

        # Mock search results with technical analysis
        mock_search_results = [
            {
                "title": "AAPL Technical Analysis",
                "url": "https://example.com/technical",
                "content": "AAPL shows bullish chart patterns with support at $180 and resistance at $200...",
                "published_date": "2024-01-15",
            }
        ]

        agent._perform_specialized_search = AsyncMock(return_value=mock_search_results)
        agent._analyze_search_results = AsyncMock(
            return_value=[
                {
                    **mock_search_results[0],
                    "analysis": {
                        "insights": [
                            "Bullish breakout pattern",
                            "Strong support levels",
                        ],
                        "risk_factors": ["Overbought conditions"],
                        "opportunities": ["Momentum continuation"],
                        "sentiment": {"direction": "bullish", "confidence": 0.7},
                    },
                    "credibility_score": 0.7,
                }
            ]
        )

        result = await agent.execute_research(task)

        assert result["research_type"] == "technical"
        assert "price_action" in result["focus_areas"]
        assert "chart_patterns" in result["focus_areas"]

    @pytest.mark.unit
    async def test_sentiment_research_agent(
        self, mock_parent_agent, sample_research_tasks
    ):
        """Test SentimentResearchAgent execution."""
        task = sample_research_tasks[2]  # sentiment task
        agent = SentimentResearchAgent(mock_parent_agent)

        # Mock search results with sentiment data
        mock_search_results = [
            {
                "title": "Apple Stock Sentiment Analysis",
                "url": "https://example.com/sentiment",
                "content": "Analyst sentiment remains positive on Apple with multiple upgrades...",
                "published_date": "2024-01-15",
            }
        ]

        agent._perform_specialized_search = AsyncMock(return_value=mock_search_results)
        agent._analyze_search_results = AsyncMock(
            return_value=[
                {
                    **mock_search_results[0],
                    "analysis": {
                        "insights": ["Positive analyst sentiment", "Upgrade momentum"],
                        "risk_factors": ["Market volatility concerns"],
                        "opportunities": ["Institutional accumulation"],
                        "sentiment": {"direction": "bullish", "confidence": 0.85},
                    },
                    "credibility_score": 0.9,
                }
            ]
        )

        result = await agent.execute_research(task)

        assert result["research_type"] == "sentiment"
        assert "market_sentiment" in result["focus_areas"]
        assert result["sentiment"]["direction"] == "bullish"

    @pytest.mark.unit
    async def test_competitive_research_agent(self, mock_parent_agent):
        """Test CompetitiveResearchAgent execution."""
        task = ResearchTask(
            task_id="test_competitive",
            task_type="competitive",
            target_topic="AAPL competitive analysis",
            focus_areas=["competitive_position", "market_share"],
        )

        agent = CompetitiveResearchAgent(mock_parent_agent)

        # Mock search results with competitive data
        mock_search_results = [
            {
                "title": "Apple vs Samsung Market Share",
                "url": "https://example.com/competitive",
                "content": "Apple maintains strong competitive position in premium smartphone market...",
                "published_date": "2024-01-15",
            }
        ]

        agent._perform_specialized_search = AsyncMock(return_value=mock_search_results)
        agent._analyze_search_results = AsyncMock(
            return_value=[
                {
                    **mock_search_results[0],
                    "analysis": {
                        "insights": [
                            "Strong market position",
                            "Premium segment dominance",
                        ],
                        "risk_factors": ["Android competition"],
                        "opportunities": ["Emerging markets"],
                        "sentiment": {"direction": "bullish", "confidence": 0.75},
                    },
                    "credibility_score": 0.8,
                }
            ]
        )

        result = await agent.execute_research(task)

        assert result["research_type"] == "competitive"
        assert "competitive_position" in result["focus_areas"]
        assert "industry_trends" in result["focus_areas"]


# Parallel Research Tests


class TestParallelResearchOrchestrator:
    """Test parallel research execution and orchestration."""

    @pytest.mark.unit
    def test_orchestrator_initialization(self):
        """Test ParallelResearchOrchestrator initialization."""
        config = ParallelResearchConfig(max_concurrent_agents=6, timeout_per_agent=20)
        orchestrator = ParallelResearchOrchestrator(config)

        assert orchestrator.config.max_concurrent_agents == 6
        assert orchestrator.config.timeout_per_agent == 20
        assert orchestrator._semaphore._value == 6  # Semaphore initialized correctly

    @pytest.mark.unit
    async def test_task_preparation(self, sample_research_tasks):
        """Test task preparation and prioritization."""
        orchestrator = ParallelResearchOrchestrator()

        prepared_tasks = await orchestrator._prepare_tasks(sample_research_tasks)

        # Should be sorted by priority (descending)
        assert prepared_tasks[0].priority >= prepared_tasks[1].priority

        # All tasks should have timeouts set
        for task in prepared_tasks:
            assert task.timeout is not None
            assert task.status == "pending"
            assert task.task_id in orchestrator.active_tasks

    @pytest.mark.integration
    async def test_parallel_execution_success(self, sample_research_tasks):
        """Test successful parallel execution of research tasks."""
        orchestrator = ParallelResearchOrchestrator(
            ParallelResearchConfig(max_concurrent_agents=3, timeout_per_agent=10)
        )

        # Mock research executor
        async def mock_executor(task):
            """Mock research executor that simulates successful execution."""
            await asyncio.sleep(0.1)  # Simulate work
            return {
                "research_type": task.task_type,
                "insights": [
                    f"{task.task_type} insight 1",
                    f"{task.task_type} insight 2",
                ],
                "sentiment": {"direction": "bullish", "confidence": 0.8},
                "sources": [
                    {"title": f"{task.task_type} source", "url": "https://example.com"}
                ],
            }

        # Mock synthesis callback
        async def mock_synthesis(task_results):
            return {
                "synthesis": f"Synthesized results from {len(task_results)} tasks",
                "confidence_score": 0.8,
            }

        result = await orchestrator.execute_parallel_research(
            tasks=sample_research_tasks,
            research_executor=mock_executor,
            synthesis_callback=mock_synthesis,
        )

        assert isinstance(result, ResearchResult)
        assert result.successful_tasks == len(sample_research_tasks)
        assert result.failed_tasks == 0
        assert result.parallel_efficiency > 1.0  # Should be faster than sequential
        assert result.synthesis is not None
        assert "synthesis" in result.synthesis

    @pytest.mark.unit
    async def test_parallel_execution_with_failures(self, sample_research_tasks):
        """Test parallel execution with some task failures."""
        orchestrator = ParallelResearchOrchestrator()

        # Mock research executor that fails for certain task types
        async def mock_executor_with_failures(task):
            if task.task_type == "technical":
                raise TimeoutError("Task timed out")
            elif task.task_type == "sentiment":
                raise Exception("Network error")
            else:
                return {"research_type": task.task_type, "insights": ["Success"]}

        result = await orchestrator.execute_parallel_research(
            tasks=sample_research_tasks,
            research_executor=mock_executor_with_failures,
        )

        assert result.successful_tasks == 1  # Only fundamental should succeed
        assert result.failed_tasks == 2

        # Check that failed tasks have error information
        failed_tasks = [
            task for task in result.task_results.values() if task.status == "failed"
        ]
        assert len(failed_tasks) == 2
        for task in failed_tasks:
            assert task.error is not None

    @pytest.mark.unit
    async def test_circuit_breaker_integration(self, sample_research_tasks):
        """Test circuit breaker integration in parallel execution."""
        orchestrator = ParallelResearchOrchestrator()

        # Mock executor that consistently fails
        failure_count = 0

        async def failing_executor(task):
            nonlocal failure_count
            failure_count += 1
            raise Exception(f"Failure {failure_count}")

        result = await orchestrator.execute_parallel_research(
            tasks=sample_research_tasks,
            research_executor=failing_executor,
        )

        # All tasks should fail
        assert result.failed_tasks == len(sample_research_tasks)
        assert result.successful_tasks == 0


class TestTaskDistributionEngine:
    """Test intelligent task distribution for research topics."""

    @pytest.mark.unit
    def test_topic_relevance_analysis(self):
        """Test topic relevance analysis for different task types."""
        engine = TaskDistributionEngine()

        # Test financial topic
        relevance = engine._analyze_topic_relevance(
            "apple earnings financial performance",
            focus_areas=["fundamentals", "financials"],
        )

        assert "fundamental" in relevance
        assert "technical" in relevance
        assert "sentiment" in relevance
        assert "competitive" in relevance

        # Fundamental should have highest relevance for earnings query
        assert relevance["fundamental"] > relevance["technical"]
        assert relevance["fundamental"] > relevance["competitive"]

    @pytest.mark.unit
    def test_task_distribution_basic(self):
        """Test basic task distribution for a research topic."""
        engine = TaskDistributionEngine()

        tasks = engine.distribute_research_tasks(
            topic="AAPL financial analysis and market outlook",
            session_id="test_session",
            focus_areas=["fundamentals", "technical_analysis"],
        )

        assert len(tasks) > 0

        # Should have variety of task types
        task_types = {task.task_type for task in tasks}
        assert "fundamental" in task_types  # High relevance for financial analysis

        # Tasks should be properly configured
        for task in tasks:
            assert task.session_id == "test_session"
            assert task.target_topic == "AAPL financial analysis and market outlook"
            assert task.priority > 0
            assert len(task.focus_areas) > 0

    @pytest.mark.unit
    def test_task_distribution_fallback(self):
        """Test task distribution fallback when no relevant tasks found."""
        engine = TaskDistributionEngine()

        # Mock the relevance analysis to return very low scores
        with patch.object(
            engine,
            "_analyze_topic_relevance",
            return_value={
                "fundamental": 0.1,
                "technical": 0.1,
                "sentiment": 0.1,
                "competitive": 0.1,
            },
        ):
            tasks = engine.distribute_research_tasks(
                topic="obscure topic with no clear relevance",
                session_id="test_session",
            )

        # Should still create at least one task (fallback)
        assert len(tasks) >= 1

        # Fallback should be fundamental analysis
        assert any(task.task_type == "fundamental" for task in tasks)

    @pytest.mark.unit
    def test_task_priority_assignment(self):
        """Test priority assignment based on relevance scores."""
        engine = TaskDistributionEngine()

        tasks = engine.distribute_research_tasks(
            topic="AAPL fundamental analysis earnings valuation",
            session_id="test_session",
        )

        # Find fundamental task (should have high priority)
        fundamental_tasks = [t for t in tasks if t.task_type == "fundamental"]
        if fundamental_tasks:
            fundamental_task = fundamental_tasks[0]
            assert fundamental_task.priority >= 7  # Should be high priority


# Timeout and Circuit Breaker Tests


class TestTimeoutAndCircuitBreaker:
    """Test timeout handling and circuit breaker patterns."""

    @pytest.mark.unit
    async def test_timeout_budget_allocation(self, mock_llm):
        """Test timeout budget allocation across research phases."""
        agent = DeepResearchAgent(llm=mock_llm, exa_api_key="test_key")

        # Test basic timeout allocation
        timeout_budget = 20.0
        result = await agent.research_comprehensive(
            topic="test topic",
            session_id="test_session",
            depth="basic",
            timeout_budget=timeout_budget,
        )

        # Should either complete or timeout gracefully
        assert "status" in result or "error" in result

        # If timeout occurred, should have appropriate error structure
        if result.get("status") == "error" or "error" in result:
            # Should be a timeout-related error for very short budget
            assert (
                "timeout" in str(result).lower()
                or "search providers" in str(result).lower()
            )

    @pytest.mark.unit
    def test_provider_health_monitoring(self):
        """Test search provider health monitoring and recovery."""
        provider = ExaSearchProvider("test_key")

        # Initially healthy
        assert provider.is_healthy()

        # Simulate multiple timeout failures
        for _i in range(15):  # Exceed default threshold of 12
            provider._record_failure("timeout")

        # Should be marked unhealthy
        assert not provider.is_healthy()

        # Recovery after success
        provider._record_success()
        assert provider.is_healthy()
        assert provider._failure_count == 0

    @pytest.mark.integration
    @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
    async def test_research_with_provider_failures(self, mock_provider, mock_llm):
        """Test research behavior when provider failures occur."""
        # Create a provider that will fail
        failing_provider = MagicMock(spec=ExaSearchProvider)
        failing_provider.is_healthy.return_value = True
        failing_provider.search = AsyncMock(side_effect=WebSearchError("Search failed"))

        mock_provider.return_value = failing_provider

        agent = DeepResearchAgent(llm=mock_llm, exa_api_key="test_key")

        result = await agent.research_comprehensive(
            topic="test topic",
            session_id="test_session",
            depth="basic",
        )

        # Should handle provider failure gracefully
        assert "status" in result
        # May succeed with fallback or fail gracefully


# Performance and Benchmarking Tests


class TestPerformanceBenchmarks:
    """Test performance across different research depths and configurations."""

    @pytest.mark.slow
    @pytest.mark.integration
    @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
    @patch("exa_py.Exa")
    async def test_research_depth_performance(
        self, mock_exa_class, mock_provider, mock_llm, mock_exa_client
    ):
        """Benchmark performance across different research depths."""
        mock_provider.return_value = ExaSearchProvider("test_key")
        mock_exa_class.return_value = mock_exa_client

        performance_results = {}

        for depth in ["basic", "standard", "comprehensive"]:
            agent = DeepResearchAgent(
                llm=mock_llm,
                exa_api_key="test_key",
                research_depth=depth,
            )

            start_time = time.time()

            result = await agent.research_comprehensive(
                topic="AAPL financial analysis",
                session_id=f"perf_test_{depth}",
                depth=depth,
                timeout_budget=30.0,
            )

            execution_time = time.time() - start_time
            performance_results[depth] = {
                "execution_time": execution_time,
                "success": result.get("status") == "success",
                "sources_analyzed": result.get("sources_analyzed", 0),
            }

        # Verify performance characteristics
        assert (
            performance_results["basic"]["execution_time"]
            <= performance_results["comprehensive"]["execution_time"]
        )

        # Basic should be fastest
        if performance_results["basic"]["success"]:
            assert (
                performance_results["basic"]["execution_time"] < 15.0
            )  # Should be fast

    @pytest.mark.slow
    async def test_parallel_vs_sequential_performance(self, sample_research_tasks):
        """Compare parallel vs sequential execution performance."""
        config = ParallelResearchConfig(max_concurrent_agents=4, timeout_per_agent=10)
        orchestrator = ParallelResearchOrchestrator(config)

        async def mock_executor(task):
            await asyncio.sleep(1)  # Simulate 1 second work per task
            return {"research_type": task.task_type, "insights": ["Mock insight"]}

        # Parallel execution
        start_time = time.time()
        parallel_result = await orchestrator.execute_parallel_research(
            tasks=sample_research_tasks,
            research_executor=mock_executor,
        )
        parallel_time = time.time() - start_time

        # Sequential simulation
        start_time = time.time()
        for task in sample_research_tasks:
            await mock_executor(task)
        sequential_time = time.time() - start_time

        # Parallel should be significantly faster
        assert parallel_result.parallel_efficiency > 1.5  # At least 50% improvement
        assert parallel_time < sequential_time * 0.7  # Should be at least 30% faster

    @pytest.mark.unit
    async def test_memory_usage_monitoring(self, sample_research_tasks):
        """Test memory usage stays reasonable during parallel execution."""
        import os

        import psutil

        process = psutil.Process(os.getpid())
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB

        config = ParallelResearchConfig(max_concurrent_agents=4)
        orchestrator = ParallelResearchOrchestrator(config)

        async def mock_executor(task):
            # Create some data but not excessive
            data = {"results": ["data"] * 1000}  # Small amount of data
            await asyncio.sleep(0.1)
            return data

        await orchestrator.execute_parallel_research(
            tasks=sample_research_tasks * 5,  # More tasks to test scaling
            research_executor=mock_executor,
        )

        final_memory = process.memory_info().rss / 1024 / 1024  # MB
        memory_growth = final_memory - initial_memory

        # Memory growth should be reasonable (less than 100MB for test)
        assert memory_growth < 100, f"Memory grew by {memory_growth:.1f}MB"


# MCP Integration Tests


class TestMCPIntegration:
    """Test MCP tool endpoints and research router integration."""

    @pytest.mark.integration
    @patch("maverick_mcp.api.routers.research.get_settings")
    async def test_comprehensive_research_mcp_tool(self, mock_settings):
        """Test the comprehensive research MCP tool endpoint."""
        mock_settings.return_value.research.exa_api_key = "test_key"

        result = await comprehensive_research(
            query="AAPL financial analysis",
            persona="moderate",
            research_scope="basic",
            max_sources=5,
            timeframe="1m",
        )

        # Should return structured response
        assert isinstance(result, dict)
        assert "success" in result

        # If successful, should have proper structure
        if result.get("success"):
            assert "research_results" in result
            assert "research_metadata" in result
            assert "request_id" in result
            assert "timestamp" in result

    @pytest.mark.unit
    @patch("maverick_mcp.api.routers.research.get_settings")
    async def test_research_without_exa_key(self, mock_settings):
        """Test research behavior without ExaSearch API key."""
        mock_settings.return_value.research.exa_api_key = None

        result = await comprehensive_research(
            query="test query",
            persona="moderate",
            research_scope="basic",
        )

        assert result["success"] is False
        assert "Exa search provider not configured" in result["error"]
        assert "setup_instructions" in result["details"]

    @pytest.mark.unit
    def test_research_request_validation(self):
        """Test ResearchRequest model validation."""
        # Valid request
        request = ResearchRequest(
            query="AAPL analysis",
            persona="moderate",
            research_scope="standard",
            max_sources=15,
            timeframe="1m",
        )

        assert request.query == "AAPL analysis"
        assert request.persona == "moderate"
        assert request.research_scope == "standard"
        assert request.max_sources == 15
        assert request.timeframe == "1m"

        # Test defaults
        minimal_request = ResearchRequest(query="test")
        assert minimal_request.persona == "moderate"
        assert minimal_request.research_scope == "standard"
        assert minimal_request.max_sources == 10
        assert minimal_request.timeframe == "1m"

    @pytest.mark.unit
    def test_get_research_agent_optimization(self):
        """Test research agent creation with optimization parameters."""
        # Test optimized agent creation
        agent = get_research_agent(
            query="complex financial analysis of multiple companies",
            research_scope="comprehensive",
            timeout_budget=25.0,
            max_sources=20,
        )

        assert isinstance(agent, DeepResearchAgent)
        assert agent.max_sources <= 20  # Should respect or optimize max sources
        assert agent.default_depth in [
            "basic",
            "standard",
            "comprehensive",
            "exhaustive",
        ]

        # Test standard agent creation
        standard_agent = get_research_agent()
        assert isinstance(standard_agent, DeepResearchAgent)


# Content Analysis Tests


class TestContentAnalyzer:
    """Test AI-powered content analysis functionality."""

    @pytest.mark.unit
    async def test_content_analysis_success(self, mock_llm):
        """Test successful content analysis."""
        analyzer = ContentAnalyzer(mock_llm)

        content = "Apple reported strong quarterly earnings with revenue growth of 12% and expanding market share in the services segment."

        result = await analyzer.analyze_content(
            content=content, persona="moderate", analysis_focus="financial"
        )

        assert "insights" in result
        assert "sentiment" in result
        assert "risk_factors" in result
        assert "opportunities" in result
        assert "credibility_score" in result
        assert "relevance_score" in result
        assert "summary" in result
        assert "analysis_timestamp" in result

    @pytest.mark.unit
    async def test_content_analysis_fallback(self, mock_llm):
        """Test content analysis fallback when AI analysis fails."""
        analyzer = ContentAnalyzer(mock_llm)

        # Make LLM fail
        mock_llm.ainvoke.side_effect = Exception("LLM error")

        result = await analyzer.analyze_content(
            content="Test content", persona="moderate"
        )

        # Should fall back to keyword-based analysis
        assert result["fallback_used"] is True
        assert "sentiment" in result
        assert result["sentiment"]["direction"] in ["bullish", "bearish", "neutral"]

    @pytest.mark.unit
    async def test_batch_content_analysis(self, mock_llm):
        """Test batch content analysis functionality."""
        analyzer = ContentAnalyzer(mock_llm)

        content_items = [
            ("Apple shows strong growth", "source1"),
            ("Market conditions remain volatile", "source2"),
            ("Tech sector outlook positive", "source3"),
        ]

        results = await analyzer.analyze_content_batch(
            content_items=content_items, persona="moderate", analysis_focus="general"
        )

        assert len(results) == len(content_items)
        for i, result in enumerate(results):
            assert result["source_identifier"] == f"source{i + 1}"
            assert result["batch_processed"] is True
            assert "sentiment" in result


# Error Handling and Edge Cases


class TestErrorHandlingAndEdgeCases:
    """Test comprehensive error handling and edge cases."""

    @pytest.mark.unit
    async def test_empty_search_results(self, mock_llm):
        """Test behavior when search returns no results."""
        provider = ExaSearchProvider("test_key")

        with patch("exa_py.Exa") as mock_exa:
            # Mock empty results
            mock_client = MagicMock()
            mock_response = MagicMock()
            mock_response.results = []
            mock_client.search_and_contents.return_value = mock_response
            mock_exa.return_value = mock_client

            results = await provider.search("nonexistent topic", num_results=5)

            assert results == []

    @pytest.mark.unit
    async def test_malformed_search_response(self, mock_llm):
        """Test handling of malformed search responses."""
        provider = ExaSearchProvider("test_key")

        with patch("exa_py.Exa") as mock_exa:
            # Mock malformed response
            mock_client = MagicMock()
            mock_client.search_and_contents.side_effect = Exception(
                "Invalid response format"
            )
            mock_exa.return_value = mock_client

            with pytest.raises(WebSearchError):
                await provider.search("test query")

    @pytest.mark.unit
    async def test_network_timeout_recovery(self):
        """Test network timeout recovery mechanisms."""
        provider = ExaSearchProvider("test_key")

        # Simulate multiple timeouts followed by success
        with patch("exa_py.Exa") as mock_exa:
            call_count = 0

            async def mock_search_with_recovery(*args, **kwargs):
                nonlocal call_count
                call_count += 1
                if call_count <= 2:
                    raise TimeoutError("Network timeout")
                else:
                    # Success on third try
                    mock_response = MagicMock()
                    mock_result = MagicMock()
                    mock_result.url = "https://example.com"
                    mock_result.title = "Test Result"
                    mock_result.text = "Test content"
                    mock_result.published_date = "2024-01-15"
                    mock_result.score = 0.8
                    mock_response.results = [mock_result]
                    return mock_response

            mock_client = MagicMock()
            mock_client.search_and_contents.side_effect = mock_search_with_recovery
            mock_exa.return_value = mock_client

            # First two calls should fail and record failures
            for _ in range(2):
                with pytest.raises(WebSearchError):
                    await provider.search("test query", timeout_budget=1.0)

            # Provider should still be healthy (failures recorded but not exceeded threshold)
            assert provider._failure_count == 2

            # Third call should succeed and reset failure count
            results = await provider.search("test query")
            assert len(results) > 0
            assert provider._failure_count == 0  # Reset on success

    @pytest.mark.unit
    async def test_concurrent_request_limits(self, sample_research_tasks):
        """Test that concurrent request limits are respected."""
        config = ParallelResearchConfig(max_concurrent_agents=2)  # Very low limit
        orchestrator = ParallelResearchOrchestrator(config)

        execution_times = []

        async def tracking_executor(task):
            start = time.time()
            await asyncio.sleep(0.5)  # Simulate work
            end = time.time()
            execution_times.append((start, end))
            return {"result": "success"}

        await orchestrator.execute_parallel_research(
            tasks=sample_research_tasks,  # 3 tasks
            research_executor=tracking_executor,
        )

        # With max_concurrent_agents=2, the third task should start after one of the first two finishes
        # This means there should be overlap but not all three running simultaneously
        assert len(execution_times) == 3

        # Sort by start time
        execution_times.sort()

        # The third task should start after the first task finishes
        # (allowing for some timing tolerance)
        third_start = execution_times[2][0]
        first_end = execution_times[0][1]

        # Third should start after first ends (with small tolerance for async timing)
        assert third_start >= (first_end - 0.1)


# Integration Test Suite


class TestFullIntegrationScenarios:
    """End-to-end integration tests for complete research workflows."""

    @pytest.mark.integration
    @pytest.mark.slow
    @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
    @patch("exa_py.Exa")
    async def test_complete_research_workflow(
        self, mock_exa_class, mock_provider, mock_llm, mock_exa_client
    ):
        """Test complete research workflow from query to final report."""
        # Setup comprehensive mocks
        mock_provider.return_value = ExaSearchProvider("test_key")
        mock_exa_class.return_value = mock_exa_client

        agent = DeepResearchAgent(
            llm=mock_llm,
            persona="moderate",
            exa_api_key="test_key",
            research_depth="standard",
            enable_parallel_execution=True,
        )

        # Execute complete research workflow
        result = await agent.research_comprehensive(
            topic="Apple Inc (AAPL) investment analysis with market sentiment and competitive position",
            session_id="integration_test_session",
            depth="standard",
            focus_areas=["fundamentals", "market_sentiment", "competitive_analysis"],
            timeframe="1m",
            use_parallel_execution=True,
        )

        # Verify comprehensive result structure
        if result.get("status") == "success":
            assert "findings" in result
            assert "confidence_score" in result
            assert isinstance(result["confidence_score"], int | float)
            assert 0.0 <= result["confidence_score"] <= 1.0
            assert "citations" in result
            assert "execution_time_ms" in result

            # Check for parallel execution indicators
            if "parallel_execution_stats" in result:
                assert "successful_tasks" in result["parallel_execution_stats"]
                assert "parallel_efficiency" in result["parallel_execution_stats"]

        # Should handle both success and controlled failure scenarios
        assert "status" in result or "error" in result

    @pytest.mark.integration
    async def test_multi_persona_consistency(self, mock_llm, mock_exa_client):
        """Test research consistency across different investor personas."""
        personas = ["conservative", "moderate", "aggressive", "day_trader"]
        results = {}

        for persona in personas:
            with (
                patch(
                    "maverick_mcp.agents.deep_research.get_cached_search_provider"
                ) as mock_provider,
                patch("exa_py.Exa") as mock_exa_class,
            ):
                mock_provider.return_value = ExaSearchProvider("test_key")
                mock_exa_class.return_value = mock_exa_client

                agent = DeepResearchAgent(
                    llm=mock_llm,
                    persona=persona,
                    exa_api_key="test_key",
                    research_depth="basic",
                )

                result = await agent.research_comprehensive(
                    topic="AAPL investment outlook",
                    session_id=f"persona_test_{persona}",
                    depth="basic",
                )

                results[persona] = result

        # All personas should provide valid responses
        for persona, result in results.items():
            assert isinstance(result, dict), f"Invalid result for {persona}"
            # Should have some form of result (success or controlled failure)
            assert "status" in result or "error" in result or "success" in result


if __name__ == "__main__":
    # Run specific test categories based on markers
    pytest.main(
        [
            __file__,
            "-v",
            "--tb=short",
            "-m",
            "unit",  # Run unit tests by default
        ]
    )

```

--------------------------------------------------------------------------------
/maverick_mcp/utils/llm_optimization.py:
--------------------------------------------------------------------------------

```python
"""
LLM-side optimizations for research agents to prevent timeouts.

This module provides comprehensive optimization strategies including:
- Adaptive model selection based on time constraints
- Progressive token budgeting with confidence tracking
- Parallel LLM processing with intelligent load balancing
- Optimized prompt engineering for speed
- Early termination based on confidence thresholds
- Content filtering to reduce processing overhead
"""

import asyncio
import logging
import re
import time
from datetime import datetime
from enum import Enum
from typing import Any

from langchain_core.messages import HumanMessage, SystemMessage
from pydantic import BaseModel, Field

from maverick_mcp.providers.openrouter_provider import (
    OpenRouterProvider,
    TaskType,
)
from maverick_mcp.utils.orchestration_logging import (
    get_orchestration_logger,
    log_method_call,
)

logger = logging.getLogger(__name__)


class ResearchPhase(str, Enum):
    """Research phases for token allocation."""

    SEARCH = "search"
    CONTENT_ANALYSIS = "content_analysis"
    SYNTHESIS = "synthesis"
    VALIDATION = "validation"


class ModelConfiguration(BaseModel):
    """Configuration for model selection with time optimization."""

    model_id: str = Field(description="OpenRouter model identifier")
    max_tokens: int = Field(description="Maximum output tokens")
    temperature: float = Field(description="Model temperature")
    timeout_seconds: float = Field(description="Request timeout")
    parallel_batch_size: int = Field(
        default=1, description="Sources per batch for this model"
    )


class TokenAllocation(BaseModel):
    """Token allocation for a research phase."""

    input_tokens: int = Field(description="Maximum input tokens")
    output_tokens: int = Field(description="Maximum output tokens")
    per_source_tokens: int = Field(description="Tokens per source")
    emergency_reserve: int = Field(description="Emergency reserve tokens")
    timeout_seconds: float = Field(description="Processing timeout")


class AdaptiveModelSelector:
    """Intelligent model selection based on time budgets and task complexity."""

    def __init__(self, openrouter_provider: OpenRouterProvider):
        self.provider = openrouter_provider
        self.performance_cache = {}  # Cache model performance metrics

    def select_model_for_time_budget(
        self,
        task_type: TaskType,
        time_remaining_seconds: float,
        complexity_score: float,
        content_size_tokens: int,
        confidence_threshold: float = 0.8,
        current_confidence: float = 0.0,
    ) -> ModelConfiguration:
        """Select optimal model based on available time and requirements."""

        # Time pressure categories with adaptive thresholds
        if time_remaining_seconds < 10:
            return self._select_emergency_model(task_type, content_size_tokens)
        elif time_remaining_seconds < 25:
            return self._select_fast_quality_model(task_type, complexity_score)
        elif time_remaining_seconds < 45:
            return self._select_balanced_model(
                task_type, complexity_score, current_confidence
            )
        else:
            return self._select_optimal_model(
                task_type, complexity_score, confidence_threshold
            )

    def _select_emergency_model(
        self, task_type: TaskType, content_size: int
    ) -> ModelConfiguration:
        """Ultra-fast models for time-critical situations."""
        # OPTIMIZATION: Prioritize speed with increased batch sizes
        if content_size > 20000:  # Large content needs fast + capable models
            return ModelConfiguration(
                model_id="google/gemini-2.5-flash",  # 199 tokens/sec - fastest available
                max_tokens=min(800, content_size // 25),  # Adaptive token limit
                temperature=0.05,  # OPTIMIZATION: Minimal temp for deterministic fast response
                timeout_seconds=5,  # OPTIMIZATION: Reduced from 8s
                parallel_batch_size=8,  # OPTIMIZATION: Doubled for faster processing
            )
        else:
            return ModelConfiguration(
                model_id="openai/gpt-4o-mini",  # 126 tokens/sec - excellent speed/cost balance
                max_tokens=min(500, content_size // 20),
                temperature=0.03,  # OPTIMIZATION: Near-zero for fastest response
                timeout_seconds=4,  # OPTIMIZATION: Reduced from 6s
                parallel_batch_size=10,  # OPTIMIZATION: Doubled for maximum parallelism
            )

    def _select_fast_quality_model(
        self, task_type: TaskType, complexity_score: float
    ) -> ModelConfiguration:
        """Balance speed and quality for time-constrained situations."""
        if complexity_score > 0.7 or task_type == TaskType.COMPLEX_REASONING:
            # Complex tasks - use fast model with good quality
            return ModelConfiguration(
                model_id="openai/gpt-4o-mini",  # 126 tokens/sec + good quality
                max_tokens=1200,
                temperature=0.1,  # OPTIMIZATION: Reduced for faster response
                timeout_seconds=10,  # OPTIMIZATION: Reduced from 18s
                parallel_batch_size=6,  # OPTIMIZATION: Doubled for better parallelism
            )
        else:
            # Simple tasks - use the fastest model available
            return ModelConfiguration(
                model_id="google/gemini-2.5-flash",  # 199 tokens/sec - fastest
                max_tokens=1000,
                temperature=0.1,  # OPTIMIZATION: Reduced for faster response
                timeout_seconds=8,  # OPTIMIZATION: Reduced from 12s
                parallel_batch_size=8,  # OPTIMIZATION: Doubled for maximum speed
            )

    def _select_balanced_model(
        self, task_type: TaskType, complexity_score: float, current_confidence: float
    ) -> ModelConfiguration:
        """Standard mode with cost-effectiveness focus."""
        # If confidence is already high, use fastest models for validation
        if current_confidence > 0.7:
            return ModelConfiguration(
                model_id="google/gemini-2.5-flash",  # 199 tokens/sec - fastest validation
                max_tokens=1500,
                temperature=0.25,
                timeout_seconds=20,  # Reduced for fastest model
                parallel_batch_size=4,  # Increased for speed
            )

        # Standard balanced approach - prioritize speed-optimized models
        if task_type in [TaskType.DEEP_RESEARCH, TaskType.RESULT_SYNTHESIS]:
            return ModelConfiguration(
                model_id="openai/gpt-4o-mini",  # Speed + quality balance for research
                max_tokens=2000,
                temperature=0.3,
                timeout_seconds=25,  # Reduced for faster model
                parallel_batch_size=3,  # Increased for speed
            )
        else:
            return ModelConfiguration(
                model_id="google/gemini-2.5-flash",  # Fastest for general tasks
                max_tokens=1500,
                temperature=0.25,
                timeout_seconds=20,  # Reduced for fastest model
                parallel_batch_size=4,  # Increased for speed
            )

    def _select_optimal_model(
        self, task_type: TaskType, complexity_score: float, confidence_threshold: float
    ) -> ModelConfiguration:
        """Comprehensive mode for complex analysis."""
        # Use premium models for the most complex tasks when time allows
        if complexity_score > 0.8 and task_type == TaskType.DEEP_RESEARCH:
            return ModelConfiguration(
                model_id="google/gemini-2.5-pro",
                max_tokens=3000,
                temperature=0.3,
                timeout_seconds=45,
                parallel_batch_size=1,  # Deep thinking models work better individually
            )

        # High-quality cost-effective models for standard comprehensive analysis
        return ModelConfiguration(
            model_id="anthropic/claude-sonnet-4",
            max_tokens=2500,
            temperature=0.3,
            timeout_seconds=40,
            parallel_batch_size=2,
        )

    def calculate_task_complexity(
        self, content: str, task_type: TaskType, focus_areas: list[str] | None = None
    ) -> float:
        """Calculate complexity score based on content and task requirements."""
        if not content:
            return 0.3  # Default low complexity

        content_lower = content.lower()

        # Financial complexity indicators
        complexity_indicators = {
            "financial_jargon": len(
                re.findall(
                    r"\b(?:ebitda|dcf|roic?|wacc|beta|volatility|sharpe)\b",
                    content_lower,
                )
            ),
            "numerical_data": len(re.findall(r"\$?[\d,]+\.?\d*[%kmbKMB]?", content)),
            "comparative_analysis": len(
                re.findall(
                    r"\b(?:versus|compared|relative|outperform|underperform)\b",
                    content_lower,
                )
            ),
            "temporal_analysis": len(
                re.findall(r"\b(?:quarterly|q[1-4]|fy|yoy|qoq|annual)\b", content_lower)
            ),
            "market_terms": len(
                re.findall(
                    r"\b(?:bullish|bearish|catalyst|headwind|tailwind)\b", content_lower
                )
            ),
            "technical_terms": len(
                re.findall(
                    r"\b(?:support|resistance|breakout|rsi|macd|sma|ema)\b",
                    content_lower,
                )
            ),
        }

        # Calculate base complexity
        total_indicators = sum(complexity_indicators.values())
        content_length = len(content.split())
        base_complexity = min(total_indicators / max(content_length / 100, 1), 1.0)

        # Task-specific complexity adjustments
        task_multipliers = {
            TaskType.DEEP_RESEARCH: 1.4,
            TaskType.COMPLEX_REASONING: 1.6,
            TaskType.RESULT_SYNTHESIS: 1.2,
            TaskType.TECHNICAL_ANALYSIS: 1.3,
            TaskType.SENTIMENT_ANALYSIS: 0.8,
            TaskType.QUICK_ANSWER: 0.5,
        }

        # Focus area adjustments
        focus_multiplier = 1.0
        if focus_areas:
            complex_focus_areas = [
                "competitive_analysis",
                "fundamental_analysis",
                "complex_reasoning",
            ]
            if any(area in focus_areas for area in complex_focus_areas):
                focus_multiplier = 1.2

        final_complexity = (
            base_complexity * task_multipliers.get(task_type, 1.0) * focus_multiplier
        )
        return min(final_complexity, 1.0)


class ProgressiveTokenBudgeter:
    """Manages token budgets across research phases with time awareness."""

    def __init__(
        self, total_time_budget_seconds: float, confidence_target: float = 0.75
    ):
        self.total_time_budget = total_time_budget_seconds
        self.confidence_target = confidence_target
        self.phase_budgets = self._calculate_base_phase_budgets()
        self.time_started = time.time()

    def _calculate_base_phase_budgets(self) -> dict[ResearchPhase, int]:
        """Calculate base token budgets for each research phase."""
        # Allocate tokens based on typical phase requirements
        if self.total_time_budget < 30:
            # Emergency mode - minimal tokens
            return {
                ResearchPhase.SEARCH: 500,
                ResearchPhase.CONTENT_ANALYSIS: 2000,
                ResearchPhase.SYNTHESIS: 800,
                ResearchPhase.VALIDATION: 300,
            }
        elif self.total_time_budget < 60:
            # Fast mode
            return {
                ResearchPhase.SEARCH: 1000,
                ResearchPhase.CONTENT_ANALYSIS: 4000,
                ResearchPhase.SYNTHESIS: 1500,
                ResearchPhase.VALIDATION: 500,
            }
        else:
            # Standard mode
            return {
                ResearchPhase.SEARCH: 1500,
                ResearchPhase.CONTENT_ANALYSIS: 6000,
                ResearchPhase.SYNTHESIS: 2500,
                ResearchPhase.VALIDATION: 1000,
            }

    def allocate_tokens_for_phase(
        self,
        phase: ResearchPhase,
        sources_count: int,
        current_confidence: float,
        complexity_score: float = 0.5,
    ) -> TokenAllocation:
        """Allocate tokens for a research phase based on current state."""

        time_elapsed = time.time() - self.time_started
        time_remaining = max(0, self.total_time_budget - time_elapsed)

        base_budget = self.phase_budgets[phase]

        # Confidence-based scaling
        if current_confidence > self.confidence_target:
            # High confidence - focus on validation with fewer tokens
            confidence_multiplier = 0.7
        elif current_confidence < 0.4:
            # Low confidence - increase token usage if time allows
            confidence_multiplier = 1.3 if time_remaining > 30 else 0.9
        else:
            confidence_multiplier = 1.0

        # Time pressure scaling
        time_multiplier = self._calculate_time_multiplier(time_remaining)

        # Complexity scaling
        complexity_multiplier = 0.8 + (complexity_score * 0.4)  # Range: 0.8 to 1.2

        # Source count scaling (diminishing returns)
        if sources_count > 0:
            source_multiplier = min(1.0 + (sources_count - 3) * 0.05, 1.3)
        else:
            source_multiplier = 1.0

        # Calculate final budget
        final_budget = int(
            base_budget
            * confidence_multiplier
            * time_multiplier
            * complexity_multiplier
            * source_multiplier
        )

        # Calculate timeout based on available time and token budget
        base_timeout = min(time_remaining * 0.8, 45)  # Max 45 seconds per phase
        adjusted_timeout = base_timeout * (final_budget / base_budget) ** 0.5

        return TokenAllocation(
            input_tokens=min(int(final_budget * 0.75), 15000),  # Cap input tokens
            output_tokens=min(int(final_budget * 0.25), 3000),  # Cap output tokens
            per_source_tokens=final_budget // max(sources_count, 1)
            if sources_count > 0
            else final_budget,
            emergency_reserve=200,  # Always keep emergency reserve
            timeout_seconds=max(adjusted_timeout, 5),  # Minimum 5 seconds
        )

    def get_next_allocation(
        self,
        sources_remaining: int,
        current_confidence: float,
        time_elapsed_seconds: float,
    ) -> dict[str, Any]:
        """Get the next token allocation for processing sources."""
        time_remaining = max(0, self.total_time_budget - time_elapsed_seconds)

        # Determine priority based on confidence and time pressure
        if current_confidence < 0.4 and time_remaining > 30:
            priority = "high"
        elif current_confidence < 0.6 and time_remaining > 15:
            priority = "medium"
        else:
            priority = "low"

        # Calculate time budget per remaining source
        if sources_remaining > 0:
            time_per_source = time_remaining / sources_remaining
        else:
            time_per_source = 0

        # Calculate token budget
        base_tokens = self.phase_budgets.get(ResearchPhase.CONTENT_ANALYSIS, 2000)

        if priority == "high":
            max_tokens = min(int(base_tokens * 1.2), 4000)
        elif priority == "medium":
            max_tokens = base_tokens
        else:
            max_tokens = int(base_tokens * 0.8)

        return {
            "time_budget": min(time_per_source, 30.0),  # Cap at 30 seconds
            "max_tokens": max_tokens,
            "priority": priority,
            "sources_remaining": sources_remaining,
        }

    def _calculate_time_multiplier(self, time_remaining: float) -> float:
        """Scale token budget based on time pressure."""
        if time_remaining < 5:
            return 0.2  # Extreme emergency mode
        elif time_remaining < 15:
            return 0.4  # Emergency mode
        elif time_remaining < 30:
            return 0.7  # Time-constrained
        elif time_remaining < 60:
            return 0.9  # Slightly reduced
        else:
            return 1.0  # Full budget available


class ParallelLLMProcessor:
    """Handles parallel LLM operations with intelligent load balancing."""

    def __init__(
        self,
        openrouter_provider: OpenRouterProvider,
        max_concurrent: int = 5,  # OPTIMIZATION: Increased from 3
    ):
        self.provider = openrouter_provider
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.BoundedSemaphore(
            max_concurrent
        )  # OPTIMIZATION: Use BoundedSemaphore
        self.model_selector = AdaptiveModelSelector(openrouter_provider)
        self.orchestration_logger = get_orchestration_logger("ParallelLLMProcessor")
        # OPTIMIZATION: Track active requests for better coordination
        self._active_requests = 0
        self._request_lock = asyncio.Lock()

    @log_method_call(component="ParallelLLMProcessor", include_timing=True)
    async def parallel_content_analysis(
        self,
        sources: list[dict],
        analysis_type: str,
        persona: str,
        time_budget_seconds: float,
        current_confidence: float = 0.0,
    ) -> list[dict]:
        """Analyze multiple sources in parallel with adaptive optimization."""

        if not sources:
            return []

        self.orchestration_logger.set_request_context(
            analysis_type=analysis_type,
            source_count=len(sources),
            time_budget=time_budget_seconds,
        )

        # Calculate complexity for all sources
        combined_content = "\n".join(
            [source.get("content", "")[:1000] for source in sources[:5]]
        )
        overall_complexity = self.model_selector.calculate_task_complexity(
            combined_content,
            TaskType.SENTIMENT_ANALYSIS
            if analysis_type == "sentiment"
            else TaskType.MARKET_ANALYSIS,
        )

        # Determine optimal batching strategy
        model_config = self.model_selector.select_model_for_time_budget(
            task_type=TaskType.SENTIMENT_ANALYSIS
            if analysis_type == "sentiment"
            else TaskType.MARKET_ANALYSIS,
            time_remaining_seconds=time_budget_seconds,
            complexity_score=overall_complexity,
            content_size_tokens=len(combined_content) // 4,
            current_confidence=current_confidence,
        )

        # Create batches based on model configuration
        batches = self._create_optimal_batches(
            sources, model_config.parallel_batch_size
        )

        self.orchestration_logger.info(
            "🔄 PARALLEL_ANALYSIS_START",
            total_sources=len(sources),
            batch_count=len(batches),
        )

        # OPTIMIZATION: Process batches using create_task for immediate parallelism
        running_tasks = []
        for i, batch in enumerate(batches):
            # Create task immediately without awaiting
            task_future = asyncio.create_task(
                self._analyze_source_batch(
                    batch=batch,
                    batch_id=i,
                    analysis_type=analysis_type,
                    persona=persona,
                    model_config=model_config,
                    overall_complexity=overall_complexity,
                )
            )
            running_tasks.append((i, task_future))  # Track batch ID with future

            # OPTIMIZATION: Minimal stagger to prevent API overload
            if i < len(batches) - 1:  # Don't delay after last batch
                await asyncio.sleep(0.01)  # 10ms micro-delay

        # OPTIMIZATION: Use as_completed for progressive result handling
        batch_results = [None] * len(batches)  # Pre-allocate results list
        timeout_at = time.time() + (time_budget_seconds * 0.9)

        try:
            for batch_id, task_future in running_tasks:
                remaining_time = timeout_at - time.time()
                if remaining_time <= 0:
                    raise TimeoutError()

                try:
                    result = await asyncio.wait_for(task_future, timeout=remaining_time)
                    batch_results[batch_id] = result
                except Exception as e:
                    batch_results[batch_id] = e
        except TimeoutError:
            self.orchestration_logger.warning(
                "⏰ PARALLEL_ANALYSIS_TIMEOUT", timeout=time_budget_seconds
            )
            return self._create_fallback_results(sources)

        # Flatten and process results
        final_results = []
        successful_batches = 0
        for i, batch_result in enumerate(batch_results):
            if isinstance(batch_result, Exception):
                self.orchestration_logger.warning(
                    "⚠️ BATCH_FAILED", batch_id=i, error=str(batch_result)
                )
                # Add fallback results for failed batch
                final_results.extend(self._create_fallback_results(batches[i]))
            else:
                final_results.extend(batch_result)
                successful_batches += 1

        self.orchestration_logger.info(
            "✅ PARALLEL_ANALYSIS_COMPLETE",
            successful_batches=successful_batches,
            results_count=len(final_results),
        )

        return final_results

    def _create_optimal_batches(
        self, sources: list[dict], batch_size: int
    ) -> list[list[dict]]:
        """Create optimal batches for parallel processing."""
        if batch_size <= 1:
            return [[source] for source in sources]

        batches = []
        for i in range(0, len(sources), batch_size):
            batch = sources[i : i + batch_size]
            batches.append(batch)

        return batches

    async def _analyze_source_batch(
        self,
        batch: list[dict],
        batch_id: int,
        analysis_type: str,
        persona: str,
        model_config: ModelConfiguration,
        overall_complexity: float,
    ) -> list[dict]:
        """Analyze a batch of sources with optimized LLM call."""

        # OPTIMIZATION: Track active requests for better coordination
        async with self._request_lock:
            self._active_requests += 1

        try:
            # OPTIMIZATION: Acquire semaphore without blocking other task creation
            await self.semaphore.acquire()
            try:
                # Create batch analysis prompt
                batch_prompt = self._create_batch_analysis_prompt(
                    batch, analysis_type, persona, model_config.max_tokens
                )

                # Get LLM instance
                llm = self.provider.get_llm(
                    model_override=model_config.model_id,
                    temperature=model_config.temperature,
                    max_tokens=model_config.max_tokens,
                )

                # Execute with timeout
                start_time = time.time()
                result = await asyncio.wait_for(
                    llm.ainvoke(
                        [
                            SystemMessage(
                                content="You are a financial analyst. Provide structured, concise analysis."
                            ),
                            HumanMessage(content=batch_prompt),
                        ]
                    ),
                    timeout=model_config.timeout_seconds,
                )

                execution_time = time.time() - start_time

                # Parse batch results
                parsed_results = self._parse_batch_analysis_result(
                    result.content, batch
                )

                self.orchestration_logger.debug(
                    "✨ BATCH_SUCCESS",
                    batch_id=batch_id,
                    duration=f"{execution_time:.2f}s",
                )

                return parsed_results

            except TimeoutError:
                self.orchestration_logger.warning(
                    "⏰ BATCH_TIMEOUT",
                    batch_id=batch_id,
                    timeout=model_config.timeout_seconds,
                )
                return self._create_fallback_results(batch)
            except Exception as e:
                self.orchestration_logger.error(
                    "💥 BATCH_ERROR", batch_id=batch_id, error=str(e)
                )
                return self._create_fallback_results(batch)
            finally:
                # OPTIMIZATION: Always release semaphore
                self.semaphore.release()
        finally:
            # OPTIMIZATION: Track active requests
            async with self._request_lock:
                self._active_requests -= 1

    def _create_batch_analysis_prompt(
        self, batch: list[dict], analysis_type: str, persona: str, max_tokens: int
    ) -> str:
        """Create optimized prompt for batch analysis."""

        # Determine prompt style based on token budget
        if max_tokens < 800:
            style = "ultra_concise"
        elif max_tokens < 1500:
            style = "concise"
        else:
            style = "detailed"

        prompt_templates = {
            "ultra_concise": """URGENT BATCH ANALYSIS - {analysis_type} for {persona} investor.

Analyze {source_count} sources. For EACH source, provide:
SOURCE_N: SENTIMENT:Bull/Bear/Neutral|CONFIDENCE:0-1|INSIGHT:one key point|RISK:main risk

{sources}

Keep total response under 500 words.""",
            "concise": """BATCH ANALYSIS - {analysis_type} for {persona} investor perspective.

Analyze these {source_count} sources. For each source provide:
- Sentiment: Bull/Bear/Neutral + confidence (0-1)
- Key insight (1 sentence)
- Main risk (1 sentence)
- Relevance score (0-1)

{sources}

Format consistently. Target ~100 words per source.""",
            "detailed": """Comprehensive {analysis_type} analysis for {persona} investor.

Analyze these {source_count} sources with structured output for each:

{sources}

For each source provide:
1. Sentiment (direction, confidence 0-1, brief reasoning)
2. Key insights (2-3 main points)
3. Risk factors (1-2 key risks)
4. Opportunities (1-2 opportunities if any)
5. Credibility assessment (0-1 score)
6. Relevance score (0-1)

Maintain {persona} investor perspective throughout.""",
        }

        # Format sources for prompt
        sources_text = ""
        for i, source in enumerate(batch, 1):
            content = source.get("content", "")[:1500]  # Limit content length
            title = source.get("title", f"Source {i}")
            sources_text += f"\nSOURCE {i} - {title}:\n{content}\n{'---' * 20}\n"

        template = prompt_templates[style]
        return template.format(
            analysis_type=analysis_type,
            persona=persona,
            source_count=len(batch),
            sources=sources_text.strip(),
        )

    def _parse_batch_analysis_result(
        self, result_content: str, batch: list[dict]
    ) -> list[dict]:
        """Parse LLM batch analysis result into structured data."""

        results = []

        # Try structured parsing first
        source_sections = re.split(r"\n(?:SOURCE\s+\d+|---+)", result_content)

        if len(source_sections) >= len(batch):
            # Structured parsing successful
            for _i, (source, section) in enumerate(
                zip(batch, source_sections[1 : len(batch) + 1], strict=False)
            ):
                parsed = self._parse_source_analysis(section, source)
                results.append(parsed)
        else:
            # Fallback to simple parsing
            for i, source in enumerate(batch):
                fallback_analysis = self._create_simple_fallback_analysis(
                    result_content, source, i
                )
                results.append(fallback_analysis)

        return results

    def _parse_source_analysis(self, analysis_text: str, source: dict) -> dict:
        """Parse analysis text for a single source."""

        # Extract sentiment
        sentiment_match = re.search(
            r"sentiment:?\s*(\w+)[,\s]*(?:confidence:?\s*([\d.]+))?",
            analysis_text.lower(),
        )
        if sentiment_match:
            direction = sentiment_match.group(1).lower()
            confidence = float(sentiment_match.group(2) or 0.5)

            # Map common sentiment terms
            if direction in ["bull", "bullish", "positive"]:
                direction = "bullish"
            elif direction in ["bear", "bearish", "negative"]:
                direction = "bearish"
            else:
                direction = "neutral"
        else:
            direction = "neutral"
            confidence = 0.5

        # Extract other information
        insights = self._extract_insights(analysis_text)
        risks = self._extract_risks(analysis_text)
        opportunities = self._extract_opportunities(analysis_text)

        # Extract scores
        relevance_match = re.search(r"relevance:?\s*([\d.]+)", analysis_text.lower())
        relevance_score = float(relevance_match.group(1)) if relevance_match else 0.6

        credibility_match = re.search(
            r"credibility:?\s*([\d.]+)", analysis_text.lower()
        )
        credibility_score = (
            float(credibility_match.group(1)) if credibility_match else 0.7
        )

        return {
            **source,
            "analysis": {
                "insights": insights,
                "sentiment": {"direction": direction, "confidence": confidence},
                "risk_factors": risks,
                "opportunities": opportunities,
                "credibility_score": credibility_score,
                "relevance_score": relevance_score,
                "analysis_timestamp": datetime.now(),
                "batch_processed": True,
            },
        }

    def _extract_insights(self, text: str) -> list[str]:
        """Extract insights from analysis text."""
        insights = []

        # Look for insight patterns
        insight_patterns = [
            r"insight:?\s*([^.\n]+)",
            r"key point:?\s*([^.\n]+)",
            r"main finding:?\s*([^.\n]+)",
        ]

        for pattern in insight_patterns:
            matches = re.findall(pattern, text, re.IGNORECASE)
            insights.extend([m.strip() for m in matches if m.strip()])

        # If no structured insights found, extract bullet points
        if not insights:
            bullet_matches = re.findall(r"[•\-\*]\s*([^.\n]+)", text)
            insights.extend([m.strip() for m in bullet_matches if m.strip()][:3])

        return insights[:5]  # Limit to 5 insights

    def _extract_risks(self, text: str) -> list[str]:
        """Extract risk factors from analysis text."""
        risk_patterns = [
            r"risk:?\s*([^.\n]+)",
            r"concern:?\s*([^.\n]+)",
            r"headwind:?\s*([^.\n]+)",
        ]

        risks = []
        for pattern in risk_patterns:
            matches = re.findall(pattern, text, re.IGNORECASE)
            risks.extend([m.strip() for m in matches if m.strip()])

        return risks[:3]

    def _extract_opportunities(self, text: str) -> list[str]:
        """Extract opportunities from analysis text."""
        opp_patterns = [
            r"opportunit(?:y|ies):?\s*([^.\n]+)",
            r"catalyst:?\s*([^.\n]+)",
            r"tailwind:?\s*([^.\n]+)",
        ]

        opportunities = []
        for pattern in opp_patterns:
            matches = re.findall(pattern, text, re.IGNORECASE)
            opportunities.extend([m.strip() for m in matches if m.strip()])

        return opportunities[:3]

    def _create_simple_fallback_analysis(
        self, full_analysis: str, source: dict, index: int
    ) -> dict:
        """Create simple fallback analysis when parsing fails."""

        # Basic sentiment analysis from text
        analysis_lower = full_analysis.lower()

        positive_words = ["positive", "bullish", "strong", "growth", "opportunity"]
        negative_words = ["negative", "bearish", "weak", "decline", "risk"]

        pos_count = sum(1 for word in positive_words if word in analysis_lower)
        neg_count = sum(1 for word in negative_words if word in analysis_lower)

        if pos_count > neg_count:
            sentiment = "bullish"
            confidence = 0.6
        elif neg_count > pos_count:
            sentiment = "bearish"
            confidence = 0.6
        else:
            sentiment = "neutral"
            confidence = 0.5

        return {
            **source,
            "analysis": {
                "insights": [f"Analysis based on source content (index {index})"],
                "sentiment": {"direction": sentiment, "confidence": confidence},
                "risk_factors": ["Unable to extract specific risks"],
                "opportunities": ["Unable to extract specific opportunities"],
                "credibility_score": 0.5,
                "relevance_score": 0.5,
                "analysis_timestamp": datetime.now(),
                "fallback_used": True,
                "batch_processed": True,
            },
        }

    def _create_fallback_results(self, sources: list[dict]) -> list[dict]:
        """Create fallback results when batch processing fails."""
        results = []
        for source in sources:
            fallback_result = {
                **source,
                "analysis": {
                    "insights": ["Analysis failed - using fallback"],
                    "sentiment": {"direction": "neutral", "confidence": 0.3},
                    "risk_factors": ["Analysis timeout - unable to assess risks"],
                    "opportunities": [],
                    "credibility_score": 0.5,
                    "relevance_score": 0.5,
                    "analysis_timestamp": datetime.now(),
                    "fallback_used": True,
                    "batch_timeout": True,
                },
            }
            results.append(fallback_result)
        return results


class OptimizedPromptEngine:
    """Creates optimized prompts for different time constraints and confidence levels."""

    def __init__(self):
        self.prompt_cache = {}  # Cache for generated prompts

        self.prompt_templates = {
            "emergency": {
                "content_analysis": """URGENT: Quick 3-point analysis of financial content for {persona} investor.

Content: {content}

Provide ONLY:
1. SENTIMENT: Bull/Bear/Neutral + confidence (0-1)
2. KEY_RISK: Primary risk factor
3. KEY_OPPORTUNITY: Main opportunity (if any)

Format: SENTIMENT:Bull|0.8 KEY_RISK:Market volatility KEY_OPPORTUNITY:Earnings growth
Max 50 words total. No explanations.""",
                "synthesis": """URGENT: 2-sentence summary from {source_count} sources for {persona} investor.

Key findings: {key_points}

Provide: 1) Overall sentiment direction 2) Primary investment implication
Max 40 words total.""",
            },
            "fast": {
                "content_analysis": """Quick financial analysis for {persona} investor - 5 points max.

Content: {content}

Provide concisely:
• Sentiment: Bull/Bear/Neutral (confidence 0-1)
• Key insight (1 sentence)
• Main risk (1 sentence)
• Main opportunity (1 sentence)
• Relevance score (0-1)

Target: Under 150 words total.""",
                "synthesis": """Synthesize research findings for {persona} investor.

Sources: {source_count} | Key insights: {insights}

4-part summary:
1. Overall sentiment + confidence
2. Top 2 opportunities
3. Top 2 risks
4. Recommended action

Limit: 200 words max.""",
            },
            "standard": {
                "content_analysis": """Financial content analysis for {persona} investor.

Content: {content}
Focus areas: {focus_areas}

Structured analysis:
- Sentiment (direction, confidence 0-1, brief reasoning)
- Key insights (3-5 bullet points)
- Risk factors (2-3 main risks)
- Opportunities (2-3 opportunities)
- Credibility assessment (0-1)
- Relevance score (0-1)

Target: 300-500 words.""",
                "synthesis": """Comprehensive research synthesis for {persona} investor.

Research Summary:
- Sources analyzed: {source_count}
- Key insights: {insights}
- Time horizon: {time_horizon}

Provide detailed analysis:
1. Executive Summary (2-3 sentences)
2. Key Findings (5-7 bullet points)
3. Investment Implications
4. Risk Assessment
5. Recommended Actions
6. Confidence Level + reasoning

Tailor specifically for {persona} investment characteristics.""",
            },
        }

    def get_optimized_prompt(
        self,
        prompt_type: str,
        time_remaining: float,
        confidence_level: float,
        **context,
    ) -> str:
        """Generate optimized prompt based on time constraints and confidence."""

        # Create cache key
        cache_key = f"{prompt_type}_{time_remaining:.0f}_{confidence_level:.1f}_{hash(str(sorted(context.items())))}"

        if cache_key in self.prompt_cache:
            return self.prompt_cache[cache_key]

        # Select template based on time pressure
        if time_remaining < 15:
            template_category = "emergency"
        elif time_remaining < 45:
            template_category = "fast"
        else:
            template_category = "standard"

        template = self.prompt_templates[template_category].get(prompt_type)

        if not template:
            # Fallback to fast template
            template = self.prompt_templates["fast"].get(
                prompt_type, "Analyze the content quickly and provide key insights."
            )

        # Add confidence-based instructions
        confidence_instructions = ""
        if confidence_level > 0.7:
            confidence_instructions = "\n\nNOTE: High confidence already achieved. Focus on validation and contradictory evidence."
        elif confidence_level < 0.4:
            confidence_instructions = "\n\nNOTE: Low confidence. Look for strong supporting evidence to build confidence."

        # Format template with context
        formatted_prompt = template.format(**context) + confidence_instructions

        # Cache the result
        self.prompt_cache[cache_key] = formatted_prompt

        return formatted_prompt

    def create_time_optimized_synthesis_prompt(
        self,
        sources: list[dict],
        persona: str,
        time_remaining: float,
        current_confidence: float,
    ) -> str:
        """Create synthesis prompt optimized for available time."""

        # Extract key information from sources
        insights = []
        sentiments = []
        for source in sources:
            analysis = source.get("analysis", {})
            insights.extend(analysis.get("insights", [])[:2])  # Limit per source
            sentiment = analysis.get("sentiment", {})
            if sentiment:
                sentiments.append(sentiment.get("direction", "neutral"))

        # Prepare context
        context = {
            "persona": persona,
            "source_count": len(sources),
            "insights": "; ".join(insights[:8]),  # Top 8 insights
            "key_points": "; ".join(insights[:8]),  # For backward compatibility
            "time_horizon": "short-term" if time_remaining < 30 else "medium-term",
        }

        return self.get_optimized_prompt(
            "synthesis", time_remaining, current_confidence, **context
        )


class ConfidenceTracker:
    """Tracks research confidence and triggers early termination when appropriate."""

    def __init__(
        self,
        target_confidence: float = 0.75,
        min_sources: int = 3,
        max_sources: int = 15,
    ):
        self.target_confidence = target_confidence
        self.min_sources = min_sources
        self.max_sources = max_sources
        self.confidence_history = []
        self.evidence_history = []
        self.source_count = 0
        self.sources_analyzed = 0  # For backward compatibility
        self.last_significant_improvement = 0
        self.sentiment_votes = {"bullish": 0, "bearish": 0, "neutral": 0}

    def update_confidence(
        self,
        new_evidence: dict,
        source_credibility: float | None = None,
        credibility_score: float | None = None,
    ) -> dict[str, Any]:
        """Update confidence based on new evidence and return continuation decision."""

        # Handle both parameter names for backward compatibility
        if source_credibility is None and credibility_score is not None:
            source_credibility = credibility_score
        elif source_credibility is None and credibility_score is None:
            source_credibility = 0.5  # Default value

        self.source_count += 1
        self.sources_analyzed += 1  # Keep both for compatibility

        # Store evidence
        self.evidence_history.append(
            {
                "evidence": new_evidence,
                "credibility": source_credibility,
                "timestamp": datetime.now(),
            }
        )

        # Update sentiment voting
        sentiment = new_evidence.get("sentiment", {})
        direction = sentiment.get("direction", "neutral")
        confidence = sentiment.get("confidence", 0.5)

        # Weight vote by source credibility and sentiment confidence
        vote_weight = source_credibility * confidence
        self.sentiment_votes[direction] += vote_weight

        # Calculate evidence strength
        evidence_strength = self._calculate_evidence_strength(
            new_evidence, source_credibility
        )

        # Update confidence using Bayesian-style updating
        current_confidence = self._update_bayesian_confidence(evidence_strength)
        self.confidence_history.append(current_confidence)

        # Check for significant improvement
        if len(self.confidence_history) >= 2:
            improvement = current_confidence - self.confidence_history[-2]
            if improvement > 0.1:  # 10% improvement
                self.last_significant_improvement = self.source_count

        # Make continuation decision
        should_continue = self._should_continue_research(current_confidence)

        return {
            "current_confidence": current_confidence,
            "should_continue": should_continue,
            "sources_processed": self.source_count,
            "sources_analyzed": self.source_count,  # For backward compatibility
            "confidence_trend": self._calculate_confidence_trend(),
            "early_termination_reason": None
            if should_continue
            else self._get_termination_reason(current_confidence),
            "sentiment_consensus": self._calculate_sentiment_consensus(),
        }

    def _calculate_evidence_strength(self, evidence: dict, credibility: float) -> float:
        """Calculate the strength of new evidence."""

        # Base strength from sentiment confidence
        sentiment = evidence.get("sentiment", {})
        sentiment_confidence = sentiment.get("confidence", 0.5)

        # Adjust for source credibility
        credibility_adjusted = sentiment_confidence * credibility

        # Factor in evidence richness
        insights_count = len(evidence.get("insights", []))
        risk_factors_count = len(evidence.get("risk_factors", []))
        opportunities_count = len(evidence.get("opportunities", []))

        # Evidence richness score (0-1)
        evidence_richness = min(
            (insights_count + risk_factors_count + opportunities_count) / 12, 1.0
        )

        # Relevance factor
        relevance_score = evidence.get("relevance_score", 0.5)

        # Final evidence strength calculation
        final_strength = credibility_adjusted * (
            0.5 + 0.3 * evidence_richness + 0.2 * relevance_score
        )

        return min(final_strength, 1.0)

    def _update_bayesian_confidence(self, evidence_strength: float) -> float:
        """Update confidence using Bayesian approach."""

        if not self.confidence_history:
            # First evidence - base confidence
            return evidence_strength

        # Current prior
        prior = self.confidence_history[-1]

        # Bayesian update with evidence strength as likelihood
        # Simple approximation: weighted average with decay
        decay_factor = 0.9 ** (self.source_count - 1)  # Diminishing returns

        updated = prior * decay_factor + evidence_strength * (1 - decay_factor)

        # Ensure within bounds
        return max(0.1, min(updated, 0.95))

    def _should_continue_research(self, current_confidence: float) -> bool:
        """Determine if research should continue based on multiple factors."""

        # Always process minimum sources
        if self.source_count < self.min_sources:
            return True

        # Stop at maximum sources
        if self.source_count >= self.max_sources:
            return False

        # High confidence reached
        if current_confidence >= self.target_confidence:
            return False

        # Check for diminishing returns
        if self.source_count - self.last_significant_improvement > 4:
            # No significant improvement in last 4 sources
            return False

        # Check sentiment consensus
        consensus_score = self._calculate_sentiment_consensus()
        if consensus_score > 0.8 and self.source_count >= 5:
            # Strong consensus with adequate sample
            return False

        # Check confidence plateau
        if len(self.confidence_history) >= 3:
            recent_change = abs(current_confidence - self.confidence_history[-3])
            if recent_change < 0.03:  # Less than 3% change in last 3 sources
                return False

        return True

    def _calculate_confidence_trend(self) -> str:
        """Calculate the trend in confidence over recent sources."""

        if len(self.confidence_history) < 3:
            return "insufficient_data"

        recent = self.confidence_history[-3:]

        # Calculate trend
        if recent[-1] > recent[0] + 0.05:
            return "increasing"
        elif recent[-1] < recent[0] - 0.05:
            return "decreasing"
        else:
            return "stable"

    def _calculate_sentiment_consensus(self) -> float:
        """Calculate how much sources agree on sentiment."""

        total_votes = sum(self.sentiment_votes.values())
        if total_votes == 0:
            return 0.0

        # Calculate consensus as max vote share
        max_votes = max(self.sentiment_votes.values())
        consensus = max_votes / total_votes

        return consensus

    def _get_termination_reason(self, current_confidence: float) -> str:
        """Get reason for early termination."""

        if current_confidence >= self.target_confidence:
            return "target_confidence_reached"
        elif self.source_count >= self.max_sources:
            return "max_sources_reached"
        elif self._calculate_sentiment_consensus() > 0.8:
            return "strong_consensus"
        elif self.source_count - self.last_significant_improvement > 4:
            return "diminishing_returns"
        else:
            return "confidence_plateau"


class IntelligentContentFilter:
    """Pre-filters and prioritizes content to reduce LLM processing overhead."""

    def __init__(self):
        self.relevance_keywords = {
            "fundamental": {
                "high": [
                    "earnings",
                    "revenue",
                    "profit",
                    "ebitda",
                    "cash flow",
                    "debt",
                    "valuation",
                ],
                "medium": [
                    "balance sheet",
                    "income statement",
                    "financial",
                    "quarterly",
                    "annual",
                ],
                "context": ["company", "business", "financial results", "guidance"],
            },
            "technical": {
                "high": [
                    "price",
                    "chart",
                    "trend",
                    "support",
                    "resistance",
                    "breakout",
                ],
                "medium": ["volume", "rsi", "macd", "moving average", "pattern"],
                "context": ["technical analysis", "trading", "momentum"],
            },
            "sentiment": {
                "high": ["rating", "upgrade", "downgrade", "buy", "sell", "hold"],
                "medium": ["analyst", "recommendation", "target price", "outlook"],
                "context": ["opinion", "sentiment", "market mood"],
            },
            "competitive": {
                "high": [
                    "market share",
                    "competitor",
                    "competitive advantage",
                    "industry",
                ],
                "medium": ["peer", "comparison", "market position", "sector"],
                "context": ["competitive landscape", "industry analysis"],
            },
        }

        self.domain_credibility_scores = {
            "reuters.com": 0.95,
            "bloomberg.com": 0.95,
            "wsj.com": 0.90,
            "ft.com": 0.90,
            "marketwatch.com": 0.85,
            "cnbc.com": 0.80,
            "yahoo.com": 0.75,
            "seekingalpha.com": 0.80,
            "fool.com": 0.70,
            "investing.com": 0.75,
        }

    async def filter_and_prioritize_sources(
        self,
        sources: list[dict],
        research_focus: str,
        time_budget: float,
        target_source_count: int | None = None,
        current_confidence: float = 0.0,
    ) -> list[dict]:
        """Filter and prioritize sources based on relevance, quality, and time constraints."""

        if not sources:
            return []

        # Determine target count based on time budget and confidence
        if target_source_count is None:
            target_source_count = self._calculate_optimal_source_count(
                time_budget, current_confidence, len(sources)
            )

        # Quick relevance scoring without LLM
        scored_sources = []
        for source in sources:
            relevance_score = self._calculate_relevance_score(source, research_focus)
            credibility_score = self._get_source_credibility(source)
            recency_score = self._calculate_recency_score(source.get("published_date"))

            # Combined score with weights
            combined_score = (
                relevance_score * 0.5 + credibility_score * 0.3 + recency_score * 0.2
            )

            if combined_score > 0.3:  # Relevance threshold
                scored_sources.append((combined_score, source))

        # Sort by combined score
        scored_sources.sort(key=lambda x: x[0], reverse=True)

        # Select diverse sources
        selected_sources = self._select_diverse_sources(
            scored_sources, target_source_count, research_focus
        )

        # Pre-process content for faster LLM processing
        processed_sources = []
        for score, source in selected_sources:
            processed_source = self._preprocess_content(
                source, research_focus, time_budget
            )
            processed_source["relevance_score"] = score
            processed_sources.append(processed_source)

        return processed_sources

    def _calculate_optimal_source_count(
        self, time_budget: float, current_confidence: float, available_sources: int
    ) -> int:
        """Calculate optimal number of sources to process given constraints."""

        # Base count from time budget
        if time_budget < 20:
            base_count = 3
        elif time_budget < 40:
            base_count = 6
        elif time_budget < 80:
            base_count = 10
        else:
            base_count = 15

        # Adjust for confidence level
        if current_confidence > 0.7:
            # High confidence - fewer sources needed
            confidence_multiplier = 0.7
        elif current_confidence < 0.4:
            # Low confidence - more sources helpful
            confidence_multiplier = 1.2
        else:
            confidence_multiplier = 1.0

        # Final calculation
        target_count = int(base_count * confidence_multiplier)

        # Ensure we don't exceed available sources
        return min(target_count, available_sources, 20)  # Cap at 20

    def _calculate_relevance_score(self, source: dict, research_focus: str) -> float:
        """Calculate relevance score using keyword matching and heuristics."""

        content = source.get("content", "").lower()
        title = source.get("title", "").lower()

        if not content and not title:
            return 0.0

        focus_keywords = self.relevance_keywords.get(research_focus, {})

        # High-value keywords
        high_keywords = focus_keywords.get("high", [])
        high_score = sum(1 for keyword in high_keywords if keyword in content) / max(
            len(high_keywords), 1
        )

        # Medium-value keywords
        medium_keywords = focus_keywords.get("medium", [])
        medium_score = sum(
            1 for keyword in medium_keywords if keyword in content
        ) / max(len(medium_keywords), 1)

        # Context keywords
        context_keywords = focus_keywords.get("context", [])
        context_score = sum(
            1 for keyword in context_keywords if keyword in content
        ) / max(len(context_keywords), 1)

        # Title relevance (titles are more focused)
        title_high_score = sum(
            1 for keyword in high_keywords if keyword in title
        ) / max(len(high_keywords), 1)

        # Combine scores with weights
        relevance_score = (
            high_score * 0.4
            + medium_score * 0.25
            + context_score * 0.15
            + title_high_score * 0.2
        )

        # Boost for very relevant titles
        if any(keyword in title for keyword in high_keywords):
            relevance_score *= 1.2

        return min(relevance_score, 1.0)

    def _get_source_credibility(self, source: dict) -> float:
        """Calculate source credibility based on domain and other factors."""

        url = source.get("url", "").lower()

        # Domain-based credibility
        domain_score = 0.5  # Default
        for domain, score in self.domain_credibility_scores.items():
            if domain in url:
                domain_score = score
                break

        # Boost for specific high-quality indicators
        if any(indicator in url for indicator in [".gov", ".edu", "sec.gov"]):
            domain_score = min(domain_score + 0.2, 1.0)

        # Penalty for low-quality indicators
        if any(indicator in url for indicator in ["blog", "forum", "reddit"]):
            domain_score *= 0.8

        return domain_score

    def _calculate_recency_score(self, published_date: str) -> float:
        """Calculate recency score based on publication date."""

        if not published_date:
            return 0.5  # Default for unknown dates

        try:
            # Parse date (handle various formats)
            if "T" in published_date:
                pub_date = datetime.fromisoformat(published_date.replace("Z", "+00:00"))
            else:
                pub_date = datetime.strptime(published_date, "%Y-%m-%d")

            # Calculate days old
            days_old = (datetime.now() - pub_date.replace(tzinfo=None)).days

            # Scoring based on age
            if days_old <= 1:
                return 1.0  # Very recent
            elif days_old <= 7:
                return 0.9  # Recent
            elif days_old <= 30:
                return 0.7  # Fairly recent
            elif days_old <= 90:
                return 0.5  # Moderately old
            else:
                return 0.3  # Old

        except (ValueError, TypeError):
            return 0.5  # Default for unparseable dates

    def _select_diverse_sources(
        self,
        scored_sources: list[tuple[float, dict]],
        target_count: int,
        research_focus: str,
    ) -> list[tuple[float, dict]]:
        """Select diverse sources to avoid redundancy."""

        if len(scored_sources) <= target_count:
            return scored_sources

        selected = []
        used_domains = set()

        # First pass: select high-scoring diverse sources
        for score, source in scored_sources:
            if len(selected) >= target_count:
                break

            url = source.get("url", "")
            domain = self._extract_domain(url)

            # Ensure diversity by domain (max 2 from same domain initially)
            domain_count = sum(
                1
                for _, s in selected
                if self._extract_domain(s.get("url", "")) == domain
            )

            if domain_count < 2 or len(selected) < target_count // 2:
                selected.append((score, source))
                used_domains.add(domain)

        # Second pass: fill remaining slots with best remaining sources
        remaining_needed = target_count - len(selected)
        if remaining_needed > 0:
            remaining_sources = scored_sources[len(selected) :]
            selected.extend(remaining_sources[:remaining_needed])

        return selected[:target_count]

    def _extract_domain(self, url: str) -> str:
        """Extract domain from URL."""
        try:
            if "//" in url:
                domain = url.split("//")[1].split("/")[0]
                return domain.replace("www.", "")
            return url
        except Exception:
            return url

    def _preprocess_content(
        self, source: dict, research_focus: str, time_budget: float
    ) -> dict:
        """Pre-process content to optimize for LLM analysis."""

        content = source.get("content", "")
        if not content:
            return source

        # Determine content length limit based on time budget
        if time_budget < 30:
            max_length = 800  # Emergency mode
        elif time_budget < 60:
            max_length = 1200  # Fast mode
        else:
            max_length = 2000  # Standard mode

        # If content is already short enough, return as-is
        if len(content) <= max_length:
            source_copy = source.copy()
            source_copy["original_length"] = len(content)
            source_copy["filtered"] = False
            return source_copy

        # Extract most relevant sentences/paragraphs
        sentences = re.split(r"[.!?]+", content)
        focus_keywords = self.relevance_keywords.get(research_focus, {})
        all_keywords = (
            focus_keywords.get("high", [])
            + focus_keywords.get("medium", [])
            + focus_keywords.get("context", [])
        )

        # Score sentences by keyword relevance
        scored_sentences = []
        for sentence in sentences:
            if len(sentence.strip()) < 20:  # Skip very short sentences
                continue

            sentence_lower = sentence.lower()
            keyword_count = sum(
                1 for keyword in all_keywords if keyword in sentence_lower
            )

            # Boost for financial numbers and percentages
            has_numbers = bool(re.search(r"\$?[\d,]+\.?\d*[%kmbKMB]?", sentence))
            number_boost = 0.5 if has_numbers else 0

            sentence_score = keyword_count + number_boost
            if sentence_score > 0:
                scored_sentences.append((sentence_score, sentence.strip()))

        # Sort by relevance and select top sentences
        scored_sentences.sort(key=lambda x: x[0], reverse=True)

        # Build filtered content
        filtered_content = ""
        for _score, sentence in scored_sentences:
            if len(filtered_content) + len(sentence) > max_length:
                break
            filtered_content += sentence + ". "

        # If no relevant sentences found, take first part of original content
        if not filtered_content:
            filtered_content = content[:max_length]

        # Create processed source
        source_copy = source.copy()
        source_copy["content"] = filtered_content.strip()
        source_copy["original_length"] = len(content)
        source_copy["filtered_length"] = len(filtered_content)
        source_copy["filtered"] = True
        source_copy["compression_ratio"] = len(filtered_content) / len(content)

        return source_copy


# Export main classes for integration
__all__ = [
    "AdaptiveModelSelector",
    "ProgressiveTokenBudgeter",
    "ParallelLLMProcessor",
    "OptimizedPromptEngine",
    "ConfidenceTracker",
    "IntelligentContentFilter",
    "ModelConfiguration",
    "TokenAllocation",
    "ResearchPhase",
]

```
Page 27/28FirstPrevNextLast