#
tokens: 45614/50000 7/435 files (page 19/29)
lines: off (toggle) GitHub
raw markdown copy
This is page 19 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── feature_request.md
│   │   ├── question.md
│   │   └── security_report.md
│   ├── pull_request_template.md
│   └── workflows
│       ├── claude-code-review.yml
│       └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│   ├── launch.json
│   └── settings.json
├── alembic
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       ├── 001_initial_schema.py
│       ├── 003_add_performance_indexes.py
│       ├── 006_rename_metadata_columns.py
│       ├── 008_performance_optimization_indexes.py
│       ├── 009_rename_to_supply_demand.py
│       ├── 010_self_contained_schema.py
│       ├── 011_remove_proprietary_terms.py
│       ├── 013_add_backtest_persistence_models.py
│       ├── 014_add_portfolio_models.py
│       ├── 08e3945a0c93_merge_heads.py
│       ├── 9374a5c9b679_merge_heads_for_testing.py
│       ├── abf9b9afb134_merge_multiple_heads.py
│       ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│       ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│       ├── f0696e2cac15_add_essential_performance_indexes.py
│       └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── api
│   │   └── backtesting.md
│   ├── BACKTESTING.md
│   ├── COST_BASIS_SPECIFICATION.md
│   ├── deep_research_agent.md
│   ├── exa_research_testing_strategy.md
│   ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│   ├── PORTFOLIO.md
│   ├── SETUP_SELF_CONTAINED.md
│   └── speed_testing_framework.md
├── examples
│   ├── complete_speed_validation.py
│   ├── deep_research_integration.py
│   ├── llm_optimization_example.py
│   ├── llm_speed_demo.py
│   ├── monitoring_example.py
│   ├── parallel_research_example.py
│   ├── speed_optimization_demo.py
│   └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│   ├── __init__.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── circuit_breaker.py
│   │   ├── deep_research.py
│   │   ├── market_analysis.py
│   │   ├── optimized_research.py
│   │   ├── supervisor.py
│   │   └── technical_analysis.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── api_server.py
│   │   ├── connection_manager.py
│   │   ├── dependencies
│   │   │   ├── __init__.py
│   │   │   ├── stock_analysis.py
│   │   │   └── technical_analysis.py
│   │   ├── error_handling.py
│   │   ├── inspector_compatible_sse.py
│   │   ├── inspector_sse.py
│   │   ├── middleware
│   │   │   ├── error_handling.py
│   │   │   ├── mcp_logging.py
│   │   │   ├── rate_limiting_enhanced.py
│   │   │   └── security.py
│   │   ├── openapi_config.py
│   │   ├── routers
│   │   │   ├── __init__.py
│   │   │   ├── agents.py
│   │   │   ├── backtesting.py
│   │   │   ├── data_enhanced.py
│   │   │   ├── data.py
│   │   │   ├── health_enhanced.py
│   │   │   ├── health_tools.py
│   │   │   ├── health.py
│   │   │   ├── intelligent_backtesting.py
│   │   │   ├── introspection.py
│   │   │   ├── mcp_prompts.py
│   │   │   ├── monitoring.py
│   │   │   ├── news_sentiment_enhanced.py
│   │   │   ├── performance.py
│   │   │   ├── portfolio.py
│   │   │   ├── research.py
│   │   │   ├── screening_ddd.py
│   │   │   ├── screening_parallel.py
│   │   │   ├── screening.py
│   │   │   ├── technical_ddd.py
│   │   │   ├── technical_enhanced.py
│   │   │   ├── technical.py
│   │   │   └── tool_registry.py
│   │   ├── server.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   ├── base_service.py
│   │   │   ├── market_service.py
│   │   │   ├── portfolio_service.py
│   │   │   ├── prompt_service.py
│   │   │   └── resource_service.py
│   │   ├── simple_sse.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── insomnia_export.py
│   │       └── postman_export.py
│   ├── application
│   │   ├── __init__.py
│   │   ├── commands
│   │   │   └── __init__.py
│   │   ├── dto
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_dto.py
│   │   ├── queries
│   │   │   ├── __init__.py
│   │   │   └── get_technical_analysis.py
│   │   └── screening
│   │       ├── __init__.py
│   │       ├── dtos.py
│   │       └── queries.py
│   ├── backtesting
│   │   ├── __init__.py
│   │   ├── ab_testing.py
│   │   ├── analysis.py
│   │   ├── batch_processing_stub.py
│   │   ├── batch_processing.py
│   │   ├── model_manager.py
│   │   ├── optimization.py
│   │   ├── persistence.py
│   │   ├── retraining_pipeline.py
│   │   ├── strategies
│   │   │   ├── __init__.py
│   │   │   ├── base.py
│   │   │   ├── ml
│   │   │   │   ├── __init__.py
│   │   │   │   ├── adaptive.py
│   │   │   │   ├── ensemble.py
│   │   │   │   ├── feature_engineering.py
│   │   │   │   └── regime_aware.py
│   │   │   ├── ml_strategies.py
│   │   │   ├── parser.py
│   │   │   └── templates.py
│   │   ├── strategy_executor.py
│   │   ├── vectorbt_engine.py
│   │   └── visualization.py
│   ├── config
│   │   ├── __init__.py
│   │   ├── constants.py
│   │   ├── database_self_contained.py
│   │   ├── database.py
│   │   ├── llm_optimization_config.py
│   │   ├── logging_settings.py
│   │   ├── plotly_config.py
│   │   ├── security_utils.py
│   │   ├── security.py
│   │   ├── settings.py
│   │   ├── technical_constants.py
│   │   ├── tool_estimation.py
│   │   └── validation.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── technical_analysis.py
│   │   └── visualization.py
│   ├── data
│   │   ├── __init__.py
│   │   ├── cache_manager.py
│   │   ├── cache.py
│   │   ├── django_adapter.py
│   │   ├── health.py
│   │   ├── models.py
│   │   ├── performance.py
│   │   ├── session_management.py
│   │   └── validation.py
│   ├── database
│   │   ├── __init__.py
│   │   ├── base.py
│   │   └── optimization.py
│   ├── dependencies.py
│   ├── domain
│   │   ├── __init__.py
│   │   ├── entities
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis.py
│   │   ├── events
│   │   │   └── __init__.py
│   │   ├── portfolio.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   ├── entities.py
│   │   │   ├── services.py
│   │   │   └── value_objects.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_service.py
│   │   ├── stock_analysis
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis_service.py
│   │   └── value_objects
│   │       ├── __init__.py
│   │       └── technical_indicators.py
│   ├── exceptions.py
│   ├── infrastructure
│   │   ├── __init__.py
│   │   ├── cache
│   │   │   └── __init__.py
│   │   ├── caching
│   │   │   ├── __init__.py
│   │   │   └── cache_management_service.py
│   │   ├── connection_manager.py
│   │   ├── data_fetching
│   │   │   ├── __init__.py
│   │   │   └── stock_data_service.py
│   │   ├── health
│   │   │   ├── __init__.py
│   │   │   └── health_checker.py
│   │   ├── persistence
│   │   │   ├── __init__.py
│   │   │   └── stock_repository.py
│   │   ├── providers
│   │   │   └── __init__.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   └── repositories.py
│   │   └── sse_optimizer.py
│   ├── langchain_tools
│   │   ├── __init__.py
│   │   ├── adapters.py
│   │   └── registry.py
│   ├── logging_config.py
│   ├── memory
│   │   ├── __init__.py
│   │   └── stores.py
│   ├── monitoring
│   │   ├── __init__.py
│   │   ├── health_check.py
│   │   ├── health_monitor.py
│   │   ├── integration_example.py
│   │   ├── metrics.py
│   │   ├── middleware.py
│   │   └── status_dashboard.py
│   ├── providers
│   │   ├── __init__.py
│   │   ├── dependencies.py
│   │   ├── factories
│   │   │   ├── __init__.py
│   │   │   ├── config_factory.py
│   │   │   └── provider_factory.py
│   │   ├── implementations
│   │   │   ├── __init__.py
│   │   │   ├── cache_adapter.py
│   │   │   ├── macro_data_adapter.py
│   │   │   ├── market_data_adapter.py
│   │   │   ├── persistence_adapter.py
│   │   │   └── stock_data_adapter.py
│   │   ├── interfaces
│   │   │   ├── __init__.py
│   │   │   ├── cache.py
│   │   │   ├── config.py
│   │   │   ├── macro_data.py
│   │   │   ├── market_data.py
│   │   │   ├── persistence.py
│   │   │   └── stock_data.py
│   │   ├── llm_factory.py
│   │   ├── macro_data.py
│   │   ├── market_data.py
│   │   ├── mocks
│   │   │   ├── __init__.py
│   │   │   ├── mock_cache.py
│   │   │   ├── mock_config.py
│   │   │   ├── mock_macro_data.py
│   │   │   ├── mock_market_data.py
│   │   │   ├── mock_persistence.py
│   │   │   └── mock_stock_data.py
│   │   ├── openrouter_provider.py
│   │   ├── optimized_screening.py
│   │   ├── optimized_stock_data.py
│   │   └── stock_data.py
│   ├── README.md
│   ├── tests
│   │   ├── __init__.py
│   │   ├── README_INMEMORY_TESTS.md
│   │   ├── test_cache_debug.py
│   │   ├── test_fixes_validation.py
│   │   ├── test_in_memory_routers.py
│   │   ├── test_in_memory_server.py
│   │   ├── test_macro_data_provider.py
│   │   ├── test_mailgun_email.py
│   │   ├── test_market_calendar_caching.py
│   │   ├── test_mcp_tool_fixes_pytest.py
│   │   ├── test_mcp_tool_fixes.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_models_functional.py
│   │   ├── test_server.py
│   │   ├── test_stock_data_enhanced.py
│   │   ├── test_stock_data_provider.py
│   │   └── test_technical_analysis.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── performance_monitoring.py
│   │   ├── portfolio_manager.py
│   │   ├── risk_management.py
│   │   └── sentiment_analysis.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── agent_errors.py
│   │   ├── batch_processing.py
│   │   ├── cache_warmer.py
│   │   ├── circuit_breaker_decorators.py
│   │   ├── circuit_breaker_services.py
│   │   ├── circuit_breaker.py
│   │   ├── data_chunking.py
│   │   ├── database_monitoring.py
│   │   ├── debug_utils.py
│   │   ├── fallback_strategies.py
│   │   ├── llm_optimization.py
│   │   ├── logging_example.py
│   │   ├── logging_init.py
│   │   ├── logging.py
│   │   ├── mcp_logging.py
│   │   ├── memory_profiler.py
│   │   ├── monitoring_middleware.py
│   │   ├── monitoring.py
│   │   ├── orchestration_logging.py
│   │   ├── parallel_research.py
│   │   ├── parallel_screening.py
│   │   ├── quick_cache.py
│   │   ├── resource_manager.py
│   │   ├── shutdown.py
│   │   ├── stock_helpers.py
│   │   ├── structured_logger.py
│   │   ├── tool_monitoring.py
│   │   ├── tracing.py
│   │   └── yfinance_pool.py
│   ├── validation
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── data.py
│   │   ├── middleware.py
│   │   ├── portfolio.py
│   │   ├── responses.py
│   │   ├── screening.py
│   │   └── technical.py
│   └── workflows
│       ├── __init__.py
│       ├── agents
│       │   ├── __init__.py
│       │   ├── market_analyzer.py
│       │   ├── optimizer_agent.py
│       │   ├── strategy_selector.py
│       │   └── validator_agent.py
│       ├── backtesting_workflow.py
│       └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── dev.sh
│   ├── INSTALLATION_GUIDE.md
│   ├── load_example.py
│   ├── load_market_data.py
│   ├── load_tiingo_data.py
│   ├── migrate_db.py
│   ├── README_TIINGO_LOADER.md
│   ├── requirements_tiingo.txt
│   ├── run_stock_screening.py
│   ├── run-migrations.sh
│   ├── seed_db.py
│   ├── seed_sp500.py
│   ├── setup_database.sh
│   ├── setup_self_contained.py
│   ├── setup_sp500_database.sh
│   ├── test_seeded_data.py
│   ├── test_tiingo_loader.py
│   ├── tiingo_config.py
│   └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── core
│   │   └── test_technical_analysis.py
│   ├── data
│   │   └── test_portfolio_models.py
│   ├── domain
│   │   ├── conftest.py
│   │   ├── test_portfolio_entities.py
│   │   └── test_technical_analysis_service.py
│   ├── fixtures
│   │   └── orchestration_fixtures.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── README.md
│   │   ├── run_integration_tests.sh
│   │   ├── test_api_technical.py
│   │   ├── test_chaos_engineering.py
│   │   ├── test_config_management.py
│   │   ├── test_full_backtest_workflow_advanced.py
│   │   ├── test_full_backtest_workflow.py
│   │   ├── test_high_volume.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_orchestration_complete.py
│   │   ├── test_portfolio_persistence.py
│   │   ├── test_redis_cache.py
│   │   ├── test_security_integration.py.disabled
│   │   └── vcr_setup.py
│   ├── performance
│   │   ├── __init__.py
│   │   ├── test_benchmarks.py
│   │   ├── test_load.py
│   │   ├── test_profiling.py
│   │   └── test_stress.py
│   ├── providers
│   │   └── test_stock_data_simple.py
│   ├── README.md
│   ├── test_agents_router_mcp.py
│   ├── test_backtest_persistence.py
│   ├── test_cache_management_service.py
│   ├── test_cache_serialization.py
│   ├── test_circuit_breaker.py
│   ├── test_database_pool_config_simple.py
│   ├── test_database_pool_config.py
│   ├── test_deep_research_functional.py
│   ├── test_deep_research_integration.py
│   ├── test_deep_research_parallel_execution.py
│   ├── test_error_handling.py
│   ├── test_event_loop_integrity.py
│   ├── test_exa_research_integration.py
│   ├── test_exception_hierarchy.py
│   ├── test_financial_search.py
│   ├── test_graceful_shutdown.py
│   ├── test_integration_simple.py
│   ├── test_langgraph_workflow.py
│   ├── test_market_data_async.py
│   ├── test_market_data_simple.py
│   ├── test_mcp_orchestration_functional.py
│   ├── test_ml_strategies.py
│   ├── test_optimized_research_agent.py
│   ├── test_orchestration_integration.py
│   ├── test_orchestration_logging.py
│   ├── test_orchestration_tools_simple.py
│   ├── test_parallel_research_integration.py
│   ├── test_parallel_research_orchestrator.py
│   ├── test_parallel_research_performance.py
│   ├── test_performance_optimizations.py
│   ├── test_production_validation.py
│   ├── test_provider_architecture.py
│   ├── test_rate_limiting_enhanced.py
│   ├── test_runner_validation.py
│   ├── test_security_comprehensive.py.disabled
│   ├── test_security_cors.py
│   ├── test_security_enhancements.py.disabled
│   ├── test_security_headers.py
│   ├── test_security_penetration.py
│   ├── test_session_management.py
│   ├── test_speed_optimization_validation.py
│   ├── test_stock_analysis_dependencies.py
│   ├── test_stock_analysis_service.py
│   ├── test_stock_data_fetching_service.py
│   ├── test_supervisor_agent.py
│   ├── test_supervisor_functional.py
│   ├── test_tool_estimation_config.py
│   ├── test_visualization.py
│   └── utils
│       ├── test_agent_errors.py
│       ├── test_logging.py
│       ├── test_parallel_screening.py
│       └── test_quick_cache.py
├── tools
│   ├── check_orchestration_config.py
│   ├── experiments
│   │   ├── validation_examples.py
│   │   └── validation_fixed.py
│   ├── fast_dev.sh
│   ├── hot_reload.py
│   ├── quick_test.py
│   └── templates
│       ├── new_router_template.py
│       ├── new_tool_template.py
│       ├── screening_strategy_template.py
│       └── test_template.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/alembic/versions/011_remove_proprietary_terms.py:
--------------------------------------------------------------------------------

