This is page 31 of 39. Use http://codebase.md/wshobson/maverick-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/test_speed_optimization_validation.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Speed Optimization Validation Test Suite for MaverickMCP Research Agents
3 |
4 | This comprehensive test suite validates the speed optimizations implemented in the research system:
5 | - Validates 2-3x speed improvement claims
6 | - Tests emergency mode completion under 30s
7 | - Verifies fast model selection (Gemini 2.5 Flash, GPT-4o Mini)
8 | - Resolves previous timeout issues (138s, 129s failures)
9 | - Compares before/after performance
10 |
11 | Speed Optimization Features Being Tested:
12 | 1. Adaptive Model Selection (emergency, fast, balanced modes)
13 | 2. Progressive Token Budgeting with time awareness
14 | 3. Parallel LLM Processing with intelligent batching
15 | 4. Optimized Prompt Engineering for speed
16 | 5. Early Termination based on confidence thresholds
17 | 6. Content Filtering to reduce processing overhead
18 | """
19 |
20 | import asyncio
21 | import logging
22 | import statistics
23 | import time
24 | from datetime import datetime
25 | from enum import Enum
26 | from typing import Any
27 | from unittest.mock import AsyncMock, MagicMock
28 |
29 | try:
30 | import pytest
31 | except ImportError:
32 | # For standalone use without pytest
33 | pytest = None
34 |
35 | from maverick_mcp.agents.deep_research import DeepResearchAgent
36 | from maverick_mcp.agents.optimized_research import OptimizedDeepResearchAgent
37 | from maverick_mcp.providers.openrouter_provider import TaskType
38 | from maverick_mcp.utils.llm_optimization import (
39 | AdaptiveModelSelector,
40 | ConfidenceTracker,
41 | IntelligentContentFilter,
42 | ParallelLLMProcessor,
43 | ProgressiveTokenBudgeter,
44 | )
45 |
46 | logger = logging.getLogger(__name__)
47 |
48 | # Speed optimization validation thresholds
49 | SPEED_THRESHOLDS = {
50 | "simple_query_max_time": 15.0, # Simple queries should complete in <15s
51 | "moderate_query_max_time": 25.0, # Moderate queries should complete in <25s
52 | "complex_query_max_time": 35.0, # Complex queries should complete in <35s
53 | "emergency_mode_max_time": 30.0, # Emergency mode should complete in <30s
54 | "minimum_speedup_factor": 2.0, # Minimum 2x speedup over baseline
55 | "target_speedup_factor": 3.0, # Target 3x speedup over baseline
56 | "timeout_failure_threshold": 0.05, # Max 5% timeout failures allowed
57 | }
58 |
59 |
60 | # Test query complexity definitions
61 | class QueryComplexity(Enum):
62 | SIMPLE = "simple"
63 | MODERATE = "moderate"
64 | COMPLEX = "complex"
65 | EMERGENCY = "emergency"
66 |
67 |
68 | # Test query templates by complexity
69 | SPEED_TEST_QUERIES = {
70 | QueryComplexity.SIMPLE: [
71 | "Apple Inc current stock price and basic sentiment",
72 | "Tesla recent news and market overview",
73 | "Microsoft quarterly earnings summary",
74 | "NVIDIA stock performance this month",
75 | ],
76 | QueryComplexity.MODERATE: [
77 | "Apple Inc comprehensive financial analysis and competitive position in smartphone market",
78 | "Tesla Inc market outlook considering EV competition and regulatory changes",
79 | "Microsoft Corp cloud business growth prospects and AI integration strategy",
80 | "NVIDIA competitive analysis in semiconductor and AI acceleration markets",
81 | ],
82 | QueryComplexity.COMPLEX: [
83 | "Apple Inc deep fundamental analysis including supply chain risks, product lifecycle assessment, regulatory challenges across global markets, competitive positioning against Samsung and Google, and 5-year growth trajectory considering AR/VR investments and services expansion",
84 | "Tesla Inc comprehensive investment thesis covering production scaling challenges, battery technology competitive advantages, autonomous driving timeline and regulatory risks, energy business growth potential, and Elon Musk leadership impact on stock volatility",
85 | "Microsoft Corp strategic analysis of cloud infrastructure competition with AWS and Google, AI monetization through Copilot integration, gaming division performance post-Activision acquisition, and enterprise software market share defense against Salesforce and Oracle",
86 | "NVIDIA Corp detailed semiconductor industry analysis covering data center growth drivers, gaming market maturity, automotive AI partnerships, geopolitical chip manufacturing risks, and competitive threats from AMD, Intel, and custom silicon development by major cloud providers",
87 | ],
88 | QueryComplexity.EMERGENCY: [
89 | "Quick Apple sentiment - bullish or bearish right now?",
90 | "Tesla stock - buy, hold, or sell this week?",
91 | "Microsoft earnings - beat or miss expectations?",
92 | "NVIDIA - momentum trade opportunity today?",
93 | ],
94 | }
95 |
96 | # Expected model selections for each scenario
97 | EXPECTED_MODEL_SELECTIONS = {
98 | QueryComplexity.SIMPLE: ["google/gemini-2.5-flash", "openai/gpt-4o-mini"],
99 | QueryComplexity.MODERATE: ["openai/gpt-4o-mini", "google/gemini-2.5-flash"],
100 | QueryComplexity.COMPLEX: [
101 | "anthropic/claude-sonnet-4",
102 | "google/gemini-2.5-pro",
103 | ],
104 | QueryComplexity.EMERGENCY: ["google/gemini-2.5-flash", "openai/gpt-4o-mini"],
105 | }
106 |
107 | # Token generation speeds (tokens/second) for validation
108 | MODEL_SPEED_BENCHMARKS = {
109 | "google/gemini-2.5-flash": 199,
110 | "openai/gpt-4o-mini": 126,
111 | "anthropic/claude-sonnet-4": 45,
112 | "google/gemini-2.5-pro": 25,
113 | "anthropic/claude-haiku": 89,
114 | }
115 |
116 |
117 | class SpeedTestMonitor:
118 | """Monitors speed optimization performance during test execution."""
119 |
120 | def __init__(self, test_name: str, complexity: QueryComplexity):
121 | self.test_name = test_name
122 | self.complexity = complexity
123 | self.start_time: float = 0
124 | self.end_time: float = 0
125 | self.phase_timings: dict[str, float] = {}
126 | self.model_selections: list[str] = []
127 | self.optimization_metrics: dict[str, Any] = {}
128 |
129 | def __enter__(self):
130 | """Start speed monitoring."""
131 | self.start_time = time.time()
132 | logger.info(f"Starting speed test: {self.test_name} ({self.complexity.value})")
133 | return self
134 |
135 | def __exit__(self, exc_type, exc_val, exc_tb):
136 | """Complete speed monitoring and log results."""
137 | self.end_time = time.time()
138 | total_time = self.end_time - self.start_time
139 |
140 | logger.info(
141 | f"Speed test completed: {self.test_name} - "
142 | f"Time: {total_time:.2f}s, "
143 | f"Complexity: {self.complexity.value}, "
144 | f"Models: {self.model_selections}"
145 | )
146 |
147 | def record_phase(self, phase_name: str, duration: float):
148 | """Record timing for a specific phase."""
149 | self.phase_timings[phase_name] = duration
150 |
151 | def record_model_selection(self, model_id: str):
152 | """Record which model was selected."""
153 | self.model_selections.append(model_id)
154 |
155 | def record_optimization_metric(self, metric_name: str, value: Any):
156 | """Record optimization-specific metrics."""
157 | self.optimization_metrics[metric_name] = value
158 |
159 | @property
160 | def total_execution_time(self) -> float:
161 | """Get total execution time."""
162 | return self.end_time - self.start_time if self.end_time > 0 else 0
163 |
164 |
165 | class MockOpenRouterProvider:
166 | """Mock OpenRouter provider that simulates realistic API response times."""
167 |
168 | def __init__(self, simulate_model_speeds: bool = True):
169 | self.simulate_model_speeds = simulate_model_speeds
170 | self.call_history: list[dict[str, Any]] = []
171 |
172 | def get_llm(self, model_override: str = None, **kwargs):
173 | """Get mock LLM with realistic speed simulation."""
174 | model_id = model_override or "openai/gpt-4o-mini"
175 |
176 | mock_llm = AsyncMock()
177 | mock_llm.model_id = model_id
178 |
179 | # Simulate realistic response times based on model speed
180 | if self.simulate_model_speeds:
181 | speed = MODEL_SPEED_BENCHMARKS.get(model_id, 50)
182 | max_tokens = kwargs.get("max_tokens", 1000)
183 | # Calculate response time: (tokens / speed) + API overhead
184 | response_time = (max_tokens / speed) + 0.5 # 0.5s API overhead
185 | else:
186 | response_time = 0.1 # Fast mock response
187 |
188 | async def mock_ainvoke(messages):
189 | await asyncio.sleep(response_time)
190 | # Record the call
191 | self.call_history.append(
192 | {
193 | "model_id": model_id,
194 | "response_time": response_time,
195 | "max_tokens": kwargs.get("max_tokens", 1000),
196 | "timestamp": time.time(),
197 | "messages": len(messages),
198 | }
199 | )
200 |
201 | # Return mock response
202 | mock_response = MagicMock()
203 | mock_response.content = (
204 | f"Mock response from {model_id} (simulated {response_time:.2f}s)"
205 | )
206 | return mock_response
207 |
208 | mock_llm.ainvoke = mock_ainvoke
209 | return mock_llm
210 |
211 |
212 | class SpeedOptimizationValidator:
213 | """Validates speed optimization claims and performance improvements."""
214 |
215 | @staticmethod
216 | async def test_adaptive_model_selection(
217 | time_budget: float, complexity: float, expected_models: list[str]
218 | ) -> dict[str, Any]:
219 | """Test that adaptive model selection chooses appropriate fast models."""
220 | provider = MockOpenRouterProvider(simulate_model_speeds=True)
221 | selector = AdaptiveModelSelector(provider)
222 |
223 | # Test model selection for time budget
224 | model_config = selector.select_model_for_time_budget(
225 | task_type=TaskType.MARKET_ANALYSIS,
226 | time_remaining_seconds=time_budget,
227 | complexity_score=complexity,
228 | content_size_tokens=1000,
229 | )
230 |
231 | return {
232 | "selected_model": model_config.model_id,
233 | "max_tokens": model_config.max_tokens,
234 | "timeout_seconds": model_config.timeout_seconds,
235 | "expected_models": expected_models,
236 | "model_appropriate": model_config.model_id in expected_models,
237 | "speed_optimized": model_config.model_id
238 | in ["google/gemini-2.5-flash", "openai/gpt-4o-mini"],
239 | }
240 |
241 | @staticmethod
242 | async def test_emergency_mode_performance(query: str) -> dict[str, Any]:
243 | """Test emergency mode performance (< 30s completion)."""
244 | provider = MockOpenRouterProvider(simulate_model_speeds=True)
245 |
246 | # Create optimized research agent
247 | agent = OptimizedDeepResearchAgent(
248 | openrouter_provider=provider,
249 | persona="moderate",
250 | optimization_enabled=True,
251 | )
252 |
253 | # Mock the search providers to avoid actual API calls
254 | agent.search_providers = [MockSearchProvider()]
255 |
256 | start_time = time.time()
257 |
258 | try:
259 | # Test with strict emergency time budget
260 | result = await agent.research_comprehensive(
261 | topic=query,
262 | session_id="emergency_test",
263 | depth="basic",
264 | time_budget_seconds=25.0, # Strict emergency budget
265 | target_confidence=0.6, # Lower confidence for speed
266 | )
267 |
268 | execution_time = time.time() - start_time
269 |
270 | return {
271 | "success": True,
272 | "execution_time": execution_time,
273 | "within_budget": execution_time
274 | < SPEED_THRESHOLDS["emergency_mode_max_time"],
275 | "result_status": result.get("status", "unknown"),
276 | "emergency_mode_used": result.get("emergency_mode", False),
277 | "optimization_metrics": result.get("optimization_metrics", {}),
278 | }
279 |
280 | except Exception as e:
281 | execution_time = time.time() - start_time
282 | return {
283 | "success": False,
284 | "execution_time": execution_time,
285 | "error": str(e),
286 | "within_budget": execution_time
287 | < SPEED_THRESHOLDS["emergency_mode_max_time"],
288 | }
289 |
290 | @staticmethod
291 | async def test_baseline_vs_optimized_performance(
292 | query: str, complexity: QueryComplexity
293 | ) -> dict[str, Any]:
294 | """Compare baseline vs optimized agent performance."""
295 | provider = MockOpenRouterProvider(simulate_model_speeds=True)
296 |
297 | # Test baseline agent (non-optimized)
298 | baseline_agent = DeepResearchAgent(
299 | llm=provider.get_llm(),
300 | persona="moderate",
301 | enable_parallel_execution=False,
302 | )
303 | baseline_agent.search_providers = [MockSearchProvider()]
304 |
305 | # Test optimized agent
306 | optimized_agent = OptimizedDeepResearchAgent(
307 | openrouter_provider=provider,
308 | persona="moderate",
309 | optimization_enabled=True,
310 | )
311 | optimized_agent.search_providers = [MockSearchProvider()]
312 |
313 | # Run baseline test
314 | baseline_start = time.time()
315 | try:
316 | baseline_result = await baseline_agent.research_comprehensive(
317 | topic=query,
318 | session_id="baseline_test",
319 | depth="standard",
320 | )
321 | baseline_time = time.time() - baseline_start
322 | baseline_success = True
323 | except Exception as e:
324 | baseline_time = time.time() - baseline_start
325 | baseline_success = False
326 | baseline_result = {"error": str(e)}
327 |
328 | # Run optimized test
329 | optimized_start = time.time()
330 | try:
331 | optimized_result = await optimized_agent.research_comprehensive(
332 | topic=query,
333 | session_id="optimized_test",
334 | depth="standard",
335 | time_budget_seconds=60.0,
336 | )
337 | optimized_time = time.time() - optimized_start
338 | optimized_success = True
339 | except Exception as e:
340 | optimized_time = time.time() - optimized_start
341 | optimized_success = False
342 | optimized_result = {"error": str(e)}
343 |
344 | # Calculate performance metrics
345 | speedup_factor = (
346 | baseline_time / max(optimized_time, 0.001) if optimized_time > 0 else 0
347 | )
348 |
349 | return {
350 | "baseline_time": baseline_time,
351 | "optimized_time": optimized_time,
352 | "speedup_factor": speedup_factor,
353 | "baseline_success": baseline_success,
354 | "optimized_success": optimized_success,
355 | "meets_2x_target": speedup_factor
356 | >= SPEED_THRESHOLDS["minimum_speedup_factor"],
357 | "meets_3x_target": speedup_factor
358 | >= SPEED_THRESHOLDS["target_speedup_factor"],
359 | "baseline_result": baseline_result,
360 | "optimized_result": optimized_result,
361 | }
362 |
363 |
364 | class MockSearchProvider:
365 | """Mock search provider for testing without external API calls."""
366 |
367 | async def search(self, query: str, num_results: int = 5) -> list[dict[str, Any]]:
368 | """Return mock search results."""
369 | await asyncio.sleep(0.1) # Simulate API delay
370 |
371 | return [
372 | {
373 | "title": f"Mock search result {i + 1} for: {query[:30]}",
374 | "url": f"https://example.com/result{i + 1}",
375 | "content": f"Mock content for result {i + 1}. " * 50, # ~50 words
376 | "published_date": datetime.now().isoformat(),
377 | "credibility_score": 0.8,
378 | "relevance_score": 0.9 - (i * 0.1),
379 | }
380 | for i in range(num_results)
381 | ]
382 |
383 |
384 | # Test fixtures (conditional on pytest availability)
385 | if pytest:
386 |
387 | @pytest.fixture
388 | def mock_openrouter_provider():
389 | """Provide mock OpenRouter provider."""
390 | return MockOpenRouterProvider(simulate_model_speeds=True)
391 |
392 | @pytest.fixture
393 | def speed_validator():
394 | """Provide speed optimization validator."""
395 | return SpeedOptimizationValidator()
396 |
397 | @pytest.fixture
398 | def speed_monitor_factory():
399 | """Factory for creating speed test monitors."""
400 |
401 | def _create_monitor(test_name: str, complexity: QueryComplexity):
402 | return SpeedTestMonitor(test_name, complexity)
403 |
404 | return _create_monitor
405 |
406 |
407 | # Core Speed Optimization Tests
408 | if pytest:
409 |
410 | @pytest.mark.unit
411 | class TestSpeedOptimizations:
412 | """Core tests for speed optimization functionality."""
413 |
414 | async def test_adaptive_model_selector_emergency_mode(
415 | self, mock_openrouter_provider
416 | ):
417 | """Test that emergency mode selects fastest models."""
418 | selector = AdaptiveModelSelector(mock_openrouter_provider)
419 |
420 | # Test ultra-emergency mode (< 10s)
421 | config = selector.select_model_for_time_budget(
422 | task_type=TaskType.QUICK_ANSWER,
423 | time_remaining_seconds=8.0,
424 | complexity_score=0.5,
425 | content_size_tokens=500,
426 | )
427 |
428 | # Should select fastest model
429 | assert config.model_id in ["google/gemini-2.5-flash", "openai/gpt-4o-mini"]
430 | assert config.timeout_seconds < 10
431 | assert config.max_tokens < 1000
432 |
433 | # Test moderate emergency (< 25s)
434 | config = selector.select_model_for_time_budget(
435 | task_type=TaskType.MARKET_ANALYSIS,
436 | time_remaining_seconds=20.0,
437 | complexity_score=0.7,
438 | content_size_tokens=1000,
439 | )
440 |
441 | # Should still prefer fast models
442 | assert config.model_id in ["google/gemini-2.5-flash", "openai/gpt-4o-mini"]
443 | assert config.timeout_seconds < 25
444 |
445 | async def test_progressive_token_budgeter_time_constraints(self):
446 | """Test progressive token budgeter adapts to time pressure."""
447 | # Test emergency budget
448 | emergency_budgeter = ProgressiveTokenBudgeter(
449 | total_time_budget_seconds=20.0, confidence_target=0.6
450 | )
451 |
452 | allocation = emergency_budgeter.allocate_tokens_for_phase(
453 | phase=emergency_budgeter.phase_budgets.__class__.CONTENT_ANALYSIS,
454 | sources_count=3,
455 | current_confidence=0.3,
456 | complexity_score=0.5,
457 | )
458 |
459 | # Emergency mode should have reduced tokens and shorter timeout
460 | assert allocation.output_tokens < 1000
461 | assert allocation.timeout_seconds < 15
462 |
463 | # Test standard budget
464 | standard_budgeter = ProgressiveTokenBudgeter(
465 | total_time_budget_seconds=120.0, confidence_target=0.75
466 | )
467 |
468 | allocation = standard_budgeter.allocate_tokens_for_phase(
469 | phase=standard_budgeter.phase_budgets.__class__.CONTENT_ANALYSIS,
470 | sources_count=3,
471 | current_confidence=0.3,
472 | complexity_score=0.5,
473 | )
474 |
475 | # Standard mode should allow more tokens and time
476 | assert allocation.output_tokens >= 1000
477 | assert allocation.timeout_seconds >= 15
478 |
479 | async def test_parallel_llm_processor_speed_optimization(
480 | self, mock_openrouter_provider
481 | ):
482 | """Test parallel LLM processor speed optimizations."""
483 | processor = ParallelLLMProcessor(mock_openrouter_provider, max_concurrent=4)
484 |
485 | # Create mock sources
486 | sources = [
487 | {
488 | "title": f"Source {i}",
489 | "content": f"Mock content {i} " * 100, # ~100 words
490 | "url": f"https://example.com/{i}",
491 | }
492 | for i in range(6)
493 | ]
494 |
495 | start_time = time.time()
496 |
497 | results = await processor.parallel_content_analysis(
498 | sources=sources,
499 | analysis_type="sentiment",
500 | persona="moderate",
501 | time_budget_seconds=15.0, # Tight budget
502 | current_confidence=0.0,
503 | )
504 |
505 | execution_time = time.time() - start_time
506 |
507 | # Should complete within time budget
508 | assert execution_time < 20.0 # Some buffer for test environment
509 | assert len(results) > 0 # Should produce results
510 |
511 | # Verify all results have required analysis structure
512 | for result in results:
513 | assert "analysis" in result
514 | analysis = result["analysis"]
515 | assert "sentiment" in analysis
516 | assert "batch_processed" in analysis
517 |
518 | async def test_confidence_tracker_early_termination(self):
519 | """Test confidence tracker enables early termination."""
520 | tracker = ConfidenceTracker(
521 | target_confidence=0.8,
522 | min_sources=2,
523 | max_sources=10,
524 | )
525 |
526 | # Simulate high-confidence evidence
527 | high_confidence_evidence = {
528 | "sentiment": {"direction": "bullish", "confidence": 0.9},
529 | "insights": ["Strong positive insight", "Another strong insight"],
530 | "risk_factors": ["Minor risk"],
531 | "opportunities": ["Major opportunity", "Growth catalyst"],
532 | "relevance_score": 0.95,
533 | }
534 |
535 | # Process minimum sources first
536 | for _i in range(2):
537 | result = tracker.update_confidence(high_confidence_evidence, 0.9)
538 | if not result["should_continue"]:
539 | break
540 |
541 | # After high-confidence sources, should suggest early termination
542 | final_result = tracker.update_confidence(high_confidence_evidence, 0.9)
543 |
544 | assert final_result["current_confidence"] > 0.7
545 | # Early termination logic should trigger with high confidence
546 |
547 | async def test_intelligent_content_filter_speed_optimization(self):
548 | """Test intelligent content filtering reduces processing overhead."""
549 | filter = IntelligentContentFilter()
550 |
551 | # Create sources with varying relevance
552 | sources = [
553 | {
554 | "title": "Apple Inc Q4 Earnings Beat Expectations",
555 | "content": "Apple Inc reported strong Q4 earnings with revenue growth of 15%. "
556 | + "The company's iPhone sales exceeded analysts' expectations. "
557 | * 20,
558 | "url": "https://reuters.com/apple-earnings",
559 | "published_date": datetime.now().isoformat(),
560 | },
561 | {
562 | "title": "Random Tech News Not About Apple",
563 | "content": "Some unrelated tech news content. " * 50,
564 | "url": "https://example.com/random",
565 | "published_date": "2023-01-01T00:00:00",
566 | },
567 | {
568 | "title": "Apple Supply Chain Analysis",
569 | "content": "Apple's supply chain faces challenges but shows resilience. "
570 | + "Manufacturing partnerships in Asia remain strong. " * 15,
571 | "url": "https://wsj.com/apple-supply-chain",
572 | "published_date": datetime.now().isoformat(),
573 | },
574 | ]
575 |
576 | filtered_sources = await filter.filter_and_prioritize_sources(
577 | sources=sources,
578 | research_focus="fundamental",
579 | time_budget=20.0, # Tight budget
580 | current_confidence=0.0,
581 | )
582 |
583 | # Should prioritize relevant, high-quality sources
584 | assert len(filtered_sources) <= len(sources)
585 | if filtered_sources:
586 | # First source should be most relevant
587 | assert "apple" in filtered_sources[0]["title"].lower()
588 | # Should have preprocessing applied
589 | assert "original_length" in filtered_sources[0]
590 |
591 |
592 | # Speed Validation Tests by Query Complexity
593 | if pytest:
594 |
595 | @pytest.mark.integration
596 | class TestQueryComplexitySpeedValidation:
597 | """Test speed validation across different query complexities."""
598 |
599 | @pytest.mark.parametrize("complexity", list(QueryComplexity))
600 | async def test_query_completion_time_thresholds(
601 | self, complexity: QueryComplexity, speed_monitor_factory, speed_validator
602 | ):
603 | """Test queries complete within time thresholds by complexity."""
604 | queries = SPEED_TEST_QUERIES[complexity]
605 |
606 | results = []
607 |
608 | for query in queries[:2]: # Test 2 queries per complexity
609 | with speed_monitor_factory(
610 | f"complexity_test_{complexity.value}", complexity
611 | ) as monitor:
612 | if complexity == QueryComplexity.EMERGENCY:
613 | result = await speed_validator.test_emergency_mode_performance(
614 | query
615 | )
616 | else:
617 | # Use baseline vs optimized for other complexities
618 | result = await speed_validator.test_baseline_vs_optimized_performance(
619 | query, complexity
620 | )
621 |
622 | monitor.record_optimization_metric(
623 | "completion_time", monitor.total_execution_time
624 | )
625 | results.append(
626 | {
627 | "query": query,
628 | "execution_time": monitor.total_execution_time,
629 | "result": result,
630 | }
631 | )
632 |
633 | # Validate time thresholds based on complexity
634 | threshold_map = {
635 | QueryComplexity.SIMPLE: SPEED_THRESHOLDS["simple_query_max_time"],
636 | QueryComplexity.MODERATE: SPEED_THRESHOLDS["moderate_query_max_time"],
637 | QueryComplexity.COMPLEX: SPEED_THRESHOLDS["complex_query_max_time"],
638 | QueryComplexity.EMERGENCY: SPEED_THRESHOLDS["emergency_mode_max_time"],
639 | }
640 |
641 | max_allowed_time = threshold_map[complexity]
642 |
643 | for result in results:
644 | execution_time = result["execution_time"]
645 | assert execution_time < max_allowed_time, (
646 | f"{complexity.value} query exceeded time threshold: "
647 | f"{execution_time:.2f}s > {max_allowed_time}s"
648 | )
649 |
650 | # Log performance summary
651 | avg_time = statistics.mean([r["execution_time"] for r in results])
652 | logger.info(
653 | f"{complexity.value} queries - Avg time: {avg_time:.2f}s "
654 | f"(threshold: {max_allowed_time}s)"
655 | )
656 |
657 | async def test_emergency_mode_model_selection(self, mock_openrouter_provider):
658 | """Test emergency mode selects fastest models."""
659 | selector = AdaptiveModelSelector(mock_openrouter_provider)
660 |
661 | # Test various emergency time budgets
662 | emergency_scenarios = [5, 10, 15, 20, 25]
663 |
664 | for time_budget in emergency_scenarios:
665 | config = selector.select_model_for_time_budget(
666 | task_type=TaskType.QUICK_ANSWER,
667 | time_remaining_seconds=time_budget,
668 | complexity_score=0.3, # Low complexity for emergency
669 | content_size_tokens=200,
670 | )
671 |
672 | # Should always select fastest models in emergency scenarios
673 | expected_models = EXPECTED_MODEL_SELECTIONS[QueryComplexity.EMERGENCY]
674 | assert config.model_id in expected_models, (
675 | f"Emergency mode with {time_budget}s budget should select fast model, "
676 | f"got {config.model_id}"
677 | )
678 |
679 | # Timeout should be appropriate for time budget
680 | assert config.timeout_seconds < time_budget * 0.8, (
681 | f"Timeout too long for emergency budget: "
682 | f"{config.timeout_seconds}s for {time_budget}s budget"
683 | )
684 |
685 |
686 | # Performance Comparison Tests
687 | if pytest:
688 |
689 | @pytest.mark.integration
690 | class TestSpeedImprovementValidation:
691 | """Validate claimed speed improvements (2-3x faster)."""
692 |
693 | async def test_2x_minimum_speedup_validation(self, speed_validator):
694 | """Validate minimum 2x speedup is achieved."""
695 | moderate_queries = SPEED_TEST_QUERIES[QueryComplexity.MODERATE]
696 |
697 | speedup_results = []
698 |
699 | for query in moderate_queries[:2]: # Test subset for CI speed
700 | result = await speed_validator.test_baseline_vs_optimized_performance(
701 | query, QueryComplexity.MODERATE
702 | )
703 |
704 | if result["baseline_success"] and result["optimized_success"]:
705 | speedup_results.append(result["speedup_factor"])
706 |
707 | logger.info(
708 | f"Speedup test: {result['speedup_factor']:.2f}x "
709 | f"({result['baseline_time']:.2f}s -> {result['optimized_time']:.2f}s)"
710 | )
711 |
712 | # Validate minimum 2x speedup achieved
713 | if speedup_results:
714 | avg_speedup = statistics.mean(speedup_results)
715 | min(speedup_results)
716 |
717 | assert avg_speedup >= SPEED_THRESHOLDS["minimum_speedup_factor"], (
718 | f"Average speedup {avg_speedup:.2f}x below 2x minimum threshold"
719 | )
720 |
721 | # At least 80% of tests should meet minimum speedup
722 | meeting_threshold = sum(
723 | 1
724 | for s in speedup_results
725 | if s >= SPEED_THRESHOLDS["minimum_speedup_factor"]
726 | )
727 | threshold_rate = meeting_threshold / len(speedup_results)
728 |
729 | assert threshold_rate >= 0.8, (
730 | f"Only {threshold_rate:.1%} of tests met 2x speedup threshold "
731 | f"(should be >= 80%)"
732 | )
733 | else:
734 | pytest.skip("No successful speedup comparisons completed")
735 |
736 | async def test_3x_target_speedup_aspiration(self, speed_validator):
737 | """Test aspirational 3x speedup target for simple queries."""
738 | simple_queries = SPEED_TEST_QUERIES[QueryComplexity.SIMPLE]
739 |
740 | speedup_results = []
741 |
742 | for query in simple_queries:
743 | result = await speed_validator.test_baseline_vs_optimized_performance(
744 | query, QueryComplexity.SIMPLE
745 | )
746 |
747 | if result["baseline_success"] and result["optimized_success"]:
748 | speedup_results.append(result["speedup_factor"])
749 |
750 | if speedup_results:
751 | avg_speedup = statistics.mean(speedup_results)
752 | max_speedup = max(speedup_results)
753 |
754 | logger.info(
755 | f"3x target test - Avg: {avg_speedup:.2f}x, Max: {max_speedup:.2f}x"
756 | )
757 |
758 | # This is aspirational - log results but don't fail
759 | target_met = avg_speedup >= SPEED_THRESHOLDS["target_speedup_factor"]
760 | if target_met:
761 | logger.info("🎉 3x speedup target achieved!")
762 | else:
763 | logger.info(f"3x target not yet achieved (current: {avg_speedup:.2f}x)")
764 |
765 | # Still assert we're making good progress toward 3x
766 | assert avg_speedup >= 1.5, (
767 | f"Should show significant speedup progress, got {avg_speedup:.2f}x"
768 | )
769 |
770 |
771 | # Timeout Resolution Tests
772 | if pytest:
773 |
774 | @pytest.mark.integration
775 | class TestTimeoutResolution:
776 | """Test resolution of previous timeout issues (138s, 129s failures)."""
777 |
778 | async def test_no_timeout_failures_in_emergency_mode(self, speed_validator):
779 | """Test emergency mode prevents timeout failures."""
780 | emergency_queries = SPEED_TEST_QUERIES[QueryComplexity.EMERGENCY]
781 |
782 | timeout_failures = 0
783 | total_tests = 0
784 |
785 | for query in emergency_queries:
786 | total_tests += 1
787 |
788 | result = await speed_validator.test_emergency_mode_performance(query)
789 |
790 | # Check if execution exceeded emergency time budget
791 | if result["execution_time"] >= SPEED_THRESHOLDS["emergency_mode_max_time"]:
792 | timeout_failures += 1
793 | logger.warning(
794 | f"Emergency mode timeout: {result['execution_time']:.2f}s "
795 | f"for query: {query[:50]}..."
796 | )
797 |
798 | # Calculate failure rate
799 | timeout_failure_rate = timeout_failures / max(total_tests, 1)
800 |
801 | # Should have very low timeout failure rate
802 | assert timeout_failure_rate <= SPEED_THRESHOLDS["timeout_failure_threshold"], (
803 | f"Timeout failure rate too high: {timeout_failure_rate:.1%} > "
804 | f"{SPEED_THRESHOLDS['timeout_failure_threshold']:.1%}"
805 | )
806 |
807 | logger.info(
808 | f"Timeout resolution test: {timeout_failure_rate:.1%} failure rate "
809 | f"({timeout_failures}/{total_tests} timeouts)"
810 | )
811 |
812 | async def test_graceful_degradation_under_time_pressure(self, speed_validator):
813 | """Test system degrades gracefully under extreme time pressure."""
814 | # Simulate very tight time budgets that previously caused 138s/129s failures
815 | tight_budgets = [10, 15, 20, 25] # Various emergency scenarios
816 |
817 | degradation_results = []
818 |
819 | for budget in tight_budgets:
820 | provider = MockOpenRouterProvider(simulate_model_speeds=True)
821 |
822 | agent = OptimizedDeepResearchAgent(
823 | openrouter_provider=provider,
824 | persona="moderate",
825 | optimization_enabled=True,
826 | )
827 | agent.search_providers = [MockSearchProvider()]
828 |
829 | start_time = time.time()
830 |
831 | try:
832 | result = await agent.research_comprehensive(
833 | topic="Apple Inc urgent analysis needed",
834 | session_id=f"degradation_test_{budget}s",
835 | depth="basic",
836 | time_budget_seconds=budget,
837 | target_confidence=0.5, # Lower expectations
838 | )
839 |
840 | execution_time = time.time() - start_time
841 |
842 | degradation_results.append(
843 | {
844 | "budget": budget,
845 | "execution_time": execution_time,
846 | "success": True,
847 | "within_budget": execution_time <= budget + 5, # 5s buffer
848 | "emergency_mode": result.get("emergency_mode", False),
849 | }
850 | )
851 |
852 | except Exception as e:
853 | execution_time = time.time() - start_time
854 | degradation_results.append(
855 | {
856 | "budget": budget,
857 | "execution_time": execution_time,
858 | "success": False,
859 | "error": str(e),
860 | "within_budget": execution_time <= budget + 5,
861 | }
862 | )
863 |
864 | # Validate graceful degradation
865 | successful_tests = [r for r in degradation_results if r["success"]]
866 | within_budget_tests = [r for r in degradation_results if r["within_budget"]]
867 |
868 | success_rate = len(successful_tests) / len(degradation_results)
869 | budget_compliance_rate = len(within_budget_tests) / len(degradation_results)
870 |
871 | # Should succeed most of the time and stay within budget
872 | assert success_rate >= 0.75, (
873 | f"Success rate too low under time pressure: {success_rate:.1%}"
874 | )
875 | assert budget_compliance_rate >= 0.80, (
876 | f"Budget compliance too low: {budget_compliance_rate:.1%}"
877 | )
878 |
879 | logger.info(
880 | f"Graceful degradation test: {success_rate:.1%} success rate, "
881 | f"{budget_compliance_rate:.1%} budget compliance"
882 | )
883 |
884 |
885 | if __name__ == "__main__":
886 | # Allow running specific test categories
887 | import sys
888 |
889 | if len(sys.argv) > 1:
890 | pytest.main([sys.argv[1], "-v", "-s", "--tb=short"])
891 | else:
892 | # Run all speed validation tests by default
893 | pytest.main([__file__, "-v", "-s", "--tb=short"])
894 |
```
--------------------------------------------------------------------------------
/maverick_mcp/config/settings.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Configuration settings for Maverick-MCP.
3 |
4 | This module provides configuration settings that can be customized
5 | through environment variables or a settings file.
6 | """
7 |
8 | import logging
9 | import os
10 | from decimal import Decimal
11 |
12 | from pydantic import BaseModel, Field
13 |
14 | from maverick_mcp.config.constants import CONFIG
15 |
16 | # Set up logging
17 | logging.basicConfig(
18 | level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
19 | )
20 | logger = logging.getLogger("maverick_mcp.config")
21 |
22 |
23 | class DatabaseSettings(BaseModel):
24 | """Database configuration settings."""
25 |
26 | host: str = Field(default="localhost", description="Database host")
27 | port: int = Field(default=5432, description="Database port")
28 | username: str = Field(default="postgres", description="Database username")
29 | password: str = Field(default="", description="Database password")
30 | database: str = Field(default="maverick_mcp", description="Database name")
31 | max_connections: int = Field(
32 | default=10, description="Maximum number of connections"
33 | )
34 |
35 | @property
36 | def url(self) -> str:
37 | """Get database URL string."""
38 | # Check for environment variable first
39 | env_url = os.getenv("DATABASE_URL") or os.getenv("POSTGRES_URL")
40 | if env_url:
41 | return env_url
42 | # Default to SQLite for development
43 | return "sqlite:///maverick_mcp.db"
44 |
45 |
46 | class APISettings(BaseModel):
47 | """API configuration settings."""
48 |
49 | host: str = Field(default="0.0.0.0", description="API host")
50 | port: int = Field(default=8000, description="API port")
51 | debug: bool = Field(default=False, description="Debug mode")
52 | log_level: str = Field(default="info", description="Log level")
53 | cache_timeout: int = Field(default=300, description="Cache timeout in seconds")
54 | cors_origins: list[str] = Field(
55 | default=["http://localhost:3000", "http://localhost:3001"],
56 | description="CORS allowed origins",
57 | )
58 |
59 | # Web search API key
60 | exa_api_key: str | None = Field(
61 | default_factory=lambda: os.getenv("EXA_API_KEY"),
62 | description="Exa API key",
63 | )
64 |
65 |
66 | class DataProviderSettings(BaseModel):
67 | """Data provider configuration settings."""
68 |
69 | api_key: str | None = Field(default=None, description="API key for data provider")
70 | use_cache: bool = Field(default=True, description="Use cache for data")
71 | cache_dir: str = Field(
72 | default="/tmp/maverick_mcp/cache", description="Cache directory"
73 | )
74 | cache_expiry: int = Field(default=86400, description="Cache expiry in seconds")
75 | rate_limit: int = Field(default=5, description="Rate limit per minute")
76 |
77 | # Research-specific settings
78 | max_search_results: int = Field(
79 | default=100, description="Max search results per query"
80 | )
81 | research_cache_ttl: int = Field(
82 | default=3600, description="Research cache TTL in seconds"
83 | )
84 | content_max_length: int = Field(
85 | default=2000, description="Max content length per source"
86 | )
87 |
88 |
89 | class RedisSettings(BaseModel):
90 | """Redis configuration settings."""
91 |
92 | host: str = Field(
93 | default_factory=lambda: CONFIG["redis"]["host"], description="Redis host"
94 | )
95 | port: int = Field(
96 | default_factory=lambda: CONFIG["redis"]["port"], description="Redis port"
97 | )
98 | db: int = Field(
99 | default_factory=lambda: CONFIG["redis"]["db"],
100 | description="Redis database number",
101 | )
102 | username: str | None = Field(
103 | default_factory=lambda: CONFIG["redis"]["username"],
104 | description="Redis username",
105 | )
106 | password: str | None = Field(
107 | default_factory=lambda: CONFIG["redis"]["password"],
108 | description="Redis password",
109 | )
110 | ssl: bool = Field(
111 | default_factory=lambda: CONFIG["redis"]["ssl"],
112 | description="Use SSL for Redis connection",
113 | )
114 |
115 | @property
116 | def url(self) -> str:
117 | """Get Redis URL string."""
118 | scheme = "rediss" if self.ssl else "redis"
119 | auth = ""
120 | if self.username and self.password:
121 | auth = f"{self.username}:{self.password}@"
122 | elif self.password:
123 | auth = f":{self.password}@"
124 | return f"{scheme}://{auth}{self.host}:{self.port}/{self.db}"
125 |
126 |
127 | class ResearchSettings(BaseModel):
128 | """Research and web search configuration settings."""
129 |
130 | # API key for web search provider
131 | exa_api_key: str | None = Field(
132 | default_factory=lambda: os.getenv("EXA_API_KEY"),
133 | description="Exa API key for web search",
134 | )
135 |
136 | # Research parameters
137 | default_max_sources: int = Field(
138 | default=50, description="Default max sources per research"
139 | )
140 | default_research_depth: str = Field(
141 | default="comprehensive", description="Default research depth"
142 | )
143 | cache_ttl_hours: int = Field(default=4, description="Research cache TTL in hours")
144 |
145 | # Content analysis settings
146 | max_content_length: int = Field(
147 | default=2000, description="Max content length per source"
148 | )
149 | sentiment_confidence_threshold: float = Field(
150 | default=0.7, description="Sentiment confidence threshold"
151 | )
152 | credibility_score_threshold: float = Field(
153 | default=0.6, description="Source credibility threshold"
154 | )
155 |
156 | # Rate limiting
157 | search_rate_limit: int = Field(default=10, description="Search requests per minute")
158 | content_analysis_batch_size: int = Field(
159 | default=5, description="Content analysis batch size"
160 | )
161 |
162 | # Domain filtering
163 | trusted_domains: list[str] = Field(
164 | default=[
165 | "reuters.com",
166 | "bloomberg.com",
167 | "wsj.com",
168 | "ft.com",
169 | "marketwatch.com",
170 | "cnbc.com",
171 | "yahoo.com",
172 | "seekingalpha.com",
173 | ],
174 | description="Trusted news domains for research",
175 | )
176 | blocked_domains: list[str] = Field(
177 | default=[], description="Blocked domains for research"
178 | )
179 |
180 | @property
181 | def api_keys(self) -> dict[str, str | None]:
182 | """Get API keys as dictionary."""
183 | return {"exa_api_key": self.exa_api_key}
184 |
185 |
186 | class DataLimitsConfig(BaseModel):
187 | """Data limits and constraints configuration settings."""
188 |
189 | # API Rate limits
190 | max_api_requests_per_minute: int = Field(
191 | default_factory=lambda: int(os.getenv("MAX_API_REQUESTS_PER_MINUTE", "60")),
192 | description="Maximum API requests per minute",
193 | )
194 | max_api_requests_per_hour: int = Field(
195 | default_factory=lambda: int(os.getenv("MAX_API_REQUESTS_PER_HOUR", "1000")),
196 | description="Maximum API requests per hour",
197 | )
198 |
199 | # Data size limits
200 | max_data_rows_per_request: int = Field(
201 | default_factory=lambda: int(os.getenv("MAX_DATA_ROWS_PER_REQUEST", "10000")),
202 | description="Maximum data rows per request",
203 | )
204 | max_symbols_per_batch: int = Field(
205 | default_factory=lambda: int(os.getenv("MAX_SYMBOLS_PER_BATCH", "100")),
206 | description="Maximum symbols per batch request",
207 | )
208 | max_response_size_mb: int = Field(
209 | default_factory=lambda: int(os.getenv("MAX_RESPONSE_SIZE_MB", "50")),
210 | description="Maximum response size in MB",
211 | )
212 |
213 | # Research limits
214 | max_research_sources: int = Field(
215 | default_factory=lambda: int(os.getenv("MAX_RESEARCH_SOURCES", "100")),
216 | description="Maximum research sources per query",
217 | )
218 | max_research_depth_level: int = Field(
219 | default_factory=lambda: int(os.getenv("MAX_RESEARCH_DEPTH_LEVEL", "5")),
220 | description="Maximum research depth level",
221 | )
222 | max_content_analysis_items: int = Field(
223 | default_factory=lambda: int(os.getenv("MAX_CONTENT_ANALYSIS_ITEMS", "50")),
224 | description="Maximum content items for analysis",
225 | )
226 |
227 | # Agent limits
228 | max_agent_iterations: int = Field(
229 | default_factory=lambda: int(os.getenv("MAX_AGENT_ITERATIONS", "10")),
230 | description="Maximum agent workflow iterations",
231 | )
232 | max_parallel_agents: int = Field(
233 | default_factory=lambda: int(os.getenv("MAX_PARALLEL_AGENTS", "5")),
234 | description="Maximum parallel agents in orchestration",
235 | )
236 | max_agent_execution_time_seconds: int = Field(
237 | default_factory=lambda: int(
238 | os.getenv("MAX_AGENT_EXECUTION_TIME_SECONDS", "720")
239 | ),
240 | description="Maximum agent execution time in seconds",
241 | )
242 |
243 | # Cache limits
244 | max_cache_size_mb: int = Field(
245 | default_factory=lambda: int(os.getenv("MAX_CACHE_SIZE_MB", "500")),
246 | description="Maximum cache size in MB",
247 | )
248 | max_cached_items: int = Field(
249 | default_factory=lambda: int(os.getenv("MAX_CACHED_ITEMS", "10000")),
250 | description="Maximum number of cached items",
251 | )
252 |
253 | # Database limits
254 | max_db_connections: int = Field(
255 | default_factory=lambda: int(os.getenv("MAX_DB_CONNECTIONS", "100")),
256 | description="Maximum database connections",
257 | )
258 | max_query_results: int = Field(
259 | default_factory=lambda: int(os.getenv("MAX_QUERY_RESULTS", "50000")),
260 | description="Maximum query results",
261 | )
262 |
263 |
264 | class ExternalDataSettings(BaseModel):
265 | """External data API configuration settings."""
266 |
267 | api_key: str | None = Field(
268 | default_factory=lambda: os.getenv("EXTERNAL_DATA_API_KEY"),
269 | description="API key for external data API",
270 | )
271 | base_url: str = Field(
272 | default="https://external-data-api.com",
273 | description="Base URL for external data API",
274 | )
275 |
276 |
277 | class EmailSettings(BaseModel):
278 | """Email service configuration settings."""
279 |
280 | enabled: bool = Field(
281 | default_factory=lambda: os.getenv("EMAIL_ENABLED", "true").lower() == "true",
282 | description="Enable email sending",
283 | )
284 | mailgun_api_key: str = Field(
285 | default_factory=lambda: os.getenv("MAILGUN_API_KEY", ""),
286 | description="Mailgun API key",
287 | )
288 | mailgun_domain: str = Field(
289 | default_factory=lambda: os.getenv("MAILGUN_DOMAIN", ""),
290 | description="Mailgun sending domain",
291 | )
292 | from_address: str = Field(
293 | default_factory=lambda: os.getenv("EMAIL_FROM_ADDRESS", "noreply@localhost"),
294 | description="Default from email address",
295 | )
296 | from_name: str = Field(
297 | default_factory=lambda: os.getenv("EMAIL_FROM_NAME", "MaverickMCP"),
298 | description="Default from name",
299 | )
300 | support_email: str = Field(
301 | default_factory=lambda: os.getenv("EMAIL_SUPPORT", "support@localhost"),
302 | description="Support email address",
303 | )
304 |
305 |
306 | class FinancialConfig(BaseModel):
307 | """Financial calculations and portfolio management settings."""
308 |
309 | # Portfolio defaults
310 | default_account_size: Decimal = Field(
311 | default_factory=lambda: Decimal(os.getenv("DEFAULT_ACCOUNT_SIZE", "100000")),
312 | description="Default account size for calculations (USD)",
313 | )
314 |
315 | @property
316 | def api_keys(self) -> dict[str, str | None]:
317 | """Get API keys as dictionary (placeholder for financial data APIs)."""
318 | return {}
319 |
320 | # Risk management
321 | max_position_size_conservative: float = Field(
322 | default_factory=lambda: float(
323 | os.getenv("MAX_POSITION_SIZE_CONSERVATIVE", "0.05")
324 | ),
325 | description="Maximum position size for conservative investors (5%)",
326 | )
327 | max_position_size_moderate: float = Field(
328 | default_factory=lambda: float(os.getenv("MAX_POSITION_SIZE_MODERATE", "0.10")),
329 | description="Maximum position size for moderate investors (10%)",
330 | )
331 | max_position_size_aggressive: float = Field(
332 | default_factory=lambda: float(
333 | os.getenv("MAX_POSITION_SIZE_AGGRESSIVE", "0.20")
334 | ),
335 | description="Maximum position size for aggressive investors (20%)",
336 | )
337 | max_position_size_day_trader: float = Field(
338 | default_factory=lambda: float(
339 | os.getenv("MAX_POSITION_SIZE_DAY_TRADER", "0.25")
340 | ),
341 | description="Maximum position size for day traders (25%)",
342 | )
343 |
344 | # Stop loss multipliers
345 | stop_loss_multiplier_conservative: float = Field(
346 | default_factory=lambda: float(
347 | os.getenv("STOP_LOSS_MULTIPLIER_CONSERVATIVE", "1.5")
348 | ),
349 | description="Stop loss multiplier for conservative investors",
350 | )
351 | stop_loss_multiplier_moderate: float = Field(
352 | default_factory=lambda: float(
353 | os.getenv("STOP_LOSS_MULTIPLIER_MODERATE", "1.2")
354 | ),
355 | description="Stop loss multiplier for moderate investors",
356 | )
357 | stop_loss_multiplier_aggressive: float = Field(
358 | default_factory=lambda: float(
359 | os.getenv("STOP_LOSS_MULTIPLIER_AGGRESSIVE", "1.0")
360 | ),
361 | description="Stop loss multiplier for aggressive investors",
362 | )
363 | stop_loss_multiplier_day_trader: float = Field(
364 | default_factory=lambda: float(
365 | os.getenv("STOP_LOSS_MULTIPLIER_DAY_TRADER", "0.8")
366 | ),
367 | description="Stop loss multiplier for day traders",
368 | )
369 |
370 | # Risk tolerance ranges (0-100 scale)
371 | risk_tolerance_conservative_min: int = Field(
372 | default_factory=lambda: int(os.getenv("RISK_TOLERANCE_CONSERVATIVE_MIN", "10")),
373 | description="Minimum risk tolerance for conservative investors",
374 | )
375 | risk_tolerance_conservative_max: int = Field(
376 | default_factory=lambda: int(os.getenv("RISK_TOLERANCE_CONSERVATIVE_MAX", "30")),
377 | description="Maximum risk tolerance for conservative investors",
378 | )
379 | risk_tolerance_moderate_min: int = Field(
380 | default_factory=lambda: int(os.getenv("RISK_TOLERANCE_MODERATE_MIN", "30")),
381 | description="Minimum risk tolerance for moderate investors",
382 | )
383 | risk_tolerance_moderate_max: int = Field(
384 | default_factory=lambda: int(os.getenv("RISK_TOLERANCE_MODERATE_MAX", "60")),
385 | description="Maximum risk tolerance for moderate investors",
386 | )
387 | risk_tolerance_aggressive_min: int = Field(
388 | default_factory=lambda: int(os.getenv("RISK_TOLERANCE_AGGRESSIVE_MIN", "60")),
389 | description="Minimum risk tolerance for aggressive investors",
390 | )
391 | risk_tolerance_aggressive_max: int = Field(
392 | default_factory=lambda: int(os.getenv("RISK_TOLERANCE_AGGRESSIVE_MAX", "90")),
393 | description="Maximum risk tolerance for aggressive investors",
394 | )
395 | risk_tolerance_day_trader_min: int = Field(
396 | default_factory=lambda: int(os.getenv("RISK_TOLERANCE_DAY_TRADER_MIN", "70")),
397 | description="Minimum risk tolerance for day traders",
398 | )
399 | risk_tolerance_day_trader_max: int = Field(
400 | default_factory=lambda: int(os.getenv("RISK_TOLERANCE_DAY_TRADER_MAX", "95")),
401 | description="Maximum risk tolerance for day traders",
402 | )
403 |
404 | # Technical analysis weights
405 | rsi_weight: float = Field(
406 | default_factory=lambda: float(os.getenv("TECHNICAL_RSI_WEIGHT", "2.0")),
407 | description="Weight for RSI in technical analysis scoring",
408 | )
409 | macd_weight: float = Field(
410 | default_factory=lambda: float(os.getenv("TECHNICAL_MACD_WEIGHT", "1.5")),
411 | description="Weight for MACD in technical analysis scoring",
412 | )
413 | momentum_weight: float = Field(
414 | default_factory=lambda: float(os.getenv("TECHNICAL_MOMENTUM_WEIGHT", "1.0")),
415 | description="Weight for momentum indicators in technical analysis scoring",
416 | )
417 | volume_weight: float = Field(
418 | default_factory=lambda: float(os.getenv("TECHNICAL_VOLUME_WEIGHT", "1.0")),
419 | description="Weight for volume indicators in technical analysis scoring",
420 | )
421 |
422 | # Trend identification thresholds
423 | uptrend_threshold: float = Field(
424 | default_factory=lambda: float(os.getenv("UPTREND_THRESHOLD", "1.2")),
425 | description="Threshold multiplier for identifying uptrends",
426 | )
427 | downtrend_threshold: float = Field(
428 | default_factory=lambda: float(os.getenv("DOWNTREND_THRESHOLD", "0.8")),
429 | description="Threshold multiplier for identifying downtrends",
430 | )
431 |
432 |
433 | class PerformanceConfig(BaseModel):
434 | """Performance settings for timeouts, retries, batch sizes, and cache TTLs."""
435 |
436 | # Timeout settings
437 | api_request_timeout: int = Field(
438 | default_factory=lambda: int(os.getenv("API_REQUEST_TIMEOUT", "120")),
439 | description="Default API request timeout in seconds",
440 | )
441 | yfinance_timeout: int = Field(
442 | default_factory=lambda: int(os.getenv("YFINANCE_TIMEOUT_SECONDS", "60")),
443 | description="yfinance API timeout in seconds",
444 | )
445 | database_timeout: int = Field(
446 | default_factory=lambda: int(os.getenv("DATABASE_TIMEOUT", "60")),
447 | description="Database operation timeout in seconds",
448 | )
449 |
450 | # Search provider timeouts
451 | search_timeout_base: int = Field(
452 | default_factory=lambda: int(os.getenv("SEARCH_TIMEOUT_BASE", "60")),
453 | description="Base search timeout in seconds for simple queries",
454 | )
455 | search_timeout_complex: int = Field(
456 | default_factory=lambda: int(os.getenv("SEARCH_TIMEOUT_COMPLEX", "120")),
457 | description="Search timeout in seconds for complex queries",
458 | )
459 | search_timeout_max: int = Field(
460 | default_factory=lambda: int(os.getenv("SEARCH_TIMEOUT_MAX", "180")),
461 | description="Maximum search timeout in seconds",
462 | )
463 |
464 | # Retry settings
465 | max_retry_attempts: int = Field(
466 | default_factory=lambda: int(os.getenv("MAX_RETRY_ATTEMPTS", "3")),
467 | description="Maximum number of retry attempts for failed operations",
468 | )
469 | retry_backoff_factor: float = Field(
470 | default_factory=lambda: float(os.getenv("RETRY_BACKOFF_FACTOR", "2.0")),
471 | description="Exponential backoff factor for retries",
472 | )
473 |
474 | # Batch processing
475 | default_batch_size: int = Field(
476 | default_factory=lambda: int(os.getenv("DEFAULT_BATCH_SIZE", "50")),
477 | description="Default batch size for processing operations",
478 | )
479 | max_batch_size: int = Field(
480 | default_factory=lambda: int(os.getenv("MAX_BATCH_SIZE", "1000")),
481 | description="Maximum batch size allowed",
482 | )
483 | parallel_screening_workers: int = Field(
484 | default_factory=lambda: int(os.getenv("PARALLEL_SCREENING_WORKERS", "4")),
485 | description="Number of worker processes for parallel screening",
486 | )
487 |
488 | # Cache settings
489 | cache_ttl_seconds: int = Field(
490 | default_factory=lambda: int(os.getenv("CACHE_TTL_SECONDS", "604800")), # 7 days
491 | description="Default cache TTL in seconds",
492 | )
493 | quick_cache_ttl: int = Field(
494 | default_factory=lambda: int(os.getenv("QUICK_CACHE_TTL", "300")), # 5 minutes
495 | description="Quick cache TTL for frequently accessed data",
496 | )
497 | agent_cache_ttl: int = Field(
498 | default_factory=lambda: int(os.getenv("AGENT_CACHE_TTL", "3600")), # 1 hour
499 | description="Agent state cache TTL in seconds",
500 | )
501 |
502 | # Rate limiting
503 | api_rate_limit_per_minute: int = Field(
504 | default_factory=lambda: int(os.getenv("API_RATE_LIMIT_PER_MINUTE", "60")),
505 | description="API rate limit requests per minute",
506 | )
507 | data_provider_rate_limit: int = Field(
508 | default_factory=lambda: int(os.getenv("DATA_PROVIDER_RATE_LIMIT", "5")),
509 | description="Data provider rate limit per minute",
510 | )
511 |
512 |
513 | class UIConfig(BaseModel):
514 | """UI and user experience configuration settings."""
515 |
516 | # Pagination defaults
517 | default_page_size: int = Field(
518 | default_factory=lambda: int(os.getenv("DEFAULT_PAGE_SIZE", "20")),
519 | description="Default number of items per page",
520 | )
521 | max_page_size: int = Field(
522 | default_factory=lambda: int(os.getenv("MAX_PAGE_SIZE", "100")),
523 | description="Maximum number of items per page",
524 | )
525 |
526 | # Data display limits
527 | max_stocks_per_screening: int = Field(
528 | default_factory=lambda: int(os.getenv("MAX_STOCKS_PER_SCREENING", "100")),
529 | description="Maximum number of stocks returned in screening results",
530 | )
531 | default_screening_limit: int = Field(
532 | default_factory=lambda: int(os.getenv("DEFAULT_SCREENING_LIMIT", "20")),
533 | description="Default number of stocks in screening results",
534 | )
535 | max_portfolio_stocks: int = Field(
536 | default_factory=lambda: int(os.getenv("MAX_PORTFOLIO_STOCKS", "30")),
537 | description="Maximum number of stocks in portfolio analysis",
538 | )
539 | default_portfolio_stocks: int = Field(
540 | default_factory=lambda: int(os.getenv("DEFAULT_PORTFOLIO_STOCKS", "10")),
541 | description="Default number of stocks in portfolio analysis",
542 | )
543 |
544 | # Historical data defaults
545 | default_history_days: int = Field(
546 | default_factory=lambda: int(os.getenv("DEFAULT_HISTORY_DAYS", "365")),
547 | description="Default number of days of historical data",
548 | )
549 | min_history_days: int = Field(
550 | default_factory=lambda: int(os.getenv("MIN_HISTORY_DAYS", "30")),
551 | description="Minimum number of days of historical data",
552 | )
553 | max_history_days: int = Field(
554 | default_factory=lambda: int(os.getenv("MAX_HISTORY_DAYS", "1825")), # 5 years
555 | description="Maximum number of days of historical data",
556 | )
557 |
558 | # Technical analysis periods
559 | default_rsi_period: int = Field(
560 | default_factory=lambda: int(os.getenv("DEFAULT_RSI_PERIOD", "14")),
561 | description="Default RSI calculation period",
562 | )
563 | default_sma_period: int = Field(
564 | default_factory=lambda: int(os.getenv("DEFAULT_SMA_PERIOD", "20")),
565 | description="Default SMA calculation period",
566 | )
567 | default_trend_period: int = Field(
568 | default_factory=lambda: int(os.getenv("DEFAULT_TREND_PERIOD", "50")),
569 | description="Default trend identification period",
570 | )
571 |
572 | # Symbol validation
573 | min_symbol_length: int = Field(
574 | default_factory=lambda: int(os.getenv("MIN_SYMBOL_LENGTH", "1")),
575 | description="Minimum stock symbol length",
576 | )
577 | max_symbol_length: int = Field(
578 | default_factory=lambda: int(os.getenv("MAX_SYMBOL_LENGTH", "10")),
579 | description="Maximum stock symbol length",
580 | )
581 |
582 |
583 | class ProviderConfig(BaseModel):
584 | """Data provider API limits and configuration settings."""
585 |
586 | # External data API limits
587 | external_data_requests_per_minute: int = Field(
588 | default_factory=lambda: int(
589 | os.getenv("EXTERNAL_DATA_REQUESTS_PER_MINUTE", "60")
590 | ),
591 | description="External data API requests per minute",
592 | )
593 | external_data_timeout: int = Field(
594 | default_factory=lambda: int(os.getenv("EXTERNAL_DATA_TIMEOUT", "120")),
595 | description="External data API timeout in seconds",
596 | )
597 |
598 | # Yahoo Finance limits
599 | yfinance_requests_per_minute: int = Field(
600 | default_factory=lambda: int(os.getenv("YFINANCE_REQUESTS_PER_MINUTE", "120")),
601 | description="Yahoo Finance requests per minute",
602 | )
603 | yfinance_max_symbols_per_request: int = Field(
604 | default_factory=lambda: int(
605 | os.getenv("YFINANCE_MAX_SYMBOLS_PER_REQUEST", "50")
606 | ),
607 | description="Maximum symbols per Yahoo Finance request",
608 | )
609 |
610 | # Finviz limits
611 | finviz_requests_per_minute: int = Field(
612 | default_factory=lambda: int(os.getenv("FINVIZ_REQUESTS_PER_MINUTE", "30")),
613 | description="Finviz requests per minute",
614 | )
615 | finviz_timeout: int = Field(
616 | default_factory=lambda: int(os.getenv("FINVIZ_TIMEOUT", "60")),
617 | description="Finviz timeout in seconds",
618 | )
619 |
620 | # News API limits
621 | news_api_requests_per_day: int = Field(
622 | default_factory=lambda: int(os.getenv("NEWS_API_REQUESTS_PER_DAY", "1000")),
623 | description="News API requests per day",
624 | )
625 | max_news_articles: int = Field(
626 | default_factory=lambda: int(os.getenv("MAX_NEWS_ARTICLES", "50")),
627 | description="Maximum news articles to fetch",
628 | )
629 | default_news_limit: int = Field(
630 | default_factory=lambda: int(os.getenv("DEFAULT_NEWS_LIMIT", "5")),
631 | description="Default number of news articles to return",
632 | )
633 |
634 | # Cache configuration per provider
635 | stock_data_cache_hours: int = Field(
636 | default_factory=lambda: int(os.getenv("STOCK_DATA_CACHE_HOURS", "4")),
637 | description="Stock data cache duration in hours",
638 | )
639 | market_data_cache_minutes: int = Field(
640 | default_factory=lambda: int(os.getenv("MARKET_DATA_CACHE_MINUTES", "15")),
641 | description="Market data cache duration in minutes",
642 | )
643 | news_cache_hours: int = Field(
644 | default_factory=lambda: int(os.getenv("NEWS_CACHE_HOURS", "2")),
645 | description="News data cache duration in hours",
646 | )
647 |
648 |
649 | class AgentConfig(BaseModel):
650 | """Agent and AI workflow configuration settings."""
651 |
652 | # Cache settings
653 | agent_cache_ttl_seconds: int = Field(
654 | default_factory=lambda: int(os.getenv("AGENT_CACHE_TTL_SECONDS", "300")),
655 | description="Agent cache TTL in seconds (5 minutes default)",
656 | )
657 | conversation_cache_ttl_hours: int = Field(
658 | default_factory=lambda: int(os.getenv("CONVERSATION_CACHE_TTL_HOURS", "1")),
659 | description="Conversation cache TTL in hours",
660 | )
661 |
662 | # Circuit breaker settings
663 | circuit_breaker_failure_threshold: int = Field(
664 | default_factory=lambda: int(
665 | os.getenv("CIRCUIT_BREAKER_FAILURE_THRESHOLD", "5")
666 | ),
667 | description="Number of failures before opening circuit",
668 | )
669 | circuit_breaker_recovery_timeout: int = Field(
670 | default_factory=lambda: int(
671 | os.getenv("CIRCUIT_BREAKER_RECOVERY_TIMEOUT", "60")
672 | ),
673 | description="Seconds to wait before testing recovery",
674 | )
675 |
676 | # Search-specific circuit breaker settings (more tolerant)
677 | search_circuit_breaker_failure_threshold: int = Field(
678 | default_factory=lambda: int(
679 | os.getenv("SEARCH_CIRCUIT_BREAKER_FAILURE_THRESHOLD", "8")
680 | ),
681 | description="Number of failures before opening search circuit (more tolerant)",
682 | )
683 | search_circuit_breaker_recovery_timeout: int = Field(
684 | default_factory=lambda: int(
685 | os.getenv("SEARCH_CIRCUIT_BREAKER_RECOVERY_TIMEOUT", "30")
686 | ),
687 | description="Seconds to wait before testing search recovery (faster recovery)",
688 | )
689 | search_timeout_failure_threshold: int = Field(
690 | default_factory=lambda: int(
691 | os.getenv("SEARCH_TIMEOUT_FAILURE_THRESHOLD", "12")
692 | ),
693 | description="Number of timeout failures before disabling search provider",
694 | )
695 |
696 | # Market data limits for sentiment analysis
697 | sentiment_news_limit: int = Field(
698 | default_factory=lambda: int(os.getenv("SENTIMENT_NEWS_LIMIT", "50")),
699 | description="Maximum news articles for sentiment analysis",
700 | )
701 | market_movers_gainers_limit: int = Field(
702 | default_factory=lambda: int(os.getenv("MARKET_MOVERS_GAINERS_LIMIT", "50")),
703 | description="Maximum gainers to fetch for market analysis",
704 | )
705 | market_movers_losers_limit: int = Field(
706 | default_factory=lambda: int(os.getenv("MARKET_MOVERS_LOSERS_LIMIT", "50")),
707 | description="Maximum losers to fetch for market analysis",
708 | )
709 | market_movers_active_limit: int = Field(
710 | default_factory=lambda: int(os.getenv("MARKET_MOVERS_ACTIVE_LIMIT", "20")),
711 | description="Maximum most active stocks to fetch",
712 | )
713 |
714 | # Screening limits
715 | screening_limit_default: int = Field(
716 | default_factory=lambda: int(os.getenv("SCREENING_LIMIT_DEFAULT", "20")),
717 | description="Default limit for screening results",
718 | )
719 | screening_limit_max: int = Field(
720 | default_factory=lambda: int(os.getenv("SCREENING_LIMIT_MAX", "100")),
721 | description="Maximum limit for screening results",
722 | )
723 | screening_min_volume_default: int = Field(
724 | default_factory=lambda: int(
725 | os.getenv("SCREENING_MIN_VOLUME_DEFAULT", "1000000")
726 | ),
727 | description="Default minimum volume filter for screening",
728 | )
729 |
730 |
731 | class DatabaseConfig(BaseModel):
732 | """Database connection and pooling configuration settings."""
733 |
734 | # Connection pool settings
735 | pool_size: int = Field(
736 | default_factory=lambda: int(os.getenv("DB_POOL_SIZE", "20")),
737 | description="Database connection pool size",
738 | )
739 | pool_max_overflow: int = Field(
740 | default_factory=lambda: int(os.getenv("DB_POOL_MAX_OVERFLOW", "10")),
741 | description="Maximum overflow connections above pool size",
742 | )
743 | pool_timeout: int = Field(
744 | default_factory=lambda: int(os.getenv("DB_POOL_TIMEOUT", "30")),
745 | description="Pool connection timeout in seconds",
746 | )
747 | statement_timeout: int = Field(
748 | default_factory=lambda: int(os.getenv("DB_STATEMENT_TIMEOUT", "30000")),
749 | description="Database statement timeout in milliseconds",
750 | )
751 |
752 | # Redis connection settings
753 | redis_max_connections: int = Field(
754 | default_factory=lambda: int(os.getenv("REDIS_MAX_CONNECTIONS", "50")),
755 | description="Maximum Redis connections in pool",
756 | )
757 | redis_socket_timeout: int = Field(
758 | default_factory=lambda: int(os.getenv("REDIS_SOCKET_TIMEOUT", "5")),
759 | description="Redis socket timeout in seconds",
760 | )
761 | redis_socket_connect_timeout: int = Field(
762 | default_factory=lambda: int(os.getenv("REDIS_SOCKET_CONNECT_TIMEOUT", "5")),
763 | description="Redis socket connection timeout in seconds",
764 | )
765 | redis_retry_on_timeout: bool = Field(
766 | default_factory=lambda: os.getenv("REDIS_RETRY_ON_TIMEOUT", "true").lower()
767 | == "true",
768 | description="Retry Redis operations on timeout",
769 | )
770 |
771 |
772 | class MiddlewareConfig(BaseModel):
773 | """Middleware and request handling configuration settings."""
774 |
775 | # Rate limiting
776 | api_rate_limit_per_minute: int = Field(
777 | default_factory=lambda: int(os.getenv("API_RATE_LIMIT_PER_MINUTE", "60")),
778 | description="API rate limit per minute",
779 | )
780 |
781 | # Security headers
782 | security_header_max_age: int = Field(
783 | default_factory=lambda: int(os.getenv("SECURITY_HEADER_MAX_AGE", "86400")),
784 | description="Security header max age in seconds (24 hours default)",
785 | )
786 |
787 | # Request handling
788 | sse_queue_timeout: int = Field(
789 | default_factory=lambda: int(os.getenv("SSE_QUEUE_TIMEOUT", "30")),
790 | description="SSE message queue timeout in seconds",
791 | )
792 | api_request_timeout_default: int = Field(
793 | default_factory=lambda: int(os.getenv("API_REQUEST_TIMEOUT_DEFAULT", "10")),
794 | description="Default API request timeout in seconds",
795 | )
796 |
797 | # Thread pool settings
798 | thread_pool_max_workers: int = Field(
799 | default_factory=lambda: int(os.getenv("THREAD_POOL_MAX_WORKERS", "10")),
800 | description="Maximum workers in thread pool executor",
801 | )
802 |
803 |
804 | class ValidationConfig(BaseModel):
805 | """Input validation configuration settings."""
806 |
807 | # String length constraints
808 | min_symbol_length: int = Field(
809 | default_factory=lambda: int(os.getenv("MIN_SYMBOL_LENGTH", "1")),
810 | description="Minimum stock symbol length",
811 | )
812 | max_symbol_length: int = Field(
813 | default_factory=lambda: int(os.getenv("MAX_SYMBOL_LENGTH", "10")),
814 | description="Maximum stock symbol length",
815 | )
816 | min_portfolio_name_length: int = Field(
817 | default_factory=lambda: int(os.getenv("MIN_PORTFOLIO_NAME_LENGTH", "2")),
818 | description="Minimum portfolio name length",
819 | )
820 | max_portfolio_name_length: int = Field(
821 | default_factory=lambda: int(os.getenv("MAX_PORTFOLIO_NAME_LENGTH", "20")),
822 | description="Maximum portfolio name length",
823 | )
824 | min_screening_name_length: int = Field(
825 | default_factory=lambda: int(os.getenv("MIN_SCREENING_NAME_LENGTH", "2")),
826 | description="Minimum screening strategy name length",
827 | )
828 | max_screening_name_length: int = Field(
829 | default_factory=lambda: int(os.getenv("MAX_SCREENING_NAME_LENGTH", "30")),
830 | description="Maximum screening strategy name length",
831 | )
832 |
833 | # General text validation
834 | min_text_field_length: int = Field(
835 | default_factory=lambda: int(os.getenv("MIN_TEXT_FIELD_LENGTH", "1")),
836 | description="Minimum length for general text fields",
837 | )
838 | max_text_field_length: int = Field(
839 | default_factory=lambda: int(os.getenv("MAX_TEXT_FIELD_LENGTH", "100")),
840 | description="Maximum length for general text fields",
841 | )
842 | max_description_length: int = Field(
843 | default_factory=lambda: int(os.getenv("MAX_DESCRIPTION_LENGTH", "500")),
844 | description="Maximum length for description fields",
845 | )
846 |
847 |
848 | class Settings(BaseModel):
849 | """Main application settings."""
850 |
851 | app_name: str = Field(default="MaverickMCP", description="Application name")
852 | environment: str = Field(
853 | default_factory=lambda: os.getenv("ENVIRONMENT", "development"),
854 | description="Environment (development, production)",
855 | )
856 | api: APISettings = Field(default_factory=APISettings, description="API settings")
857 | database: DatabaseSettings = Field(
858 | default_factory=DatabaseSettings, description="Database settings"
859 | )
860 | data_provider: DataProviderSettings = Field(
861 | default_factory=DataProviderSettings, description="Data provider settings"
862 | )
863 | redis: RedisSettings = Field(
864 | default_factory=RedisSettings, description="Redis settings"
865 | )
866 | external_data: ExternalDataSettings = Field(
867 | default_factory=ExternalDataSettings,
868 | description="External data API settings",
869 | )
870 | email: EmailSettings = Field(
871 | default_factory=EmailSettings, description="Email service configuration"
872 | )
873 | financial: FinancialConfig = Field(
874 | default_factory=FinancialConfig, description="Financial settings"
875 | )
876 | research: ResearchSettings = Field(
877 | default_factory=ResearchSettings, description="Research settings"
878 | )
879 | data_limits: DataLimitsConfig = Field(
880 | default_factory=DataLimitsConfig, description="Data limits settings"
881 | )
882 | agent: AgentConfig = Field(
883 | default_factory=AgentConfig, description="Agent settings"
884 | )
885 | validation: ValidationConfig = Field(
886 | default_factory=FinancialConfig, description="Financial calculation settings"
887 | )
888 | performance: PerformanceConfig = Field(
889 | default_factory=PerformanceConfig, description="Performance settings"
890 | )
891 | ui: UIConfig = Field(default_factory=UIConfig, description="UI configuration")
892 | provider: ProviderConfig = Field(
893 | default_factory=ProviderConfig, description="Provider configuration"
894 | )
895 | agent: AgentConfig = Field(
896 | default_factory=AgentConfig, description="Agent configuration"
897 | )
898 | db: DatabaseConfig = Field(
899 | default_factory=DatabaseConfig, description="Database connection settings"
900 | )
901 | middleware: MiddlewareConfig = Field(
902 | default_factory=MiddlewareConfig, description="Middleware settings"
903 | )
904 | validation: ValidationConfig = Field(
905 | default_factory=ValidationConfig, description="Validation settings"
906 | )
907 |
908 |
909 | def load_settings_from_environment() -> Settings:
910 | """
911 | Load settings from environment variables.
912 |
913 | Environment variables should be prefixed with MAVERICK_MCP_,
914 | e.g., MAVERICK_MCP_API__PORT=8000
915 |
916 | Returns:
917 | Settings object with values loaded from environment
918 | """
919 | return Settings()
920 |
921 |
922 | def get_settings() -> Settings:
923 | """
924 | Get application settings.
925 |
926 | This function loads settings from environment variables and
927 | any custom overrides specified in the constants.
928 |
929 | Returns:
930 | Settings object with all configured values
931 | """
932 | settings = load_settings_from_environment()
933 |
934 | # Apply any overrides from constants
935 | if hasattr(CONFIG, "SETTINGS"):
936 | # This would update settings with values from CONFIG.SETTINGS
937 | pass
938 |
939 | # Override with environment-specific settings if needed
940 | if settings.environment == "production":
941 | # Apply production-specific settings
942 | # e.g., disable debug mode, set higher rate limits, etc.
943 | settings.api.debug = False
944 | settings.api.log_level = "warning"
945 | settings.data_provider.rate_limit = 20
946 |
947 | return settings
948 |
949 |
950 | # Create a singleton instance of settings
951 | settings = get_settings()
952 |
```
--------------------------------------------------------------------------------
/tests/integration/test_chaos_engineering.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Chaos Engineering Tests for Resilience Testing.
3 |
4 | This test suite covers:
5 | - API failures and recovery mechanisms
6 | - Database connection drops and reconnection
7 | - Cache failures and fallback behavior
8 | - Circuit breaker behavior under load
9 | - Network timeouts and retries
10 | - Memory pressure scenarios
11 | - CPU overload situations
12 | - External service outages
13 | """
14 |
15 | import asyncio
16 | import logging
17 | import random
18 | import threading
19 | import time
20 | from contextlib import ExitStack, contextmanager
21 | from unittest.mock import MagicMock, Mock, patch
22 |
23 | import numpy as np
24 | import pandas as pd
25 | import pytest
26 |
27 | from maverick_mcp.backtesting import VectorBTEngine
28 | from maverick_mcp.backtesting.persistence import BacktestPersistenceManager
29 | from maverick_mcp.backtesting.strategies import STRATEGY_TEMPLATES
30 |
31 | logger = logging.getLogger(__name__)
32 |
33 |
34 | class ChaosInjector:
35 | """Utility class for injecting various types of failures."""
36 |
37 | @staticmethod
38 | @contextmanager
39 | def api_failure_injection(failure_rate: float = 0.3):
40 | """Inject API failures at specified rate."""
41 | original_get_stock_data = None
42 |
43 | def failing_get_stock_data(*args, **kwargs):
44 | if random.random() < failure_rate:
45 | if random.random() < 0.5:
46 | raise ConnectionError("Simulated API connection failure")
47 | else:
48 | raise TimeoutError("Simulated API timeout")
49 | return (
50 | original_get_stock_data(*args, **kwargs)
51 | if original_get_stock_data
52 | else Mock()
53 | )
54 |
55 | try:
56 | # Store original method and replace with failing version
57 | with patch.object(
58 | VectorBTEngine,
59 | "get_historical_data",
60 | side_effect=failing_get_stock_data,
61 | ):
62 | yield
63 | finally:
64 | pass
65 |
66 | @staticmethod
67 | @contextmanager
68 | def database_failure_injection(failure_rate: float = 0.2):
69 | """Inject database failures at specified rate."""
70 |
71 | def failing_db_operation(*args, **kwargs):
72 | if random.random() < failure_rate:
73 | if random.random() < 0.33:
74 | raise ConnectionError("Database connection lost")
75 | elif random.random() < 0.66:
76 | raise Exception("Database query timeout")
77 | else:
78 | raise Exception("Database lock timeout")
79 | return MagicMock() # Return mock successful result
80 |
81 | try:
82 | with patch.object(
83 | BacktestPersistenceManager,
84 | "save_backtest_result",
85 | side_effect=failing_db_operation,
86 | ):
87 | yield
88 | finally:
89 | pass
90 |
91 | @staticmethod
92 | @contextmanager
93 | def memory_pressure_injection(pressure_mb: int = 500):
94 | """Inject memory pressure by allocating large arrays."""
95 | pressure_arrays = []
96 | try:
97 | # Create memory pressure
98 | for _ in range(pressure_mb // 10):
99 | arr = np.random.random((1280, 1000)) # ~10MB each
100 | pressure_arrays.append(arr)
101 | yield
102 | finally:
103 | # Clean up memory pressure
104 | del pressure_arrays
105 |
106 | @staticmethod
107 | @contextmanager
108 | def cpu_load_injection(load_intensity: float = 0.8, duration: float = 5.0):
109 | """Inject CPU load using background threads."""
110 | stop_event = threading.Event()
111 | load_threads = []
112 |
113 | def cpu_intensive_task():
114 | """CPU-intensive task for load injection."""
115 | while not stop_event.is_set():
116 | # Perform CPU-intensive computation
117 | for _ in range(int(100000 * load_intensity)):
118 | _ = sum(i**2 for i in range(100))
119 | time.sleep(0.01) # Brief pause
120 |
121 | try:
122 | # Start CPU load threads
123 | num_threads = max(1, int(4 * load_intensity)) # Scale with intensity
124 | for _ in range(num_threads):
125 | thread = threading.Thread(target=cpu_intensive_task)
126 | thread.daemon = True
127 | thread.start()
128 | load_threads.append(thread)
129 |
130 | yield
131 | finally:
132 | # Stop CPU load
133 | stop_event.set()
134 | for thread in load_threads:
135 | thread.join(timeout=1.0)
136 |
137 | @staticmethod
138 | @contextmanager
139 | def network_instability_injection(
140 | delay_range: tuple = (0.1, 2.0), timeout_rate: float = 0.1
141 | ):
142 | """Inject network instability with delays and timeouts."""
143 |
144 | async def unstable_network_call(original_func, *args, **kwargs):
145 | # Random delay
146 | delay = random.uniform(*delay_range)
147 | await asyncio.sleep(delay)
148 |
149 | # Random timeout
150 | if random.random() < timeout_rate:
151 | raise TimeoutError("Simulated network timeout")
152 |
153 | return await original_func(*args, **kwargs)
154 |
155 | # This is a simplified version - real implementation would patch actual network calls
156 | yield
157 |
158 |
159 | class TestChaosEngineering:
160 | """Chaos engineering tests for system resilience."""
161 |
162 | @pytest.fixture
163 | async def resilient_data_provider(self):
164 | """Create data provider with built-in resilience patterns."""
165 | provider = Mock()
166 |
167 | async def resilient_get_data(symbol: str, *args, **kwargs):
168 | """Data provider with retry logic and fallback."""
169 | max_retries = 3
170 | retry_delay = 0.1
171 |
172 | for attempt in range(max_retries):
173 | try:
174 | # Simulate data generation (can fail randomly)
175 | if random.random() < 0.1: # 10% failure rate
176 | raise ConnectionError(f"API failure for {symbol}")
177 |
178 | # Generate mock data
179 | dates = pd.date_range(
180 | start="2023-01-01", end="2023-12-31", freq="D"
181 | )
182 | returns = np.random.normal(0.0008, 0.02, len(dates))
183 | prices = 100 * np.cumprod(1 + returns)
184 |
185 | return pd.DataFrame(
186 | {
187 | "Open": prices * np.random.uniform(0.99, 1.01, len(dates)),
188 | "High": prices * np.random.uniform(1.00, 1.03, len(dates)),
189 | "Low": prices * np.random.uniform(0.97, 1.00, len(dates)),
190 | "Close": prices,
191 | "Volume": np.random.randint(100000, 5000000, len(dates)),
192 | "Adj Close": prices,
193 | },
194 | index=dates,
195 | )
196 |
197 | except Exception:
198 | if attempt == max_retries - 1:
199 | # Final attempt failed, return minimal fallback data
200 | logger.warning(
201 | f"All retries failed for {symbol}, using fallback data"
202 | )
203 | dates = pd.date_range(start="2023-01-01", periods=10, freq="D")
204 | prices = np.full(len(dates), 100.0)
205 | return pd.DataFrame(
206 | {
207 | "Open": prices,
208 | "High": prices * 1.01,
209 | "Low": prices * 0.99,
210 | "Close": prices,
211 | "Volume": np.full(len(dates), 1000000),
212 | "Adj Close": prices,
213 | },
214 | index=dates,
215 | )
216 |
217 | await asyncio.sleep(retry_delay)
218 | retry_delay *= 2 # Exponential backoff
219 |
220 | provider.get_stock_data.side_effect = resilient_get_data
221 | return provider
222 |
223 | async def test_api_failures_and_recovery(
224 | self, resilient_data_provider, benchmark_timer
225 | ):
226 | """Test API failure scenarios and recovery mechanisms."""
227 | symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
228 | strategy = "sma_cross"
229 | parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
230 |
231 | # Test with different failure rates
232 | failure_scenarios = [
233 | {"name": "low_failure", "rate": 0.1},
234 | {"name": "moderate_failure", "rate": 0.3},
235 | {"name": "high_failure", "rate": 0.6},
236 | ]
237 |
238 | scenario_results = {}
239 |
240 | for scenario in failure_scenarios:
241 | with ChaosInjector.api_failure_injection(failure_rate=scenario["rate"]):
242 | with benchmark_timer() as timer:
243 | results = []
244 | failures = []
245 |
246 | engine = VectorBTEngine(data_provider=resilient_data_provider)
247 |
248 | for symbol in symbols:
249 | try:
250 | result = await engine.run_backtest(
251 | symbol=symbol,
252 | strategy_type=strategy,
253 | parameters=parameters,
254 | start_date="2023-01-01",
255 | end_date="2023-12-31",
256 | )
257 | results.append(result)
258 | logger.info(
259 | f"✓ {symbol} succeeded under {scenario['name']} conditions"
260 | )
261 |
262 | except Exception as e:
263 | failures.append({"symbol": symbol, "error": str(e)})
264 | logger.error(
265 | f"✗ {symbol} failed under {scenario['name']} conditions: {e}"
266 | )
267 |
268 | execution_time = timer.elapsed
269 | success_rate = len(results) / len(symbols)
270 | recovery_rate = 1 - (
271 | scenario["rate"] * (1 - success_rate)
272 | ) # Account for injected failures
273 |
274 | scenario_results[scenario["name"]] = {
275 | "failure_rate_injected": scenario["rate"],
276 | "success_rate_achieved": success_rate,
277 | "recovery_effectiveness": recovery_rate,
278 | "execution_time": execution_time,
279 | "successful_backtests": len(results),
280 | "failed_backtests": len(failures),
281 | }
282 |
283 | logger.info(
284 | f"{scenario['name'].upper()} Failure Scenario:\n"
285 | f" • Injected Failure Rate: {scenario['rate']:.1%}\n"
286 | f" • Achieved Success Rate: {success_rate:.1%}\n"
287 | f" • Recovery Effectiveness: {recovery_rate:.1%}\n"
288 | f" • Execution Time: {execution_time:.2f}s"
289 | )
290 |
291 | # Assert minimum recovery effectiveness
292 | assert success_rate >= 0.5, (
293 | f"Success rate too low for {scenario['name']}: {success_rate:.1%}"
294 | )
295 |
296 | return scenario_results
297 |
298 | async def test_database_connection_drops(
299 | self, resilient_data_provider, db_session, benchmark_timer
300 | ):
301 | """Test database connection drops and reconnection logic."""
302 | symbols = ["AAPL", "GOOGL", "MSFT"]
303 | strategy = "rsi"
304 | parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
305 |
306 | engine = VectorBTEngine(data_provider=resilient_data_provider)
307 |
308 | # Generate backtest results first
309 | backtest_results = []
310 | for symbol in symbols:
311 | result = await engine.run_backtest(
312 | symbol=symbol,
313 | strategy_type=strategy,
314 | parameters=parameters,
315 | start_date="2023-01-01",
316 | end_date="2023-12-31",
317 | )
318 | backtest_results.append(result)
319 |
320 | # Test database operations under chaos
321 | with ChaosInjector.database_failure_injection(failure_rate=0.3):
322 | with benchmark_timer() as timer:
323 | persistence_results = []
324 | persistence_failures = []
325 |
326 | # Attempt to save results with intermittent database failures
327 | for result in backtest_results:
328 | retry_count = 0
329 | max_retries = 3
330 |
331 | while retry_count < max_retries:
332 | try:
333 | with BacktestPersistenceManager(
334 | session=db_session
335 | ) as persistence:
336 | backtest_id = persistence.save_backtest_result(
337 | vectorbt_results=result,
338 | execution_time=2.0,
339 | notes=f"Chaos test - {result['symbol']}",
340 | )
341 | persistence_results.append(
342 | {
343 | "symbol": result["symbol"],
344 | "backtest_id": backtest_id,
345 | "retry_count": retry_count,
346 | }
347 | )
348 | break # Success, break retry loop
349 |
350 | except Exception as e:
351 | retry_count += 1
352 | if retry_count >= max_retries:
353 | persistence_failures.append(
354 | {
355 | "symbol": result["symbol"],
356 | "error": str(e),
357 | "retry_count": retry_count,
358 | }
359 | )
360 | else:
361 | await asyncio.sleep(
362 | 0.1 * retry_count
363 | ) # Exponential backoff
364 |
365 | persistence_time = timer.elapsed
366 |
367 | # Analyze results
368 | persistence_success_rate = len(persistence_results) / len(backtest_results)
369 | avg_retries = (
370 | np.mean([r["retry_count"] for r in persistence_results])
371 | if persistence_results
372 | else 0
373 | )
374 |
375 | # Test recovery by attempting to retrieve saved data
376 | retrieval_successes = 0
377 | if persistence_results:
378 | for saved_result in persistence_results:
379 | try:
380 | with BacktestPersistenceManager(session=db_session) as persistence:
381 | retrieved = persistence.get_backtest_by_id(
382 | saved_result["backtest_id"]
383 | )
384 | if retrieved:
385 | retrieval_successes += 1
386 | except Exception as e:
387 | logger.error(f"Retrieval failed for {saved_result['symbol']}: {e}")
388 |
389 | retrieval_success_rate = (
390 | retrieval_successes / len(persistence_results) if persistence_results else 0
391 | )
392 |
393 | logger.info(
394 | f"Database Connection Drops Test Results:\n"
395 | f" • Backtest Results: {len(backtest_results)}\n"
396 | f" • Persistence Successes: {len(persistence_results)}\n"
397 | f" • Persistence Failures: {len(persistence_failures)}\n"
398 | f" • Persistence Success Rate: {persistence_success_rate:.1%}\n"
399 | f" • Average Retries: {avg_retries:.1f}\n"
400 | f" • Retrieval Success Rate: {retrieval_success_rate:.1%}\n"
401 | f" • Total Time: {persistence_time:.2f}s"
402 | )
403 |
404 | # Assert resilience requirements
405 | assert persistence_success_rate >= 0.7, (
406 | f"Persistence success rate too low: {persistence_success_rate:.1%}"
407 | )
408 | assert retrieval_success_rate >= 0.9, (
409 | f"Retrieval success rate too low: {retrieval_success_rate:.1%}"
410 | )
411 |
412 | return {
413 | "persistence_success_rate": persistence_success_rate,
414 | "retrieval_success_rate": retrieval_success_rate,
415 | "avg_retries": avg_retries,
416 | }
417 |
418 | async def test_cache_failures_and_fallback(
419 | self, resilient_data_provider, benchmark_timer
420 | ):
421 | """Test cache failures and fallback behavior."""
422 | symbols = ["CACHE_TEST_1", "CACHE_TEST_2", "CACHE_TEST_3"]
423 | strategy = "macd"
424 | parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
425 |
426 | engine = VectorBTEngine(data_provider=resilient_data_provider)
427 |
428 | # Test cache behavior under failures
429 | cache_scenarios = [
430 | {"name": "normal_cache", "inject_failure": False},
431 | {"name": "cache_failures", "inject_failure": True},
432 | ]
433 |
434 | scenario_results = {}
435 |
436 | for scenario in cache_scenarios:
437 | if scenario["inject_failure"]:
438 | # Mock cache to randomly fail
439 | def failing_cache_get(key):
440 | if random.random() < 0.4: # 40% cache failure rate
441 | raise ConnectionError("Cache connection failed")
442 | return None # Cache miss
443 |
444 | def failing_cache_set(key, value, ttl=None):
445 | if random.random() < 0.3: # 30% cache set failure rate
446 | raise ConnectionError("Cache set operation failed")
447 | return True
448 |
449 | cache_patches = [
450 | patch(
451 | "maverick_mcp.core.cache.CacheManager.get",
452 | side_effect=failing_cache_get,
453 | ),
454 | patch(
455 | "maverick_mcp.core.cache.CacheManager.set",
456 | side_effect=failing_cache_set,
457 | ),
458 | ]
459 | else:
460 | cache_patches = []
461 |
462 | with benchmark_timer() as timer:
463 | results = []
464 | cache_errors = []
465 |
466 | # Apply cache patches if needed
467 | with ExitStack() as stack:
468 | for patch_context in cache_patches:
469 | stack.enter_context(patch_context)
470 |
471 | # Run backtests - should fallback gracefully on cache failures
472 | for symbol in symbols:
473 | try:
474 | result = await engine.run_backtest(
475 | symbol=symbol,
476 | strategy_type=strategy,
477 | parameters=parameters,
478 | start_date="2023-01-01",
479 | end_date="2023-12-31",
480 | )
481 | results.append(result)
482 |
483 | except Exception as e:
484 | cache_errors.append({"symbol": symbol, "error": str(e)})
485 | logger.error(
486 | f"Backtest failed for {symbol} under {scenario['name']}: {e}"
487 | )
488 |
489 | execution_time = timer.elapsed
490 | success_rate = len(results) / len(symbols)
491 |
492 | scenario_results[scenario["name"]] = {
493 | "execution_time": execution_time,
494 | "success_rate": success_rate,
495 | "cache_errors": len(cache_errors),
496 | }
497 |
498 | logger.info(
499 | f"{scenario['name'].upper()} Cache Scenario:\n"
500 | f" • Execution Time: {execution_time:.2f}s\n"
501 | f" • Success Rate: {success_rate:.1%}\n"
502 | f" • Cache Errors: {len(cache_errors)}"
503 | )
504 |
505 | # Cache failures should not prevent backtests from completing
506 | assert success_rate >= 0.8, (
507 | f"Success rate too low with cache issues: {success_rate:.1%}"
508 | )
509 |
510 | # Cache failures might slightly increase execution time but shouldn't break functionality
511 | time_ratio = (
512 | scenario_results["cache_failures"]["execution_time"]
513 | / scenario_results["normal_cache"]["execution_time"]
514 | )
515 | assert time_ratio < 3.0, (
516 | f"Cache failure time penalty too high: {time_ratio:.1f}x"
517 | )
518 |
519 | return scenario_results
520 |
521 | async def test_circuit_breaker_behavior(
522 | self, resilient_data_provider, benchmark_timer
523 | ):
524 | """Test circuit breaker behavior under load and failures."""
525 | symbols = ["CB_TEST_1", "CB_TEST_2", "CB_TEST_3", "CB_TEST_4", "CB_TEST_5"]
526 | strategy = "sma_cross"
527 | parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
528 |
529 | # Mock circuit breaker states
530 | circuit_breaker_state = {"failures": 0, "state": "CLOSED", "last_failure": 0}
531 | failure_threshold = 3
532 | recovery_timeout = 2.0
533 |
534 | def circuit_breaker_wrapper(func):
535 | """Simple circuit breaker implementation."""
536 |
537 | async def wrapper(*args, **kwargs):
538 | current_time = time.time()
539 |
540 | # Check if circuit should reset
541 | if (
542 | circuit_breaker_state["state"] == "OPEN"
543 | and current_time - circuit_breaker_state["last_failure"]
544 | > recovery_timeout
545 | ):
546 | circuit_breaker_state["state"] = "HALF_OPEN"
547 | logger.info("Circuit breaker moved to HALF_OPEN state")
548 |
549 | # Circuit is open, reject immediately
550 | if circuit_breaker_state["state"] == "OPEN":
551 | raise Exception("Circuit breaker is OPEN")
552 |
553 | try:
554 | # Inject failures for testing
555 | if random.random() < 0.4: # 40% failure rate
556 | raise ConnectionError("Simulated service failure")
557 |
558 | result = await func(*args, **kwargs)
559 |
560 | # Success - reset failure count if in HALF_OPEN state
561 | if circuit_breaker_state["state"] == "HALF_OPEN":
562 | circuit_breaker_state["state"] = "CLOSED"
563 | circuit_breaker_state["failures"] = 0
564 | logger.info("Circuit breaker CLOSED after successful recovery")
565 |
566 | return result
567 |
568 | except Exception as e:
569 | circuit_breaker_state["failures"] += 1
570 | circuit_breaker_state["last_failure"] = current_time
571 |
572 | if circuit_breaker_state["failures"] >= failure_threshold:
573 | circuit_breaker_state["state"] = "OPEN"
574 | logger.warning(
575 | f"Circuit breaker OPENED after {circuit_breaker_state['failures']} failures"
576 | )
577 |
578 | raise e
579 |
580 | return wrapper
581 |
582 | # Apply circuit breaker to engine operations
583 | engine = VectorBTEngine(data_provider=resilient_data_provider)
584 |
585 | with benchmark_timer() as timer:
586 | results = []
587 | circuit_breaker_trips = 0
588 | recovery_attempts = 0
589 |
590 | for _i, symbol in enumerate(symbols):
591 | try:
592 | # Simulate circuit breaker behavior
593 | current_symbol = symbol
594 |
595 | @circuit_breaker_wrapper
596 | async def protected_backtest(symbol_to_use=current_symbol):
597 | return await engine.run_backtest(
598 | symbol=symbol_to_use,
599 | strategy_type=strategy,
600 | parameters=parameters,
601 | start_date="2023-01-01",
602 | end_date="2023-12-31",
603 | )
604 |
605 | result = await protected_backtest()
606 | results.append(result)
607 | logger.info(
608 | f"✓ {symbol} succeeded (CB state: {circuit_breaker_state['state']})"
609 | )
610 |
611 | except Exception as e:
612 | if "Circuit breaker is OPEN" in str(e):
613 | circuit_breaker_trips += 1
614 | logger.warning(f"⚡ {symbol} blocked by circuit breaker")
615 |
616 | # Wait for potential recovery
617 | await asyncio.sleep(recovery_timeout + 0.1)
618 | recovery_attempts += 1
619 |
620 | # Try once more after recovery timeout
621 | try:
622 | recovery_symbol = symbol
623 |
624 | @circuit_breaker_wrapper
625 | async def recovery_backtest(symbol_to_use=recovery_symbol):
626 | return await engine.run_backtest(
627 | symbol=symbol_to_use,
628 | strategy_type=strategy,
629 | parameters=parameters,
630 | start_date="2023-01-01",
631 | end_date="2023-12-31",
632 | )
633 |
634 | result = await recovery_backtest()
635 | results.append(result)
636 | logger.info(
637 | f"✓ {symbol} succeeded after circuit breaker recovery"
638 | )
639 |
640 | except Exception as recovery_error:
641 | logger.error(
642 | f"✗ {symbol} failed even after recovery: {recovery_error}"
643 | )
644 | else:
645 | logger.error(f"✗ {symbol} failed: {e}")
646 |
647 | execution_time = timer.elapsed
648 | success_rate = len(results) / len(symbols)
649 | circuit_breaker_effectiveness = (
650 | circuit_breaker_trips > 0
651 | ) # Circuit breaker activated
652 |
653 | logger.info(
654 | f"Circuit Breaker Behavior Test Results:\n"
655 | f" • Symbols Tested: {len(symbols)}\n"
656 | f" • Successful Results: {len(results)}\n"
657 | f" • Success Rate: {success_rate:.1%}\n"
658 | f" • Circuit Breaker Trips: {circuit_breaker_trips}\n"
659 | f" • Recovery Attempts: {recovery_attempts}\n"
660 | f" • Circuit Breaker Effectiveness: {circuit_breaker_effectiveness}\n"
661 | f" • Final CB State: {circuit_breaker_state['state']}\n"
662 | f" • Execution Time: {execution_time:.2f}s"
663 | )
664 |
665 | # Circuit breaker should provide some protection
666 | assert circuit_breaker_effectiveness, "Circuit breaker should have activated"
667 | assert success_rate >= 0.4, (
668 | f"Success rate too low even with circuit breaker: {success_rate:.1%}"
669 | )
670 |
671 | return {
672 | "success_rate": success_rate,
673 | "circuit_breaker_trips": circuit_breaker_trips,
674 | "recovery_attempts": recovery_attempts,
675 | "final_state": circuit_breaker_state["state"],
676 | }
677 |
678 | async def test_memory_pressure_resilience(
679 | self, resilient_data_provider, benchmark_timer
680 | ):
681 | """Test system resilience under memory pressure."""
682 | symbols = ["MEM_TEST_1", "MEM_TEST_2", "MEM_TEST_3"]
683 | strategy = "bollinger"
684 | parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
685 |
686 | # Test under different memory pressure levels
687 | pressure_levels = [0, 500, 1000] # MB of memory pressure
688 | pressure_results = {}
689 |
690 | for pressure_mb in pressure_levels:
691 | with ChaosInjector.memory_pressure_injection(pressure_mb=pressure_mb):
692 | with benchmark_timer() as timer:
693 | results = []
694 | memory_errors = []
695 |
696 | engine = VectorBTEngine(data_provider=resilient_data_provider)
697 |
698 | for symbol in symbols:
699 | try:
700 | result = await engine.run_backtest(
701 | symbol=symbol,
702 | strategy_type=strategy,
703 | parameters=parameters,
704 | start_date="2023-01-01",
705 | end_date="2023-12-31",
706 | )
707 | results.append(result)
708 |
709 | except (MemoryError, Exception) as e:
710 | memory_errors.append({"symbol": symbol, "error": str(e)})
711 | logger.error(
712 | f"Memory pressure caused failure for {symbol}: {e}"
713 | )
714 |
715 | execution_time = timer.elapsed
716 | success_rate = len(results) / len(symbols)
717 |
718 | pressure_results[f"{pressure_mb}mb"] = {
719 | "pressure_mb": pressure_mb,
720 | "execution_time": execution_time,
721 | "success_rate": success_rate,
722 | "memory_errors": len(memory_errors),
723 | }
724 |
725 | logger.info(
726 | f"Memory Pressure {pressure_mb}MB Results:\n"
727 | f" • Execution Time: {execution_time:.2f}s\n"
728 | f" • Success Rate: {success_rate:.1%}\n"
729 | f" • Memory Errors: {len(memory_errors)}"
730 | )
731 |
732 | # System should be resilient to moderate memory pressure
733 | moderate_pressure_result = pressure_results["500mb"]
734 | high_pressure_result = pressure_results["1000mb"]
735 |
736 | assert moderate_pressure_result["success_rate"] >= 0.8, (
737 | "Should handle moderate memory pressure"
738 | )
739 | assert high_pressure_result["success_rate"] >= 0.5, (
740 | "Should partially handle high memory pressure"
741 | )
742 |
743 | return pressure_results
744 |
745 | async def test_cpu_overload_resilience(
746 | self, resilient_data_provider, benchmark_timer
747 | ):
748 | """Test system resilience under CPU overload."""
749 | symbols = ["CPU_TEST_1", "CPU_TEST_2"]
750 | strategy = "momentum"
751 | parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
752 |
753 | # Test under CPU load
754 | with ChaosInjector.cpu_load_injection(load_intensity=0.8, duration=10.0):
755 | with benchmark_timer() as timer:
756 | results = []
757 | timeout_errors = []
758 |
759 | engine = VectorBTEngine(data_provider=resilient_data_provider)
760 |
761 | for symbol in symbols:
762 | try:
763 | # Add timeout to prevent hanging under CPU load
764 | result = await asyncio.wait_for(
765 | engine.run_backtest(
766 | symbol=symbol,
767 | strategy_type=strategy,
768 | parameters=parameters,
769 | start_date="2023-01-01",
770 | end_date="2023-12-31",
771 | ),
772 | timeout=30.0, # 30 second timeout
773 | )
774 | results.append(result)
775 |
776 | except TimeoutError:
777 | timeout_errors.append(
778 | {"symbol": symbol, "error": "CPU overload timeout"}
779 | )
780 | logger.error(f"CPU overload caused timeout for {symbol}")
781 |
782 | except Exception as e:
783 | timeout_errors.append({"symbol": symbol, "error": str(e)})
784 | logger.error(f"CPU overload caused failure for {symbol}: {e}")
785 |
786 | execution_time = timer.elapsed
787 |
788 | success_rate = len(results) / len(symbols)
789 | timeout_rate = len(
790 | [e for e in timeout_errors if "timeout" in e["error"]]
791 | ) / len(symbols)
792 |
793 | logger.info(
794 | f"CPU Overload Resilience Results:\n"
795 | f" • Symbols Tested: {len(symbols)}\n"
796 | f" • Successful Results: {len(results)}\n"
797 | f" • Success Rate: {success_rate:.1%}\n"
798 | f" • Timeout Rate: {timeout_rate:.1%}\n"
799 | f" • Execution Time: {execution_time:.2f}s"
800 | )
801 |
802 | # System should handle some CPU pressure, though performance may degrade
803 | assert success_rate >= 0.5, (
804 | f"Success rate too low under CPU load: {success_rate:.1%}"
805 | )
806 | assert execution_time < 60.0, (
807 | f"Execution time too long under CPU load: {execution_time:.1f}s"
808 | )
809 |
810 | return {
811 | "success_rate": success_rate,
812 | "timeout_rate": timeout_rate,
813 | "execution_time": execution_time,
814 | }
815 |
816 | async def test_cascading_failure_recovery(
817 | self, resilient_data_provider, benchmark_timer
818 | ):
819 | """Test recovery from cascading failures across multiple components."""
820 | symbols = ["CASCADE_1", "CASCADE_2", "CASCADE_3"]
821 | strategy = "rsi"
822 | parameters = STRATEGY_TEMPLATES[strategy]["parameters"]
823 |
824 | # Simulate cascading failures: API -> Cache -> Database
825 | with ChaosInjector.api_failure_injection(failure_rate=0.5):
826 | with ChaosInjector.memory_pressure_injection(pressure_mb=300):
827 | with benchmark_timer() as timer:
828 | results = []
829 | cascading_failures = []
830 |
831 | engine = VectorBTEngine(data_provider=resilient_data_provider)
832 |
833 | for symbol in symbols:
834 | failure_chain = []
835 | final_result = None
836 |
837 | # Multiple recovery attempts with different strategies
838 | for attempt in range(3):
839 | try:
840 | result = await engine.run_backtest(
841 | symbol=symbol,
842 | strategy_type=strategy,
843 | parameters=parameters,
844 | start_date="2023-01-01",
845 | end_date="2023-12-31",
846 | )
847 | final_result = result
848 | break # Success, exit retry loop
849 |
850 | except Exception as e:
851 | failure_chain.append(
852 | f"Attempt {attempt + 1}: {str(e)[:50]}"
853 | )
854 | if attempt < 2:
855 | # Progressive backoff and different strategies
856 | await asyncio.sleep(0.5 * (attempt + 1))
857 |
858 | if final_result:
859 | results.append(final_result)
860 | logger.info(
861 | f"✓ {symbol} recovered after {len(failure_chain)} failures"
862 | )
863 | else:
864 | cascading_failures.append(
865 | {"symbol": symbol, "failure_chain": failure_chain}
866 | )
867 | logger.error(
868 | f"✗ {symbol} failed completely: {failure_chain}"
869 | )
870 |
871 | execution_time = timer.elapsed
872 |
873 | recovery_rate = len(results) / len(symbols)
874 | avg_failures_before_recovery = (
875 | np.mean([len(cf["failure_chain"]) for cf in cascading_failures])
876 | if cascading_failures
877 | else 0
878 | )
879 |
880 | logger.info(
881 | f"Cascading Failure Recovery Results:\n"
882 | f" • Symbols Tested: {len(symbols)}\n"
883 | f" • Successfully Recovered: {len(results)}\n"
884 | f" • Complete Failures: {len(cascading_failures)}\n"
885 | f" • Recovery Rate: {recovery_rate:.1%}\n"
886 | f" • Avg Failures Before Recovery: {avg_failures_before_recovery:.1f}\n"
887 | f" • Execution Time: {execution_time:.2f}s"
888 | )
889 |
890 | # System should show some recovery capability even under cascading failures
891 | assert recovery_rate >= 0.3, (
892 | f"Recovery rate too low for cascading failures: {recovery_rate:.1%}"
893 | )
894 |
895 | return {
896 | "recovery_rate": recovery_rate,
897 | "cascading_failures": len(cascading_failures),
898 | "avg_failures_before_recovery": avg_failures_before_recovery,
899 | }
900 |
901 |
902 | if __name__ == "__main__":
903 | # Run chaos engineering tests
904 | pytest.main(
905 | [
906 | __file__,
907 | "-v",
908 | "--tb=short",
909 | "--asyncio-mode=auto",
910 | "--timeout=900", # 15 minute timeout for chaos tests
911 | "--durations=15", # Show 15 slowest tests
912 | ]
913 | )
914 |
```
--------------------------------------------------------------------------------
/tests/test_parallel_research_integration.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Comprehensive integration tests for parallel research functionality.
3 |
4 | This test suite covers:
5 | - End-to-end parallel research workflows
6 | - Integration between all parallel research components
7 | - Performance characteristics under realistic conditions
8 | - Error scenarios and recovery mechanisms
9 | - Logging integration across all components
10 | - Resource usage and scalability testing
11 | """
12 |
13 | import asyncio
14 | import time
15 | from datetime import datetime
16 | from typing import Any
17 | from unittest.mock import Mock, patch
18 |
19 | import pytest
20 | from langchain_core.language_models import BaseChatModel
21 | from langchain_core.messages import AIMessage
22 | from langgraph.checkpoint.memory import MemorySaver
23 |
24 | from maverick_mcp.agents.deep_research import DeepResearchAgent
25 | from maverick_mcp.utils.parallel_research import (
26 | ParallelResearchConfig,
27 | )
28 |
29 |
30 | class MockSearchProvider:
31 | """Mock search provider for integration testing."""
32 |
33 | def __init__(self, provider_name: str, fail_rate: float = 0.0):
34 | self.provider_name = provider_name
35 | self.fail_rate = fail_rate
36 | self.call_count = 0
37 |
38 | async def search(self, query: str, num_results: int = 10) -> list[dict[str, Any]]:
39 | """Mock search with configurable failure rate."""
40 | self.call_count += 1
41 |
42 | # Simulate failures based on fail_rate
43 | import random
44 |
45 | if random.random() < self.fail_rate:
46 | raise RuntimeError(f"{self.provider_name} search failed")
47 |
48 | await asyncio.sleep(0.02) # Simulate network latency
49 |
50 | # Generate mock search results
51 | results = []
52 | for i in range(min(num_results, 3)): # Return up to 3 results
53 | results.append(
54 | {
55 | "url": f"https://{self.provider_name.lower()}.example.com/article_{i}_{self.call_count}",
56 | "title": f"{query} - Article {i + 1} from {self.provider_name}",
57 | "content": f"This is detailed content about {query} from {self.provider_name}. "
58 | f"It contains valuable insights and analysis relevant to the research topic. "
59 | f"Provider: {self.provider_name}, Call: {self.call_count}",
60 | "published_date": datetime.now().isoformat(),
61 | "author": f"Expert Analyst {i + 1}",
62 | "score": 0.8 - (i * 0.1),
63 | "provider": self.provider_name.lower(),
64 | }
65 | )
66 |
67 | return results
68 |
69 |
70 | class MockContentAnalyzer:
71 | """Mock content analyzer for integration testing."""
72 |
73 | def __init__(self, analysis_delay: float = 0.01):
74 | self.analysis_delay = analysis_delay
75 | self.analysis_count = 0
76 |
77 | async def analyze_content(
78 | self, content: str, persona: str, analysis_focus: str = "general"
79 | ) -> dict[str, Any]:
80 | """Mock content analysis."""
81 | self.analysis_count += 1
82 | await asyncio.sleep(self.analysis_delay)
83 |
84 | # Generate realistic analysis based on content keywords
85 | insights = []
86 | risk_factors = []
87 | opportunities = []
88 |
89 | content_lower = content.lower()
90 |
91 | if "earnings" in content_lower or "revenue" in content_lower:
92 | insights.append("Strong earnings performance indicated")
93 | opportunities.append("Potential for continued revenue growth")
94 |
95 | if "technical" in content_lower or "chart" in content_lower:
96 | insights.append("Technical indicators suggest trend continuation")
97 | risk_factors.append("Support level break could trigger selling")
98 |
99 | if "sentiment" in content_lower or "analyst" in content_lower:
100 | insights.append("Market sentiment appears positive")
101 | opportunities.append("Analyst upgrades possible")
102 |
103 | if "competitive" in content_lower or "market share" in content_lower:
104 | insights.append("Competitive position remains strong")
105 | risk_factors.append("Increased competitive pressure in market")
106 |
107 | # Default insights if no specific keywords found
108 | if not insights:
109 | insights = [
110 | f"General analysis insight {self.analysis_count} for {persona} investor"
111 | ]
112 |
113 | sentiment_mapping = {
114 | "conservative": {"direction": "neutral", "confidence": 0.6},
115 | "moderate": {"direction": "bullish", "confidence": 0.7},
116 | "aggressive": {"direction": "bullish", "confidence": 0.8},
117 | }
118 |
119 | return {
120 | "insights": insights,
121 | "sentiment": sentiment_mapping.get(
122 | persona, {"direction": "neutral", "confidence": 0.5}
123 | ),
124 | "risk_factors": risk_factors or ["Standard market risks apply"],
125 | "opportunities": opportunities or ["Monitor for opportunities"],
126 | "credibility_score": 0.8,
127 | "relevance_score": 0.75,
128 | "summary": f"Analysis for {persona} investor from {analysis_focus} perspective",
129 | "analysis_timestamp": datetime.now(),
130 | }
131 |
132 |
133 | class MockLLM(BaseChatModel):
134 | """Mock LLM for integration testing."""
135 |
136 | def __init__(self, response_delay: float = 0.05, fail_rate: float = 0.0):
137 | super().__init__()
138 | self.response_delay = response_delay
139 | self.fail_rate = fail_rate
140 | self.invocation_count = 0
141 |
142 | async def ainvoke(self, messages, config=None, **kwargs):
143 | """Mock async LLM invocation."""
144 | self.invocation_count += 1
145 |
146 | # Simulate failures
147 | import random
148 |
149 | if random.random() < self.fail_rate:
150 | raise RuntimeError("LLM service unavailable")
151 |
152 | await asyncio.sleep(self.response_delay)
153 |
154 | # Generate contextual response based on message content
155 | message_content = str(messages[-1].content).lower()
156 |
157 | if "synthesis" in message_content:
158 | response = """
159 | Based on the comprehensive research from multiple specialized agents, this analysis provides
160 | a well-rounded view of the investment opportunity. The fundamental analysis shows strong
161 | financial metrics, while sentiment analysis indicates positive market reception. Technical
162 | analysis suggests favorable entry points, and competitive analysis reveals sustainable
163 | advantages. Overall, this presents a compelling investment case for the specified investor persona.
164 | """
165 | else:
166 | response = '{"KEY_INSIGHTS": ["AI-generated insight"], "SENTIMENT": {"direction": "bullish", "confidence": 0.75}, "CREDIBILITY": 0.8}'
167 |
168 | return AIMessage(content=response)
169 |
170 | def _generate(self, messages, stop=None, **kwargs):
171 | raise NotImplementedError("Use ainvoke for async tests")
172 |
173 | @property
174 | def _llm_type(self) -> str:
175 | return "mock_llm"
176 |
177 |
178 | @pytest.mark.integration
179 | class TestParallelResearchEndToEnd:
180 | """Test complete end-to-end parallel research workflows."""
181 |
182 | @pytest.fixture
183 | def integration_config(self):
184 | """Configuration for integration testing."""
185 | return ParallelResearchConfig(
186 | max_concurrent_agents=3,
187 | timeout_per_agent=10,
188 | enable_fallbacks=True,
189 | rate_limit_delay=0.1,
190 | )
191 |
192 | @pytest.fixture
193 | def mock_search_providers(self):
194 | """Create mock search providers."""
195 | return [
196 | MockSearchProvider("Exa", fail_rate=0.1),
197 | MockSearchProvider("Tavily", fail_rate=0.1),
198 | ]
199 |
200 | @pytest.fixture
201 | def integration_agent(self, integration_config, mock_search_providers):
202 | """Create DeepResearchAgent for integration testing."""
203 | llm = MockLLM(response_delay=0.05, fail_rate=0.05)
204 |
205 | agent = DeepResearchAgent(
206 | llm=llm,
207 | persona="moderate",
208 | checkpointer=MemorySaver(),
209 | enable_parallel_execution=True,
210 | parallel_config=integration_config,
211 | )
212 |
213 | # Replace search providers with mocks
214 | agent.search_providers = mock_search_providers
215 |
216 | # Replace content analyzer with mock
217 | agent.content_analyzer = MockContentAnalyzer(analysis_delay=0.02)
218 |
219 | return agent
220 |
221 | @pytest.mark.asyncio
222 | async def test_complete_parallel_research_workflow(self, integration_agent):
223 | """Test complete parallel research workflow from start to finish."""
224 | start_time = time.time()
225 |
226 | result = await integration_agent.research_comprehensive(
227 | topic="Apple Inc comprehensive investment analysis for Q4 2024",
228 | session_id="integration_test_001",
229 | depth="comprehensive",
230 | focus_areas=["fundamentals", "technical_analysis", "market_sentiment"],
231 | timeframe="30d",
232 | )
233 |
234 | execution_time = time.time() - start_time
235 |
236 | # Verify successful execution
237 | assert result["status"] == "success"
238 | assert result["agent_type"] == "deep_research"
239 | assert result["execution_mode"] == "parallel"
240 | assert (
241 | result["research_topic"]
242 | == "Apple Inc comprehensive investment analysis for Q4 2024"
243 | )
244 |
245 | # Verify research quality
246 | assert result["confidence_score"] > 0.5
247 | assert result["sources_analyzed"] > 0
248 | assert len(result["citations"]) > 0
249 |
250 | # Verify parallel execution stats
251 | assert "parallel_execution_stats" in result
252 | stats = result["parallel_execution_stats"]
253 | assert stats["total_tasks"] > 0
254 | assert stats["successful_tasks"] >= 0
255 | assert stats["parallel_efficiency"] > 0
256 |
257 | # Verify findings structure
258 | assert "findings" in result
259 | findings = result["findings"]
260 | assert "synthesis" in findings
261 | assert "confidence_score" in findings
262 |
263 | # Verify performance characteristics
264 | assert execution_time < 15 # Should complete within reasonable time
265 | assert result["execution_time_ms"] > 0
266 |
267 | @pytest.mark.asyncio
268 | async def test_parallel_vs_sequential_performance_comparison(
269 | self, integration_agent
270 | ):
271 | """Compare parallel vs sequential execution performance."""
272 | topic = "Tesla Inc strategic analysis and market position"
273 |
274 | # Test parallel execution
275 | start_parallel = time.time()
276 | parallel_result = await integration_agent.research_comprehensive(
277 | topic=topic,
278 | session_id="perf_test_parallel",
279 | use_parallel_execution=True,
280 | depth="standard",
281 | )
282 | parallel_time = time.time() - start_parallel
283 |
284 | # Test sequential execution
285 | start_sequential = time.time()
286 | sequential_result = await integration_agent.research_comprehensive(
287 | topic=topic,
288 | session_id="perf_test_sequential",
289 | use_parallel_execution=False,
290 | depth="standard",
291 | )
292 | sequential_time = time.time() - start_sequential
293 |
294 | # Both should succeed
295 | assert parallel_result["status"] == "success"
296 | assert sequential_result["status"] == "success"
297 |
298 | # Verify execution modes
299 | assert parallel_result["execution_mode"] == "parallel"
300 | # Sequential won't have execution_mode in result
301 |
302 | # Parallel should show efficiency metrics
303 | if "parallel_execution_stats" in parallel_result:
304 | stats = parallel_result["parallel_execution_stats"]
305 | assert stats["parallel_efficiency"] > 0
306 |
307 | # If multiple tasks were executed in parallel, should show some efficiency gain
308 | if stats["total_tasks"] > 1:
309 | print(
310 | f"Parallel time: {parallel_time:.3f}s, Sequential time: {sequential_time:.3f}s"
311 | )
312 | print(f"Parallel efficiency: {stats['parallel_efficiency']:.2f}x")
313 |
314 | @pytest.mark.asyncio
315 | async def test_error_resilience_and_fallback(self, integration_config):
316 | """Test error resilience and fallback mechanisms."""
317 | # Create agent with higher failure rates to test resilience
318 | failing_llm = MockLLM(fail_rate=0.3) # 30% failure rate
319 | failing_providers = [
320 | MockSearchProvider("FailingProvider", fail_rate=0.5) # 50% failure rate
321 | ]
322 |
323 | agent = DeepResearchAgent(
324 | llm=failing_llm,
325 | persona="moderate",
326 | enable_parallel_execution=True,
327 | parallel_config=integration_config,
328 | )
329 |
330 | agent.search_providers = failing_providers
331 | agent.content_analyzer = MockContentAnalyzer()
332 |
333 | # Should handle failures gracefully
334 | result = await agent.research_comprehensive(
335 | topic="Resilience test analysis",
336 | session_id="resilience_test_001",
337 | depth="basic",
338 | )
339 |
340 | # Should complete successfully despite some failures
341 | assert result["status"] == "success"
342 |
343 | # May have lower confidence due to failures
344 | assert result["confidence_score"] >= 0.0
345 |
346 | # Should have some parallel execution stats even if tasks failed
347 | if result.get("execution_mode") == "parallel":
348 | stats = result["parallel_execution_stats"]
349 | # Total tasks should be >= failed tasks (some tasks were attempted)
350 | assert stats["total_tasks"] >= stats.get("failed_tasks", 0)
351 |
352 | @pytest.mark.asyncio
353 | async def test_different_research_depths(self, integration_agent):
354 | """Test parallel research with different depth configurations."""
355 | depths_to_test = ["basic", "standard", "comprehensive"]
356 | results = {}
357 |
358 | for depth in depths_to_test:
359 | result = await integration_agent.research_comprehensive(
360 | topic=f"Microsoft Corp analysis - {depth} depth",
361 | session_id=f"depth_test_{depth}",
362 | depth=depth,
363 | use_parallel_execution=True,
364 | )
365 |
366 | results[depth] = result
367 |
368 | # All should succeed
369 | assert result["status"] == "success"
370 | assert result["research_depth"] == depth
371 |
372 | # Comprehensive should generally have more sources and higher confidence
373 | if all(r["status"] == "success" for r in results.values()):
374 | basic_sources = results["basic"]["sources_analyzed"]
375 | comprehensive_sources = results["comprehensive"]["sources_analyzed"]
376 |
377 | # More comprehensive research should analyze more sources (when successful)
378 | if basic_sources > 0 and comprehensive_sources > 0:
379 | assert comprehensive_sources >= basic_sources
380 |
381 | @pytest.mark.asyncio
382 | async def test_persona_specific_research(self, integration_config):
383 | """Test parallel research with different investor personas."""
384 | personas_to_test = ["conservative", "moderate", "aggressive"]
385 | topic = "Amazon Inc investment opportunity analysis"
386 |
387 | for persona in personas_to_test:
388 | llm = MockLLM(response_delay=0.03)
389 | agent = DeepResearchAgent(
390 | llm=llm,
391 | persona=persona,
392 | enable_parallel_execution=True,
393 | parallel_config=integration_config,
394 | )
395 |
396 | # Mock components
397 | agent.search_providers = [MockSearchProvider("TestProvider")]
398 | agent.content_analyzer = MockContentAnalyzer()
399 |
400 | result = await agent.research_comprehensive(
401 | topic=topic,
402 | session_id=f"persona_test_{persona}",
403 | use_parallel_execution=True,
404 | )
405 |
406 | assert result["status"] == "success"
407 | assert result["persona"] == persona
408 |
409 | # Should have findings tailored to persona
410 | assert "findings" in result
411 |
412 | @pytest.mark.asyncio
413 | async def test_concurrent_research_sessions(self, integration_agent):
414 | """Test multiple concurrent research sessions."""
415 | topics = [
416 | "Google Alphabet strategic analysis",
417 | "Meta Platforms competitive position",
418 | "Netflix content strategy evaluation",
419 | ]
420 |
421 | # Run multiple research sessions concurrently
422 | tasks = [
423 | integration_agent.research_comprehensive(
424 | topic=topic,
425 | session_id=f"concurrent_test_{i}",
426 | use_parallel_execution=True,
427 | depth="standard",
428 | )
429 | for i, topic in enumerate(topics)
430 | ]
431 |
432 | start_time = time.time()
433 | results = await asyncio.gather(*tasks, return_exceptions=True)
434 | execution_time = time.time() - start_time
435 |
436 | # All should succeed (or be exceptions we can handle)
437 | successful_results = [
438 | r for r in results if isinstance(r, dict) and r.get("status") == "success"
439 | ]
440 |
441 | assert (
442 | len(successful_results) >= len(topics) // 2
443 | ) # At least half should succeed
444 |
445 | # Should complete in reasonable time despite concurrency
446 | assert execution_time < 30
447 |
448 | # Verify each result has proper session isolation
449 | for _i, result in enumerate(successful_results):
450 | if "findings" in result:
451 | # Each should have distinct research content
452 | assert result["research_topic"] in topics
453 |
454 |
455 | @pytest.mark.integration
456 | class TestParallelResearchScalability:
457 | """Test scalability characteristics of parallel research."""
458 |
459 | @pytest.fixture
460 | def scalability_config(self):
461 | """Configuration for scalability testing."""
462 | return ParallelResearchConfig(
463 | max_concurrent_agents=4,
464 | timeout_per_agent=8,
465 | enable_fallbacks=True,
466 | rate_limit_delay=0.05,
467 | )
468 |
469 | @pytest.mark.asyncio
470 | async def test_agent_limit_enforcement(self, scalability_config):
471 | """Test that concurrent agent limits are properly enforced."""
472 | llm = MockLLM(response_delay=0.1) # Slower to see concurrency effects
473 |
474 | agent = DeepResearchAgent(
475 | llm=llm,
476 | persona="moderate",
477 | enable_parallel_execution=True,
478 | parallel_config=scalability_config,
479 | )
480 |
481 | # Mock components with tracking
482 | call_tracker = {"max_concurrent": 0, "current_concurrent": 0}
483 |
484 | class TrackingProvider(MockSearchProvider):
485 | async def search(self, query: str, num_results: int = 10):
486 | call_tracker["current_concurrent"] += 1
487 | call_tracker["max_concurrent"] = max(
488 | call_tracker["max_concurrent"], call_tracker["current_concurrent"]
489 | )
490 | try:
491 | return await super().search(query, num_results)
492 | finally:
493 | call_tracker["current_concurrent"] -= 1
494 |
495 | agent.search_providers = [TrackingProvider("Tracker")]
496 | agent.content_analyzer = MockContentAnalyzer()
497 |
498 | result = await agent.research_comprehensive(
499 | topic="Scalability test with many potential subtasks",
500 | session_id="scalability_test_001",
501 | focus_areas=[
502 | "fundamentals",
503 | "technical",
504 | "sentiment",
505 | "competitive",
506 | "extra1",
507 | "extra2",
508 | ],
509 | use_parallel_execution=True,
510 | )
511 |
512 | assert result["status"] == "success"
513 |
514 | # Should not exceed configured max concurrent agents
515 | assert (
516 | call_tracker["max_concurrent"] <= scalability_config.max_concurrent_agents
517 | )
518 |
519 | @pytest.mark.asyncio
520 | async def test_memory_usage_under_load(self, scalability_config):
521 | """Test memory usage characteristics under load."""
522 | import gc
523 | import os
524 |
525 | import psutil
526 |
527 | # Get initial memory usage
528 | process = psutil.Process(os.getpid())
529 | initial_memory = process.memory_info().rss / 1024 / 1024 # MB
530 |
531 | llm = MockLLM(response_delay=0.02)
532 | agent = DeepResearchAgent(
533 | llm=llm,
534 | persona="moderate",
535 | enable_parallel_execution=True,
536 | parallel_config=scalability_config,
537 | )
538 |
539 | agent.search_providers = [MockSearchProvider("MemoryTest")]
540 | agent.content_analyzer = MockContentAnalyzer(analysis_delay=0.01)
541 |
542 | # Perform multiple research operations
543 | for i in range(10): # 10 operations to test memory accumulation
544 | result = await agent.research_comprehensive(
545 | topic=f"Memory test analysis {i}",
546 | session_id=f"memory_test_{i}",
547 | use_parallel_execution=True,
548 | depth="basic",
549 | )
550 |
551 | assert result["status"] == "success"
552 |
553 | # Force garbage collection
554 | gc.collect()
555 |
556 | # Check final memory usage
557 | final_memory = process.memory_info().rss / 1024 / 1024 # MB
558 | memory_growth = final_memory - initial_memory
559 |
560 | # Memory growth should be reasonable (not indicative of leaks)
561 | assert memory_growth < 100 # Less than 100MB growth for 10 operations
562 |
563 | @pytest.mark.asyncio
564 | async def test_large_scale_task_distribution(self, scalability_config):
565 | """Test task distribution with many potential research areas."""
566 | llm = MockLLM()
567 | agent = DeepResearchAgent(
568 | llm=llm,
569 | persona="moderate",
570 | enable_parallel_execution=True,
571 | parallel_config=scalability_config,
572 | )
573 |
574 | agent.search_providers = [MockSearchProvider("LargeScale")]
575 | agent.content_analyzer = MockContentAnalyzer()
576 |
577 | # Test with many focus areas (more than max concurrent agents)
578 | many_focus_areas = [
579 | "earnings",
580 | "revenue",
581 | "profit_margins",
582 | "debt_analysis",
583 | "cash_flow",
584 | "technical_indicators",
585 | "chart_patterns",
586 | "support_levels",
587 | "momentum",
588 | "analyst_ratings",
589 | "news_sentiment",
590 | "social_sentiment",
591 | "institutional_sentiment",
592 | "market_share",
593 | "competitive_position",
594 | "industry_trends",
595 | "regulatory_environment",
596 | ]
597 |
598 | result = await agent.research_comprehensive(
599 | topic="Large scale comprehensive analysis with many research dimensions",
600 | session_id="large_scale_test_001",
601 | focus_areas=many_focus_areas,
602 | use_parallel_execution=True,
603 | depth="comprehensive",
604 | )
605 |
606 | assert result["status"] == "success"
607 |
608 | # Should handle large number of focus areas efficiently
609 | if "parallel_execution_stats" in result:
610 | stats = result["parallel_execution_stats"]
611 | # Should not create more tasks than max concurrent agents allows
612 | assert stats["total_tasks"] <= scalability_config.max_concurrent_agents
613 |
614 | # Should achieve some parallel efficiency
615 | if stats["successful_tasks"] > 1:
616 | assert stats["parallel_efficiency"] > 1.0
617 |
618 |
619 | @pytest.mark.integration
620 | class TestParallelResearchLoggingIntegration:
621 | """Test integration of logging throughout parallel research workflow."""
622 |
623 | @pytest.fixture
624 | def logged_agent(self):
625 | """Create agent with comprehensive logging."""
626 | llm = MockLLM(response_delay=0.02)
627 | config = ParallelResearchConfig(
628 | max_concurrent_agents=2,
629 | timeout_per_agent=5,
630 | enable_fallbacks=True,
631 | rate_limit_delay=0.05,
632 | )
633 |
634 | agent = DeepResearchAgent(
635 | llm=llm,
636 | persona="moderate",
637 | enable_parallel_execution=True,
638 | parallel_config=config,
639 | )
640 |
641 | agent.search_providers = [MockSearchProvider("LoggedProvider")]
642 | agent.content_analyzer = MockContentAnalyzer()
643 |
644 | return agent
645 |
646 | @pytest.mark.asyncio
647 | async def test_comprehensive_logging_workflow(self, logged_agent):
648 | """Test that comprehensive logging occurs throughout workflow."""
649 | with patch(
650 | "maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
651 | ) as mock_get_logger:
652 | mock_logger = Mock()
653 | mock_get_logger.return_value = mock_logger
654 |
655 | result = await logged_agent.research_comprehensive(
656 | topic="Comprehensive logging test analysis",
657 | session_id="logging_test_001",
658 | use_parallel_execution=True,
659 | )
660 |
661 | assert result["status"] == "success"
662 |
663 | # Should have multiple logging calls
664 | assert mock_logger.info.call_count >= 10 # Multiple stages should log
665 |
666 | # Verify different types of log messages occurred
667 | all_log_calls = [call[0][0] for call in mock_logger.info.call_args_list]
668 | " ".join(all_log_calls)
669 |
670 | # Should contain various logging elements
671 | assert any("RESEARCH_START" in call for call in all_log_calls)
672 | assert any("PARALLEL" in call for call in all_log_calls)
673 |
674 | @pytest.mark.asyncio
675 | async def test_error_logging_integration(self, logged_agent):
676 | """Test error logging integration in parallel workflow."""
677 | # Create a scenario that will cause some errors
678 | failing_llm = MockLLM(fail_rate=0.5) # High failure rate
679 | logged_agent.llm = failing_llm
680 |
681 | with patch(
682 | "maverick_mcp.utils.orchestration_logging.get_orchestration_logger"
683 | ) as mock_get_logger:
684 | mock_logger = Mock()
685 | mock_get_logger.return_value = mock_logger
686 |
687 | # This may succeed or fail, but should log appropriately
688 | try:
689 | result = await logged_agent.research_comprehensive(
690 | topic="Error logging test",
691 | session_id="error_logging_test_001",
692 | use_parallel_execution=True,
693 | )
694 |
695 | # If it succeeds, should still have logged errors from failed components
696 | assert result["status"] == "success" or result["status"] == "error"
697 |
698 | except Exception:
699 | # If it fails completely, that's also acceptable for this test
700 | pass
701 |
702 | # Should have some error or warning logs due to high failure rate
703 | has_error_logs = (
704 | mock_logger.error.call_count > 0 or mock_logger.warning.call_count > 0
705 | )
706 | assert has_error_logs
707 |
708 | @pytest.mark.asyncio
709 | async def test_performance_metrics_logging(self, logged_agent):
710 | """Test that performance metrics are properly logged."""
711 | with patch(
712 | "maverick_mcp.utils.orchestration_logging.log_performance_metrics"
713 | ) as mock_perf_log:
714 | result = await logged_agent.research_comprehensive(
715 | topic="Performance metrics test",
716 | session_id="perf_metrics_test_001",
717 | use_parallel_execution=True,
718 | )
719 |
720 | assert result["status"] == "success"
721 |
722 | # Should have logged performance metrics
723 | assert mock_perf_log.call_count >= 1
724 |
725 | # Verify metrics content
726 | perf_call = mock_perf_log.call_args_list[0]
727 | perf_call[0][0]
728 | metrics = perf_call[0][1]
729 |
730 | assert isinstance(metrics, dict)
731 | # Should contain relevant performance metrics
732 | expected_metrics = [
733 | "total_tasks",
734 | "successful_tasks",
735 | "failed_tasks",
736 | "parallel_efficiency",
737 | ]
738 | assert any(metric in metrics for metric in expected_metrics)
739 |
740 |
741 | @pytest.mark.integration
742 | class TestParallelResearchErrorRecovery:
743 | """Test error recovery and resilience in parallel research."""
744 |
745 | @pytest.mark.asyncio
746 | async def test_partial_failure_recovery(self):
747 | """Test recovery when some parallel tasks fail."""
748 | config = ParallelResearchConfig(
749 | max_concurrent_agents=3,
750 | timeout_per_agent=5,
751 | enable_fallbacks=True,
752 | rate_limit_delay=0.05,
753 | )
754 |
755 | # Create agent with mixed success/failure providers
756 | llm = MockLLM(response_delay=0.03)
757 | agent = DeepResearchAgent(
758 | llm=llm,
759 | persona="moderate",
760 | enable_parallel_execution=True,
761 | parallel_config=config,
762 | )
763 |
764 | # Mix of failing and working providers
765 | agent.search_providers = [
766 | MockSearchProvider("WorkingProvider", fail_rate=0.0),
767 | MockSearchProvider("FailingProvider", fail_rate=0.8), # 80% failure rate
768 | ]
769 | agent.content_analyzer = MockContentAnalyzer()
770 |
771 | result = await agent.research_comprehensive(
772 | topic="Partial failure recovery test",
773 | session_id="partial_failure_test_001",
774 | use_parallel_execution=True,
775 | )
776 |
777 | # Should complete successfully despite some failures
778 | assert result["status"] == "success"
779 |
780 | # Should have parallel execution stats showing mixed results
781 | if "parallel_execution_stats" in result:
782 | stats = result["parallel_execution_stats"]
783 | # Should have attempted multiple tasks
784 | assert stats["total_tasks"] >= 1
785 | # May have some failures but should have some successes
786 | if stats["total_tasks"] > 1:
787 | assert (
788 | stats["successful_tasks"] + stats["failed_tasks"]
789 | == stats["total_tasks"]
790 | )
791 |
792 | @pytest.mark.asyncio
793 | async def test_complete_failure_fallback(self):
794 | """Test fallback to sequential when parallel execution completely fails."""
795 | config = ParallelResearchConfig(
796 | max_concurrent_agents=2,
797 | timeout_per_agent=3,
798 | enable_fallbacks=True,
799 | )
800 |
801 | # Create agent that will fail in parallel mode
802 | failing_llm = MockLLM(fail_rate=0.9) # Very high failure rate
803 | agent = DeepResearchAgent(
804 | llm=failing_llm,
805 | persona="moderate",
806 | enable_parallel_execution=True,
807 | parallel_config=config,
808 | )
809 |
810 | agent.search_providers = [MockSearchProvider("FailingProvider", fail_rate=0.9)]
811 | agent.content_analyzer = MockContentAnalyzer()
812 |
813 | # Mock the sequential execution to succeed
814 | with patch.object(agent.graph, "ainvoke") as mock_sequential:
815 | mock_sequential.return_value = {
816 | "status": "success",
817 | "persona": "moderate",
818 | "research_confidence": 0.6,
819 | "research_findings": {"synthesis": "Fallback analysis"},
820 | }
821 |
822 | result = await agent.research_comprehensive(
823 | topic="Complete failure fallback test",
824 | session_id="complete_failure_test_001",
825 | use_parallel_execution=True,
826 | )
827 |
828 | # Should fall back to sequential and succeed
829 | assert result["status"] == "success"
830 |
831 | # Sequential execution should have been called due to parallel failure
832 | mock_sequential.assert_called_once()
833 |
834 | @pytest.mark.asyncio
835 | async def test_timeout_handling_in_parallel_execution(self):
836 | """Test handling of timeouts in parallel execution."""
837 | config = ParallelResearchConfig(
838 | max_concurrent_agents=2,
839 | timeout_per_agent=1, # Very short timeout
840 | enable_fallbacks=True,
841 | )
842 |
843 | # Create components with delays longer than timeout
844 | slow_llm = MockLLM(response_delay=2.0) # Slower than timeout
845 | agent = DeepResearchAgent(
846 | llm=slow_llm,
847 | persona="moderate",
848 | enable_parallel_execution=True,
849 | parallel_config=config,
850 | )
851 |
852 | agent.search_providers = [MockSearchProvider("SlowProvider")]
853 | agent.content_analyzer = MockContentAnalyzer(analysis_delay=0.5)
854 |
855 | # Should handle timeouts gracefully
856 | result = await agent.research_comprehensive(
857 | topic="Timeout handling test",
858 | session_id="timeout_test_001",
859 | use_parallel_execution=True,
860 | )
861 |
862 | # Should complete with some status (success or error)
863 | assert result["status"] in ["success", "error"]
864 |
865 | # If parallel execution stats are available, should show timeout effects
866 | if (
867 | "parallel_execution_stats" in result
868 | and result["parallel_execution_stats"]["total_tasks"] > 0
869 | ):
870 | stats = result["parallel_execution_stats"]
871 | # Timeouts should result in failed tasks
872 | assert stats["failed_tasks"] >= 0
873 |
874 |
875 | @pytest.mark.integration
876 | class TestParallelResearchDataFlow:
877 | """Test data flow and consistency in parallel research."""
878 |
879 | @pytest.mark.asyncio
880 | async def test_data_consistency_across_parallel_tasks(self):
881 | """Test that data remains consistent across parallel task execution."""
882 | config = ParallelResearchConfig(
883 | max_concurrent_agents=3,
884 | timeout_per_agent=5,
885 | )
886 |
887 | llm = MockLLM()
888 | agent = DeepResearchAgent(
889 | llm=llm,
890 | persona="moderate",
891 | enable_parallel_execution=True,
892 | parallel_config=config,
893 | )
894 |
895 | # Create providers that return consistent data
896 | consistent_provider = MockSearchProvider("ConsistentProvider")
897 | agent.search_providers = [consistent_provider]
898 | agent.content_analyzer = MockContentAnalyzer()
899 |
900 | result = await agent.research_comprehensive(
901 | topic="Data consistency test for Apple Inc",
902 | session_id="consistency_test_001",
903 | use_parallel_execution=True,
904 | )
905 |
906 | assert result["status"] == "success"
907 |
908 | # Verify data structure consistency
909 | assert "research_topic" in result
910 | assert "confidence_score" in result
911 | assert "citations" in result
912 | assert isinstance(result["citations"], list)
913 |
914 | # If parallel execution occurred, verify stats structure
915 | if "parallel_execution_stats" in result:
916 | stats = result["parallel_execution_stats"]
917 | required_stats = [
918 | "total_tasks",
919 | "successful_tasks",
920 | "failed_tasks",
921 | "parallel_efficiency",
922 | ]
923 | for stat in required_stats:
924 | assert stat in stats
925 | assert isinstance(stats[stat], int | float)
926 |
927 | @pytest.mark.asyncio
928 | async def test_citation_aggregation_across_tasks(self):
929 | """Test that citations are properly aggregated from parallel tasks."""
930 | config = ParallelResearchConfig(max_concurrent_agents=2)
931 |
932 | llm = MockLLM()
933 | agent = DeepResearchAgent(
934 | llm=llm,
935 | persona="moderate",
936 | enable_parallel_execution=True,
937 | parallel_config=config,
938 | )
939 |
940 | # Multiple providers to generate multiple sources
941 | agent.search_providers = [
942 | MockSearchProvider("Provider1"),
943 | MockSearchProvider("Provider2"),
944 | ]
945 | agent.content_analyzer = MockContentAnalyzer()
946 |
947 | result = await agent.research_comprehensive(
948 | topic="Citation aggregation test",
949 | session_id="citation_test_001",
950 | use_parallel_execution=True,
951 | )
952 |
953 | assert result["status"] == "success"
954 |
955 | # Should have citations from multiple sources
956 | citations = result.get("citations", [])
957 | if len(citations) > 0:
958 | # Citations should have required fields
959 | for citation in citations:
960 | assert "id" in citation
961 | assert "title" in citation
962 | assert "url" in citation
963 | assert "credibility_score" in citation
964 |
965 | # Should have unique citation IDs
966 | citation_ids = [c["id"] for c in citations]
967 | assert len(citation_ids) == len(set(citation_ids))
968 |
969 | @pytest.mark.asyncio
970 | async def test_research_quality_metrics(self):
971 | """Test research quality metrics in parallel execution."""
972 | config = ParallelResearchConfig(max_concurrent_agents=2)
973 |
974 | llm = MockLLM()
975 | agent = DeepResearchAgent(
976 | llm=llm,
977 | persona="moderate",
978 | enable_parallel_execution=True,
979 | parallel_config=config,
980 | )
981 |
982 | agent.search_providers = [MockSearchProvider("QualityProvider")]
983 | agent.content_analyzer = MockContentAnalyzer()
984 |
985 | result = await agent.research_comprehensive(
986 | topic="Research quality metrics test",
987 | session_id="quality_test_001",
988 | use_parallel_execution=True,
989 | )
990 |
991 | assert result["status"] == "success"
992 |
993 | # Verify quality metrics
994 | assert "confidence_score" in result
995 | assert 0.0 <= result["confidence_score"] <= 1.0
996 |
997 | assert "sources_analyzed" in result
998 | assert isinstance(result["sources_analyzed"], int)
999 | assert result["sources_analyzed"] >= 0
1000 |
1001 | if "source_diversity" in result:
1002 | assert 0.0 <= result["source_diversity"] <= 1.0
1003 |
```