```python
"""Remove proprietary terminology from columns

Revision ID: 011_remove_proprietary_terms
Revises: 010_self_contained_schema
Create Date: 2025-01-10

This migration removes proprietary terminology from database columns:
- rs_rating → momentum_score (more descriptive of what it measures)
- vcp_status → consolidation_status (generic pattern description)

Updates all related indexes and handles both PostgreSQL and SQLite databases.
"""

import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

from alembic import op

# revision identifiers
revision = "011_remove_proprietary_terms"
down_revision = "010_self_contained_schema"
branch_labels = None
depends_on = None


def upgrade():
    """Remove proprietary terminology from columns."""

    # Check if we're using PostgreSQL or SQLite
    bind = op.get_bind()
    dialect_name = bind.dialect.name

    if dialect_name == "postgresql":
        print("🗃️  PostgreSQL: Renaming columns and indexes...")

        # 1. Rename columns in mcp_maverick_stocks
        print("   📊 Updating mcp_maverick_stocks...")
        op.alter_column(
            "mcp_maverick_stocks", "rs_rating", new_column_name="momentum_score"
        )
        op.alter_column(
            "mcp_maverick_stocks", "vcp_status", new_column_name="consolidation_status"
        )

        # 2. Rename columns in mcp_maverick_bear_stocks
        print("   🐻 Updating mcp_maverick_bear_stocks...")
        op.alter_column(
            "mcp_maverick_bear_stocks", "rs_rating", new_column_name="momentum_score"
        )
        op.alter_column(
            "mcp_maverick_bear_stocks",
            "vcp_status",
            new_column_name="consolidation_status",
        )

        # 3. Rename columns in mcp_supply_demand_breakouts
        print("   📈 Updating mcp_supply_demand_breakouts...")
        op.alter_column(
            "mcp_supply_demand_breakouts", "rs_rating", new_column_name="momentum_score"
        )
        op.alter_column(
            "mcp_supply_demand_breakouts",
            "vcp_status",
            new_column_name="consolidation_status",
        )

        # 4. Rename indexes to use new column names
        print("   🔍 Updating indexes...")
        op.execute(
            "ALTER INDEX IF EXISTS mcp_maverick_stocks_rs_rating_idx RENAME TO mcp_maverick_stocks_momentum_score_idx"
        )
        op.execute(
            "ALTER INDEX IF EXISTS mcp_maverick_bear_stocks_rs_rating_idx RENAME TO mcp_maverick_bear_stocks_momentum_score_idx"
        )
        op.execute(
            "ALTER INDEX IF EXISTS mcp_supply_demand_breakouts_rs_rating_idx RENAME TO mcp_supply_demand_breakouts_momentum_score_idx"
        )

        # 5. Update any legacy indexes that might still exist
        op.execute(
            "ALTER INDEX IF EXISTS idx_stocks_supply_demand_breakouts_rs_rating_desc RENAME TO idx_stocks_supply_demand_breakouts_momentum_score_desc"
        )
        op.execute(
            "ALTER INDEX IF EXISTS idx_supply_demand_breakouts_rs_rating RENAME TO idx_supply_demand_breakouts_momentum_score"
        )

    elif dialect_name == "sqlite":
        print("🗃️  SQLite: Recreating tables with new column names...")

        # SQLite doesn't support column renaming well, need to recreate tables

        # 1. Recreate mcp_maverick_stocks table
        print("   📊 Recreating mcp_maverick_stocks...")
        op.rename_table("mcp_maverick_stocks", "mcp_maverick_stocks_old")

        op.create_table(
            "mcp_maverick_stocks",
            sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
            sa.Column(
                "stock_id", postgresql.UUID(as_uuid=True), nullable=False, index=True
            ),
            sa.Column("date_analyzed", sa.Date(), nullable=False),
            # OHLCV Data
            sa.Column("open_price", sa.Numeric(12, 4), default=0),
            sa.Column("high_price", sa.Numeric(12, 4), default=0),
            sa.Column("low_price", sa.Numeric(12, 4), default=0),
            sa.Column("close_price", sa.Numeric(12, 4), default=0),
            sa.Column("volume", sa.BigInteger(), default=0),
            # Technical Indicators
            sa.Column("ema_21", sa.Numeric(12, 4), default=0),
            sa.Column("sma_50", sa.Numeric(12, 4), default=0),
            sa.Column("sma_150", sa.Numeric(12, 4), default=0),
            sa.Column("sma_200", sa.Numeric(12, 4), default=0),
            sa.Column("momentum_score", sa.Numeric(5, 2), default=0),  # was rs_rating
            sa.Column("avg_vol_30d", sa.Numeric(15, 2), default=0),
            sa.Column("adr_pct", sa.Numeric(5, 2), default=0),
            sa.Column("atr", sa.Numeric(12, 4), default=0),
            # Pattern Analysis
            sa.Column("pattern_type", sa.String(50)),
            sa.Column("squeeze_status", sa.String(50)),
            sa.Column("consolidation_status", sa.String(50)),  # was vcp_status
            sa.Column("entry_signal", sa.String(50)),
            # Scoring
            sa.Column("compression_score", sa.Integer(), default=0),
            sa.Column("pattern_detected", sa.Integer(), default=0),
            sa.Column("combined_score", sa.Integer(), default=0),
            sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
            sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
        )

        # Copy data with column mapping
        op.execute("""
            INSERT INTO mcp_maverick_stocks
            SELECT
                id, stock_id, date_analyzed, open_price, high_price, low_price, close_price, volume,
                ema_21, sma_50, sma_150, sma_200, rs_rating, avg_vol_30d, adr_pct, atr,
                pattern_type, squeeze_status, vcp_status, entry_signal,
                compression_score, pattern_detected, combined_score, created_at, updated_at
            FROM mcp_maverick_stocks_old
        """)

        op.drop_table("mcp_maverick_stocks_old")

        # Create indexes for maverick stocks
        op.create_index(
            "mcp_maverick_stocks_combined_score_idx",
            "mcp_maverick_stocks",
            ["combined_score"],
        )
        op.create_index(
            "mcp_maverick_stocks_momentum_score_idx",
            "mcp_maverick_stocks",
            ["momentum_score"],
        )
        op.create_index(
            "mcp_maverick_stocks_date_analyzed_idx",
            "mcp_maverick_stocks",
            ["date_analyzed"],
        )
        op.create_index(
            "mcp_maverick_stocks_stock_date_idx",
            "mcp_maverick_stocks",
            ["stock_id", "date_analyzed"],
        )

        # 2. Recreate mcp_maverick_bear_stocks table
        print("   🐻 Recreating mcp_maverick_bear_stocks...")
        op.rename_table("mcp_maverick_bear_stocks", "mcp_maverick_bear_stocks_old")

        op.create_table(
            "mcp_maverick_bear_stocks",
            sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
            sa.Column(
                "stock_id", postgresql.UUID(as_uuid=True), nullable=False, index=True
            ),
            sa.Column("date_analyzed", sa.Date(), nullable=False),
            # OHLCV Data
            sa.Column("open_price", sa.Numeric(12, 4), default=0),
            sa.Column("high_price", sa.Numeric(12, 4), default=0),
            sa.Column("low_price", sa.Numeric(12, 4), default=0),
            sa.Column("close_price", sa.Numeric(12, 4), default=0),
            sa.Column("volume", sa.BigInteger(), default=0),
            # Technical Indicators
            sa.Column("momentum_score", sa.Numeric(5, 2), default=0),  # was rs_rating
            sa.Column("ema_21", sa.Numeric(12, 4), default=0),
            sa.Column("sma_50", sa.Numeric(12, 4), default=0),
            sa.Column("sma_200", sa.Numeric(12, 4), default=0),
            sa.Column("rsi_14", sa.Numeric(5, 2), default=0),
            # MACD Indicators
            sa.Column("macd", sa.Numeric(12, 6), default=0),
            sa.Column("macd_signal", sa.Numeric(12, 6), default=0),
            sa.Column("macd_histogram", sa.Numeric(12, 6), default=0),
            # Bear Market Indicators
            sa.Column("dist_days_20", sa.Integer(), default=0),
            sa.Column("adr_pct", sa.Numeric(5, 2), default=0),
            sa.Column("atr_contraction", sa.Boolean(), default=False),
            sa.Column("atr", sa.Numeric(12, 4), default=0),
            sa.Column("avg_vol_30d", sa.Numeric(15, 2), default=0),
            sa.Column("big_down_vol", sa.Boolean(), default=False),
            # Pattern Analysis
            sa.Column("squeeze_status", sa.String(50)),
            sa.Column("consolidation_status", sa.String(50)),  # was vcp_status
            # Scoring
            sa.Column("score", sa.Integer(), default=0),
            sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
            sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
        )

        # Copy data with column mapping
        op.execute("""
            INSERT INTO mcp_maverick_bear_stocks
            SELECT
                id, stock_id, date_analyzed, open_price, high_price, low_price, close_price, volume,
                rs_rating, ema_21, sma_50, sma_200, rsi_14,
                macd, macd_signal, macd_histogram, dist_days_20, adr_pct, atr_contraction, atr, avg_vol_30d, big_down_vol,
                squeeze_status, vcp_status, score, created_at, updated_at
            FROM mcp_maverick_bear_stocks_old
        """)

        op.drop_table("mcp_maverick_bear_stocks_old")

        # Create indexes for bear stocks
        op.create_index(
            "mcp_maverick_bear_stocks_score_idx", "mcp_maverick_bear_stocks", ["score"]
        )
        op.create_index(
            "mcp_maverick_bear_stocks_momentum_score_idx",
            "mcp_maverick_bear_stocks",
            ["momentum_score"],
        )
        op.create_index(
            "mcp_maverick_bear_stocks_date_analyzed_idx",
            "mcp_maverick_bear_stocks",
            ["date_analyzed"],
        )
        op.create_index(
            "mcp_maverick_bear_stocks_stock_date_idx",
            "mcp_maverick_bear_stocks",
            ["stock_id", "date_analyzed"],
        )

        # 3. Recreate mcp_supply_demand_breakouts table
        print("   📈 Recreating mcp_supply_demand_breakouts...")
        op.rename_table(
            "mcp_supply_demand_breakouts", "mcp_supply_demand_breakouts_old"
        )

        op.create_table(
            "mcp_supply_demand_breakouts",
            sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
            sa.Column(
                "stock_id", postgresql.UUID(as_uuid=True), nullable=False, index=True
            ),
            sa.Column("date_analyzed", sa.Date(), nullable=False),
            # OHLCV Data
            sa.Column("open_price", sa.Numeric(12, 4), default=0),
            sa.Column("high_price", sa.Numeric(12, 4), default=0),
            sa.Column("low_price", sa.Numeric(12, 4), default=0),
            sa.Column("close_price", sa.Numeric(12, 4), default=0),
            sa.Column("volume", sa.BigInteger(), default=0),
            # Technical Indicators
            sa.Column("ema_21", sa.Numeric(12, 4), default=0),
            sa.Column("sma_50", sa.Numeric(12, 4), default=0),
            sa.Column("sma_150", sa.Numeric(12, 4), default=0),
            sa.Column("sma_200", sa.Numeric(12, 4), default=0),
            sa.Column("momentum_score", sa.Numeric(5, 2), default=0),  # was rs_rating
            sa.Column("avg_volume_30d", sa.Numeric(15, 2), default=0),
            sa.Column("adr_pct", sa.Numeric(5, 2), default=0),
            sa.Column("atr", sa.Numeric(12, 4), default=0),
            # Pattern Analysis
            sa.Column("pattern_type", sa.String(50)),
            sa.Column("squeeze_status", sa.String(50)),
            sa.Column("consolidation_status", sa.String(50)),  # was vcp_status
            sa.Column("entry_signal", sa.String(50)),
            # Supply/Demand Analysis
            sa.Column("accumulation_rating", sa.Numeric(5, 2), default=0),
            sa.Column("distribution_rating", sa.Numeric(5, 2), default=0),
            sa.Column("breakout_strength", sa.Numeric(5, 2), default=0),
            sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
            sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
        )

        # Copy data with column mapping
        op.execute("""
            INSERT INTO mcp_supply_demand_breakouts
            SELECT
                id, stock_id, date_analyzed, open_price, high_price, low_price, close_price, volume,
                ema_21, sma_50, sma_150, sma_200, rs_rating, avg_volume_30d, adr_pct, atr,
                pattern_type, squeeze_status, vcp_status, entry_signal,
                accumulation_rating, distribution_rating, breakout_strength, created_at, updated_at
            FROM mcp_supply_demand_breakouts_old
        """)

        op.drop_table("mcp_supply_demand_breakouts_old")

        # Create indexes for supply/demand breakouts
        op.create_index(
            "mcp_supply_demand_breakouts_momentum_score_idx",
            "mcp_supply_demand_breakouts",
            ["momentum_score"],
        )
        op.create_index(
            "mcp_supply_demand_breakouts_date_analyzed_idx",
            "mcp_supply_demand_breakouts",
            ["date_analyzed"],
        )
        op.create_index(
            "mcp_supply_demand_breakouts_stock_date_idx",
            "mcp_supply_demand_breakouts",
            ["stock_id", "date_analyzed"],
        )
        op.create_index(
            "mcp_supply_demand_breakouts_ma_filter_idx",
            "mcp_supply_demand_breakouts",
            ["close_price", "sma_50", "sma_150", "sma_200"],
        )

    # Log successful migration
    print("✅ Successfully removed proprietary terminology from database columns:")
    print("   - rs_rating → momentum_score (more descriptive)")
    print("   - vcp_status → consolidation_status (generic pattern description)")
    print("   - All related indexes have been updated")


def downgrade():
    """Revert column names back to original proprietary terms."""

    bind = op.get_bind()
    dialect_name = bind.dialect.name

    if dialect_name == "postgresql":
        print("🗃️  PostgreSQL: Reverting column names...")

        # 1. Revert indexes first
        print("   🔍 Reverting indexes...")
        op.execute(
            "ALTER INDEX IF EXISTS mcp_maverick_stocks_momentum_score_idx RENAME TO mcp_maverick_stocks_rs_rating_idx"
        )
        op.execute(
            "ALTER INDEX IF EXISTS mcp_maverick_bear_stocks_momentum_score_idx RENAME TO mcp_maverick_bear_stocks_rs_rating_idx"
        )
        op.execute(
            "ALTER INDEX IF EXISTS mcp_supply_demand_breakouts_momentum_score_idx RENAME TO mcp_supply_demand_breakouts_rs_rating_idx"
        )

        # Revert any legacy indexes
        op.execute(
            "ALTER INDEX IF EXISTS idx_stocks_supply_demand_breakouts_momentum_score_desc RENAME TO idx_stocks_supply_demand_breakouts_rs_rating_desc"
        )
        op.execute(
            "ALTER INDEX IF EXISTS idx_supply_demand_breakouts_momentum_score RENAME TO idx_supply_demand_breakouts_rs_rating"
        )

        # 2. Revert columns in mcp_maverick_stocks
        print("   📊 Reverting mcp_maverick_stocks...")
        op.alter_column(
            "mcp_maverick_stocks", "momentum_score", new_column_name="rs_rating"
        )
        op.alter_column(
            "mcp_maverick_stocks", "consolidation_status", new_column_name="vcp_status"
        )

        # 3. Revert columns in mcp_maverick_bear_stocks
        print("   🐻 Reverting mcp_maverick_bear_stocks...")
        op.alter_column(
            "mcp_maverick_bear_stocks", "momentum_score", new_column_name="rs_rating"
        )
        op.alter_column(
            "mcp_maverick_bear_stocks",
            "consolidation_status",
            new_column_name="vcp_status",
        )

        # 4. Revert columns in mcp_supply_demand_breakouts
        print("   📈 Reverting mcp_supply_demand_breakouts...")
        op.alter_column(
            "mcp_supply_demand_breakouts", "momentum_score", new_column_name="rs_rating"
        )
        op.alter_column(
            "mcp_supply_demand_breakouts",
            "consolidation_status",
            new_column_name="vcp_status",
        )

    elif dialect_name == "sqlite":
        print("🗃️  SQLite: Recreating tables with original column names...")

        # SQLite: Recreate tables with original names

        # 1. Recreate mcp_maverick_stocks table with original columns
        print("   📊 Recreating mcp_maverick_stocks...")
        op.rename_table("mcp_maverick_stocks", "mcp_maverick_stocks_new")

        op.create_table(
            "mcp_maverick_stocks",
            sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
            sa.Column(
                "stock_id", postgresql.UUID(as_uuid=True), nullable=False, index=True
            ),
            sa.Column("date_analyzed", sa.Date(), nullable=False),
            # OHLCV Data
            sa.Column("open_price", sa.Numeric(12, 4), default=0),
            sa.Column("high_price", sa.Numeric(12, 4), default=0),
            sa.Column("low_price", sa.Numeric(12, 4), default=0),
            sa.Column("close_price", sa.Numeric(12, 4), default=0),
            sa.Column("volume", sa.BigInteger(), default=0),
            # Technical Indicators
            sa.Column("ema_21", sa.Numeric(12, 4), default=0),
            sa.Column("sma_50", sa.Numeric(12, 4), default=0),
            sa.Column("sma_150", sa.Numeric(12, 4), default=0),
            sa.Column("sma_200", sa.Numeric(12, 4), default=0),
            sa.Column("rs_rating", sa.Numeric(5, 2), default=0),  # restored
            sa.Column("avg_vol_30d", sa.Numeric(15, 2), default=0),
            sa.Column("adr_pct", sa.Numeric(5, 2), default=0),
            sa.Column("atr", sa.Numeric(12, 4), default=0),
            # Pattern Analysis
            sa.Column("pattern_type", sa.String(50)),
            sa.Column("squeeze_status", sa.String(50)),
            sa.Column("vcp_status", sa.String(50)),  # restored
            sa.Column("entry_signal", sa.String(50)),
            # Scoring
            sa.Column("compression_score", sa.Integer(), default=0),
            sa.Column("pattern_detected", sa.Integer(), default=0),
            sa.Column("combined_score", sa.Integer(), default=0),
            sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
            sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
        )

        # Copy data back with column mapping
        op.execute("""
            INSERT INTO mcp_maverick_stocks
            SELECT
                id, stock_id, date_analyzed, open_price, high_price, low_price, close_price, volume,
                ema_21, sma_50, sma_150, sma_200, momentum_score, avg_vol_30d, adr_pct, atr,
                pattern_type, squeeze_status, consolidation_status, entry_signal,
                compression_score, pattern_detected, combined_score, created_at, updated_at
            FROM mcp_maverick_stocks_new
        """)

        op.drop_table("mcp_maverick_stocks_new")

        # Create original indexes
        op.create_index(
            "mcp_maverick_stocks_combined_score_idx",
            "mcp_maverick_stocks",
            ["combined_score"],
        )
        op.create_index(
            "mcp_maverick_stocks_rs_rating_idx", "mcp_maverick_stocks", ["rs_rating"]
        )
        op.create_index(
            "mcp_maverick_stocks_date_analyzed_idx",
            "mcp_maverick_stocks",
            ["date_analyzed"],
        )
        op.create_index(
            "mcp_maverick_stocks_stock_date_idx",
            "mcp_maverick_stocks",
            ["stock_id", "date_analyzed"],
        )

        # 2. Recreate mcp_maverick_bear_stocks with original columns
        print("   🐻 Recreating mcp_maverick_bear_stocks...")
        op.rename_table("mcp_maverick_bear_stocks", "mcp_maverick_bear_stocks_new")

        op.create_table(
            "mcp_maverick_bear_stocks",
            sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
            sa.Column(
                "stock_id", postgresql.UUID(as_uuid=True), nullable=False, index=True
            ),
            sa.Column("date_analyzed", sa.Date(), nullable=False),
            # OHLCV Data
            sa.Column("open_price", sa.Numeric(12, 4), default=0),
            sa.Column("high_price", sa.Numeric(12, 4), default=0),
            sa.Column("low_price", sa.Numeric(12, 4), default=0),
            sa.Column("close_price", sa.Numeric(12, 4), default=0),
            sa.Column("volume", sa.BigInteger(), default=0),
            # Technical Indicators
            sa.Column("rs_rating", sa.Numeric(5, 2), default=0),  # restored
            sa.Column("ema_21", sa.Numeric(12, 4), default=0),
            sa.Column("sma_50", sa.Numeric(12, 4), default=0),
            sa.Column("sma_200", sa.Numeric(12, 4), default=0),
            sa.Column("rsi_14", sa.Numeric(5, 2), default=0),
            # MACD Indicators
            sa.Column("macd", sa.Numeric(12, 6), default=0),
            sa.Column("macd_signal", sa.Numeric(12, 6), default=0),
            sa.Column("macd_histogram", sa.Numeric(12, 6), default=0),
            # Bear Market Indicators
            sa.Column("dist_days_20", sa.Integer(), default=0),
            sa.Column("adr_pct", sa.Numeric(5, 2), default=0),
            sa.Column("atr_contraction", sa.Boolean(), default=False),
            sa.Column("atr", sa.Numeric(12, 4), default=0),
            sa.Column("avg_vol_30d", sa.Numeric(15, 2), default=0),
            sa.Column("big_down_vol", sa.Boolean(), default=False),
            # Pattern Analysis
            sa.Column("squeeze_status", sa.String(50)),
            sa.Column("vcp_status", sa.String(50)),  # restored
            # Scoring
            sa.Column("score", sa.Integer(), default=0),
            sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
            sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
        )

        # Copy data back
        op.execute("""
            INSERT INTO mcp_maverick_bear_stocks
            SELECT
                id, stock_id, date_analyzed, open_price, high_price, low_price, close_price, volume,
                momentum_score, ema_21, sma_50, sma_200, rsi_14,
                macd, macd_signal, macd_histogram, dist_days_20, adr_pct, atr_contraction, atr, avg_vol_30d, big_down_vol,
                squeeze_status, consolidation_status, score, created_at, updated_at
            FROM mcp_maverick_bear_stocks_new
        """)

        op.drop_table("mcp_maverick_bear_stocks_new")

        # Create original indexes
        op.create_index(
            "mcp_maverick_bear_stocks_score_idx", "mcp_maverick_bear_stocks", ["score"]
        )
        op.create_index(
            "mcp_maverick_bear_stocks_rs_rating_idx",
            "mcp_maverick_bear_stocks",
            ["rs_rating"],
        )
        op.create_index(
            "mcp_maverick_bear_stocks_date_analyzed_idx",
            "mcp_maverick_bear_stocks",
            ["date_analyzed"],
        )
        op.create_index(
            "mcp_maverick_bear_stocks_stock_date_idx",
            "mcp_maverick_bear_stocks",
            ["stock_id", "date_analyzed"],
        )

        # 3. Recreate mcp_supply_demand_breakouts with original columns
        print("   📈 Recreating mcp_supply_demand_breakouts...")
        op.rename_table(
            "mcp_supply_demand_breakouts", "mcp_supply_demand_breakouts_new"
        )

        op.create_table(
            "mcp_supply_demand_breakouts",
            sa.Column("id", sa.BigInteger(), primary_key=True, autoincrement=True),
            sa.Column(
                "stock_id", postgresql.UUID(as_uuid=True), nullable=False, index=True
            ),
            sa.Column("date_analyzed", sa.Date(), nullable=False),
            # OHLCV Data
            sa.Column("open_price", sa.Numeric(12, 4), default=0),
            sa.Column("high_price", sa.Numeric(12, 4), default=0),
            sa.Column("low_price", sa.Numeric(12, 4), default=0),
            sa.Column("close_price", sa.Numeric(12, 4), default=0),
            sa.Column("volume", sa.BigInteger(), default=0),
            # Technical Indicators
            sa.Column("ema_21", sa.Numeric(12, 4), default=0),
            sa.Column("sma_50", sa.Numeric(12, 4), default=0),
            sa.Column("sma_150", sa.Numeric(12, 4), default=0),
            sa.Column("sma_200", sa.Numeric(12, 4), default=0),
            sa.Column("rs_rating", sa.Numeric(5, 2), default=0),  # restored
            sa.Column("avg_volume_30d", sa.Numeric(15, 2), default=0),
            sa.Column("adr_pct", sa.Numeric(5, 2), default=0),
            sa.Column("atr", sa.Numeric(12, 4), default=0),
            # Pattern Analysis
            sa.Column("pattern_type", sa.String(50)),
            sa.Column("squeeze_status", sa.String(50)),
            sa.Column("vcp_status", sa.String(50)),  # restored
            sa.Column("entry_signal", sa.String(50)),
            # Supply/Demand Analysis
            sa.Column("accumulation_rating", sa.Numeric(5, 2), default=0),
            sa.Column("distribution_rating", sa.Numeric(5, 2), default=0),
            sa.Column("breakout_strength", sa.Numeric(5, 2), default=0),
            sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
            sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
        )

        # Copy data back
        op.execute("""
            INSERT INTO mcp_supply_demand_breakouts
            SELECT
                id, stock_id, date_analyzed, open_price, high_price, low_price, close_price, volume,
                ema_21, sma_50, sma_150, sma_200, momentum_score, avg_volume_30d, adr_pct, atr,
                pattern_type, squeeze_status, consolidation_status, entry_signal,
                accumulation_rating, distribution_rating, breakout_strength, created_at, updated_at
            FROM mcp_supply_demand_breakouts_new
        """)

        op.drop_table("mcp_supply_demand_breakouts_new")

        # Create original indexes
        op.create_index(
            "mcp_supply_demand_breakouts_rs_rating_idx",
            "mcp_supply_demand_breakouts",
            ["rs_rating"],
        )
        op.create_index(
            "mcp_supply_demand_breakouts_date_analyzed_idx",
            "mcp_supply_demand_breakouts",
            ["date_analyzed"],
        )
        op.create_index(
            "mcp_supply_demand_breakouts_stock_date_idx",
            "mcp_supply_demand_breakouts",
            ["stock_id", "date_analyzed"],
        )
        op.create_index(
            "mcp_supply_demand_breakouts_ma_filter_idx",
            "mcp_supply_demand_breakouts",
            ["close_price", "sma_50", "sma_150", "sma_200"],
        )

    print("✅ Successfully reverted column names back to original proprietary terms")

```

--------------------------------------------------------------------------------
/maverick_mcp/config/database.py:
--------------------------------------------------------------------------------

```python
"""
Enhanced database pool configuration with validation and monitoring capabilities.

This module provides the DatabasePoolConfig class that extends the basic database
configuration with comprehensive connection pool management, validation, and monitoring.

This enhances the existing DatabaseConfig class from providers.interfaces.persistence
with advanced validation, monitoring capabilities, and production-ready features.
"""

import logging
import os
import warnings
from typing import Any

from pydantic import BaseModel, Field, model_validator
from sqlalchemy import event
from sqlalchemy.engine import Engine
from sqlalchemy.pool import QueuePool

# Import the existing DatabaseConfig for compatibility
from maverick_mcp.providers.interfaces.persistence import DatabaseConfig

# Set up logging
logger = logging.getLogger("maverick_mcp.config.database")


class DatabasePoolConfig(BaseModel):
    """
    Enhanced database pool configuration with comprehensive validation and monitoring.

    This class provides advanced connection pool management with:
    - Validation to prevent connection pool exhaustion
    - Monitoring capabilities with event listeners
    - Automatic threshold calculations for pool sizing
    - Protection against database connection limits
    """

    # Core pool configuration
    pool_size: int = Field(
        default_factory=lambda: int(os.getenv("DB_POOL_SIZE", "20")),
        ge=1,
        le=100,
        description="Number of connections to maintain in the pool (1-100)",
    )

    max_overflow: int = Field(
        default_factory=lambda: int(os.getenv("DB_MAX_OVERFLOW", "10")),
        ge=0,
        le=50,
        description="Maximum overflow connections above pool size (0-50)",
    )

    pool_timeout: int = Field(
        default_factory=lambda: int(os.getenv("DB_POOL_TIMEOUT", "30")),
        ge=1,
        le=300,
        description="Timeout in seconds to get connection from pool (1-300)",
    )

    pool_recycle: int = Field(
        default_factory=lambda: int(os.getenv("DB_POOL_RECYCLE", "3600")),
        ge=300,
        le=7200,
        description="Seconds before connection is recycled (300-7200, 1 hour default)",
    )

    # Database capacity configuration
    max_database_connections: int = Field(
        default_factory=lambda: int(os.getenv("DB_MAX_CONNECTIONS", "100")),
        description="Maximum connections allowed by database server",
    )

    reserved_superuser_connections: int = Field(
        default_factory=lambda: int(
            os.getenv("DB_RESERVED_SUPERUSER_CONNECTIONS", "3")
        ),
        description="Connections reserved for superuser access",
    )

    # Application usage configuration
    expected_concurrent_users: int = Field(
        default_factory=lambda: int(os.getenv("DB_EXPECTED_CONCURRENT_USERS", "20")),
        description="Expected number of concurrent users",
    )

    connections_per_user: float = Field(
        default_factory=lambda: float(os.getenv("DB_CONNECTIONS_PER_USER", "1.2")),
        description="Average connections per concurrent user",
    )

    # Additional pool settings
    pool_pre_ping: bool = Field(
        default_factory=lambda: os.getenv("DB_POOL_PRE_PING", "true").lower() == "true",
        description="Enable connection validation before use",
    )

    echo_pool: bool = Field(
        default_factory=lambda: os.getenv("DB_ECHO_POOL", "false").lower() == "true",
        description="Enable pool debugging logs",
    )

    # Monitoring thresholds (computed by validator)
    pool_warning_threshold: float = Field(
        default=0.8, description="Pool usage warning threshold"
    )
    pool_critical_threshold: float = Field(
        default=0.95, description="Pool usage critical threshold"
    )

    @model_validator(mode="after")
    def validate_pool_configuration(self) -> "DatabasePoolConfig":
        """
        Comprehensive validation of database pool configuration.

        This validator ensures:
        1. Total pool connections don't exceed available database connections
        2. Pool sizing is appropriate for expected load
        3. Warning and critical thresholds are set appropriately

        Returns:
            Validated DatabasePoolConfig instance

        Raises:
            ValueError: If configuration is invalid or unsafe
        """
        # Calculate total possible connections from this application
        total_app_connections = self.pool_size + self.max_overflow

        # Calculate available connections (excluding superuser reserved)
        available_connections = (
            self.max_database_connections - self.reserved_superuser_connections
        )

        # Validate total connections don't exceed database limits
        if total_app_connections > available_connections:
            raise ValueError(
                f"Pool configuration exceeds database capacity: "
                f"total_app_connections={total_app_connections} > "
                f"available_connections={available_connections} "
                f"(max_db_connections={self.max_database_connections} - "
                f"reserved_superuser={self.reserved_superuser_connections})"
            )

        # Calculate expected connection demand
        expected_demand = int(
            self.expected_concurrent_users * self.connections_per_user
        )

        # Warn if pool size may be insufficient for expected load
        if self.pool_size < expected_demand:
            warning_msg = (
                f"Pool size ({self.pool_size}) may be insufficient for expected load. "
                f"Expected demand: {expected_demand} connections "
                f"({self.expected_concurrent_users} users × {self.connections_per_user} conn/user). "
                f"Consider increasing pool_size or max_overflow."
            )
            logger.warning(warning_msg)
            warnings.warn(warning_msg, UserWarning, stacklevel=2)

        # Validate overflow capacity
        if total_app_connections < expected_demand:
            raise ValueError(
                f"Total connection capacity ({total_app_connections}) is insufficient "
                f"for expected demand ({expected_demand}). "
                f"Increase pool_size and/or max_overflow."
            )

        # Set monitoring thresholds based on pool size
        self.pool_warning_threshold = 0.8  # 80% of pool_size
        self.pool_critical_threshold = 0.95  # 95% of pool_size

        # Log configuration summary
        logger.info(
            f"Database pool configured: pool_size={self.pool_size}, "
            f"max_overflow={self.max_overflow}, total_capacity={total_app_connections}, "
            f"expected_demand={expected_demand}, available_db_connections={available_connections}"
        )

        return self

    def get_pool_kwargs(self) -> dict[str, Any]:
        """
        Get SQLAlchemy pool configuration keywords.

        Returns:
            Dictionary of pool configuration parameters for SQLAlchemy engine creation
        """
        return {
            "poolclass": QueuePool,
            "pool_size": self.pool_size,
            "max_overflow": self.max_overflow,
            "pool_timeout": self.pool_timeout,
            "pool_recycle": self.pool_recycle,
            "pool_pre_ping": self.pool_pre_ping,
            "echo_pool": self.echo_pool,
        }

    def get_monitoring_thresholds(self) -> dict[str, int]:
        """
        Get connection pool monitoring thresholds.

        Returns:
            Dictionary with warning and critical thresholds for pool monitoring
        """
        warning_count = int(self.pool_size * self.pool_warning_threshold)
        critical_count = int(self.pool_size * self.pool_critical_threshold)

        return {
            "warning_threshold": warning_count,
            "critical_threshold": critical_count,
            "pool_size": self.pool_size,
            "max_overflow": self.max_overflow,
            "total_capacity": self.pool_size + self.max_overflow,
        }

    def setup_pool_monitoring(self, engine: Engine) -> None:
        """
        Set up connection pool monitoring event listeners.

        This method registers SQLAlchemy event listeners to monitor pool usage
        and log warnings when thresholds are exceeded.

        Args:
            engine: SQLAlchemy Engine instance to monitor
        """
        thresholds = self.get_monitoring_thresholds()

        @event.listens_for(engine, "connect")
        def receive_connect(dbapi_connection, connection_record):
            """Log successful connection events."""
            pool = engine.pool
            checked_out = pool.checkedout()
            checked_in = pool.checkedin()
            total_checked_out = checked_out

            if self.echo_pool:
                logger.debug(
                    f"Connection established. Pool status: "
                    f"checked_out={checked_out}, checked_in={checked_in}, "
                    f"total_checked_out={total_checked_out}"
                )

            # Check warning threshold
            if total_checked_out >= thresholds["warning_threshold"]:
                logger.warning(
                    f"Pool usage approaching capacity: {total_checked_out}/{thresholds['pool_size']} "
                    f"connections in use (warning threshold: {thresholds['warning_threshold']})"
                )

            # Check critical threshold
            if total_checked_out >= thresholds["critical_threshold"]:
                logger.error(
                    f"Pool usage critical: {total_checked_out}/{thresholds['pool_size']} "
                    f"connections in use (critical threshold: {thresholds['critical_threshold']})"
                )

        @event.listens_for(engine, "checkout")
        def receive_checkout(dbapi_connection, connection_record, connection_proxy):
            """Log connection checkout events."""
            pool = engine.pool
            checked_out = pool.checkedout()

            if self.echo_pool:
                logger.debug(
                    f"Connection checked out. Active connections: {checked_out}"
                )

        @event.listens_for(engine, "checkin")
        def receive_checkin(dbapi_connection, connection_record):
            """Log connection checkin events."""
            pool = engine.pool
            checked_out = pool.checkedout()
            checked_in = pool.checkedin()

            if self.echo_pool:
                logger.debug(
                    f"Connection checked in. Pool status: "
                    f"checked_out={checked_out}, checked_in={checked_in}"
                )

        @event.listens_for(engine, "invalidate")
        def receive_invalidate(dbapi_connection, connection_record, exception):
            """Log connection invalidation events."""
            logger.warning(
                f"Connection invalidated due to error: {exception}. "
                f"Connection will be recycled."
            )

        @event.listens_for(engine, "soft_invalidate")
        def receive_soft_invalidate(dbapi_connection, connection_record, exception):
            """Log soft connection invalidation events."""
            logger.info(
                f"Connection soft invalidated: {exception}. "
                f"Connection will be recycled on next use."
            )

        logger.info(
            f"Pool monitoring enabled for engine. Thresholds: "
            f"warning={thresholds['warning_threshold']}, "
            f"critical={thresholds['critical_threshold']}, "
            f"capacity={thresholds['total_capacity']}"
        )

    def validate_against_database_limits(self, actual_max_connections: int) -> None:
        """
        Validate configuration against actual database connection limits.

        This method should be called after connecting to the database to verify
        that the actual database limits match the configured expectations.

        Args:
            actual_max_connections: Actual max_connections setting from database

        Raises:
            ValueError: If actual limits don't match configuration
        """
        if actual_max_connections != self.max_database_connections:
            if actual_max_connections < self.max_database_connections:
                # Actual limit is lower than expected - this is dangerous
                total_app_connections = self.pool_size + self.max_overflow
                available_connections = (
                    actual_max_connections - self.reserved_superuser_connections
                )

                if total_app_connections > available_connections:
                    raise ValueError(
                        f"Configuration invalid for actual database limits: "
                        f"actual_max_connections={actual_max_connections} < "
                        f"configured_max_connections={self.max_database_connections}. "
                        f"Pool requires {total_app_connections} connections but only "
                        f"{available_connections} are available."
                    )
                else:
                    logger.warning(
                        f"Database max_connections ({actual_max_connections}) is lower than "
                        f"configured ({self.max_database_connections}), but pool still fits."
                    )
            else:
                # Actual limit is higher - update our understanding
                logger.info(
                    f"Database max_connections ({actual_max_connections}) is higher than "
                    f"configured ({self.max_database_connections}). Configuration is safe."
                )
                self.max_database_connections = actual_max_connections

    def to_legacy_config(self, database_url: str) -> DatabaseConfig:
        """
        Convert to legacy DatabaseConfig for backward compatibility.

        This method creates a DatabaseConfig instance (from persistence interface)
        that can be used with existing code while preserving the enhanced
        configuration settings.

        Args:
            database_url: Database connection URL

        Returns:
            DatabaseConfig instance compatible with existing interfaces
        """
        return DatabaseConfig(
            database_url=database_url,
            pool_size=self.pool_size,
            max_overflow=self.max_overflow,
            pool_timeout=self.pool_timeout,
            pool_recycle=self.pool_recycle,
            echo=self.echo_pool,
            autocommit=False,  # Always False for safety
            autoflush=True,  # Default behavior
            expire_on_commit=True,  # Default behavior
        )

    @classmethod
    def from_legacy_config(
        cls, legacy_config: DatabaseConfig, **overrides
    ) -> "DatabasePoolConfig":
        """
        Create enhanced config from legacy DatabaseConfig.

        This method allows upgrading from the basic DatabaseConfig to the
        enhanced DatabasePoolConfig while preserving existing settings.

        Args:
            legacy_config: Existing DatabaseConfig instance
            **overrides: Additional configuration overrides

        Returns:
            DatabasePoolConfig with enhanced features
        """
        # Extract basic configuration
        base_config = {
            "pool_size": legacy_config.pool_size,
            "max_overflow": legacy_config.max_overflow,
            "pool_timeout": legacy_config.pool_timeout,
            "pool_recycle": legacy_config.pool_recycle,
            "echo_pool": legacy_config.echo,
        }

        # Apply any overrides
        base_config.update(overrides)

        return cls(**base_config)


def create_monitored_engine_kwargs(
    database_url: str, pool_config: DatabasePoolConfig
) -> dict[str, Any]:
    """
    Create SQLAlchemy engine kwargs with monitoring enabled.

    This is a convenience function that combines database URL with pool configuration
    and returns kwargs suitable for creating a monitored SQLAlchemy engine.

    Args:
        database_url: Database connection URL
        pool_config: DatabasePoolConfig instance

    Returns:
        Dictionary of kwargs for SQLAlchemy create_engine()

    Example:
        >>> config = DatabasePoolConfig(pool_size=10, max_overflow=5)
        >>> kwargs = create_monitored_engine_kwargs("postgresql://...", config)
        >>> engine = create_engine(database_url, **kwargs)
        >>> config.setup_pool_monitoring(engine)
    """
    engine_kwargs = {
        "url": database_url,
        **pool_config.get_pool_kwargs(),
        "connect_args": {
            "application_name": "maverick_mcp",
        },
    }

    return engine_kwargs


# Example usage and factory functions
def get_default_pool_config() -> DatabasePoolConfig:
    """
    Get default database pool configuration.

    This function provides a pre-configured DatabasePoolConfig suitable for
    most applications. Environment variables can override defaults.

    Returns:
        DatabasePoolConfig with default settings
    """
    return DatabasePoolConfig()


def get_high_concurrency_pool_config() -> DatabasePoolConfig:
    """
    Get database pool configuration optimized for high concurrency.

    Returns:
        DatabasePoolConfig optimized for high-traffic scenarios
    """
    return DatabasePoolConfig(
        pool_size=50,
        max_overflow=30,
        pool_timeout=60,
        pool_recycle=1800,  # 30 minutes
        expected_concurrent_users=60,
        connections_per_user=1.3,
        max_database_connections=200,
        reserved_superuser_connections=5,
    )


def get_development_pool_config() -> DatabasePoolConfig:
    """
    Get database pool configuration optimized for development.

    Returns:
        DatabasePoolConfig optimized for development scenarios
    """
    return DatabasePoolConfig(
        pool_size=5,
        max_overflow=2,
        pool_timeout=30,
        pool_recycle=3600,  # 1 hour
        expected_concurrent_users=5,
        connections_per_user=1.0,
        max_database_connections=20,
        reserved_superuser_connections=2,
        echo_pool=True,  # Enable debugging in development
    )


def get_pool_config_from_settings() -> DatabasePoolConfig:
    """
    Create DatabasePoolConfig from existing settings system.

    This function integrates with the existing maverick_mcp.config.settings
    to create an enhanced pool configuration while maintaining compatibility.

    Returns:
        DatabasePoolConfig based on current application settings
    """
    try:
        from maverick_mcp.config.settings import settings

        # Get environment for configuration selection
        environment = getattr(settings, "environment", "development").lower()

        if environment in ["development", "dev", "test"]:
            base_config = get_development_pool_config()
        elif environment == "production":
            base_config = get_high_concurrency_pool_config()
        else:
            base_config = get_default_pool_config()

        # Override with any specific database settings from the config
        if hasattr(settings, "db"):
            db_settings = settings.db
            overrides = {}

            if hasattr(db_settings, "pool_size"):
                overrides["pool_size"] = db_settings.pool_size
            if hasattr(db_settings, "pool_max_overflow"):
                overrides["max_overflow"] = db_settings.pool_max_overflow
            if hasattr(db_settings, "pool_timeout"):
                overrides["pool_timeout"] = db_settings.pool_timeout

            # Apply overrides if any exist
            if overrides:
                # Create new config with overrides
                config_dict = base_config.model_dump()
                config_dict.update(overrides)
                base_config = DatabasePoolConfig(**config_dict)

        logger.info(
            f"Database pool configuration loaded for environment: {environment}"
        )
        return base_config

    except ImportError:
        logger.warning("Could not import settings, using default pool configuration")
        return get_default_pool_config()


# Integration examples and utilities
def create_engine_with_enhanced_config(
    database_url: str, pool_config: DatabasePoolConfig | None = None
):
    """
    Create SQLAlchemy engine with enhanced pool configuration and monitoring.

    This is a complete example showing how to integrate the enhanced configuration
    with SQLAlchemy engine creation and monitoring setup.

    Args:
        database_url: Database connection URL
        pool_config: Optional DatabasePoolConfig, uses settings-based config if None

    Returns:
        Configured SQLAlchemy Engine with monitoring enabled

    Example:
        >>> from maverick_mcp.config.database import create_engine_with_enhanced_config
        >>> engine = create_engine_with_enhanced_config("postgresql://user:pass@localhost/db")
        >>> # Engine is now configured with validation, monitoring, and optimal settings
    """
    from sqlalchemy import create_engine

    if pool_config is None:
        pool_config = get_pool_config_from_settings()

    # Create engine with enhanced configuration
    engine_kwargs = create_monitored_engine_kwargs(database_url, pool_config)
    engine = create_engine(**engine_kwargs)

    # Set up monitoring
    pool_config.setup_pool_monitoring(engine)

    logger.info(
        f"Database engine created with enhanced pool configuration: "
        f"pool_size={pool_config.pool_size}, max_overflow={pool_config.max_overflow}"
    )

    return engine


def validate_production_config(pool_config: DatabasePoolConfig) -> bool:
    """
    Validate that pool configuration is suitable for production use.

    This function performs additional validation checks specifically for
    production environments to ensure optimal and safe configuration.

    Args:
        pool_config: DatabasePoolConfig to validate

    Returns:
        True if configuration is production-ready

    Raises:
        ValueError: If configuration is not suitable for production
    """
    errors = []
    warnings_list = []

    # Check minimum pool size for production
    if pool_config.pool_size < 10:
        warnings_list.append(
            f"Pool size ({pool_config.pool_size}) may be too small for production. "
            "Consider at least 10-20 connections."
        )

    # Check maximum pool size isn't excessive
    if pool_config.pool_size > 100:
        warnings_list.append(
            f"Pool size ({pool_config.pool_size}) may be excessive. "
            "Consider if this many connections are truly needed."
        )

    # Check timeout settings
    if pool_config.pool_timeout < 10:
        errors.append(
            f"Pool timeout ({pool_config.pool_timeout}s) is too aggressive for production. "
            "Consider at least 30 seconds."
        )

    # Check recycle settings
    if pool_config.pool_recycle > 7200:  # 2 hours
        warnings_list.append(
            f"Pool recycle time ({pool_config.pool_recycle}s) is very long. "
            "Consider 1-2 hours maximum."
        )

    # Check overflow settings
    if pool_config.max_overflow == 0:
        warnings_list.append(
            "No overflow connections configured. Consider allowing some overflow for traffic spikes."
        )

    # Log warnings
    for warning in warnings_list:
        logger.warning(f"Production config warning: {warning}")

    # Raise errors
    if errors:
        error_msg = "Production configuration validation failed:\n" + "\n".join(errors)
        raise ValueError(error_msg)

    if warnings_list:
        logger.info(
            f"Production configuration validation passed with {len(warnings_list)} warnings"
        )
    else:
        logger.info("Production configuration validation passed")

    return True


# Usage Examples and Documentation
"""
## Usage Examples

### Basic Usage

```python
from maverick_mcp.config.database import (
    DatabasePoolConfig,
    create_engine_with_enhanced_config
)

# Create enhanced database engine with monitoring
engine = create_engine_with_enhanced_config("postgresql://user:pass@localhost/db")
```

### Custom Configuration

```python
from maverick_mcp.config.database import DatabasePoolConfig

# Create custom pool configuration
config = DatabasePoolConfig(
    pool_size=25,
    max_overflow=15,
    pool_timeout=45,
    expected_concurrent_users=30,
    connections_per_user=1.5,
    max_database_connections=150
)

# Create engine with custom config
engine_kwargs = create_monitored_engine_kwargs(database_url, config)
engine = create_engine(**engine_kwargs)
config.setup_pool_monitoring(engine)
```

### Environment-Specific Configurations

```python
from maverick_mcp.config.database import (
    get_development_pool_config,
    get_high_concurrency_pool_config,
    validate_production_config
)

# Development
dev_config = get_development_pool_config()  # Small pool, debug enabled

# Production
prod_config = get_high_concurrency_pool_config()  # Large pool, optimized
validate_production_config(prod_config)  # Ensure production-ready
```

### Integration with Existing Settings

```python
from maverick_mcp.config.database import get_pool_config_from_settings

# Automatically use settings from maverick_mcp.config.settings
config = get_pool_config_from_settings()
```

### Legacy Compatibility

```python
from maverick_mcp.config.database import DatabasePoolConfig
from maverick_mcp.providers.interfaces.persistence import DatabaseConfig

# Convert enhanced config to legacy format
enhanced_config = DatabasePoolConfig(pool_size=30)
legacy_config = enhanced_config.to_legacy_config("postgresql://...")

# Upgrade legacy config to enhanced format
legacy_config = DatabaseConfig(pool_size=20)
enhanced_config = DatabasePoolConfig.from_legacy_config(legacy_config)
```

### Production Validation

```python
from maverick_mcp.config.database import validate_production_config

try:
    validate_production_config(pool_config)
    print("✅ Configuration is production-ready")
except ValueError as e:
    print(f"❌ Configuration issues: {e}")
```

### Monitoring Integration

The enhanced configuration automatically provides:

1. **Connection Pool Monitoring**: Real-time logging of pool usage
2. **Threshold Alerts**: Warnings at 80% usage, critical alerts at 95%
3. **Connection Lifecycle Tracking**: Logs for connect/disconnect/invalidate events
4. **Production Validation**: Ensures safe configuration for production use

### Environment Variables

All configuration can be overridden via environment variables:

```bash
# Core pool settings
export DB_POOL_SIZE=30
export DB_MAX_OVERFLOW=15
export DB_POOL_TIMEOUT=45
export DB_POOL_RECYCLE=1800

# Database capacity
export DB_MAX_CONNECTIONS=150
export DB_RESERVED_SUPERUSER_CONNECTIONS=5

# Usage expectations
export DB_EXPECTED_CONCURRENT_USERS=40
export DB_CONNECTIONS_PER_USER=1.3

# Debugging
export DB_POOL_PRE_PING=true
export DB_ECHO_POOL=false
```

This enhanced configuration provides production-ready database connection management
with comprehensive validation, monitoring, and safety checks while maintaining
backward compatibility with existing code.
"""

```

--------------------------------------------------------------------------------
/tests/test_security_penetration.py:
--------------------------------------------------------------------------------

```python
"""
Security Penetration Testing Suite for MaverickMCP.

This suite performs security penetration testing to validate that
security protections are active and effective against real attack vectors.

Tests include:
- Authentication bypass attempts
- CSRF attack vectors
- Rate limiting evasion
- Input validation bypass
- Session hijacking attempts
- SQL injection prevention
- XSS protection validation
- Information disclosure prevention
"""

import time
from datetime import UTC, datetime, timedelta
from uuid import uuid4

import pytest
from fastapi.testclient import TestClient

from maverick_mcp.api.api_server import create_api_app


@pytest.fixture
def security_test_app():
    """Create app for security testing."""
    return create_api_app()


@pytest.fixture
def security_client(security_test_app):
    """Create client for security testing."""
    return TestClient(security_test_app)


@pytest.fixture
def test_user():
    """Test user for security testing."""
    return {
        "email": f"sectest{uuid4().hex[:8]}@example.com",
        "password": "SecurePass123!",
        "name": "Security Test User",
        "company": "Test Security Inc",
    }


class TestAuthenticationSecurity:
    """Test authentication security against bypass attempts."""

    @pytest.mark.integration
    def test_jwt_token_manipulation_resistance(self, security_client, test_user):
        """Test resistance to JWT token manipulation attacks."""

        # Register and login
        security_client.post("/auth/register", json=test_user)
        login_response = security_client.post(
            "/auth/login",
            json={"email": test_user["email"], "password": test_user["password"]},
        )

        # Extract tokens from cookies
        cookies = login_response.cookies
        access_token_cookie = cookies.get("maverick_access_token")

        if not access_token_cookie:
            pytest.skip("JWT tokens not in cookies - may be test environment")

        # Attempt 1: Modified JWT signature
        tampered_token = access_token_cookie[:-10] + "tampered123"

        response = security_client.get(
            "/user/profile", cookies={"maverick_access_token": tampered_token}
        )
        assert response.status_code == 401  # Should reject tampered token

        # Attempt 2: Algorithm confusion attack (trying "none" algorithm)
        none_algorithm_token = "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJ1c2VyX2lkIjoxLCJleHAiOjk5OTk5OTk5OTl9."

        response = security_client.get(
            "/user/profile", cookies={"maverick_access_token": none_algorithm_token}
        )
        assert response.status_code == 401  # Should reject "none" algorithm

        # Attempt 3: Expired token
        {
            "user_id": 1,
            "exp": int((datetime.now(UTC) - timedelta(hours=1)).timestamp()),
            "iat": int((datetime.now(UTC) - timedelta(hours=2)).timestamp()),
            "jti": "expired_token",
        }

        # This would require creating an expired token with the same secret
        # For security, we just test that expired tokens are rejected

    @pytest.mark.integration
    def test_session_fixation_protection(self, security_client, test_user):
        """Test protection against session fixation attacks."""

        # Get initial session state
        initial_response = security_client.get("/auth/login")
        initial_cookies = initial_response.cookies

        # Login with potential pre-set session
        security_client.post("/auth/register", json=test_user)
        login_response = security_client.post(
            "/auth/login",
            json={"email": test_user["email"], "password": test_user["password"]},
            cookies=initial_cookies,  # Try to maintain old session
        )

        # Verify new session is created (cookies should be different)
        new_cookies = login_response.cookies

        # Session should be regenerated after login
        if "maverick_access_token" in new_cookies:
            # New token should be different from any pre-existing one
            assert login_response.status_code == 200

    @pytest.mark.integration
    def test_concurrent_session_limits(self, security_client, test_user):
        """Test limits on concurrent sessions."""

        # Register user
        security_client.post("/auth/register", json=test_user)

        # Create multiple concurrent sessions
        session_responses = []
        for _i in range(5):
            client_instance = TestClient(security_client.app)
            response = client_instance.post(
                "/auth/login",
                json={"email": test_user["email"], "password": test_user["password"]},
            )
            session_responses.append(response)

        # All should succeed (or be limited if concurrent session limits implemented)
        success_count = sum(1 for r in session_responses if r.status_code == 200)
        assert success_count >= 1  # At least one should succeed

        # If concurrent session limits are implemented, test that old sessions are invalidated

    @pytest.mark.integration
    def test_password_brute_force_protection(self, security_client, test_user):
        """Test protection against password brute force attacks."""

        # Register user
        security_client.post("/auth/register", json=test_user)

        # Attempt multiple failed logins
        failed_attempts = []
        for i in range(10):
            response = security_client.post(
                "/auth/login",
                json={"email": test_user["email"], "password": f"wrong_password_{i}"},
            )
            failed_attempts.append(response.status_code)

            # Small delay to avoid overwhelming the system
            time.sleep(0.1)

        # Should have multiple failures
        assert all(status == 401 for status in failed_attempts)

        # After multiple failures, account should be locked or rate limited
        # Test with correct password - should be blocked if protection is active
        final_attempt = security_client.post(
            "/auth/login",
            json={"email": test_user["email"], "password": test_user["password"]},
        )

        # If brute force protection is active, should be rate limited
        # Otherwise, should succeed
        assert final_attempt.status_code in [200, 401, 429]


class TestCSRFAttackVectors:
    """Test CSRF protection against various attack vectors."""

    @pytest.mark.integration
    def test_csrf_attack_simulation(self, security_client, test_user):
        """Simulate CSRF attacks to test protection."""

        # Setup authenticated session
        security_client.post("/auth/register", json=test_user)
        login_response = security_client.post(
            "/auth/login",
            json={"email": test_user["email"], "password": test_user["password"]},
        )
        csrf_token = login_response.json().get("csrf_token")

        # Attack 1: Missing CSRF token
        attack_response_1 = security_client.post(
            "/user/profile", json={"name": "Attacked Name"}
        )
        assert attack_response_1.status_code == 403
        assert "CSRF" in attack_response_1.json()["detail"]

        # Attack 2: Invalid CSRF token
        attack_response_2 = security_client.post(
            "/user/profile",
            json={"name": "Attacked Name"},
            headers={"X-CSRF-Token": "invalid_token_123"},
        )
        assert attack_response_2.status_code == 403

        # Attack 3: CSRF token from different session
        # Create second user and get their CSRF token
        other_user = {
            "email": f"other{uuid4().hex[:8]}@example.com",
            "password": "OtherPass123!",
            "name": "Other User",
        }

        other_client = TestClient(security_client.app)
        other_client.post("/auth/register", json=other_user)
        other_login = other_client.post(
            "/auth/login",
            json={"email": other_user["email"], "password": other_user["password"]},
        )
        other_csrf = other_login.json().get("csrf_token")

        # Try to use other user's CSRF token
        attack_response_3 = security_client.post(
            "/user/profile",
            json={"name": "Cross-User Attack"},
            headers={"X-CSRF-Token": other_csrf},
        )
        assert attack_response_3.status_code == 403

        # Legitimate request should work
        legitimate_response = security_client.post(
            "/user/profile",
            json={"name": "Legitimate Update"},
            headers={"X-CSRF-Token": csrf_token},
        )
        assert legitimate_response.status_code == 200

    @pytest.mark.integration
    def test_csrf_double_submit_validation(self, security_client, test_user):
        """Test CSRF double-submit cookie validation."""

        # Setup session
        security_client.post("/auth/register", json=test_user)
        login_response = security_client.post(
            "/auth/login",
            json={"email": test_user["email"], "password": test_user["password"]},
        )
        csrf_token = login_response.json().get("csrf_token")
        cookies = login_response.cookies

        # Attack: Modify CSRF cookie but keep header the same
        modified_cookies = cookies.copy()
        if "maverick_csrf_token" in modified_cookies:
            modified_cookies["maverick_csrf_token"] = "modified_csrf_token"

        attack_response = security_client.post(
            "/user/profile",
            json={"name": "CSRF Cookie Attack"},
            headers={"X-CSRF-Token": csrf_token},
            cookies=modified_cookies,
        )
        assert attack_response.status_code == 403

    @pytest.mark.integration
    def test_csrf_token_entropy_and_uniqueness(self, security_client, test_user):
        """Test CSRF tokens have sufficient entropy and are unique."""

        # Register user
        security_client.post("/auth/register", json=test_user)

        # Generate multiple CSRF tokens
        csrf_tokens = []
        for _i in range(5):
            response = security_client.post(
                "/auth/login",
                json={"email": test_user["email"], "password": test_user["password"]},
            )
            csrf_token = response.json().get("csrf_token")
            if csrf_token:
                csrf_tokens.append(csrf_token)

        if csrf_tokens:
            # All tokens should be unique
            assert len(set(csrf_tokens)) == len(csrf_tokens)

            # Tokens should have sufficient length (at least 32 chars)
            for token in csrf_tokens:
                assert len(token) >= 32

            # Tokens should not be predictable patterns
            for i, token in enumerate(csrf_tokens[1:], 1):
                # Should not be sequential or pattern-based
                assert token != csrf_tokens[0] + str(i)
                assert not token.startswith(csrf_tokens[0][:-5])


class TestRateLimitingEvasion:
    """Test rate limiting against evasion attempts."""

    @pytest.mark.integration
    def test_ip_based_rate_limit_evasion(self, security_client):
        """Test attempts to evade IP-based rate limiting."""

        # Test basic rate limiting
        responses = []
        for _i in range(25):
            response = security_client.get("/api/data")
            responses.append(response.status_code)

        # Should hit rate limit
        sum(1 for status in responses if status == 200)
        rate_limited_count = sum(1 for status in responses if status == 429)
        assert rate_limited_count > 0  # Should have some rate limited responses

        # Attempt 1: X-Forwarded-For header spoofing
        spoofed_responses = []
        for i in range(10):
            response = security_client.get(
                "/api/data", headers={"X-Forwarded-For": f"192.168.1.{i}"}
            )
            spoofed_responses.append(response.status_code)

        # Should still be rate limited (proper implementation should use real IP)
        sum(1 for status in spoofed_responses if status == 429)

        # Attempt 2: X-Real-IP header spoofing
        real_ip_responses = []
        for i in range(5):
            response = security_client.get(
                "/api/data", headers={"X-Real-IP": f"10.0.0.{i}"}
            )
            real_ip_responses.append(response.status_code)

        # Rate limiting should not be easily bypassed

    @pytest.mark.integration
    def test_user_agent_rotation_evasion(self, security_client):
        """Test rate limiting against user agent rotation."""

        user_agents = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36",
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:91.0) Gecko/20100101",
            "Mozilla/5.0 (iPhone; CPU iPhone OS 14_7_1 like Mac OS X)",
        ]

        # Attempt to evade rate limiting by rotating user agents
        ua_responses = []
        for i in range(15):
            ua = user_agents[i % len(user_agents)]
            response = security_client.get("/api/data", headers={"User-Agent": ua})
            ua_responses.append(response.status_code)

        # Should still enforce rate limiting regardless of user agent
        sum(1 for status in ua_responses if status == 429)
        # Should have some rate limiting if effective

    @pytest.mark.integration
    def test_distributed_rate_limit_evasion(self, security_client):
        """Test against distributed rate limit evasion attempts."""

        # Simulate requests with small delays (trying to stay under rate limits)
        distributed_responses = []
        for _i in range(10):
            response = security_client.get("/api/data")
            distributed_responses.append(response.status_code)
            time.sleep(0.1)  # Small delay

        # Even with delays, sustained high-rate requests should be limited
        # This tests if rate limiting has proper time windows


class TestInputValidationBypass:
    """Test input validation against bypass attempts."""

    @pytest.mark.integration
    def test_sql_injection_prevention(self, security_client, test_user):
        """Test SQL injection prevention."""

        # SQL injection payloads
        sql_payloads = [
            "'; DROP TABLE users; --",
            "' OR '1'='1",
            "' UNION SELECT * FROM users --",
            "'; DELETE FROM users WHERE '1'='1",
            "' OR 1=1 --",
            "admin'--",
            "admin'/*",
            "' OR 'x'='x",
            "' AND id IS NULL; --",
            "'OR 1=1#",
        ]

        # Test SQL injection in login email field
        for payload in sql_payloads:
            response = security_client.post(
                "/auth/login", json={"email": payload, "password": "any_password"}
            )

            # Should handle gracefully without SQL errors
            assert response.status_code in [400, 401, 422]  # Not 500 (SQL error)

            # Response should not contain SQL error messages
            response_text = response.text.lower()
            sql_error_indicators = [
                "syntax error",
                "sql",
                "mysql",
                "postgresql",
                "sqlite",
                "database",
                "column",
                "table",
                "select",
                "union",
            ]

            for indicator in sql_error_indicators:
                assert indicator not in response_text

        # Test SQL injection in registration fields
        for field in ["name", "company"]:
            malicious_user = test_user.copy()
            malicious_user[field] = "'; DROP TABLE users; --"

            response = security_client.post("/auth/register", json=malicious_user)

            # Should either reject or sanitize the input
            assert response.status_code in [200, 201, 400, 422]

            if response.status_code in [200, 201]:
                # If accepted, verify it's sanitized
                login_response = security_client.post(
                    "/auth/login",
                    json={
                        "email": malicious_user["email"],
                        "password": malicious_user["password"],
                    },
                )

                if login_response.status_code == 200:
                    csrf_token = login_response.json().get("csrf_token")
                    profile_response = security_client.get(
                        "/user/profile", headers={"X-CSRF-Token": csrf_token}
                    )

                    if profile_response.status_code == 200:
                        profile_data = profile_response.json()
                        # SQL injection should be sanitized
                        assert "DROP TABLE" not in profile_data.get(field, "")

    @pytest.mark.integration
    def test_xss_prevention(self, security_client, test_user):
        """Test XSS prevention."""

        xss_payloads = [
            "<script>alert('XSS')</script>",
            "<img src=x onerror=alert('XSS')>",
            "javascript:alert('XSS')",
            "<svg onload=alert('XSS')>",
            "<iframe src=javascript:alert('XSS')>",
            "';alert('XSS');//",
            "<body onload=alert('XSS')>",
            "<input onfocus=alert('XSS') autofocus>",
            "<select onfocus=alert('XSS') autofocus>",
            "<textarea onfocus=alert('XSS') autofocus>",
        ]

        for payload in xss_payloads:
            # Test XSS in user registration
            malicious_user = test_user.copy()
            malicious_user["email"] = f"xss{uuid4().hex[:8]}@example.com"
            malicious_user["name"] = payload

            response = security_client.post("/auth/register", json=malicious_user)

            if response.status_code in [200, 201]:
                # Login and check profile
                login_response = security_client.post(
                    "/auth/login",
                    json={
                        "email": malicious_user["email"],
                        "password": malicious_user["password"],
                    },
                )

                if login_response.status_code == 200:
                    csrf_token = login_response.json().get("csrf_token")
                    profile_response = security_client.get(
                        "/user/profile", headers={"X-CSRF-Token": csrf_token}
                    )

                    if profile_response.status_code == 200:
                        profile_data = profile_response.json()
                        stored_name = profile_data.get("name", "")

                        # XSS should be escaped or removed
                        assert "<script>" not in stored_name
                        assert "javascript:" not in stored_name
                        assert "onerror=" not in stored_name
                        assert "onload=" not in stored_name
                        assert "alert(" not in stored_name

    @pytest.mark.integration
    def test_path_traversal_prevention(self, security_client):
        """Test path traversal prevention."""

        path_traversal_payloads = [
            "../../../etc/passwd",
            "..\\..\\..\\windows\\system32\\config\\sam",
            "....//....//....//etc/passwd",
            "%2e%2e%2f%2e%2e%2f%2e%2e%2fetc%2fpasswd",
            "..%252f..%252f..%252fetc%252fpasswd",
            "..%c0%af..%c0%af..%c0%afetc%c0%afpasswd",
        ]

        # Test path traversal in file access endpoints (if any)
        for payload in path_traversal_payloads:
            # Test in URL path
            response = security_client.get(f"/api/files/{payload}")

            # Should return 404 or 400, not 500 or file contents
            assert response.status_code in [400, 404, 422]

            # Should not return file system contents
            response_text = response.text.lower()
            sensitive_file_indicators = [
                "root:",
                "daemon:",
                "bin:",
                "sys:",  # /etc/passwd content
                "[boot loader]",
                "[operating systems]",  # Windows boot.ini
                "password",
                "hash",
                "secret",
            ]

            for indicator in sensitive_file_indicators:
                assert indicator not in response_text

    @pytest.mark.integration
    def test_command_injection_prevention(self, security_client, test_user):
        """Test command injection prevention."""

        command_injection_payloads = [
            "; cat /etc/passwd",
            "| cat /etc/passwd",
            "& dir",
            "`cat /etc/passwd`",
            "$(cat /etc/passwd)",
            "; rm -rf /",
            "&& rm -rf /",
            "|| rm -rf /",
            "; shutdown -h now",
            "'; whoami; echo '",
        ]

        # Test command injection in various fields
        for payload in command_injection_payloads:
            malicious_user = test_user.copy()
            malicious_user["email"] = f"cmd{uuid4().hex[:8]}@example.com"
            malicious_user["company"] = payload

            response = security_client.post("/auth/register", json=malicious_user)

            # Should handle gracefully
            assert response.status_code in [200, 201, 400, 422]

            # Should not execute system commands
            response_text = response.text
            command_output_indicators = [
                "root:",
                "daemon:",
                "bin:",  # Output of cat /etc/passwd
                "total ",
                "drwx",  # Output of ls -la
                "uid=",
                "gid=",  # Output of whoami/id
            ]

            for indicator in command_output_indicators:
                assert indicator not in response_text


class TestInformationDisclosure:
    """Test prevention of information disclosure."""

    @pytest.mark.integration
    def test_error_message_sanitization(self, security_client):
        """Test that error messages don't leak sensitive information."""

        # Test 404 error
        response = security_client.get("/nonexistent/endpoint/123")
        assert response.status_code == 404

        error_data = response.json()
        error_message = str(error_data).lower()

        # Should not contain sensitive system information
        sensitive_info = [
            "/users/",
            "/home/",
            "\\users\\",
            "\\home\\",  # File paths
            "password",
            "secret",
            "key",
            "token",
            "jwt",  # Credentials
            "localhost",
            "127.0.0.1",
            "redis://",
            "postgresql://",  # Internal addresses
            "traceback",
            "stack trace",
            "exception",
            "error at",  # Stack traces
            "python",
            "uvicorn",
            "fastapi",
            "sqlalchemy",  # Framework details
            "database",
            "sql",
            "query",
            "connection",  # Database details
        ]

        for info in sensitive_info:
            assert info not in error_message

        # Should include request ID for tracking
        assert "request_id" in error_data or "error_id" in error_data

    @pytest.mark.integration
    def test_debug_information_disclosure(self, security_client):
        """Test that debug information is not disclosed."""

        # Attempt to trigger various error conditions
        error_test_cases = [
            ("/auth/login", {"invalid": "json_structure"}),
            ("/user/profile", {}),  # Missing authentication
        ]

        for endpoint, data in error_test_cases:
            response = security_client.post(endpoint, json=data)

            # Should not contain debug information
            response_text = response.text.lower()
            debug_indicators = [
                "traceback",
                "stack trace",
                "file ",
                "line ",
                "exception",
                "raise ",
                "assert",
                "debug",
                "__file__",
                "__name__",
                "locals()",
                "globals()",
            ]

            for indicator in debug_indicators:
                assert indicator not in response_text

    @pytest.mark.integration
    def test_version_information_disclosure(self, security_client):
        """Test that version information is not disclosed."""

        # Test common endpoints that might leak version info
        test_endpoints = [
            "/health",
            "/",
            "/api/docs",
            "/metrics",
        ]

        for endpoint in test_endpoints:
            response = security_client.get(endpoint)

            if response.status_code == 200:
                response_text = response.text.lower()

                # Should not contain detailed version information
                version_indicators = [
                    "python/",
                    "fastapi/",
                    "uvicorn/",
                    "nginx/",
                    "version",
                    "build",
                    "commit",
                    "git",
                    "dev",
                    "debug",
                    "staging",
                    "test",
                ]

                # Some version info might be acceptable in health endpoints
                if endpoint != "/health":
                    for indicator in version_indicators:
                        assert indicator not in response_text

    @pytest.mark.integration
    def test_user_enumeration_prevention(self, security_client):
        """Test prevention of user enumeration attacks."""

        # Test with valid email (user exists)
        existing_user = {
            "email": f"existing{uuid4().hex[:8]}@example.com",
            "password": "ValidPass123!",
            "name": "Existing User",
        }
        security_client.post("/auth/register", json=existing_user)

        # Test login with existing user but wrong password
        response_existing = security_client.post(
            "/auth/login",
            json={"email": existing_user["email"], "password": "wrong_password"},
        )

        # Test login with non-existing user
        response_nonexisting = security_client.post(
            "/auth/login",
            json={
                "email": f"nonexisting{uuid4().hex[:8]}@example.com",
                "password": "any_password",
            },
        )

        # Both should return similar error messages and status codes
        assert response_existing.status_code == response_nonexisting.status_code

        # Error messages should not distinguish between cases
        error_1 = response_existing.json().get("detail", "")
        error_2 = response_nonexisting.json().get("detail", "")

        # Should not contain user-specific information
        user_specific_terms = [
            "user not found",
            "user does not exist",
            "invalid user",
            "email not found",
            "account not found",
            "user unknown",
        ]

        for term in user_specific_terms:
            assert term.lower() not in error_1.lower()
            assert term.lower() not in error_2.lower()


if __name__ == "__main__":
    pytest.main([__file__, "-v", "--tb=short"])

```

--------------------------------------------------------------------------------
/maverick_mcp/tests/test_models_functional.py:
--------------------------------------------------------------------------------

```python
"""
Functional tests for SQLAlchemy models that test the actual data operations.

These tests verify model functionality by reading from the existing production database
without creating any new tables or modifying data.
"""

import os
import uuid
from datetime import datetime, timedelta
from decimal import Decimal

import pytest
from sqlalchemy import create_engine, text
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.orm import sessionmaker

from maverick_mcp.data.models import (
    DATABASE_URL,
    MaverickBearStocks,
    MaverickStocks,
    PriceCache,
    Stock,
    SupplyDemandBreakoutStocks,
    get_latest_maverick_screening,
)


@pytest.fixture(scope="session")
def read_only_engine():
    """Create a read-only database engine for the existing database."""
    # Use the existing database URL from environment or default
    db_url = os.getenv("POSTGRES_URL", DATABASE_URL)

    try:
        # Create engine with read-only intent
        engine = create_engine(db_url, echo=False)
        # Test the connection
        with engine.connect() as conn:
            conn.execute(text("SELECT 1"))
    except Exception as e:
        pytest.skip(f"Database not available: {e}")
        return

    yield engine

    engine.dispose()


@pytest.fixture(scope="function")
def db_session(read_only_engine):
    """Create a read-only database session for each test."""
    SessionLocal = sessionmaker(bind=read_only_engine)
    session = SessionLocal()

    yield session

    session.rollback()  # Rollback any potential changes
    session.close()


class TestStockModelReadOnly:
    """Test the Stock model functionality with read-only operations."""

    def test_query_stocks(self, db_session):
        """Test querying existing stocks from the database."""
        # Query for any existing stocks
        stocks = db_session.query(Stock).limit(5).all()

        # If there are stocks in the database, verify their structure
        for stock in stocks:
            assert hasattr(stock, "stock_id")
            assert hasattr(stock, "ticker_symbol")
            assert hasattr(stock, "created_at")
            assert hasattr(stock, "updated_at")

            # Verify timestamps are timezone-aware
            if stock.created_at:
                assert stock.created_at.tzinfo is not None
            if stock.updated_at:
                assert stock.updated_at.tzinfo is not None

    def test_query_by_ticker(self, db_session):
        """Test querying stock by ticker symbol."""
        # Try to find a common stock like AAPL
        stock = db_session.query(Stock).filter_by(ticker_symbol="AAPL").first()

        if stock:
            assert stock.ticker_symbol == "AAPL"
            assert isinstance(stock.stock_id, uuid.UUID)

    def test_stock_repr(self, db_session):
        """Test string representation of Stock."""
        stock = db_session.query(Stock).first()
        if stock:
            repr_str = repr(stock)
            assert "<Stock(" in repr_str
            assert "ticker=" in repr_str
            assert stock.ticker_symbol in repr_str

    def test_stock_relationships(self, db_session):
        """Test stock relationships with price caches."""
        # Find a stock that has price data
        stock_with_prices = db_session.query(Stock).join(PriceCache).distinct().first()

        if stock_with_prices:
            # Access the relationship
            price_caches = stock_with_prices.price_caches
            assert isinstance(price_caches, list)

            # Verify each price cache belongs to this stock
            for pc in price_caches[:5]:  # Check first 5
                assert pc.stock_id == stock_with_prices.stock_id
                assert pc.stock == stock_with_prices


class TestPriceCacheModelReadOnly:
    """Test the PriceCache model functionality with read-only operations."""

    def test_query_price_cache(self, db_session):
        """Test querying existing price cache entries."""
        # Query for any existing price data
        prices = db_session.query(PriceCache).limit(10).all()

        # Verify structure of price entries
        for price in prices:
            assert hasattr(price, "price_cache_id")
            assert hasattr(price, "stock_id")
            assert hasattr(price, "date")
            assert hasattr(price, "close_price")

            # Verify data types
            if price.price_cache_id:
                assert isinstance(price.price_cache_id, uuid.UUID)
            if price.close_price:
                assert isinstance(price.close_price, Decimal)

    def test_price_cache_repr(self, db_session):
        """Test string representation of PriceCache."""
        price = db_session.query(PriceCache).first()
        if price:
            repr_str = repr(price)
            assert "<PriceCache(" in repr_str
            assert "stock_id=" in repr_str
            assert "date=" in repr_str
            assert "close=" in repr_str

    def test_get_price_data(self, db_session):
        """Test retrieving price data as DataFrame for existing tickers."""
        # Try to get price data for a common stock
        # Use a recent date range that might have data
        end_date = datetime.now().date()
        start_date = end_date - timedelta(days=30)

        # Try common tickers
        for ticker in ["AAPL", "MSFT", "GOOGL"]:
            df = PriceCache.get_price_data(
                db_session,
                ticker,
                start_date.strftime("%Y-%m-%d"),
                end_date.strftime("%Y-%m-%d"),
            )

            if not df.empty:
                # Verify DataFrame structure
                assert df.index.name == "date"
                assert "open" in df.columns
                assert "high" in df.columns
                assert "low" in df.columns
                assert "close" in df.columns
                assert "volume" in df.columns
                assert "symbol" in df.columns
                assert df["symbol"].iloc[0] == ticker

                # Verify data types
                assert df["close"].dtype == float
                assert df["volume"].dtype == int
                break

    def test_stock_relationship(self, db_session):
        """Test relationship back to Stock."""
        # Find a price entry with stock relationship
        price = db_session.query(PriceCache).join(Stock).first()

        if price:
            assert price.stock is not None
            assert price.stock.stock_id == price.stock_id
            assert hasattr(price.stock, "ticker_symbol")


@pytest.mark.integration
class TestMaverickStocksReadOnly:
    """Test MaverickStocks model functionality with read-only operations."""

    def test_query_maverick_stocks(self, db_session):
        """Test querying existing maverick stock entries."""
        try:
            # Query for any existing maverick stocks
            mavericks = db_session.query(MaverickStocks).limit(10).all()

            # Verify structure of maverick entries
            for maverick in mavericks:
                assert hasattr(maverick, "id")
                assert hasattr(maverick, "stock")
                assert hasattr(maverick, "close")
                assert hasattr(maverick, "combined_score")
                assert hasattr(maverick, "momentum_score")
        except Exception as e:
            if "does not exist" in str(e):
                pytest.skip(f"MaverickStocks table not found: {e}")
            else:
                raise

    def test_maverick_repr(self, db_session):
        """Test string representation of MaverickStocks."""
        try:
            maverick = db_session.query(MaverickStocks).first()
            if maverick:
                repr_str = repr(maverick)
                assert "<MaverickStock(" in repr_str
                assert "stock=" in repr_str
                assert "close=" in repr_str
                assert "score=" in repr_str
        except ProgrammingError as e:
            if "does not exist" in str(e):
                pytest.skip(f"MaverickStocks table not found: {e}")
            else:
                raise

    def test_get_top_stocks(self, db_session):
        """Test retrieving top maverick stocks."""
        try:
            # Get top stocks from existing data
            top_stocks = MaverickStocks.get_top_stocks(db_session, limit=20)

            # Verify results are sorted by combined_score
            if len(top_stocks) > 1:
                for i in range(len(top_stocks) - 1):
                    assert (
                        top_stocks[i].combined_score >= top_stocks[i + 1].combined_score
                    )

            # Verify limit is respected
            assert len(top_stocks) <= 20
        except ProgrammingError as e:
            if "does not exist" in str(e):
                pytest.skip(f"MaverickStocks table not found: {e}")
            else:
                raise

    def test_maverick_to_dict(self, db_session):
        """Test converting MaverickStocks to dictionary."""
        try:
            maverick = db_session.query(MaverickStocks).first()
            if maverick:
                data = maverick.to_dict()

                # Verify expected keys
                expected_keys = [
                    "stock",
                    "close",
                    "volume",
                    "momentum_score",
                    "adr_pct",
                    "pattern",
                    "squeeze",
                    "consolidation",
                    "entry",
                    "combined_score",
                    "compression_score",
                    "pattern_detected",
                ]
                for key in expected_keys:
                    assert key in data

                # Verify data types
                assert isinstance(data["stock"], str)
                assert isinstance(data["combined_score"], int | type(None))
        except ProgrammingError as e:
            if "does not exist" in str(e):
                pytest.skip(f"MaverickStocks table not found: {e}")
            else:
                raise


@pytest.mark.integration
class TestMaverickBearStocksReadOnly:
    """Test MaverickBearStocks model functionality with read-only operations."""

    def test_query_bear_stocks(self, db_session):
        """Test querying existing maverick bear stock entries."""
        try:
            # Query for any existing bear stocks
            bears = db_session.query(MaverickBearStocks).limit(10).all()

            # Verify structure of bear entries
            for bear in bears:
                assert hasattr(bear, "id")
                assert hasattr(bear, "stock")
                assert hasattr(bear, "close")
                assert hasattr(bear, "score")
                assert hasattr(bear, "momentum_score")
                assert hasattr(bear, "rsi_14")
                assert hasattr(bear, "atr_contraction")
                assert hasattr(bear, "big_down_vol")
        except Exception as e:
            if "does not exist" in str(e):
                pytest.skip(f"MaverickBearStocks table not found: {e}")
            else:
                raise

    def test_bear_repr(self, db_session):
        """Test string representation of MaverickBearStocks."""
        try:
            bear = db_session.query(MaverickBearStocks).first()
            if bear:
                repr_str = repr(bear)
                assert "<MaverickBearStock(" in repr_str
                assert "stock=" in repr_str
                assert "close=" in repr_str
                assert "score=" in repr_str
        except ProgrammingError as e:
            if "does not exist" in str(e):
                pytest.skip(f"MaverickBearStocks table not found: {e}")
            else:
                raise

    def test_get_top_bear_stocks(self, db_session):
        """Test retrieving top bear stocks."""
        try:
            # Get top bear stocks from existing data
            top_bears = MaverickBearStocks.get_top_stocks(db_session, limit=20)

            # Verify results are sorted by score
            if len(top_bears) > 1:
                for i in range(len(top_bears) - 1):
                    assert top_bears[i].score >= top_bears[i + 1].score

            # Verify limit is respected
            assert len(top_bears) <= 20
        except ProgrammingError as e:
            if "does not exist" in str(e):
                pytest.skip(f"MaverickBearStocks table not found: {e}")
            else:
                raise

    def test_bear_to_dict(self, db_session):
        """Test converting MaverickBearStocks to dictionary."""
        try:
            bear = db_session.query(MaverickBearStocks).first()
            if bear:
                data = bear.to_dict()

                # Verify expected keys
                expected_keys = [
                    "stock",
                    "close",
                    "volume",
                    "momentum_score",
                    "rsi_14",
                    "macd",
                    "macd_signal",
                    "macd_histogram",
                    "adr_pct",
                    "atr",
                    "atr_contraction",
                    "avg_vol_30d",
                    "big_down_vol",
                    "score",
                    "squeeze",
                    "consolidation",
                ]
                for key in expected_keys:
                    assert key in data

                # Verify boolean fields
                assert isinstance(data["atr_contraction"], bool)
                assert isinstance(data["big_down_vol"], bool)
        except ProgrammingError as e:
            if "does not exist" in str(e):
                pytest.skip(f"MaverickBearStocks table not found: {e}")
            else:
                raise


@pytest.mark.integration
class TestSupplyDemandBreakoutStocksReadOnly:
    """Test SupplyDemandBreakoutStocks model functionality with read-only operations."""

    def test_query_supply_demand_stocks(self, db_session):
        """Test querying existing supply/demand breakout stock entries."""
        try:
            # Query for any existing supply/demand breakout stocks
            stocks = db_session.query(SupplyDemandBreakoutStocks).limit(10).all()

            # Verify structure of supply/demand breakout entries
            for stock in stocks:
                assert hasattr(stock, "id")
                assert hasattr(stock, "stock")
                assert hasattr(stock, "close")
                assert hasattr(stock, "momentum_score")
                assert hasattr(stock, "sma_50")
                assert hasattr(stock, "sma_150")
                assert hasattr(stock, "sma_200")
        except Exception as e:
            if "does not exist" in str(e):
                pytest.skip(f"SupplyDemandBreakoutStocks table not found: {e}")
            else:
                raise

    def test_supply_demand_repr(self, db_session):
        """Test string representation of SupplyDemandBreakoutStocks."""
        try:
            supply_demand = db_session.query(SupplyDemandBreakoutStocks).first()
            if supply_demand:
                repr_str = repr(supply_demand)
                assert "<supply/demand breakoutStock(" in repr_str
                assert "stock=" in repr_str
                assert "close=" in repr_str
                assert "rs=" in repr_str
        except ProgrammingError as e:
            if "does not exist" in str(e):
                pytest.skip(f"SupplyDemandBreakoutStocks table not found: {e}")
            else:
                raise

    def test_get_top_supply_demand_stocks(self, db_session):
        """Test retrieving top supply/demand breakout stocks."""
        try:
            # Get top stocks from existing data
            top_stocks = SupplyDemandBreakoutStocks.get_top_stocks(db_session, limit=20)

            # Verify results are sorted by momentum_score
            if len(top_stocks) > 1:
                for i in range(len(top_stocks) - 1):
                    assert (
                        top_stocks[i].momentum_score >= top_stocks[i + 1].momentum_score
                    )

            # Verify limit is respected
            assert len(top_stocks) <= 20
        except ProgrammingError as e:
            if "does not exist" in str(e):
                pytest.skip(f"SupplyDemandBreakoutStocks table not found: {e}")
            else:
                raise

    def test_get_stocks_above_moving_averages(self, db_session):
        """Test retrieving stocks meeting supply/demand breakout criteria."""
        try:
            # Get stocks that meet supply/demand breakout criteria from existing data
            stocks = SupplyDemandBreakoutStocks.get_stocks_above_moving_averages(
                db_session
            )

            # Verify all returned stocks meet the criteria
            for stock in stocks:
                assert stock.close > stock.sma_50
                assert stock.close > stock.sma_150
                assert stock.close > stock.sma_200
                assert stock.sma_50 > stock.sma_150
                assert stock.sma_150 > stock.sma_200

            # Verify they're sorted by momentum score
            if len(stocks) > 1:
                for i in range(len(stocks) - 1):
                    assert stocks[i].momentum_score >= stocks[i + 1].momentum_score
        except ProgrammingError as e:
            if "does not exist" in str(e):
                pytest.skip(f"SupplyDemandBreakoutStocks table not found: {e}")
            else:
                raise

    def test_supply_demand_to_dict(self, db_session):
        """Test converting SupplyDemandBreakoutStocks to dictionary."""
        try:
            supply_demand = db_session.query(SupplyDemandBreakoutStocks).first()
            if supply_demand:
                data = supply_demand.to_dict()

                # Verify expected keys
                expected_keys = [
                    "stock",
                    "close",
                    "volume",
                    "momentum_score",
                    "adr_pct",
                    "pattern",
                    "squeeze",
                    "consolidation",
                    "entry",
                    "ema_21",
                    "sma_50",
                    "sma_150",
                    "sma_200",
                    "atr",
                    "avg_volume_30d",
                ]
                for key in expected_keys:
                    assert key in data

                # Verify data types
                assert isinstance(data["stock"], str)
                assert isinstance(data["momentum_score"], float | int)
        except ProgrammingError as e:
            if "does not exist" in str(e):
                pytest.skip(f"SupplyDemandBreakoutStocks table not found: {e}")
            else:
                raise


@pytest.mark.integration
class TestGetLatestMaverickScreeningReadOnly:
    """Test the get_latest_maverick_screening function with read-only operations."""

    def test_get_latest_screening(self):
        """Test retrieving latest screening results from existing data."""
        try:
            # Call the function directly - it creates its own session
            results = get_latest_maverick_screening()

            # Verify structure
            assert isinstance(results, dict)
            assert "maverick_stocks" in results
            assert "maverick_bear_stocks" in results
            assert "supply_demand_stocks" in results

            # Verify each result is a list of dictionaries
            assert isinstance(results["maverick_stocks"], list)
            assert isinstance(results["maverick_bear_stocks"], list)
            assert isinstance(results["supply_demand_stocks"], list)

            # If there are maverick stocks, verify their structure
            if results["maverick_stocks"]:
                stock_dict = results["maverick_stocks"][0]
                assert isinstance(stock_dict, dict)
                assert "stock" in stock_dict
                assert "combined_score" in stock_dict

                # Verify they're sorted by combined_score
                scores = [s["combined_score"] for s in results["maverick_stocks"]]
                assert scores == sorted(scores, reverse=True)

            # If there are bear stocks, verify their structure
            if results["maverick_bear_stocks"]:
                bear_dict = results["maverick_bear_stocks"][0]
                assert isinstance(bear_dict, dict)
                assert "stock" in bear_dict
                assert "score" in bear_dict

                # Verify they're sorted by score
                scores = [s["score"] for s in results["maverick_bear_stocks"]]
                assert scores == sorted(scores, reverse=True)

            # If there are supply/demand breakout stocks, verify their structure
            if results["supply_demand_stocks"]:
                min_dict = results["supply_demand_stocks"][0]
                assert isinstance(min_dict, dict)
                assert "stock" in min_dict
                assert "momentum_score" in min_dict

                # Verify they're sorted by momentum_score
                ratings = [s["momentum_score"] for s in results["supply_demand_stocks"]]
                assert ratings == sorted(ratings, reverse=True)

        except Exception as e:
            # If tables don't exist, that's okay for read-only tests
            if "does not exist" in str(e):
                pytest.skip(f"Screening tables not found in database: {e}")
            else:
                raise


class TestDatabaseStructureReadOnly:
    """Test database structure and relationships with read-only operations."""

    def test_stock_ticker_query_performance(self, db_session):
        """Test that ticker queries work efficiently (index should exist)."""
        # Query for a specific ticker - should be fast if indexed
        import time

        start_time = time.time()

        # Try to find a stock by ticker
        stock = db_session.query(Stock).filter_by(ticker_symbol="AAPL").first()

        query_time = time.time() - start_time

        # Query should be reasonably fast if properly indexed
        # Allow up to 1 second for connection overhead
        assert query_time < 1.0

        # If stock exists, verify it has expected fields
        if stock:
            assert stock.ticker_symbol == "AAPL"

    def test_price_cache_date_query_performance(self, db_session):
        """Test that price cache queries by stock and date are efficient."""
        # First find a stock with prices
        stock_with_prices = db_session.query(Stock).join(PriceCache).first()

        if stock_with_prices:
            # Get a recent date
            recent_price = (
                db_session.query(PriceCache)
                .filter_by(stock_id=stock_with_prices.stock_id)
                .order_by(PriceCache.date.desc())
                .first()
            )

            if recent_price:
                # Query for specific stock_id and date - should be fast
                import time

                start_time = time.time()

                result = (
                    db_session.query(PriceCache)
                    .filter_by(
                        stock_id=stock_with_prices.stock_id, date=recent_price.date
                    )
                    .first()
                )

                query_time = time.time() - start_time

                # Query should be reasonably fast if composite index exists
                assert query_time < 1.0
                assert result is not None
                assert result.price_cache_id == recent_price.price_cache_id


class TestDataTypesAndConstraintsReadOnly:
    """Test data types and constraints with read-only operations."""

    def test_null_values_in_existing_data(self, db_session):
        """Test handling of null values in optional fields in existing data."""
        # Query stocks that might have null values
        stocks = db_session.query(Stock).limit(20).all()

        for stock in stocks:
            # These fields are optional and can be None
            assert hasattr(stock, "company_name")
            assert hasattr(stock, "sector")
            assert hasattr(stock, "industry")

            # Verify ticker_symbol is never null (it's required)
            assert stock.ticker_symbol is not None
            assert isinstance(stock.ticker_symbol, str)

    def test_decimal_precision_in_existing_data(self, db_session):
        """Test decimal precision in existing price data."""
        # Query some price data
        prices = db_session.query(PriceCache).limit(10).all()

        for price in prices:
            # Verify decimal fields
            if price.close_price is not None:
                assert isinstance(price.close_price, Decimal)
                # Check precision (should have at most 2 decimal places)
                str_price = str(price.close_price)
                if "." in str_price:
                    decimal_places = len(str_price.split(".")[1])
                    assert decimal_places <= 2

            # Same for other price fields
            for field in ["open_price", "high_price", "low_price"]:
                value = getattr(price, field)
                if value is not None:
                    assert isinstance(value, Decimal)

    def test_volume_data_types(self, db_session):
        """Test volume data types in existing data."""
        # Query price data with volumes
        prices = (
            db_session.query(PriceCache)
            .filter(PriceCache.volume.isnot(None))
            .limit(10)
            .all()
        )

        for price in prices:
            assert isinstance(price.volume, int)
            assert price.volume >= 0

    def test_timezone_handling_in_existing_data(self, db_session):
        """Test that timestamps have timezone info in existing data."""
        # Query any model with timestamps
        stocks = db_session.query(Stock).limit(5).all()

        # Skip test if no stocks found
        if not stocks:
            pytest.skip("No stock data found in database")

        # Check if data has timezone info (newer data should, legacy data might not)
        has_tz_info = False
        for stock in stocks:
            if stock.created_at and stock.created_at.tzinfo is not None:
                has_tz_info = True
                # Data should have timezone info (not necessarily UTC for legacy data)
                # New data created by the app will be UTC

            if stock.updated_at and stock.updated_at.tzinfo is not None:
                has_tz_info = True
                # Data should have timezone info (not necessarily UTC for legacy data)

        # This test just verifies that timezone-aware timestamps are being used
        # Legacy data might not be UTC, but new data will be
        if has_tz_info:
            # Pass - data has timezone info which is what we want
            pass
        else:
            pytest.skip(
                "Legacy data without timezone info - new data will have timezone info"
            )

    def test_relationships_integrity(self, db_session):
        """Test that relationships maintain referential integrity."""
        # Find prices with valid stock relationships
        prices_with_stocks = db_session.query(PriceCache).join(Stock).limit(10).all()

        for price in prices_with_stocks:
            # Verify the relationship is intact
            assert price.stock is not None
            assert price.stock.stock_id == price.stock_id

            # Verify reverse relationship
            assert price in price.stock.price_caches

```

--------------------------------------------------------------------------------
/examples/llm_optimization_example.py:
--------------------------------------------------------------------------------

```python
"""
LLM Optimization Example for Research Agents - Speed-Optimized Edition.

This example demonstrates how to use the comprehensive LLM optimization strategies
with new speed-optimized models to prevent research agent timeouts while maintaining
research quality. Features 2-3x speed improvements with Gemini 2.5 Flash and GPT-4o Mini.
"""

import asyncio
import logging
import os
import time
from typing import Any

from maverick_mcp.agents.optimized_research import (
    OptimizedDeepResearchAgent,
    create_optimized_research_agent,
)
from maverick_mcp.config.llm_optimization_config import (
    ModelSelectionStrategy,
    ResearchComplexity,
    create_adaptive_config,
    create_balanced_config,
    create_emergency_config,
    create_fast_config,
)
from maverick_mcp.providers.openrouter_provider import (
    OpenRouterProvider,
    TaskType,
)

# Set up logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


class OptimizationExamples:
    """Examples demonstrating LLM optimization strategies."""

    def __init__(self, openrouter_api_key: str):
        """Initialize with OpenRouter API key."""
        self.openrouter_api_key = openrouter_api_key

    async def example_1_emergency_research(self) -> dict[str, Any]:
        """
        Example 1: Emergency research with <20 second time budget.

        Use case: Real-time alerts or urgent market events requiring immediate analysis.
        """
        logger.info("🚨 Example 1: Emergency Research (<20s)")

        # Create emergency configuration (for optimization reference)
        _ = create_emergency_config(time_budget=15.0)

        # Create optimized agent
        agent = create_optimized_research_agent(
            openrouter_api_key=self.openrouter_api_key,
            persona="aggressive",  # Aggressive for quick decisions
            time_budget_seconds=15.0,
            target_confidence=0.6,  # Lower bar for emergency
        )

        # Execute emergency research
        start_time = time.time()

        result = await agent.research_comprehensive(
            topic="NVDA earnings surprise impact",
            session_id="emergency_001",
            depth="basic",
            focus_areas=["sentiment", "catalyst"],
            time_budget_seconds=15.0,
            target_confidence=0.6,
        )

        execution_time = time.time() - start_time

        logger.info(f"✅ Emergency research completed in {execution_time:.2f}s")
        logger.info(
            f"Optimization features used: {result.get('optimization_metrics', {}).get('optimization_features_used', [])}"
        )

        return {
            "scenario": "emergency",
            "time_budget": 15.0,
            "actual_time": execution_time,
            "success": execution_time < 20,  # Success if under 20s
            "confidence": result.get("findings", {}).get("confidence_score", 0),
            "sources_processed": result.get("sources_analyzed", 0),
            "optimization_features": result.get("optimization_metrics", {}).get(
                "optimization_features_used", []
            ),
        }

    async def example_2_fast_research(self) -> dict[str, Any]:
        """
        Example 2: Fast research with 45 second time budget.

        Use case: Quick analysis for trading decisions or portfolio updates.
        """
        logger.info("⚡ Example 2: Fast Research (45s)")

        # Create fast configuration
        _ = create_fast_config(time_budget=45.0)

        # Create optimized agent
        agent = create_optimized_research_agent(
            openrouter_api_key=self.openrouter_api_key,
            persona="moderate",
            time_budget_seconds=45.0,
            target_confidence=0.7,
        )

        start_time = time.time()

        result = await agent.research_comprehensive(
            topic="Tesla Q4 2024 delivery numbers analysis",
            session_id="fast_001",
            depth="standard",
            focus_areas=["fundamental", "sentiment"],
            time_budget_seconds=45.0,
            target_confidence=0.7,
        )

        execution_time = time.time() - start_time

        logger.info(f"✅ Fast research completed in {execution_time:.2f}s")

        return {
            "scenario": "fast",
            "time_budget": 45.0,
            "actual_time": execution_time,
            "success": execution_time < 60,
            "confidence": result.get("findings", {}).get("confidence_score", 0),
            "sources_processed": result.get("sources_analyzed", 0),
            "early_terminated": result.get("findings", {}).get(
                "early_terminated", False
            ),
        }

    async def example_3_balanced_research(self) -> dict[str, Any]:
        """
        Example 3: Balanced research with 2 minute time budget.

        Use case: Standard research for investment decisions.
        """
        logger.info("⚖️ Example 3: Balanced Research (120s)")

        # Create balanced configuration
        _ = create_balanced_config(time_budget=120.0)

        agent = create_optimized_research_agent(
            openrouter_api_key=self.openrouter_api_key,
            persona="conservative",
            time_budget_seconds=120.0,
            target_confidence=0.75,
        )

        start_time = time.time()

        result = await agent.research_comprehensive(
            topic="Microsoft cloud services competitive position 2024",
            session_id="balanced_001",
            depth="comprehensive",
            focus_areas=["competitive", "fundamental", "technical"],
            time_budget_seconds=120.0,
            target_confidence=0.75,
        )

        execution_time = time.time() - start_time

        logger.info(f"✅ Balanced research completed in {execution_time:.2f}s")

        return {
            "scenario": "balanced",
            "time_budget": 120.0,
            "actual_time": execution_time,
            "success": execution_time < 150,  # 25% buffer
            "confidence": result.get("findings", {}).get("confidence_score", 0),
            "sources_processed": result.get("sources_analyzed", 0),
            "processing_mode": result.get("findings", {}).get(
                "processing_mode", "unknown"
            ),
        }

    async def example_4_adaptive_research(self) -> dict[str, Any]:
        """
        Example 4: Adaptive research that adjusts based on complexity and available time.

        Use case: Dynamic research where time constraints may vary.
        """
        logger.info("🎯 Example 4: Adaptive Research")

        # Simulate varying time constraints
        scenarios = [
            {
                "time_budget": 30,
                "complexity": ResearchComplexity.SIMPLE,
                "topic": "Apple stock price today",
            },
            {
                "time_budget": 90,
                "complexity": ResearchComplexity.MODERATE,
                "topic": "Federal Reserve interest rate policy impact on tech stocks",
            },
            {
                "time_budget": 180,
                "complexity": ResearchComplexity.COMPLEX,
                "topic": "Cryptocurrency regulation implications for financial institutions",
            },
        ]

        results = []

        for i, scenario in enumerate(scenarios):
            logger.info(
                f"📊 Adaptive scenario {i + 1}: {scenario['complexity'].value} complexity, {scenario['time_budget']}s budget"
            )

            # Create adaptive configuration
            config = create_adaptive_config(
                time_budget_seconds=scenario["time_budget"],
                complexity=scenario["complexity"],
            )

            agent = create_optimized_research_agent(
                openrouter_api_key=self.openrouter_api_key, persona="moderate"
            )

            start_time = time.time()

            result = await agent.research_comprehensive(
                topic=scenario["topic"],
                session_id=f"adaptive_{i + 1:03d}",
                time_budget_seconds=scenario["time_budget"],
                target_confidence=config.preset.target_confidence,
            )

            execution_time = time.time() - start_time

            scenario_result = {
                "scenario_id": i + 1,
                "complexity": scenario["complexity"].value,
                "time_budget": scenario["time_budget"],
                "actual_time": execution_time,
                "success": execution_time < scenario["time_budget"] * 1.1,  # 10% buffer
                "confidence": result.get("findings", {}).get("confidence_score", 0),
                "sources_processed": result.get("sources_analyzed", 0),
                "adaptations_used": result.get("optimization_metrics", {}).get(
                    "optimization_features_used", []
                ),
            }

            results.append(scenario_result)

            logger.info(
                f"✅ Adaptive scenario {i + 1} completed in {execution_time:.2f}s"
            )

        return {
            "scenario": "adaptive",
            "scenarios_tested": len(scenarios),
            "results": results,
            "overall_success": all(r["success"] for r in results),
        }

    async def example_5_optimization_comparison(self) -> dict[str, Any]:
        """
        Example 5: Compare optimized vs non-optimized research performance.

        Use case: Demonstrate the effectiveness of optimizations.
        """
        logger.info("📈 Example 5: Optimization Comparison")

        test_topic = "Amazon Web Services market share growth 2024"
        time_budget = 90.0

        results = {}

        # Test with optimizations enabled
        logger.info("🔧 Testing WITH optimizations...")

        optimized_agent = OptimizedDeepResearchAgent(
            openrouter_provider=OpenRouterProvider(self.openrouter_api_key),
            persona="moderate",
            optimization_enabled=True,
        )

        start_time = time.time()
        optimized_result = await optimized_agent.research_comprehensive(
            topic=test_topic,
            session_id="comparison_optimized",
            time_budget_seconds=time_budget,
            target_confidence=0.75,
        )
        optimized_time = time.time() - start_time

        results["optimized"] = {
            "execution_time": optimized_time,
            "success": optimized_time < time_budget,
            "confidence": optimized_result.get("findings", {}).get(
                "confidence_score", 0
            ),
            "sources_processed": optimized_result.get("sources_analyzed", 0),
            "optimization_features": optimized_result.get(
                "optimization_metrics", {}
            ).get("optimization_features_used", []),
        }

        # Test with optimizations disabled
        logger.info("🐌 Testing WITHOUT optimizations...")

        standard_agent = OptimizedDeepResearchAgent(
            openrouter_provider=OpenRouterProvider(self.openrouter_api_key),
            persona="moderate",
            optimization_enabled=False,
        )

        start_time = time.time()
        try:
            standard_result = await asyncio.wait_for(
                standard_agent.research_comprehensive(
                    topic=test_topic, session_id="comparison_standard", depth="standard"
                ),
                timeout=time_budget + 30,  # Give extra time for timeout demonstration
            )
            standard_time = time.time() - start_time

            results["standard"] = {
                "execution_time": standard_time,
                "success": standard_time < time_budget,
                "confidence": standard_result.get("findings", {}).get(
                    "confidence_score", 0
                ),
                "sources_processed": standard_result.get("sources_analyzed", 0),
                "timed_out": False,
            }

        except TimeoutError:
            standard_time = time_budget + 30
            results["standard"] = {
                "execution_time": standard_time,
                "success": False,
                "confidence": 0,
                "sources_processed": 0,
                "timed_out": True,
            }

        # Calculate improvement metrics
        time_improvement = (
            (
                results["standard"]["execution_time"]
                - results["optimized"]["execution_time"]
            )
            / results["standard"]["execution_time"]
            * 100
        )
        confidence_ratio = results["optimized"]["confidence"] / max(
            results["standard"]["confidence"], 0.01
        )

        results["comparison"] = {
            "time_improvement_pct": time_improvement,
            "optimized_faster": results["optimized"]["execution_time"]
            < results["standard"]["execution_time"],
            "confidence_ratio": confidence_ratio,
            "both_successful": results["optimized"]["success"]
            and results["standard"]["success"],
        }

        logger.info("📊 Optimization Results:")
        logger.info(
            f"   Optimized: {results['optimized']['execution_time']:.2f}s (success: {results['optimized']['success']})"
        )
        logger.info(
            f"   Standard: {results['standard']['execution_time']:.2f}s (success: {results['standard']['success']})"
        )
        logger.info(f"   Time improvement: {time_improvement:.1f}%")

        return results

    async def example_6_speed_optimized_models(self) -> dict[str, Any]:
        """
        Example 6: Test the new speed-optimized models (Gemini 2.5 Flash, GPT-4o Mini).

        Use case: Demonstrate 2-3x speed improvements with the fastest available models.
        """
        logger.info("🚀 Example 6: Speed-Optimized Models Test")

        speed_test_results = {}

        # Test Gemini 2.5 Flash (199 tokens/sec - fastest)
        logger.info("🔥 Testing Gemini 2.5 Flash (199 tokens/sec)...")
        provider = OpenRouterProvider(self.openrouter_api_key)

        gemini_llm = provider.get_llm(
            model_override="google/gemini-2.5-flash",
            task_type=TaskType.DEEP_RESEARCH,
            prefer_fast=True,
        )

        start_time = time.time()
        try:
            response = await gemini_llm.ainvoke(
                [
                    {
                        "role": "user",
                        "content": "Analyze Tesla's Q4 2024 performance in exactly 3 bullet points. Be concise and factual.",
                    }
                ]
            )
            gemini_time = time.time() - start_time

            # Safely handle content that could be string or list
            content_text = (
                response.content
                if isinstance(response.content, str)
                else str(response.content)
                if response.content
                else ""
            )
            speed_test_results["gemini_2_5_flash"] = {
                "execution_time": gemini_time,
                "tokens_per_second": len(content_text.split()) / gemini_time
                if gemini_time > 0
                else 0,
                "success": True,
                "response_quality": "high" if len(content_text) > 50 else "low",
            }
        except Exception as e:
            speed_test_results["gemini_2_5_flash"] = {
                "execution_time": 999,
                "success": False,
                "error": str(e),
            }

        # Test GPT-4o Mini (126 tokens/sec - excellent balance)
        logger.info("⚡ Testing GPT-4o Mini (126 tokens/sec)...")

        gpt_llm = provider.get_llm(
            model_override="openai/gpt-4o-mini",
            task_type=TaskType.MARKET_ANALYSIS,
            prefer_fast=True,
        )

        start_time = time.time()
        try:
            response = await gpt_llm.ainvoke(
                [
                    {
                        "role": "user",
                        "content": "Analyze Amazon's cloud services competitive position in exactly 3 bullet points. Be concise and factual.",
                    }
                ]
            )
            gpt_time = time.time() - start_time

            # Safely handle content that could be string or list
            content_text = (
                response.content
                if isinstance(response.content, str)
                else str(response.content)
                if response.content
                else ""
            )
            speed_test_results["gpt_4o_mini"] = {
                "execution_time": gpt_time,
                "tokens_per_second": len(content_text.split()) / gpt_time
                if gpt_time > 0
                else 0,
                "success": True,
                "response_quality": "high" if len(content_text) > 50 else "low",
            }
        except Exception as e:
            speed_test_results["gpt_4o_mini"] = {
                "execution_time": 999,
                "success": False,
                "error": str(e),
            }

        # Test Claude 3.5 Haiku (65.6 tokens/sec - old baseline)
        logger.info("🐌 Testing Claude 3.5 Haiku (65.6 tokens/sec - baseline)...")

        claude_llm = provider.get_llm(
            model_override="anthropic/claude-3.5-haiku",
            task_type=TaskType.QUICK_ANSWER,
            prefer_fast=True,
        )

        start_time = time.time()
        try:
            response = await claude_llm.ainvoke(
                [
                    {
                        "role": "user",
                        "content": "Analyze Microsoft's AI strategy in exactly 3 bullet points. Be concise and factual.",
                    }
                ]
            )
            claude_time = time.time() - start_time

            # Safely handle content that could be string or list
            content_text = (
                response.content
                if isinstance(response.content, str)
                else str(response.content)
                if response.content
                else ""
            )
            speed_test_results["claude_3_5_haiku"] = {
                "execution_time": claude_time,
                "tokens_per_second": len(content_text.split()) / claude_time
                if claude_time > 0
                else 0,
                "success": True,
                "response_quality": "high" if len(content_text) > 50 else "low",
            }
        except Exception as e:
            speed_test_results["claude_3_5_haiku"] = {
                "execution_time": 999,
                "success": False,
                "error": str(e),
            }

        # Calculate speed improvements
        baseline_time = speed_test_results.get("claude_3_5_haiku", {}).get(
            "execution_time", 10
        )

        if speed_test_results["gemini_2_5_flash"]["success"]:
            gemini_improvement = (
                (
                    baseline_time
                    - speed_test_results["gemini_2_5_flash"]["execution_time"]
                )
                / baseline_time
                * 100
            )
        else:
            gemini_improvement = 0

        if speed_test_results["gpt_4o_mini"]["success"]:
            gpt_improvement = (
                (baseline_time - speed_test_results["gpt_4o_mini"]["execution_time"])
                / baseline_time
                * 100
            )
        else:
            gpt_improvement = 0

        # Test emergency model selection
        emergency_models = ModelSelectionStrategy.get_model_priority(
            time_remaining=20.0,
            task_type=TaskType.DEEP_RESEARCH,
            complexity=ResearchComplexity.MODERATE,
        )

        logger.info("📊 Speed Test Results:")
        logger.info(
            f"   Gemini 2.5 Flash: {speed_test_results['gemini_2_5_flash']['execution_time']:.2f}s ({gemini_improvement:+.1f}% vs baseline)"
        )
        logger.info(
            f"   GPT-4o Mini: {speed_test_results['gpt_4o_mini']['execution_time']:.2f}s ({gpt_improvement:+.1f}% vs baseline)"
        )
        logger.info(
            f"   Claude 3.5 Haiku: {speed_test_results['claude_3_5_haiku']['execution_time']:.2f}s (baseline)"
        )
        logger.info(f"   Emergency models: {emergency_models[:2]}")

        return {
            "scenario": "speed_optimization",
            "models_tested": 3,
            "speed_results": speed_test_results,
            "improvements": {
                "gemini_2_5_flash_vs_baseline_pct": gemini_improvement,
                "gpt_4o_mini_vs_baseline_pct": gpt_improvement,
            },
            "emergency_models": emergency_models[:2],
            "success": all(
                result.get("success", False) for result in speed_test_results.values()
            ),
            "fastest_model": min(
                speed_test_results.items(),
                key=lambda x: x[1].get("execution_time", 999),
            )[0],
            "speed_optimization_effective": gemini_improvement > 30
            or gpt_improvement > 20,  # 30%+ or 20%+ improvement
        }

    def test_model_selection_strategy(self) -> dict[str, Any]:
        """Test the updated model selection strategy with speed-optimized models."""

        logger.info("🎯 Testing Model Selection Strategy...")

        test_scenarios = [
            {"time": 15, "task": TaskType.DEEP_RESEARCH, "desc": "Ultra Emergency"},
            {"time": 25, "task": TaskType.MARKET_ANALYSIS, "desc": "Emergency"},
            {"time": 45, "task": TaskType.TECHNICAL_ANALYSIS, "desc": "Fast"},
            {"time": 120, "task": TaskType.RESULT_SYNTHESIS, "desc": "Balanced"},
        ]

        strategy_results = {}

        for scenario in test_scenarios:
            models = ModelSelectionStrategy.get_model_priority(
                time_remaining=scenario["time"],
                task_type=scenario["task"],
                complexity=ResearchComplexity.MODERATE,
            )

            strategy_results[scenario["desc"].lower()] = {
                "time_budget": scenario["time"],
                "primary_model": models[0] if models else "None",
                "backup_models": models[1:3] if len(models) > 1 else [],
                "total_available": len(models),
                "uses_speed_optimized": any(
                    model in ["google/gemini-2.5-flash", "openai/gpt-4o-mini"]
                    for model in models[:2]
                ),
            }

            logger.info(
                f"   {scenario['desc']} ({scenario['time']}s): Primary = {models[0] if models else 'None'}"
            )

        return {
            "test_scenarios": len(test_scenarios),
            "strategy_results": strategy_results,
            "all_scenarios_use_speed_models": all(
                result["uses_speed_optimized"] for result in strategy_results.values()
            ),
            "success": True,
        }

    async def run_all_examples(self) -> dict[str, Any]:
        """Run all optimization examples and return combined results."""

        logger.info("🚀 Starting LLM Optimization Examples...")

        all_results = {}

        try:
            # Run each example
            all_results["emergency"] = await self.example_1_emergency_research()
            all_results["fast"] = await self.example_2_fast_research()
            all_results["balanced"] = await self.example_3_balanced_research()
            all_results["adaptive"] = await self.example_4_adaptive_research()
            all_results["comparison"] = await self.example_5_optimization_comparison()
            all_results[
                "speed_optimization"
            ] = await self.example_6_speed_optimized_models()
            all_results["model_strategy"] = self.test_model_selection_strategy()

            # Calculate overall success metrics
            successful_examples = sum(
                1
                for result in all_results.values()
                if result.get("success") or result.get("overall_success")
            )

            all_results["summary"] = {
                "total_examples": 7,  # Updated for new examples
                "successful_examples": successful_examples,
                "success_rate_pct": (successful_examples / 7) * 100,
                "optimization_effectiveness": "High"
                if successful_examples >= 6
                else "Moderate"
                if successful_examples >= 4
                else "Low",
                "speed_optimization_available": all_results.get(
                    "speed_optimization", {}
                ).get("success", False),
                "speed_improvement_demonstrated": all_results.get(
                    "speed_optimization", {}
                ).get("speed_optimization_effective", False),
            }

            logger.info(
                f"🎉 All examples completed! Success rate: {all_results['summary']['success_rate_pct']:.0f}%"
            )

        except Exception as e:
            logger.error(f"❌ Example execution failed: {e}")
            all_results["error"] = str(e)

        return all_results


async def main():
    """Main function to run optimization examples."""

    # Get OpenRouter API key
    openrouter_api_key = os.getenv("OPENROUTER_API_KEY")
    if not openrouter_api_key:
        logger.error("❌ OPENROUTER_API_KEY environment variable not set")
        return

    # Create examples instance
    examples = OptimizationExamples(openrouter_api_key)

    # Run all examples
    results = await examples.run_all_examples()

    # Print summary
    print("\n" + "=" * 80)
    print("LLM OPTIMIZATION RESULTS SUMMARY")
    print("=" * 80)

    if "summary" in results:
        summary = results["summary"]
        print(f"Total Examples: {summary['total_examples']}")
        print(f"Successful: {summary['successful_examples']}")
        print(f"Success Rate: {summary['success_rate_pct']:.0f}%")
        print(f"Effectiveness: {summary['optimization_effectiveness']}")

    if "comparison" in results and "comparison" in results["comparison"]:
        comp = results["comparison"]["comparison"]
        if comp.get("time_improvement_pct", 0) > 0:
            print(f"Speed Improvement: {comp['time_improvement_pct']:.1f}%")

    if "speed_optimization" in results and results["speed_optimization"].get("success"):
        speed_results = results["speed_optimization"]
        print(f"Fastest Model: {speed_results.get('fastest_model', 'Unknown')}")

        improvements = speed_results.get("improvements", {})
        if improvements.get("gemini_2_5_flash_vs_baseline_pct", 0) > 0:
            print(
                f"Gemini 2.5 Flash Speed Boost: {improvements['gemini_2_5_flash_vs_baseline_pct']:+.1f}%"
            )
        if improvements.get("gpt_4o_mini_vs_baseline_pct", 0) > 0:
            print(
                f"GPT-4o Mini Speed Boost: {improvements['gpt_4o_mini_vs_baseline_pct']:+.1f}%"
            )

    print("\nDetailed Results:")
    for example_name, result in results.items():
        if example_name not in ["summary", "error"]:
            if isinstance(result, dict):
                success = result.get("success") or result.get("overall_success")
                time_info = (
                    f"{result.get('actual_time', 0):.1f}s"
                    if "actual_time" in result
                    else "N/A"
                )
                print(
                    f"  {example_name.title()}: {'✅ SUCCESS' if success else '❌ FAILED'} ({time_info})"
                )

    print("=" * 80)


if __name__ == "__main__":
    # Run the examples
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/tests/test_parallel_research_orchestrator.py:
--------------------------------------------------------------------------------

```python
"""
Comprehensive test suite for ParallelResearchOrchestrator.

This test suite covers:
- Parallel task execution with concurrency control
- Task distribution and load balancing
- Error handling and timeout management
- Synthesis callback functionality
- Performance improvements over sequential execution
- Circuit breaker integration
- Resource usage monitoring
"""

import asyncio
import time
from typing import Any

import pytest

from maverick_mcp.utils.parallel_research import (
    ParallelResearchConfig,
    ParallelResearchOrchestrator,
    ResearchResult,
    ResearchTask,
    TaskDistributionEngine,
)


class TestParallelResearchConfig:
    """Test ParallelResearchConfig configuration class."""

    def test_default_configuration(self):
        """Test default configuration values."""
        config = ParallelResearchConfig()

        assert config.max_concurrent_agents == 4
        assert config.timeout_per_agent == 180
        assert config.enable_fallbacks is False
        assert config.rate_limit_delay == 0.5

    def test_custom_configuration(self):
        """Test custom configuration values."""
        config = ParallelResearchConfig(
            max_concurrent_agents=8,
            timeout_per_agent=180,
            enable_fallbacks=False,
            rate_limit_delay=0.5,
        )

        assert config.max_concurrent_agents == 8
        assert config.timeout_per_agent == 180
        assert config.enable_fallbacks is False
        assert config.rate_limit_delay == 0.5


class TestResearchTask:
    """Test ResearchTask data class."""

    def test_research_task_creation(self):
        """Test basic research task creation."""
        task = ResearchTask(
            task_id="test_123_fundamental",
            task_type="fundamental",
            target_topic="AAPL financial analysis",
            focus_areas=["earnings", "valuation", "growth"],
            priority=8,
            timeout=240,
        )

        assert task.task_id == "test_123_fundamental"
        assert task.task_type == "fundamental"
        assert task.target_topic == "AAPL financial analysis"
        assert task.focus_areas == ["earnings", "valuation", "growth"]
        assert task.priority == 8
        assert task.timeout == 240
        assert task.status == "pending"
        assert task.result is None
        assert task.error is None

    def test_task_lifecycle_tracking(self):
        """Test task lifecycle status tracking."""
        task = ResearchTask(
            task_id="lifecycle_test",
            task_type="sentiment",
            target_topic="TSLA sentiment analysis",
            focus_areas=["news", "social"],
        )

        # Initial state
        assert task.status == "pending"
        assert task.start_time is None
        assert task.end_time is None

        # Simulate task execution
        task.start_time = time.time()
        task.status = "running"

        # Simulate completion
        time.sleep(0.01)  # Small delay to ensure different timestamps
        task.end_time = time.time()
        task.status = "completed"
        task.result = {"insights": ["Test insight"]}

        assert task.status == "completed"
        assert task.start_time < task.end_time
        assert task.result is not None

    def test_task_error_handling(self):
        """Test task error state tracking."""
        task = ResearchTask(
            task_id="error_test",
            task_type="technical",
            target_topic="NVDA technical analysis",
            focus_areas=["chart_patterns"],
        )

        # Simulate error
        task.status = "failed"
        task.error = "API timeout occurred"
        task.end_time = time.time()

        assert task.status == "failed"
        assert task.error == "API timeout occurred"
        assert task.result is None


class TestParallelResearchOrchestrator:
    """Test ParallelResearchOrchestrator main functionality."""

    @pytest.fixture
    def config(self):
        """Create test configuration."""
        return ParallelResearchConfig(
            max_concurrent_agents=3,
            timeout_per_agent=5,  # Short timeout for tests
            enable_fallbacks=True,
            rate_limit_delay=0.1,  # Fast rate limit for tests
        )

    @pytest.fixture
    def orchestrator(self, config):
        """Create orchestrator with test configuration."""
        return ParallelResearchOrchestrator(config)

    @pytest.fixture
    def sample_tasks(self):
        """Create sample research tasks for testing."""
        return [
            ResearchTask(
                task_id="test_123_fundamental",
                task_type="fundamental",
                target_topic="AAPL analysis",
                focus_areas=["earnings", "valuation"],
                priority=8,
            ),
            ResearchTask(
                task_id="test_123_technical",
                task_type="technical",
                target_topic="AAPL analysis",
                focus_areas=["chart_patterns", "indicators"],
                priority=6,
            ),
            ResearchTask(
                task_id="test_123_sentiment",
                task_type="sentiment",
                target_topic="AAPL analysis",
                focus_areas=["news", "analyst_ratings"],
                priority=7,
            ),
        ]

    def test_orchestrator_initialization(self, config):
        """Test orchestrator initialization."""
        orchestrator = ParallelResearchOrchestrator(config)

        assert orchestrator.config == config
        assert orchestrator.active_tasks == {}
        assert orchestrator._semaphore._value == config.max_concurrent_agents
        assert orchestrator.orchestration_logger is not None

    def test_orchestrator_default_config(self):
        """Test orchestrator with default configuration."""
        orchestrator = ParallelResearchOrchestrator()

        assert orchestrator.config.max_concurrent_agents == 4
        assert orchestrator.config.timeout_per_agent == 180

    @pytest.mark.asyncio
    async def test_successful_parallel_execution(self, orchestrator, sample_tasks):
        """Test successful parallel execution of research tasks."""

        # Mock research executor that returns success
        async def mock_executor(task: ResearchTask) -> dict[str, Any]:
            await asyncio.sleep(0.1)  # Simulate work
            return {
                "research_type": task.task_type,
                "insights": [f"Insight for {task.task_type}"],
                "sentiment": {"direction": "bullish", "confidence": 0.8},
                "credibility_score": 0.9,
            }

        # Mock synthesis callback
        async def mock_synthesis(
            task_results: dict[str, ResearchTask],
        ) -> dict[str, Any]:
            return {
                "synthesis": "Combined analysis from parallel research",
                "confidence_score": 0.85,
                "key_findings": ["Finding 1", "Finding 2"],
            }

        # Execute parallel research
        start_time = time.time()
        result = await orchestrator.execute_parallel_research(
            tasks=sample_tasks,
            research_executor=mock_executor,
            synthesis_callback=mock_synthesis,
        )
        execution_time = time.time() - start_time

        # Verify results
        assert isinstance(result, ResearchResult)
        assert result.successful_tasks == 3
        assert result.failed_tasks == 0
        assert result.synthesis is not None
        assert (
            result.synthesis["synthesis"] == "Combined analysis from parallel research"
        )
        assert len(result.task_results) == 3

        # Verify parallel efficiency (should be faster than sequential)
        assert (
            execution_time < 0.5
        )  # Should complete much faster than 3 * 0.1s sequential
        assert result.parallel_efficiency > 0.0  # Should show some efficiency

    @pytest.mark.asyncio
    async def test_concurrency_control(self, orchestrator, config):
        """Test that concurrency is properly limited."""
        execution_order = []
        active_count = 0
        max_concurrent = 0

        async def mock_executor(task: ResearchTask) -> dict[str, Any]:
            nonlocal active_count, max_concurrent

            active_count += 1
            max_concurrent = max(max_concurrent, active_count)
            execution_order.append(f"start_{task.task_id}")

            await asyncio.sleep(0.1)  # Simulate work

            active_count -= 1
            execution_order.append(f"end_{task.task_id}")
            return {"result": f"completed_{task.task_id}"}

        # Create more tasks than max concurrent agents
        tasks = [
            ResearchTask(f"task_{i}", "fundamental", "topic", ["focus"], priority=i)
            for i in range(
                config.max_concurrent_agents + 2
            )  # 5 tasks, max 3 concurrent
        ]

        result = await orchestrator.execute_parallel_research(
            tasks=tasks,
            research_executor=mock_executor,
        )

        # Verify concurrency was limited
        assert max_concurrent <= config.max_concurrent_agents
        assert (
            result.successful_tasks == config.max_concurrent_agents
        )  # Limited by config
        assert len(execution_order) > 0

    @pytest.mark.asyncio
    async def test_task_timeout_handling(self, orchestrator):
        """Test handling of task timeouts."""

        async def slow_executor(task: ResearchTask) -> dict[str, Any]:
            await asyncio.sleep(10)  # Longer than timeout
            return {"result": "should_not_complete"}

        tasks = [
            ResearchTask(
                "timeout_task",
                "fundamental",
                "slow topic",
                ["focus"],
                timeout=1,  # Very short timeout
            )
        ]

        result = await orchestrator.execute_parallel_research(
            tasks=tasks,
            research_executor=slow_executor,
        )

        # Verify timeout was handled
        assert result.successful_tasks == 0
        assert result.failed_tasks == 1

        failed_task = result.task_results["timeout_task"]
        assert failed_task.status == "failed"
        assert "timeout" in failed_task.error.lower()

    @pytest.mark.asyncio
    async def test_task_error_handling(self, orchestrator, sample_tasks):
        """Test handling of task execution errors."""

        async def error_executor(task: ResearchTask) -> dict[str, Any]:
            if task.task_type == "technical":
                raise ValueError(f"Error in {task.task_type} analysis")
            return {"result": f"success_{task.task_type}"}

        result = await orchestrator.execute_parallel_research(
            tasks=sample_tasks,
            research_executor=error_executor,
        )

        # Verify mixed success/failure results
        assert result.successful_tasks == 2  # fundamental and sentiment should succeed
        assert result.failed_tasks == 1  # technical should fail

        # Check specific task status
        technical_task = next(
            task
            for task in result.task_results.values()
            if task.task_type == "technical"
        )
        assert technical_task.status == "failed"
        assert "Error in technical analysis" in technical_task.error

    @pytest.mark.asyncio
    async def test_task_preparation_and_prioritization(self, orchestrator):
        """Test task preparation and priority-based ordering."""
        tasks = [
            ResearchTask("low_priority", "technical", "topic", ["focus"], priority=2),
            ResearchTask(
                "high_priority", "fundamental", "topic", ["focus"], priority=9
            ),
            ResearchTask("med_priority", "sentiment", "topic", ["focus"], priority=5),
        ]

        async def track_executor(task: ResearchTask) -> dict[str, Any]:
            return {"task_id": task.task_id, "priority": task.priority}

        result = await orchestrator.execute_parallel_research(
            tasks=tasks,
            research_executor=track_executor,
        )

        # Verify all tasks were prepared (limited by max_concurrent_agents = 3)
        assert len(result.task_results) == 3

        # Verify tasks have default timeout set
        for task in result.task_results.values():
            assert task.timeout == orchestrator.config.timeout_per_agent

    @pytest.mark.asyncio
    async def test_synthesis_callback_error_handling(self, orchestrator, sample_tasks):
        """Test synthesis callback error handling."""

        async def success_executor(task: ResearchTask) -> dict[str, Any]:
            return {"result": f"success_{task.task_type}"}

        async def failing_synthesis(
            task_results: dict[str, ResearchTask],
        ) -> dict[str, Any]:
            raise RuntimeError("Synthesis failed!")

        result = await orchestrator.execute_parallel_research(
            tasks=sample_tasks,
            research_executor=success_executor,
            synthesis_callback=failing_synthesis,
        )

        # Verify tasks succeeded but synthesis failed gracefully
        assert result.successful_tasks == 3
        assert result.synthesis is not None
        assert "error" in result.synthesis
        assert "Synthesis failed" in result.synthesis["error"]

    @pytest.mark.asyncio
    async def test_no_synthesis_callback(self, orchestrator, sample_tasks):
        """Test execution without synthesis callback."""

        async def success_executor(task: ResearchTask) -> dict[str, Any]:
            return {"result": f"success_{task.task_type}"}

        result = await orchestrator.execute_parallel_research(
            tasks=sample_tasks,
            research_executor=success_executor,
            # No synthesis callback provided
        )

        assert result.successful_tasks == 3
        assert result.synthesis is None  # Should be None when no callback

    @pytest.mark.asyncio
    async def test_rate_limiting_between_tasks(self, orchestrator):
        """Test rate limiting delays between task starts."""
        start_times = []

        async def timing_executor(task: ResearchTask) -> dict[str, Any]:
            start_times.append(time.time())
            await asyncio.sleep(0.05)
            return {"result": task.task_id}

        tasks = [
            ResearchTask(f"task_{i}", "fundamental", "topic", ["focus"])
            for i in range(3)
        ]

        await orchestrator.execute_parallel_research(
            tasks=tasks,
            research_executor=timing_executor,
        )

        # Verify rate limiting created delays (approximately rate_limit_delay apart)
        assert len(start_times) == 3
        # Note: Due to parallel execution, exact timing is hard to verify
        # but we can check that execution completed

    @pytest.mark.asyncio
    async def test_empty_task_list(self, orchestrator):
        """Test handling of empty task list."""

        async def unused_executor(task: ResearchTask) -> dict[str, Any]:
            return {"result": "should_not_be_called"}

        result = await orchestrator.execute_parallel_research(
            tasks=[],
            research_executor=unused_executor,
        )

        assert result.successful_tasks == 0
        assert result.failed_tasks == 0
        assert result.task_results == {}
        assert result.synthesis is None

    @pytest.mark.asyncio
    async def test_performance_metrics_calculation(self, orchestrator, sample_tasks):
        """Test calculation of performance metrics."""
        task_durations = []

        async def tracked_executor(task: ResearchTask) -> dict[str, Any]:
            start = time.time()
            await asyncio.sleep(0.05)  # Simulate work
            duration = time.time() - start
            task_durations.append(duration)
            return {"result": task.task_id}

        result = await orchestrator.execute_parallel_research(
            tasks=sample_tasks,
            research_executor=tracked_executor,
        )

        # Verify performance metrics
        assert result.total_execution_time > 0
        assert result.parallel_efficiency > 0

        # Parallel efficiency should be roughly: sum(individual_durations) / total_wall_time
        expected_sequential_time = sum(task_durations)
        efficiency_ratio = expected_sequential_time / result.total_execution_time

        # Allow some tolerance for timing variations
        assert abs(result.parallel_efficiency - efficiency_ratio) < 0.5

    @pytest.mark.asyncio
    async def test_circuit_breaker_integration(self, orchestrator):
        """Test integration with circuit breaker pattern."""
        failure_count = 0

        async def circuit_breaker_executor(task: ResearchTask) -> dict[str, Any]:
            nonlocal failure_count
            failure_count += 1
            if failure_count <= 2:  # First 2 calls fail
                raise RuntimeError("Circuit breaker test failure")
            return {"result": "success_after_failures"}

        tasks = [
            ResearchTask(f"cb_task_{i}", "fundamental", "topic", ["focus"])
            for i in range(3)
        ]

        # Note: The actual circuit breaker is applied in _execute_single_task
        # This test verifies that errors are properly handled
        result = await orchestrator.execute_parallel_research(
            tasks=tasks,
            research_executor=circuit_breaker_executor,
        )

        # Should have some failures and potentially some successes
        assert result.failed_tasks >= 2  # At least 2 should fail
        assert result.total_execution_time > 0


class TestTaskDistributionEngine:
    """Test TaskDistributionEngine functionality."""

    def test_task_distribution_engine_creation(self):
        """Test creation of task distribution engine."""
        engine = TaskDistributionEngine()
        assert hasattr(engine, "TASK_TYPES")
        assert "fundamental" in engine.TASK_TYPES
        assert "technical" in engine.TASK_TYPES
        assert "sentiment" in engine.TASK_TYPES
        assert "competitive" in engine.TASK_TYPES

    def test_topic_relevance_analysis(self):
        """Test analysis of topic relevance to different research types."""
        engine = TaskDistributionEngine()

        # Test financial topic
        relevance = engine._analyze_topic_relevance(
            "AAPL earnings revenue profit analysis"
        )

        assert (
            relevance["fundamental"] > relevance["technical"]
        )  # Should favor fundamental
        assert all(0 <= score <= 1 for score in relevance.values())  # Valid range
        assert len(relevance) == 4  # All task types

    def test_distribute_research_tasks(self):
        """Test distribution of research topic into specialized tasks."""
        engine = TaskDistributionEngine()

        tasks = engine.distribute_research_tasks(
            topic="Tesla financial performance and market sentiment",
            session_id="test_123",
            focus_areas=["earnings", "sentiment"],
        )

        assert len(tasks) > 0
        assert all(isinstance(task, ResearchTask) for task in tasks)
        assert all(
            task.session_id == "test_123" for task in []
        )  # Tasks don't have session_id directly
        assert all(
            task.target_topic == "Tesla financial performance and market sentiment"
            for task in tasks
        )

        # Verify task types are relevant
        task_types = {task.task_type for task in tasks}
        assert (
            "fundamental" in task_types or "sentiment" in task_types
        )  # Should include relevant types

    def test_fallback_task_creation(self):
        """Test fallback task creation when no relevant tasks found."""
        engine = TaskDistributionEngine()

        # Use a topic that truly has low relevance scores and will trigger fallback
        # First, let's mock the _analyze_topic_relevance to return low scores
        original_method = engine._analyze_topic_relevance

        def mock_low_relevance(topic, focus_areas=None):
            return {
                "fundamental": 0.1,
                "technical": 0.1,
                "sentiment": 0.1,
                "competitive": 0.1,
            }

        engine._analyze_topic_relevance = mock_low_relevance
        tasks = engine.distribute_research_tasks(
            topic="fallback test topic", session_id="fallback_test"
        )
        # Restore original method
        engine._analyze_topic_relevance = original_method

        # Should create at least one fallback task
        assert len(tasks) >= 1
        # Should have fundamental as fallback
        fallback_task = tasks[0]
        assert fallback_task.task_type == "fundamental"
        assert fallback_task.priority == 5  # Default priority

    def test_task_priority_assignment(self):
        """Test priority assignment based on relevance scores."""
        engine = TaskDistributionEngine()

        tasks = engine.distribute_research_tasks(
            topic="Apple dividend yield earnings cash flow stability",  # Should favor fundamental
            session_id="priority_test",
        )

        # Find fundamental task (should have higher priority for this topic)
        fundamental_tasks = [task for task in tasks if task.task_type == "fundamental"]
        if fundamental_tasks:
            fundamental_task = fundamental_tasks[0]
            assert fundamental_task.priority >= 5  # Should have decent priority

    def test_focus_areas_integration(self):
        """Test integration of provided focus areas."""
        engine = TaskDistributionEngine()

        tasks = engine.distribute_research_tasks(
            topic="Microsoft analysis",
            session_id="focus_test",
            focus_areas=["technical_analysis", "chart_patterns"],
        )

        # Should include technical analysis tasks when focus areas suggest it
        {task.task_type for task in tasks}
        # Should favor technical analysis given the focus areas
        assert len(tasks) > 0  # Should create some tasks


class TestResearchResult:
    """Test ResearchResult data structure."""

    def test_research_result_initialization(self):
        """Test ResearchResult initialization."""
        result = ResearchResult()

        assert result.task_results == {}
        assert result.synthesis is None
        assert result.total_execution_time == 0.0
        assert result.successful_tasks == 0
        assert result.failed_tasks == 0
        assert result.parallel_efficiency == 0.0

    def test_research_result_data_storage(self):
        """Test storing data in ResearchResult."""
        result = ResearchResult()

        # Add sample task results
        task1 = ResearchTask("task_1", "fundamental", "topic", ["focus"])
        task1.status = "completed"
        task2 = ResearchTask("task_2", "technical", "topic", ["focus"])
        task2.status = "failed"

        result.task_results = {"task_1": task1, "task_2": task2}
        result.successful_tasks = 1
        result.failed_tasks = 1
        result.total_execution_time = 2.5
        result.parallel_efficiency = 1.8
        result.synthesis = {"findings": "Test findings"}

        assert len(result.task_results) == 2
        assert result.successful_tasks == 1
        assert result.failed_tasks == 1
        assert result.total_execution_time == 2.5
        assert result.parallel_efficiency == 1.8
        assert result.synthesis["findings"] == "Test findings"


@pytest.mark.integration
class TestParallelResearchIntegration:
    """Integration tests for complete parallel research workflow."""

    @pytest.fixture
    def full_orchestrator(self):
        """Create orchestrator with realistic configuration."""
        config = ParallelResearchConfig(
            max_concurrent_agents=2,  # Reduced for testing
            timeout_per_agent=10,
            enable_fallbacks=True,
            rate_limit_delay=0.1,
        )
        return ParallelResearchOrchestrator(config)

    @pytest.mark.asyncio
    async def test_end_to_end_parallel_research(self, full_orchestrator):
        """Test complete end-to-end parallel research workflow."""
        # Create realistic research tasks
        engine = TaskDistributionEngine()
        tasks = engine.distribute_research_tasks(
            topic="Apple Inc financial analysis and market outlook",
            session_id="integration_test",
        )

        # Mock a realistic research executor
        async def realistic_executor(task: ResearchTask) -> dict[str, Any]:
            await asyncio.sleep(0.1)  # Simulate API calls

            return {
                "research_type": task.task_type,
                "insights": [
                    f"{task.task_type} insight 1 for {task.target_topic}",
                    f"{task.task_type} insight 2 based on {task.focus_areas[0] if task.focus_areas else 'general'}",
                ],
                "sentiment": {
                    "direction": "bullish"
                    if task.task_type != "technical"
                    else "neutral",
                    "confidence": 0.75,
                },
                "risk_factors": [f"{task.task_type} risk factor"],
                "opportunities": [f"{task.task_type} opportunity"],
                "credibility_score": 0.8,
                "sources": [
                    {
                        "title": f"Source for {task.task_type} research",
                        "url": f"https://example.com/{task.task_type}",
                        "credibility_score": 0.85,
                    }
                ],
            }

        # Mock synthesis callback
        async def integration_synthesis(
            task_results: dict[str, ResearchTask],
        ) -> dict[str, Any]:
            successful_results = [
                task.result
                for task in task_results.values()
                if task.status == "completed" and task.result
            ]

            all_insights = []
            for result in successful_results:
                all_insights.extend(result.get("insights", []))

            return {
                "synthesis": f"Integrated analysis from {len(successful_results)} research angles",
                "confidence_score": 0.82,
                "key_findings": all_insights[:5],  # Top 5 insights
                "overall_sentiment": "bullish",
                "research_depth": "comprehensive",
            }

        # Execute the integration test
        start_time = time.time()
        result = await full_orchestrator.execute_parallel_research(
            tasks=tasks,
            research_executor=realistic_executor,
            synthesis_callback=integration_synthesis,
        )
        execution_time = time.time() - start_time

        # Comprehensive verification
        assert isinstance(result, ResearchResult)
        assert result.successful_tasks > 0
        assert result.total_execution_time > 0
        assert execution_time < 5  # Should complete reasonably quickly

        # Verify synthesis was generated
        assert result.synthesis is not None
        assert "synthesis" in result.synthesis
        assert result.synthesis["confidence_score"] > 0

        # Verify task results structure
        for task_id, task in result.task_results.items():
            assert isinstance(task, ResearchTask)
            assert task.task_id == task_id
            if task.status == "completed":
                assert task.result is not None
                assert "insights" in task.result
                assert "sentiment" in task.result

        # Verify performance characteristics
        if result.successful_tasks > 1:
            assert result.parallel_efficiency > 1.0  # Should show parallel benefit

```

--------------------------------------------------------------------------------
/examples/speed_optimization_demo.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Live Speed Optimization Demonstration for MaverickMCP Research Agent

This script validates the speed improvements through live API testing across
different research scenarios with actual performance metrics.

Demonstrates:
- Emergency research (<30s timeout)
- Simple research queries
- Model selection efficiency (Gemini 2.5 Flash for speed)
- Search provider performance
- Token generation speeds
- 2-3x speed improvement validation
"""

import asyncio
import os
import sys
import time
from datetime import datetime
from typing import Any

# Add the project root to Python path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from maverick_mcp.agents.optimized_research import OptimizedDeepResearchAgent
from maverick_mcp.providers.openrouter_provider import OpenRouterProvider, TaskType
from maverick_mcp.utils.llm_optimization import AdaptiveModelSelector


class SpeedDemonstrationSuite:
    """Comprehensive speed optimization demonstration and validation."""

    def __init__(self):
        """Initialize the demonstration suite."""
        api_key = os.getenv("OPENROUTER_API_KEY")
        if not api_key:
            raise ValueError(
                "OPENROUTER_API_KEY environment variable is required. "
                "Please set it with your OpenRouter API key."
            )
        self.openrouter_provider = OpenRouterProvider(api_key=api_key)
        self.model_selector = AdaptiveModelSelector(self.openrouter_provider)
        self.results: list[dict[str, Any]] = []

        # Test scenarios with expected performance targets
        self.test_scenarios = [
            {
                "name": "Emergency Research - AI Earnings",
                "topic": "NVIDIA Q4 2024 earnings impact on AI market",
                "time_budget": 25.0,  # Emergency mode
                "target_time": 25.0,
                "description": "Emergency research under extreme time pressure",
            },
            {
                "name": "Simple Stock Analysis",
                "topic": "Apple stock technical analysis today",
                "time_budget": 40.0,  # Simple query
                "target_time": 35.0,
                "description": "Basic stock analysis query",
            },
            {
                "name": "Market Trend Research",
                "topic": "Federal Reserve interest rate impact on technology stocks",
                "time_budget": 60.0,  # Moderate complexity
                "target_time": 50.0,
                "description": "Moderate complexity market research",
            },
            {
                "name": "Sector Analysis",
                "topic": "Renewable energy sector outlook 2025 investment opportunities",
                "time_budget": 90.0,  # Standard research
                "target_time": 75.0,
                "description": "Standard sector analysis research",
            },
        ]

    def print_header(self, title: str):
        """Print formatted section header."""
        print("\n" + "=" * 80)
        print(f" {title}")
        print("=" * 80)

    def print_subheader(self, title: str):
        """Print formatted subsection header."""
        print(f"\n--- {title} ---")

    async def validate_api_connections(self) -> bool:
        """Validate that all required APIs are accessible."""
        self.print_header("🔧 API CONNECTION VALIDATION")

        connection_results = {}

        # Test OpenRouter connection
        try:
            test_llm = self.openrouter_provider.get_llm(TaskType.GENERAL)
            await asyncio.wait_for(
                test_llm.ainvoke([{"role": "user", "content": "test"}]), timeout=10.0
            )
            connection_results["OpenRouter"] = "✅ Connected"
            print("✅ OpenRouter API: Connected successfully")
        except Exception as e:
            connection_results["OpenRouter"] = f"❌ Failed: {e}"
            print(f"❌ OpenRouter API: Failed - {e}")
            return False

        # Test search providers using the actual deep_research imports
        try:
            from maverick_mcp.agents.deep_research import get_cached_search_provider

            search_provider = await get_cached_search_provider(
                exa_api_key=os.getenv("EXA_API_KEY")
            )

            if search_provider:
                # Test provider with a simple search
                await asyncio.wait_for(
                    search_provider.search("test query", num_results=1), timeout=15.0
                )
                connection_results["Search Providers"] = "✅ Connected (Exa provider)"
                print("✅ Search Providers: Connected (Exa provider)")
            else:
                connection_results["Search Providers"] = "⚠️ No providers configured"
                print("⚠️ Search Providers: No API keys configured, will use mock mode")

        except Exception as e:
            connection_results["Search Providers"] = f"❌ Failed: {e}"
            print(f"❌ Search Providers: Failed - {e}")
            print("   🔧 Will continue with mock search data for demonstration")

        print("\n🎉 API Validation Complete - Core systems ready")
        return True

    async def demonstrate_model_selection(self):
        """Demonstrate intelligent model selection for speed."""
        self.print_header("🧠 INTELLIGENT MODEL SELECTION DEMO")

        # Test different scenarios for model selection
        test_cases = [
            {
                "scenario": "Emergency Research (Time Critical)",
                "time_budget": 20.0,
                "task_type": TaskType.DEEP_RESEARCH,
                "content_size": 1000,
                "expected_model": "gemini-2.5-flash-199",
            },
            {
                "scenario": "Simple Query (Speed Focus)",
                "time_budget": 30.0,
                "task_type": TaskType.SENTIMENT_ANALYSIS,
                "content_size": 500,
                "expected_model": "gemini-2.5-flash-199",
            },
            {
                "scenario": "Complex Analysis (Balanced)",
                "time_budget": 60.0,
                "task_type": TaskType.RESULT_SYNTHESIS,
                "content_size": 2000,
                "expected_model": "claude-3.5-haiku-20241022",
            },
        ]

        for test_case in test_cases:
            print(f"\nTest: {test_case['scenario']}")
            print(f"  Time Budget: {test_case['time_budget']}s")
            print(f"  Task Type: {test_case['task_type'].value}")
            print(f"  Content Size: {test_case['content_size']} tokens")

            # Calculate task complexity
            complexity = self.model_selector.calculate_task_complexity(
                content="x" * test_case["content_size"],
                task_type=test_case["task_type"],
                focus_areas=["analysis"],
            )

            # Get model recommendation
            model_config = self.model_selector.select_model_for_time_budget(
                task_type=test_case["task_type"],
                time_remaining_seconds=test_case["time_budget"],
                complexity_score=complexity,
                content_size_tokens=test_case["content_size"],
            )

            print(f"  📊 Complexity Score: {complexity:.2f}")
            print(f"  🎯 Selected Model: {model_config.model_id}")
            print(f"  ⏱️  Timeout: {model_config.timeout_seconds}s")
            print(f"  🎛️  Temperature: {model_config.temperature}")
            print(f"  📝 Max Tokens: {model_config.max_tokens}")

            # Validate speed-optimized selection
            is_speed_optimized = (
                "gemini-2.5-flash" in model_config.model_id
                or "claude-3.5-haiku" in model_config.model_id
            )
            print(f"  🚀 Speed Optimized: {'✅' if is_speed_optimized else '❌'}")

    async def run_research_scenario(self, scenario: dict[str, Any]) -> dict[str, Any]:
        """Execute a single research scenario and collect metrics."""

        print(f"\n🔍 Running: {scenario['name']}")
        print(f"   Topic: {scenario['topic']}")
        print(f"   Time Budget: {scenario['time_budget']}s")
        print(f"   Target: <{scenario['target_time']}s")

        # Create optimized research agent
        agent = OptimizedDeepResearchAgent(
            openrouter_provider=self.openrouter_provider,
            persona="moderate",
            exa_api_key=os.getenv("EXA_API_KEY"),
            optimization_enabled=True,
        )

        # Execute research with timing
        start_time = time.time()
        session_id = f"demo_{int(start_time)}"

        try:
            result = await agent.research_comprehensive(
                topic=scenario["topic"],
                session_id=session_id,
                depth="standard",
                focus_areas=["fundamental", "technical"],
                time_budget_seconds=scenario["time_budget"],
                target_confidence=0.75,
            )

            execution_time = time.time() - start_time

            # Extract key metrics
            metrics = {
                "scenario_name": scenario["name"],
                "topic": scenario["topic"],
                "execution_time": execution_time,
                "time_budget": scenario["time_budget"],
                "target_time": scenario["target_time"],
                "budget_utilization": (execution_time / scenario["time_budget"]) * 100,
                "target_achieved": execution_time <= scenario["target_time"],
                "status": result.get("status", "unknown"),
                "sources_processed": result.get("sources_analyzed", 0),
                "final_confidence": result.get("findings", {}).get(
                    "confidence_score", 0.0
                ),
                "optimization_metrics": result.get("optimization_metrics", {}),
                "emergency_mode": result.get("emergency_mode", False),
                "early_terminated": result.get("findings", {}).get(
                    "early_terminated", False
                ),
                "synthesis_length": len(
                    result.get("findings", {}).get("synthesis", "")
                ),
            }

            # Print immediate results
            self.print_results_summary(metrics, result)

            return metrics

        except Exception as e:
            execution_time = time.time() - start_time
            print(f"   ❌ Failed: {str(e)}")

            # If search providers are unavailable, run LLM optimization demo instead
            if "search providers" in str(e).lower() or "no module" in str(e).lower():
                print("   🔧 Running LLM-only optimization demo instead...")
                return await self.run_llm_only_optimization_demo(scenario)

            return {
                "scenario_name": scenario["name"],
                "execution_time": execution_time,
                "status": "error",
                "error": str(e),
                "target_achieved": False,
            }

    async def run_llm_only_optimization_demo(
        self, scenario: dict[str, Any]
    ) -> dict[str, Any]:
        """Run an LLM-only demonstration of optimization features when search is unavailable."""

        start_time = time.time()

        try:
            # Demonstrate model selection for the scenario
            complexity = self.model_selector.calculate_task_complexity(
                content=scenario["topic"],
                task_type=TaskType.DEEP_RESEARCH,
                focus_areas=["analysis"],
            )

            model_config = self.model_selector.select_model_for_time_budget(
                task_type=TaskType.DEEP_RESEARCH,
                time_remaining_seconds=scenario["time_budget"],
                complexity_score=complexity,
                content_size_tokens=len(scenario["topic"]) // 4,
            )

            print(f"   🎯 Selected Model: {model_config.model_id}")
            print(f"   ⏱️  Timeout: {model_config.timeout_seconds}s")

            # Simulate optimized LLM processing
            llm = self.openrouter_provider.get_llm(
                model_override=model_config.model_id,
                temperature=model_config.temperature,
                max_tokens=model_config.max_tokens,
            )

            # Create a research-style query to demonstrate speed
            research_query = f"""Provide a brief analysis of {scenario["topic"]} covering:
1. Key market factors
2. Current sentiment
3. Risk assessment
4. Investment outlook

Keep response concise but comprehensive."""

            llm_start = time.time()
            response = await asyncio.wait_for(
                llm.ainvoke([{"role": "user", "content": research_query}]),
                timeout=model_config.timeout_seconds,
            )
            llm_time = time.time() - llm_start
            execution_time = time.time() - start_time

            # Calculate token generation metrics
            response_length = len(response.content)
            estimated_tokens = response_length // 4
            tokens_per_second = estimated_tokens / llm_time if llm_time > 0 else 0

            print(
                f"   🚀 LLM Execution: {llm_time:.2f}s (~{tokens_per_second:.0f} tok/s)"
            )
            print(f"   📝 Response Length: {response_length} chars")

            return {
                "scenario_name": scenario["name"],
                "topic": scenario["topic"],
                "execution_time": execution_time,
                "llm_execution_time": llm_time,
                "tokens_per_second": tokens_per_second,
                "time_budget": scenario["time_budget"],
                "target_time": scenario["target_time"],
                "budget_utilization": (execution_time / scenario["time_budget"]) * 100,
                "target_achieved": execution_time <= scenario["target_time"],
                "status": "llm_demo_success",
                "model_used": model_config.model_id,
                "response_length": response_length,
                "optimization_applied": True,
                "sources_processed": 0,  # No search performed
                "final_confidence": 0.8,  # Simulated high confidence for LLM analysis
            }

        except Exception as e:
            execution_time = time.time() - start_time
            print(f"   ❌ LLM Demo Failed: {str(e)}")

            return {
                "scenario_name": scenario["name"],
                "execution_time": execution_time,
                "status": "error",
                "error": str(e),
                "target_achieved": False,
            }

    def print_results_summary(
        self, metrics: dict[str, Any], full_result: dict[str, Any] | None = None
    ):
        """Print immediate results summary."""

        status_icon = "✅" if metrics.get("target_achieved") else "⚠️"
        emergency_icon = "🚨" if metrics.get("emergency_mode") else ""
        llm_demo_icon = "🧠" if metrics.get("status") == "llm_demo_success" else ""

        print(
            f"   {status_icon} {emergency_icon} {llm_demo_icon} Complete: {metrics['execution_time']:.2f}s"
        )
        print(f"      Budget Used: {metrics['budget_utilization']:.1f}%")

        if metrics.get("status") == "llm_demo_success":
            # LLM-only demo results
            print(f"      Model: {metrics.get('model_used', 'unknown')}")
            print(f"      LLM Speed: {metrics.get('tokens_per_second', 0):.0f} tok/s")
            print(f"      LLM Time: {metrics.get('llm_execution_time', 0):.2f}s")
        else:
            # Full research results
            print(f"      Sources: {metrics['sources_processed']}")
            print(f"      Confidence: {metrics['final_confidence']:.2f}")

        if metrics.get("early_terminated") and full_result:
            print(
                f"      Early Exit: {full_result.get('findings', {}).get('termination_reason', 'unknown')}"
            )

        # Show optimization features used
        opt_metrics = metrics.get("optimization_metrics", {})
        if opt_metrics:
            features_used = opt_metrics.get("optimization_features_used", [])
            if features_used:
                print(f"      Optimizations: {', '.join(features_used[:3])}")

        # Show a brief excerpt of findings
        if full_result:
            synthesis = full_result.get("findings", {}).get("synthesis", "")
            if synthesis and len(synthesis) > 100:
                excerpt = synthesis[:200] + "..."
                print(f"      Preview: {excerpt}")

    async def run_performance_comparison(self):
        """Run all scenarios and compare against previous baseline."""
        self.print_header("🚀 PERFORMANCE VALIDATION SUITE")

        print("Running comprehensive speed tests with live API calls...")
        print(
            "This validates our 2-3x speed improvements against 138s/129s timeout failures"
        )

        results = []
        total_start_time = time.time()

        # Run all test scenarios
        for scenario in self.test_scenarios:
            try:
                result = await self.run_research_scenario(scenario)
                results.append(result)

                # Brief pause between tests
                await asyncio.sleep(2)

            except Exception as e:
                print(f"❌ Scenario '{scenario['name']}' failed: {e}")
                results.append(
                    {
                        "scenario_name": scenario["name"],
                        "status": "error",
                        "error": str(e),
                        "target_achieved": False,
                    }
                )

        total_execution_time = time.time() - total_start_time

        # Analyze results
        self.analyze_performance_results(results, total_execution_time)

        return results

    def analyze_performance_results(
        self, results: list[dict[str, Any]], total_time: float
    ):
        """Analyze and report performance results."""
        self.print_header("📊 PERFORMANCE ANALYSIS REPORT")

        successful_tests = [
            r for r in results if r.get("status") in ["success", "llm_demo_success"]
        ]
        failed_tests = [
            r for r in results if r.get("status") not in ["success", "llm_demo_success"]
        ]
        targets_achieved = [r for r in results if r.get("target_achieved")]
        llm_demo_tests = [r for r in results if r.get("status") == "llm_demo_success"]

        print("📈 Overall Results:")
        print(f"   Total Tests: {len(results)}")
        print(
            f"   Successful: {len(successful_tests)} (Full Research: {len(successful_tests) - len(llm_demo_tests)}, LLM Demos: {len(llm_demo_tests)})"
        )
        print(f"   Failed: {len(failed_tests)}")
        print(f"   Targets Achieved: {len(targets_achieved)}/{len(results)}")
        print(f"   Success Rate: {(len(targets_achieved) / len(results) * 100):.1f}%")
        print(f"   Total Suite Time: {total_time:.2f}s")

        if successful_tests:
            avg_execution_time = sum(
                r["execution_time"] for r in successful_tests
            ) / len(successful_tests)
            avg_budget_utilization = sum(
                r["budget_utilization"] for r in successful_tests
            ) / len(successful_tests)
            avg_sources = sum(r["sources_processed"] for r in successful_tests) / len(
                successful_tests
            )
            avg_confidence = sum(r["final_confidence"] for r in successful_tests) / len(
                successful_tests
            )

            print("\n📊 Performance Metrics (Successful Tests):")
            print(f"   Average Execution Time: {avg_execution_time:.2f}s")
            print(f"   Average Budget Utilization: {avg_budget_utilization:.1f}%")
            print(f"   Average Sources Processed: {avg_sources:.1f}")
            print(f"   Average Confidence Score: {avg_confidence:.2f}")

        # Speed improvement validation
        self.print_subheader("🎯 SPEED OPTIMIZATION VALIDATION")

        # Historical baseline (previous timeout issues: 138s, 129s)
        historical_baseline = 130  # Average of timeout failures

        if successful_tests:
            max_execution_time = max(r["execution_time"] for r in successful_tests)
            speed_improvement = (
                historical_baseline / max_execution_time
                if max_execution_time > 0
                else 0
            )

            print(f"   Historical Baseline (Timeout Issues): {historical_baseline}s")
            print(f"   Current Max Execution Time: {max_execution_time:.2f}s")
            print(f"   Speed Improvement Factor: {speed_improvement:.1f}x")

            if speed_improvement >= 2.0:
                print(
                    f"   🎉 SUCCESS: Achieved {speed_improvement:.1f}x speed improvement!"
                )
            elif speed_improvement >= 1.5:
                print(
                    f"   ✅ GOOD: Achieved {speed_improvement:.1f}x improvement (target: 2x)"
                )
            else:
                print(f"   ⚠️  NEEDS WORK: Only {speed_improvement:.1f}x improvement")

        # Emergency mode validation
        emergency_tests = [r for r in results if r.get("emergency_mode")]
        if emergency_tests:
            print("\n🚨 Emergency Mode Performance:")
            for test in emergency_tests:
                print(f"   {test['scenario_name']}: {test['execution_time']:.2f}s")

        # Feature utilization analysis
        self.print_subheader("🔧 OPTIMIZATION FEATURE UTILIZATION")

        feature_usage = {}
        for result in successful_tests:
            opt_metrics = result.get("optimization_metrics", {})
            features = opt_metrics.get("optimization_features_used", [])
            for feature in features:
                feature_usage[feature] = feature_usage.get(feature, 0) + 1

        if feature_usage:
            print("   Optimization Features Used:")
            for feature, count in sorted(
                feature_usage.items(), key=lambda x: x[1], reverse=True
            ):
                percentage = (count / len(successful_tests)) * 100
                print(
                    f"     {feature}: {count}/{len(successful_tests)} tests ({percentage:.0f}%)"
                )

    async def demonstrate_token_generation_speed(self):
        """Demonstrate token generation speeds with different models."""
        self.print_header("⚡ TOKEN GENERATION SPEED DEMO")

        models_to_test = [
            ("gemini-2.5-flash-199", "Ultra-fast model (199 tok/s)"),
            ("claude-3.5-haiku-20241022", "Balanced speed model"),
            ("gpt-4o-mini", "OpenAI speed model"),
        ]

        test_prompt = (
            "Analyze the current market sentiment for technology stocks in 200 words."
        )

        for model_id, description in models_to_test:
            print(f"\n🧠 Testing: {model_id}")
            print(f"   Description: {description}")

            try:
                llm = self.openrouter_provider.get_llm(
                    model_override=model_id,
                    temperature=0.7,
                    max_tokens=300,
                )

                start_time = time.time()
                response = await asyncio.wait_for(
                    llm.ainvoke([{"role": "user", "content": test_prompt}]),
                    timeout=30.0,
                )
                execution_time = time.time() - start_time

                # Calculate approximate token generation speed
                response_length = len(response.content)
                estimated_tokens = response_length // 4  # Rough estimate
                tokens_per_second = (
                    estimated_tokens / execution_time if execution_time > 0 else 0
                )

                print(f"   ⏱️  Execution Time: {execution_time:.2f}s")
                print(
                    f"   📝 Response Length: {response_length} chars (~{estimated_tokens} tokens)"
                )
                print(f"   🚀 Speed: ~{tokens_per_second:.0f} tokens/second")

                # Show brief response preview
                preview = (
                    response.content[:150] + "..."
                    if len(response.content) > 150
                    else response.content
                )
                print(f"   💬 Preview: {preview}")

            except Exception as e:
                print(f"   ❌ Failed: {str(e)}")

    async def run_comprehensive_demo(self):
        """Run the complete speed optimization demonstration."""
        print("🚀 MaverickMCP Speed Optimization Live Demonstration")
        print(f"⏰ Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("🎯 Goal: Validate 2-3x speed improvements with live API calls")

        # Step 1: Validate API connections
        if not await self.validate_api_connections():
            print("\n❌ Cannot proceed - API connections failed")
            return False

        # Step 2: Demonstrate model selection intelligence
        await self.demonstrate_model_selection()

        # Step 3: Demonstrate token generation speeds
        await self.demonstrate_token_generation_speed()

        # Step 4: Run comprehensive performance tests
        results = await self.run_performance_comparison()

        # Final summary
        self.print_header("🎉 DEMONSTRATION COMPLETE")

        successful_results = [r for r in results if r.get("status") == "success"]
        targets_achieved = [r for r in results if r.get("target_achieved")]

        print("✅ Speed Optimization Demonstration Results:")
        print(f"   Tests Run: {len(results)}")
        print(f"   Successful: {len(successful_results)}")
        print(f"   Targets Achieved: {len(targets_achieved)}")
        print(f"   Success Rate: {(len(targets_achieved) / len(results) * 100):.1f}%")

        if successful_results:
            max_time = max(r["execution_time"] for r in successful_results)
            avg_time = sum(r["execution_time"] for r in successful_results) / len(
                successful_results
            )
            print(f"   Max Execution Time: {max_time:.2f}s")
            print(f"   Avg Execution Time: {avg_time:.2f}s")
            print("   Historical Baseline: 130s (timeout failures)")
            print(f"   Speed Improvement: {130 / max_time:.1f}x faster")

        print("\n📊 Key Optimizations Validated:")
        print("   ✅ Adaptive Model Selection (Gemini 2.5 Flash for speed)")
        print("   ✅ Progressive Token Budgeting")
        print("   ✅ Parallel Processing")
        print("   ✅ Early Termination Based on Confidence")
        print("   ✅ Intelligent Content Filtering")
        print("   ✅ Optimized Prompt Engineering")

        return len(targets_achieved) >= len(results) * 0.7  # 70% success threshold


async def main():
    """Main demonstration entry point."""
    demo = SpeedDemonstrationSuite()

    try:
        success = await demo.run_comprehensive_demo()

        if success:
            print("\n🎉 Demonstration PASSED - Speed optimizations validated!")
            return 0
        else:
            print("\n⚠️ Demonstration had issues - review results above")
            return 1

    except KeyboardInterrupt:
        print("\n\n⏹️  Demonstration interrupted by user")
        return 130
    except Exception as e:
        print(f"\n💥 Demonstration failed with error: {e}")
        import traceback

        traceback.print_exc()
        return 1


if __name__ == "__main__":
    # Ensure we have the required environment variables
    required_vars = ["OPENROUTER_API_KEY"]
    missing_vars = [var for var in required_vars if not os.getenv(var)]

    if missing_vars:
        print(f"❌ Missing required environment variables: {missing_vars}")
        print("Please check your .env file")
        sys.exit(1)

    # Run the demonstration
    exit_code = asyncio.run(main())
    sys.exit(exit_code)

```
Page 19/29FirstPrevNextLast