This is page 37 of 39. Use http://codebase.md/wshobson/maverick-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/test_exa_research_integration.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Comprehensive test suite for ExaSearch integration with research agents.
3 |
4 | This test suite validates the complete research agent architecture with ExaSearch provider,
5 | including timeout handling, parallel execution, specialized subagents, and performance
6 | benchmarking across all research depths and focus areas.
7 | """
8 |
9 | import asyncio
10 | import time
11 | from unittest.mock import AsyncMock, MagicMock, patch
12 |
13 | import pytest
14 | from exa_py import Exa
15 |
16 | from maverick_mcp.agents.deep_research import (
17 | RESEARCH_DEPTH_LEVELS,
18 | CompetitiveResearchAgent,
19 | ContentAnalyzer,
20 | DeepResearchAgent,
21 | ExaSearchProvider,
22 | FundamentalResearchAgent,
23 | SentimentResearchAgent,
24 | TechnicalResearchAgent,
25 | )
26 | from maverick_mcp.api.routers.research import (
27 | ResearchRequest,
28 | comprehensive_research,
29 | get_research_agent,
30 | )
31 | from maverick_mcp.exceptions import WebSearchError
32 | from maverick_mcp.utils.parallel_research import (
33 | ParallelResearchConfig,
34 | ParallelResearchOrchestrator,
35 | ResearchResult,
36 | ResearchTask,
37 | TaskDistributionEngine,
38 | )
39 |
40 | # Test Data Factories and Fixtures
41 |
42 |
43 | @pytest.fixture
44 | def mock_llm():
45 | """Mock LLM with realistic response patterns for research scenarios."""
46 | llm = MagicMock()
47 | llm.ainvoke = AsyncMock()
48 |
49 | # Mock different response types for different research phases
50 | def mock_response_content(messages):
51 | """Generate realistic mock responses based on message content."""
52 | content = str(messages[-1].content).lower()
53 |
54 | if "synthesis" in content:
55 | return MagicMock(
56 | content='{"synthesis": "Comprehensive analysis shows positive outlook", "confidence": 0.8}'
57 | )
58 | elif "analyze" in content or "financial" in content:
59 | return MagicMock(
60 | content='{"KEY_INSIGHTS": ["Strong earnings growth", "Market share expansion"], "SENTIMENT": {"direction": "bullish", "confidence": 0.75}, "RISK_FACTORS": ["Interest rate sensitivity"], "OPPORTUNITIES": ["Market expansion"], "CREDIBILITY": 0.8, "RELEVANCE": 0.9, "SUMMARY": "Positive financial outlook"}'
61 | )
62 | else:
63 | return MagicMock(content="Analysis completed successfully")
64 |
65 | llm.ainvoke.side_effect = lambda messages, **kwargs: mock_response_content(messages)
66 | return llm
67 |
68 |
69 | @pytest.fixture
70 | def mock_exa_client():
71 | """Mock Exa client with realistic search responses."""
72 | mock_client = MagicMock(spec=Exa)
73 |
74 | def create_mock_result(title, text, url_suffix=""):
75 | """Create mock Exa result object."""
76 | result = MagicMock()
77 | result.url = f"https://example.com/{url_suffix}"
78 | result.title = title
79 | result.text = text
80 | result.published_date = "2024-01-15T10:00:00Z"
81 | result.score = 0.85
82 | result.author = "Financial Analyst"
83 | return result
84 |
85 | def mock_search_and_contents(query, num_results=5, **kwargs):
86 | """Generate mock search results based on query content."""
87 | response = MagicMock()
88 | results = []
89 |
90 | query_lower = query.lower()
91 |
92 | if "aapl" in query_lower or "apple" in query_lower:
93 | results.extend(
94 | [
95 | create_mock_result(
96 | "Apple Q4 Earnings Beat Expectations",
97 | "Apple reported strong quarterly earnings with iPhone sales growth of 15% and services revenue reaching new highs. The company's financial position remains robust with strong cash flow.",
98 | "apple-earnings",
99 | ),
100 | create_mock_result(
101 | "Apple Stock Technical Analysis",
102 | "Apple stock shows bullish technical patterns with support at $180 and resistance at $200. RSI indicates oversold conditions presenting buying opportunity.",
103 | "apple-technical",
104 | ),
105 | ]
106 | )
107 | elif "sentiment" in query_lower:
108 | results.extend(
109 | [
110 | create_mock_result(
111 | "Market Sentiment Turns Positive",
112 | "Investor sentiment shows improvement with increased confidence in tech sector. Analyst upgrades and positive earnings surprises drive optimism.",
113 | "market-sentiment",
114 | ),
115 | ]
116 | )
117 | elif "competitive" in query_lower or "industry" in query_lower:
118 | results.extend(
119 | [
120 | create_mock_result(
121 | "Tech Industry Competitive Landscape",
122 | "The technology sector shows fierce competition with market leaders maintaining strong positions. Innovation and market share battles intensify.",
123 | "competitive-analysis",
124 | ),
125 | ]
126 | )
127 | else:
128 | # Default financial research results
129 | results.extend(
130 | [
131 | create_mock_result(
132 | "Financial Market Analysis",
133 | "Current market conditions show mixed signals with growth prospects balanced against economic uncertainties. Investors remain cautiously optimistic.",
134 | "market-analysis",
135 | ),
136 | create_mock_result(
137 | "Investment Outlook 2024",
138 | "Investment opportunities emerge in technology and healthcare sectors despite ongoing market volatility. Diversification remains key strategy.",
139 | "investment-outlook",
140 | ),
141 | ]
142 | )
143 |
144 | # Limit results to requested number
145 | response.results = results[:num_results]
146 | return response
147 |
148 | mock_client.search_and_contents.side_effect = mock_search_and_contents
149 | return mock_client
150 |
151 |
152 | @pytest.fixture
153 | def sample_research_tasks():
154 | """Sample research tasks for parallel execution testing."""
155 | return [
156 | ResearchTask(
157 | task_id="session_123_fundamental",
158 | task_type="fundamental",
159 | target_topic="AAPL financial analysis",
160 | focus_areas=["earnings", "valuation", "growth"],
161 | priority=8,
162 | timeout=20,
163 | ),
164 | ResearchTask(
165 | task_id="session_123_technical",
166 | task_type="technical",
167 | target_topic="AAPL technical analysis",
168 | focus_areas=["chart_patterns", "support_resistance"],
169 | priority=7,
170 | timeout=15,
171 | ),
172 | ResearchTask(
173 | task_id="session_123_sentiment",
174 | task_type="sentiment",
175 | target_topic="AAPL market sentiment",
176 | focus_areas=["news_sentiment", "analyst_ratings"],
177 | priority=6,
178 | timeout=15,
179 | ),
180 | ]
181 |
182 |
183 | @pytest.fixture
184 | def mock_settings():
185 | """Mock settings with ExaSearch configuration."""
186 | settings = MagicMock()
187 | settings.research.exa_api_key = "test_exa_api_key"
188 | settings.data_limits.max_parallel_agents = 4
189 | settings.performance.search_timeout_failure_threshold = 12
190 | settings.performance.search_circuit_breaker_failure_threshold = 8
191 | settings.performance.search_circuit_breaker_recovery_timeout = 30
192 | return settings
193 |
194 |
195 | # ExaSearchProvider Tests
196 |
197 |
198 | class TestExaSearchProvider:
199 | """Test ExaSearch provider integration and functionality."""
200 |
201 | @pytest.mark.unit
202 | def test_exa_provider_initialization(self):
203 | """Test ExaSearchProvider initialization."""
204 | api_key = "test_api_key_123"
205 | provider = ExaSearchProvider(api_key)
206 |
207 | assert provider.api_key == api_key
208 | assert provider._api_key_verified is True
209 | assert provider.is_healthy() is True
210 | assert provider._failure_count == 0
211 |
212 | @pytest.mark.unit
213 | def test_exa_provider_initialization_without_key(self):
214 | """Test ExaSearchProvider initialization without API key."""
215 | provider = ExaSearchProvider("")
216 |
217 | assert provider.api_key == ""
218 | assert provider._api_key_verified is False
219 | assert provider.is_healthy() is True # Still healthy, but searches will fail
220 |
221 | @pytest.mark.unit
222 | def test_timeout_calculation(self):
223 | """Test adaptive timeout calculation for different query complexities."""
224 | provider = ExaSearchProvider("test_key")
225 |
226 | # Simple query
227 | timeout = provider._calculate_timeout("AAPL", None)
228 | assert timeout >= 4.0 # Minimum for Exa reliability
229 |
230 | # Complex query
231 | complex_query = "comprehensive analysis of Apple Inc financial performance and market position with competitive analysis"
232 | timeout_complex = provider._calculate_timeout(complex_query, None)
233 | assert timeout_complex >= timeout
234 |
235 | # Budget constrained query
236 | timeout_budget = provider._calculate_timeout("AAPL", 8.0)
237 | assert 4.0 <= timeout_budget <= 8.0
238 |
239 | @pytest.mark.unit
240 | def test_failure_recording_and_health_status(self):
241 | """Test failure recording and health status management."""
242 | provider = ExaSearchProvider("test_key")
243 |
244 | # Initially healthy
245 | assert provider.is_healthy() is True
246 |
247 | # Record several timeout failures
248 | for _ in range(5):
249 | provider._record_failure("timeout")
250 |
251 | assert provider._failure_count == 5
252 | assert provider.is_healthy() is True # Still healthy, threshold not reached
253 |
254 | # Exceed timeout threshold (default 12)
255 | for _ in range(8):
256 | provider._record_failure("timeout")
257 |
258 | assert provider._failure_count == 13
259 | assert provider.is_healthy() is False # Now unhealthy
260 |
261 | # Test recovery
262 | provider._record_success()
263 | assert provider.is_healthy() is True
264 | assert provider._failure_count == 0
265 |
266 | @pytest.mark.unit
267 | @patch("exa_py.Exa")
268 | async def test_exa_search_success(self, mock_exa_class, mock_exa_client):
269 | """Test successful ExaSearch operation."""
270 | mock_exa_class.return_value = mock_exa_client
271 | provider = ExaSearchProvider("test_key")
272 |
273 | results = await provider.search("AAPL financial analysis", num_results=3)
274 |
275 | assert len(results) >= 1
276 | assert all("url" in result for result in results)
277 | assert all("title" in result for result in results)
278 | assert all("content" in result for result in results)
279 | assert all(result["provider"] == "exa" for result in results)
280 |
281 | @pytest.mark.unit
282 | @patch("exa_py.Exa")
283 | async def test_exa_search_timeout(self, mock_exa_class):
284 | """Test ExaSearch timeout handling."""
285 | # Mock Exa client that takes too long
286 | mock_client = MagicMock()
287 |
288 | def slow_search(*args, **kwargs):
289 | import time
290 |
291 | time.sleep(10) # Simulate slow synchronous response
292 |
293 | mock_client.search_and_contents.side_effect = slow_search
294 | mock_exa_class.return_value = mock_client
295 |
296 | provider = ExaSearchProvider("test_key")
297 |
298 | with pytest.raises(WebSearchError, match="timed out"):
299 | await provider.search("test query", timeout_budget=2.0)
300 |
301 | # Check that failure was recorded
302 | assert not provider.is_healthy() or provider._failure_count > 0
303 |
304 | @pytest.mark.unit
305 | @patch("exa_py.Exa")
306 | async def test_exa_search_unhealthy_provider(self, mock_exa_class):
307 | """Test behavior when provider is marked as unhealthy."""
308 | provider = ExaSearchProvider("test_key")
309 | provider._is_healthy = False
310 |
311 | with pytest.raises(WebSearchError, match="disabled due to repeated failures"):
312 | await provider.search("test query")
313 |
314 |
315 | # DeepResearchAgent Tests
316 |
317 |
318 | class TestDeepResearchAgent:
319 | """Test DeepResearchAgent with ExaSearch integration."""
320 |
321 | @pytest.mark.unit
322 | @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
323 | async def test_agent_initialization_with_exa(self, mock_provider, mock_llm):
324 | """Test DeepResearchAgent initialization with ExaSearch provider."""
325 | mock_exa_provider = MagicMock(spec=ExaSearchProvider)
326 | mock_provider.return_value = mock_exa_provider
327 |
328 | agent = DeepResearchAgent(
329 | llm=mock_llm,
330 | persona="moderate",
331 | exa_api_key="test_key",
332 | research_depth="standard",
333 | )
334 |
335 | await agent.initialize()
336 |
337 | assert agent.search_providers == [mock_exa_provider]
338 | assert agent._search_providers_loaded is True
339 | assert agent.default_depth == "standard"
340 |
341 | @pytest.mark.unit
342 | @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
343 | async def test_agent_initialization_without_providers(
344 | self, mock_provider, mock_llm
345 | ):
346 | """Test agent behavior when no search providers are available."""
347 | mock_provider.return_value = None
348 |
349 | agent = DeepResearchAgent(
350 | llm=mock_llm,
351 | persona="moderate",
352 | exa_api_key=None,
353 | )
354 |
355 | await agent.initialize()
356 |
357 | assert agent.search_providers == []
358 | assert agent._search_providers_loaded is True
359 |
360 | @pytest.mark.unit
361 | @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
362 | async def test_research_comprehensive_no_providers(self, mock_provider, mock_llm):
363 | """Test research behavior when no search providers are configured."""
364 | mock_provider.return_value = None
365 |
366 | agent = DeepResearchAgent(llm=mock_llm, exa_api_key=None)
367 |
368 | result = await agent.research_comprehensive(
369 | topic="AAPL analysis", session_id="test_session", depth="basic"
370 | )
371 |
372 | assert "error" in result
373 | assert "no search providers configured" in result["error"]
374 | assert result["topic"] == "AAPL analysis"
375 |
376 | @pytest.mark.integration
377 | @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
378 | @patch("exa_py.Exa")
379 | async def test_research_comprehensive_success(
380 | self, mock_exa_class, mock_provider, mock_llm, mock_exa_client
381 | ):
382 | """Test successful comprehensive research with ExaSearch."""
383 | # Setup mocks
384 | mock_exa_provider = ExaSearchProvider("test_key")
385 | mock_provider.return_value = mock_exa_provider
386 | mock_exa_class.return_value = mock_exa_client
387 |
388 | agent = DeepResearchAgent(
389 | llm=mock_llm,
390 | persona="moderate",
391 | exa_api_key="test_key",
392 | research_depth="basic",
393 | )
394 |
395 | # Execute research
396 | result = await agent.research_comprehensive(
397 | topic="AAPL financial analysis",
398 | session_id="test_session_123",
399 | depth="basic",
400 | timeout_budget=15.0,
401 | )
402 |
403 | # Verify result structure
404 | assert result["status"] == "success"
405 | assert result["agent_type"] == "deep_research"
406 | assert result["research_topic"] == "AAPL financial analysis"
407 | assert result["research_depth"] == "basic"
408 | assert "findings" in result
409 | assert "confidence_score" in result
410 | assert "execution_time_ms" in result
411 |
412 | @pytest.mark.unit
413 | def test_research_depth_levels(self):
414 | """Test research depth level configurations."""
415 | assert "basic" in RESEARCH_DEPTH_LEVELS
416 | assert "standard" in RESEARCH_DEPTH_LEVELS
417 | assert "comprehensive" in RESEARCH_DEPTH_LEVELS
418 | assert "exhaustive" in RESEARCH_DEPTH_LEVELS
419 |
420 | # Verify basic level has minimal settings for speed
421 | basic = RESEARCH_DEPTH_LEVELS["basic"]
422 | assert basic["max_sources"] <= 5
423 | assert basic["max_searches"] <= 2
424 | assert basic["validation_required"] is False
425 |
426 | # Verify exhaustive has maximum settings
427 | exhaustive = RESEARCH_DEPTH_LEVELS["exhaustive"]
428 | assert exhaustive["max_sources"] >= 10
429 | assert exhaustive["validation_required"] is True
430 |
431 |
432 | # Specialized Subagent Tests
433 |
434 |
435 | class TestSpecializedSubagents:
436 | """Test specialized research subagents."""
437 |
438 | @pytest.fixture
439 | def mock_parent_agent(self, mock_llm):
440 | """Mock parent DeepResearchAgent for subagent testing."""
441 | agent = MagicMock()
442 | agent.llm = mock_llm
443 | agent.search_providers = [MagicMock(spec=ExaSearchProvider)]
444 | agent.content_analyzer = MagicMock(spec=ContentAnalyzer)
445 | agent.persona = MagicMock()
446 | agent.persona.name = "moderate"
447 | agent._calculate_source_credibility = MagicMock(return_value=0.8)
448 | return agent
449 |
450 | @pytest.mark.unit
451 | async def test_fundamental_research_agent(
452 | self, mock_parent_agent, sample_research_tasks
453 | ):
454 | """Test FundamentalResearchAgent execution."""
455 | task = sample_research_tasks[0] # fundamental task
456 | agent = FundamentalResearchAgent(mock_parent_agent)
457 |
458 | # Mock search results
459 | mock_search_results = [
460 | {
461 | "title": "AAPL Q4 Earnings Report",
462 | "url": "https://example.com/earnings",
463 | "content": "Apple reported strong quarterly earnings with revenue growth of 12%...",
464 | "published_date": "2024-01-15",
465 | }
466 | ]
467 |
468 | agent._perform_specialized_search = AsyncMock(return_value=mock_search_results)
469 | agent._analyze_search_results = AsyncMock(
470 | return_value=[
471 | {
472 | **mock_search_results[0],
473 | "analysis": {
474 | "insights": [
475 | "Strong earnings growth",
476 | "Revenue diversification",
477 | ],
478 | "risk_factors": ["Market competition"],
479 | "opportunities": ["Market expansion"],
480 | "sentiment": {"direction": "bullish", "confidence": 0.8},
481 | },
482 | "credibility_score": 0.8,
483 | }
484 | ]
485 | )
486 |
487 | result = await agent.execute_research(task)
488 |
489 | assert result["research_type"] == "fundamental"
490 | assert "insights" in result
491 | assert "risk_factors" in result
492 | assert "opportunities" in result
493 | assert "sentiment" in result
494 | assert "sources" in result
495 | assert len(result["focus_areas"]) > 0
496 | assert "earnings" in result["focus_areas"]
497 |
498 | @pytest.mark.unit
499 | async def test_technical_research_agent(
500 | self, mock_parent_agent, sample_research_tasks
501 | ):
502 | """Test TechnicalResearchAgent execution."""
503 | task = sample_research_tasks[1] # technical task
504 | agent = TechnicalResearchAgent(mock_parent_agent)
505 |
506 | # Mock search results with technical analysis
507 | mock_search_results = [
508 | {
509 | "title": "AAPL Technical Analysis",
510 | "url": "https://example.com/technical",
511 | "content": "AAPL shows bullish chart patterns with support at $180 and resistance at $200...",
512 | "published_date": "2024-01-15",
513 | }
514 | ]
515 |
516 | agent._perform_specialized_search = AsyncMock(return_value=mock_search_results)
517 | agent._analyze_search_results = AsyncMock(
518 | return_value=[
519 | {
520 | **mock_search_results[0],
521 | "analysis": {
522 | "insights": [
523 | "Bullish breakout pattern",
524 | "Strong support levels",
525 | ],
526 | "risk_factors": ["Overbought conditions"],
527 | "opportunities": ["Momentum continuation"],
528 | "sentiment": {"direction": "bullish", "confidence": 0.7},
529 | },
530 | "credibility_score": 0.7,
531 | }
532 | ]
533 | )
534 |
535 | result = await agent.execute_research(task)
536 |
537 | assert result["research_type"] == "technical"
538 | assert "price_action" in result["focus_areas"]
539 | assert "chart_patterns" in result["focus_areas"]
540 |
541 | @pytest.mark.unit
542 | async def test_sentiment_research_agent(
543 | self, mock_parent_agent, sample_research_tasks
544 | ):
545 | """Test SentimentResearchAgent execution."""
546 | task = sample_research_tasks[2] # sentiment task
547 | agent = SentimentResearchAgent(mock_parent_agent)
548 |
549 | # Mock search results with sentiment data
550 | mock_search_results = [
551 | {
552 | "title": "Apple Stock Sentiment Analysis",
553 | "url": "https://example.com/sentiment",
554 | "content": "Analyst sentiment remains positive on Apple with multiple upgrades...",
555 | "published_date": "2024-01-15",
556 | }
557 | ]
558 |
559 | agent._perform_specialized_search = AsyncMock(return_value=mock_search_results)
560 | agent._analyze_search_results = AsyncMock(
561 | return_value=[
562 | {
563 | **mock_search_results[0],
564 | "analysis": {
565 | "insights": ["Positive analyst sentiment", "Upgrade momentum"],
566 | "risk_factors": ["Market volatility concerns"],
567 | "opportunities": ["Institutional accumulation"],
568 | "sentiment": {"direction": "bullish", "confidence": 0.85},
569 | },
570 | "credibility_score": 0.9,
571 | }
572 | ]
573 | )
574 |
575 | result = await agent.execute_research(task)
576 |
577 | assert result["research_type"] == "sentiment"
578 | assert "market_sentiment" in result["focus_areas"]
579 | assert result["sentiment"]["direction"] == "bullish"
580 |
581 | @pytest.mark.unit
582 | async def test_competitive_research_agent(self, mock_parent_agent):
583 | """Test CompetitiveResearchAgent execution."""
584 | task = ResearchTask(
585 | task_id="test_competitive",
586 | task_type="competitive",
587 | target_topic="AAPL competitive analysis",
588 | focus_areas=["competitive_position", "market_share"],
589 | )
590 |
591 | agent = CompetitiveResearchAgent(mock_parent_agent)
592 |
593 | # Mock search results with competitive data
594 | mock_search_results = [
595 | {
596 | "title": "Apple vs Samsung Market Share",
597 | "url": "https://example.com/competitive",
598 | "content": "Apple maintains strong competitive position in premium smartphone market...",
599 | "published_date": "2024-01-15",
600 | }
601 | ]
602 |
603 | agent._perform_specialized_search = AsyncMock(return_value=mock_search_results)
604 | agent._analyze_search_results = AsyncMock(
605 | return_value=[
606 | {
607 | **mock_search_results[0],
608 | "analysis": {
609 | "insights": [
610 | "Strong market position",
611 | "Premium segment dominance",
612 | ],
613 | "risk_factors": ["Android competition"],
614 | "opportunities": ["Emerging markets"],
615 | "sentiment": {"direction": "bullish", "confidence": 0.75},
616 | },
617 | "credibility_score": 0.8,
618 | }
619 | ]
620 | )
621 |
622 | result = await agent.execute_research(task)
623 |
624 | assert result["research_type"] == "competitive"
625 | assert "competitive_position" in result["focus_areas"]
626 | assert "industry_trends" in result["focus_areas"]
627 |
628 |
629 | # Parallel Research Tests
630 |
631 |
632 | class TestParallelResearchOrchestrator:
633 | """Test parallel research execution and orchestration."""
634 |
635 | @pytest.mark.unit
636 | def test_orchestrator_initialization(self):
637 | """Test ParallelResearchOrchestrator initialization."""
638 | config = ParallelResearchConfig(max_concurrent_agents=6, timeout_per_agent=20)
639 | orchestrator = ParallelResearchOrchestrator(config)
640 |
641 | assert orchestrator.config.max_concurrent_agents == 6
642 | assert orchestrator.config.timeout_per_agent == 20
643 | assert orchestrator._semaphore._value == 6 # Semaphore initialized correctly
644 |
645 | @pytest.mark.unit
646 | async def test_task_preparation(self, sample_research_tasks):
647 | """Test task preparation and prioritization."""
648 | orchestrator = ParallelResearchOrchestrator()
649 |
650 | prepared_tasks = await orchestrator._prepare_tasks(sample_research_tasks)
651 |
652 | # Should be sorted by priority (descending)
653 | assert prepared_tasks[0].priority >= prepared_tasks[1].priority
654 |
655 | # All tasks should have timeouts set
656 | for task in prepared_tasks:
657 | assert task.timeout is not None
658 | assert task.status == "pending"
659 | assert task.task_id in orchestrator.active_tasks
660 |
661 | @pytest.mark.integration
662 | async def test_parallel_execution_success(self, sample_research_tasks):
663 | """Test successful parallel execution of research tasks."""
664 | orchestrator = ParallelResearchOrchestrator(
665 | ParallelResearchConfig(max_concurrent_agents=3, timeout_per_agent=10)
666 | )
667 |
668 | # Mock research executor
669 | async def mock_executor(task):
670 | """Mock research executor that simulates successful execution."""
671 | await asyncio.sleep(0.1) # Simulate work
672 | return {
673 | "research_type": task.task_type,
674 | "insights": [
675 | f"{task.task_type} insight 1",
676 | f"{task.task_type} insight 2",
677 | ],
678 | "sentiment": {"direction": "bullish", "confidence": 0.8},
679 | "sources": [
680 | {"title": f"{task.task_type} source", "url": "https://example.com"}
681 | ],
682 | }
683 |
684 | # Mock synthesis callback
685 | async def mock_synthesis(task_results):
686 | return {
687 | "synthesis": f"Synthesized results from {len(task_results)} tasks",
688 | "confidence_score": 0.8,
689 | }
690 |
691 | result = await orchestrator.execute_parallel_research(
692 | tasks=sample_research_tasks,
693 | research_executor=mock_executor,
694 | synthesis_callback=mock_synthesis,
695 | )
696 |
697 | assert isinstance(result, ResearchResult)
698 | assert result.successful_tasks == len(sample_research_tasks)
699 | assert result.failed_tasks == 0
700 | assert result.parallel_efficiency > 1.0 # Should be faster than sequential
701 | assert result.synthesis is not None
702 | assert "synthesis" in result.synthesis
703 |
704 | @pytest.mark.unit
705 | async def test_parallel_execution_with_failures(self, sample_research_tasks):
706 | """Test parallel execution with some task failures."""
707 | orchestrator = ParallelResearchOrchestrator()
708 |
709 | # Mock research executor that fails for certain task types
710 | async def mock_executor_with_failures(task):
711 | if task.task_type == "technical":
712 | raise TimeoutError("Task timed out")
713 | elif task.task_type == "sentiment":
714 | raise Exception("Network error")
715 | else:
716 | return {"research_type": task.task_type, "insights": ["Success"]}
717 |
718 | result = await orchestrator.execute_parallel_research(
719 | tasks=sample_research_tasks,
720 | research_executor=mock_executor_with_failures,
721 | )
722 |
723 | assert result.successful_tasks == 1 # Only fundamental should succeed
724 | assert result.failed_tasks == 2
725 |
726 | # Check that failed tasks have error information
727 | failed_tasks = [
728 | task for task in result.task_results.values() if task.status == "failed"
729 | ]
730 | assert len(failed_tasks) == 2
731 | for task in failed_tasks:
732 | assert task.error is not None
733 |
734 | @pytest.mark.unit
735 | async def test_circuit_breaker_integration(self, sample_research_tasks):
736 | """Test circuit breaker integration in parallel execution."""
737 | orchestrator = ParallelResearchOrchestrator()
738 |
739 | # Mock executor that consistently fails
740 | failure_count = 0
741 |
742 | async def failing_executor(task):
743 | nonlocal failure_count
744 | failure_count += 1
745 | raise Exception(f"Failure {failure_count}")
746 |
747 | result = await orchestrator.execute_parallel_research(
748 | tasks=sample_research_tasks,
749 | research_executor=failing_executor,
750 | )
751 |
752 | # All tasks should fail
753 | assert result.failed_tasks == len(sample_research_tasks)
754 | assert result.successful_tasks == 0
755 |
756 |
757 | class TestTaskDistributionEngine:
758 | """Test intelligent task distribution for research topics."""
759 |
760 | @pytest.mark.unit
761 | def test_topic_relevance_analysis(self):
762 | """Test topic relevance analysis for different task types."""
763 | engine = TaskDistributionEngine()
764 |
765 | # Test financial topic
766 | relevance = engine._analyze_topic_relevance(
767 | "apple earnings financial performance",
768 | focus_areas=["fundamentals", "financials"],
769 | )
770 |
771 | assert "fundamental" in relevance
772 | assert "technical" in relevance
773 | assert "sentiment" in relevance
774 | assert "competitive" in relevance
775 |
776 | # Fundamental should have highest relevance for earnings query
777 | assert relevance["fundamental"] > relevance["technical"]
778 | assert relevance["fundamental"] > relevance["competitive"]
779 |
780 | @pytest.mark.unit
781 | def test_task_distribution_basic(self):
782 | """Test basic task distribution for a research topic."""
783 | engine = TaskDistributionEngine()
784 |
785 | tasks = engine.distribute_research_tasks(
786 | topic="AAPL financial analysis and market outlook",
787 | session_id="test_session",
788 | focus_areas=["fundamentals", "technical_analysis"],
789 | )
790 |
791 | assert len(tasks) > 0
792 |
793 | # Should have variety of task types
794 | task_types = {task.task_type for task in tasks}
795 | assert "fundamental" in task_types # High relevance for financial analysis
796 |
797 | # Tasks should be properly configured
798 | for task in tasks:
799 | assert task.session_id == "test_session"
800 | assert task.target_topic == "AAPL financial analysis and market outlook"
801 | assert task.priority > 0
802 | assert len(task.focus_areas) > 0
803 |
804 | @pytest.mark.unit
805 | def test_task_distribution_fallback(self):
806 | """Test task distribution fallback when no relevant tasks found."""
807 | engine = TaskDistributionEngine()
808 |
809 | # Mock the relevance analysis to return very low scores
810 | with patch.object(
811 | engine,
812 | "_analyze_topic_relevance",
813 | return_value={
814 | "fundamental": 0.1,
815 | "technical": 0.1,
816 | "sentiment": 0.1,
817 | "competitive": 0.1,
818 | },
819 | ):
820 | tasks = engine.distribute_research_tasks(
821 | topic="obscure topic with no clear relevance",
822 | session_id="test_session",
823 | )
824 |
825 | # Should still create at least one task (fallback)
826 | assert len(tasks) >= 1
827 |
828 | # Fallback should be fundamental analysis
829 | assert any(task.task_type == "fundamental" for task in tasks)
830 |
831 | @pytest.mark.unit
832 | def test_task_priority_assignment(self):
833 | """Test priority assignment based on relevance scores."""
834 | engine = TaskDistributionEngine()
835 |
836 | tasks = engine.distribute_research_tasks(
837 | topic="AAPL fundamental analysis earnings valuation",
838 | session_id="test_session",
839 | )
840 |
841 | # Find fundamental task (should have high priority)
842 | fundamental_tasks = [t for t in tasks if t.task_type == "fundamental"]
843 | if fundamental_tasks:
844 | fundamental_task = fundamental_tasks[0]
845 | assert fundamental_task.priority >= 7 # Should be high priority
846 |
847 |
848 | # Timeout and Circuit Breaker Tests
849 |
850 |
851 | class TestTimeoutAndCircuitBreaker:
852 | """Test timeout handling and circuit breaker patterns."""
853 |
854 | @pytest.mark.unit
855 | async def test_timeout_budget_allocation(self, mock_llm):
856 | """Test timeout budget allocation across research phases."""
857 | agent = DeepResearchAgent(llm=mock_llm, exa_api_key="test_key")
858 |
859 | # Test basic timeout allocation
860 | timeout_budget = 20.0
861 | result = await agent.research_comprehensive(
862 | topic="test topic",
863 | session_id="test_session",
864 | depth="basic",
865 | timeout_budget=timeout_budget,
866 | )
867 |
868 | # Should either complete or timeout gracefully
869 | assert "status" in result or "error" in result
870 |
871 | # If timeout occurred, should have appropriate error structure
872 | if result.get("status") == "error" or "error" in result:
873 | # Should be a timeout-related error for very short budget
874 | assert (
875 | "timeout" in str(result).lower()
876 | or "search providers" in str(result).lower()
877 | )
878 |
879 | @pytest.mark.unit
880 | def test_provider_health_monitoring(self):
881 | """Test search provider health monitoring and recovery."""
882 | provider = ExaSearchProvider("test_key")
883 |
884 | # Initially healthy
885 | assert provider.is_healthy()
886 |
887 | # Simulate multiple timeout failures
888 | for _i in range(15): # Exceed default threshold of 12
889 | provider._record_failure("timeout")
890 |
891 | # Should be marked unhealthy
892 | assert not provider.is_healthy()
893 |
894 | # Recovery after success
895 | provider._record_success()
896 | assert provider.is_healthy()
897 | assert provider._failure_count == 0
898 |
899 | @pytest.mark.integration
900 | @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
901 | async def test_research_with_provider_failures(self, mock_provider, mock_llm):
902 | """Test research behavior when provider failures occur."""
903 | # Create a provider that will fail
904 | failing_provider = MagicMock(spec=ExaSearchProvider)
905 | failing_provider.is_healthy.return_value = True
906 | failing_provider.search = AsyncMock(side_effect=WebSearchError("Search failed"))
907 |
908 | mock_provider.return_value = failing_provider
909 |
910 | agent = DeepResearchAgent(llm=mock_llm, exa_api_key="test_key")
911 |
912 | result = await agent.research_comprehensive(
913 | topic="test topic",
914 | session_id="test_session",
915 | depth="basic",
916 | )
917 |
918 | # Should handle provider failure gracefully
919 | assert "status" in result
920 | # May succeed with fallback or fail gracefully
921 |
922 |
923 | # Performance and Benchmarking Tests
924 |
925 |
926 | class TestPerformanceBenchmarks:
927 | """Test performance across different research depths and configurations."""
928 |
929 | @pytest.mark.slow
930 | @pytest.mark.integration
931 | @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
932 | @patch("exa_py.Exa")
933 | async def test_research_depth_performance(
934 | self, mock_exa_class, mock_provider, mock_llm, mock_exa_client
935 | ):
936 | """Benchmark performance across different research depths."""
937 | mock_provider.return_value = ExaSearchProvider("test_key")
938 | mock_exa_class.return_value = mock_exa_client
939 |
940 | performance_results = {}
941 |
942 | for depth in ["basic", "standard", "comprehensive"]:
943 | agent = DeepResearchAgent(
944 | llm=mock_llm,
945 | exa_api_key="test_key",
946 | research_depth=depth,
947 | )
948 |
949 | start_time = time.time()
950 |
951 | result = await agent.research_comprehensive(
952 | topic="AAPL financial analysis",
953 | session_id=f"perf_test_{depth}",
954 | depth=depth,
955 | timeout_budget=30.0,
956 | )
957 |
958 | execution_time = time.time() - start_time
959 | performance_results[depth] = {
960 | "execution_time": execution_time,
961 | "success": result.get("status") == "success",
962 | "sources_analyzed": result.get("sources_analyzed", 0),
963 | }
964 |
965 | # Verify performance characteristics
966 | assert (
967 | performance_results["basic"]["execution_time"]
968 | <= performance_results["comprehensive"]["execution_time"]
969 | )
970 |
971 | # Basic should be fastest
972 | if performance_results["basic"]["success"]:
973 | assert (
974 | performance_results["basic"]["execution_time"] < 15.0
975 | ) # Should be fast
976 |
977 | @pytest.mark.slow
978 | async def test_parallel_vs_sequential_performance(self, sample_research_tasks):
979 | """Compare parallel vs sequential execution performance."""
980 | config = ParallelResearchConfig(max_concurrent_agents=4, timeout_per_agent=10)
981 | orchestrator = ParallelResearchOrchestrator(config)
982 |
983 | async def mock_executor(task):
984 | await asyncio.sleep(1) # Simulate 1 second work per task
985 | return {"research_type": task.task_type, "insights": ["Mock insight"]}
986 |
987 | # Parallel execution
988 | start_time = time.time()
989 | parallel_result = await orchestrator.execute_parallel_research(
990 | tasks=sample_research_tasks,
991 | research_executor=mock_executor,
992 | )
993 | parallel_time = time.time() - start_time
994 |
995 | # Sequential simulation
996 | start_time = time.time()
997 | for task in sample_research_tasks:
998 | await mock_executor(task)
999 | sequential_time = time.time() - start_time
1000 |
1001 | # Parallel should be significantly faster
1002 | assert parallel_result.parallel_efficiency > 1.5 # At least 50% improvement
1003 | assert parallel_time < sequential_time * 0.7 # Should be at least 30% faster
1004 |
1005 | @pytest.mark.unit
1006 | async def test_memory_usage_monitoring(self, sample_research_tasks):
1007 | """Test memory usage stays reasonable during parallel execution."""
1008 | import os
1009 |
1010 | import psutil
1011 |
1012 | process = psutil.Process(os.getpid())
1013 | initial_memory = process.memory_info().rss / 1024 / 1024 # MB
1014 |
1015 | config = ParallelResearchConfig(max_concurrent_agents=4)
1016 | orchestrator = ParallelResearchOrchestrator(config)
1017 |
1018 | async def mock_executor(task):
1019 | # Create some data but not excessive
1020 | data = {"results": ["data"] * 1000} # Small amount of data
1021 | await asyncio.sleep(0.1)
1022 | return data
1023 |
1024 | await orchestrator.execute_parallel_research(
1025 | tasks=sample_research_tasks * 5, # More tasks to test scaling
1026 | research_executor=mock_executor,
1027 | )
1028 |
1029 | final_memory = process.memory_info().rss / 1024 / 1024 # MB
1030 | memory_growth = final_memory - initial_memory
1031 |
1032 | # Memory growth should be reasonable (less than 100MB for test)
1033 | assert memory_growth < 100, f"Memory grew by {memory_growth:.1f}MB"
1034 |
1035 |
1036 | # MCP Integration Tests
1037 |
1038 |
1039 | class TestMCPIntegration:
1040 | """Test MCP tool endpoints and research router integration."""
1041 |
1042 | @pytest.mark.integration
1043 | @patch("maverick_mcp.api.routers.research.get_settings")
1044 | async def test_comprehensive_research_mcp_tool(self, mock_settings):
1045 | """Test the comprehensive research MCP tool endpoint."""
1046 | mock_settings.return_value.research.exa_api_key = "test_key"
1047 |
1048 | result = await comprehensive_research(
1049 | query="AAPL financial analysis",
1050 | persona="moderate",
1051 | research_scope="basic",
1052 | max_sources=5,
1053 | timeframe="1m",
1054 | )
1055 |
1056 | # Should return structured response
1057 | assert isinstance(result, dict)
1058 | assert "success" in result
1059 |
1060 | # If successful, should have proper structure
1061 | if result.get("success"):
1062 | assert "research_results" in result
1063 | assert "research_metadata" in result
1064 | assert "request_id" in result
1065 | assert "timestamp" in result
1066 |
1067 | @pytest.mark.unit
1068 | @patch("maverick_mcp.api.routers.research.get_settings")
1069 | async def test_research_without_exa_key(self, mock_settings):
1070 | """Test research behavior without ExaSearch API key."""
1071 | mock_settings.return_value.research.exa_api_key = None
1072 |
1073 | result = await comprehensive_research(
1074 | query="test query",
1075 | persona="moderate",
1076 | research_scope="basic",
1077 | )
1078 |
1079 | assert result["success"] is False
1080 | assert "Exa search provider not configured" in result["error"]
1081 | assert "setup_instructions" in result["details"]
1082 |
1083 | @pytest.mark.unit
1084 | def test_research_request_validation(self):
1085 | """Test ResearchRequest model validation."""
1086 | # Valid request
1087 | request = ResearchRequest(
1088 | query="AAPL analysis",
1089 | persona="moderate",
1090 | research_scope="standard",
1091 | max_sources=15,
1092 | timeframe="1m",
1093 | )
1094 |
1095 | assert request.query == "AAPL analysis"
1096 | assert request.persona == "moderate"
1097 | assert request.research_scope == "standard"
1098 | assert request.max_sources == 15
1099 | assert request.timeframe == "1m"
1100 |
1101 | # Test defaults
1102 | minimal_request = ResearchRequest(query="test")
1103 | assert minimal_request.persona == "moderate"
1104 | assert minimal_request.research_scope == "standard"
1105 | assert minimal_request.max_sources == 10
1106 | assert minimal_request.timeframe == "1m"
1107 |
1108 | @pytest.mark.unit
1109 | def test_get_research_agent_optimization(self):
1110 | """Test research agent creation with optimization parameters."""
1111 | # Test optimized agent creation
1112 | agent = get_research_agent(
1113 | query="complex financial analysis of multiple companies",
1114 | research_scope="comprehensive",
1115 | timeout_budget=25.0,
1116 | max_sources=20,
1117 | )
1118 |
1119 | assert isinstance(agent, DeepResearchAgent)
1120 | assert agent.max_sources <= 20 # Should respect or optimize max sources
1121 | assert agent.default_depth in [
1122 | "basic",
1123 | "standard",
1124 | "comprehensive",
1125 | "exhaustive",
1126 | ]
1127 |
1128 | # Test standard agent creation
1129 | standard_agent = get_research_agent()
1130 | assert isinstance(standard_agent, DeepResearchAgent)
1131 |
1132 |
1133 | # Content Analysis Tests
1134 |
1135 |
1136 | class TestContentAnalyzer:
1137 | """Test AI-powered content analysis functionality."""
1138 |
1139 | @pytest.mark.unit
1140 | async def test_content_analysis_success(self, mock_llm):
1141 | """Test successful content analysis."""
1142 | analyzer = ContentAnalyzer(mock_llm)
1143 |
1144 | content = "Apple reported strong quarterly earnings with revenue growth of 12% and expanding market share in the services segment."
1145 |
1146 | result = await analyzer.analyze_content(
1147 | content=content, persona="moderate", analysis_focus="financial"
1148 | )
1149 |
1150 | assert "insights" in result
1151 | assert "sentiment" in result
1152 | assert "risk_factors" in result
1153 | assert "opportunities" in result
1154 | assert "credibility_score" in result
1155 | assert "relevance_score" in result
1156 | assert "summary" in result
1157 | assert "analysis_timestamp" in result
1158 |
1159 | @pytest.mark.unit
1160 | async def test_content_analysis_fallback(self, mock_llm):
1161 | """Test content analysis fallback when AI analysis fails."""
1162 | analyzer = ContentAnalyzer(mock_llm)
1163 |
1164 | # Make LLM fail
1165 | mock_llm.ainvoke.side_effect = Exception("LLM error")
1166 |
1167 | result = await analyzer.analyze_content(
1168 | content="Test content", persona="moderate"
1169 | )
1170 |
1171 | # Should fall back to keyword-based analysis
1172 | assert result["fallback_used"] is True
1173 | assert "sentiment" in result
1174 | assert result["sentiment"]["direction"] in ["bullish", "bearish", "neutral"]
1175 |
1176 | @pytest.mark.unit
1177 | async def test_batch_content_analysis(self, mock_llm):
1178 | """Test batch content analysis functionality."""
1179 | analyzer = ContentAnalyzer(mock_llm)
1180 |
1181 | content_items = [
1182 | ("Apple shows strong growth", "source1"),
1183 | ("Market conditions remain volatile", "source2"),
1184 | ("Tech sector outlook positive", "source3"),
1185 | ]
1186 |
1187 | results = await analyzer.analyze_content_batch(
1188 | content_items=content_items, persona="moderate", analysis_focus="general"
1189 | )
1190 |
1191 | assert len(results) == len(content_items)
1192 | for i, result in enumerate(results):
1193 | assert result["source_identifier"] == f"source{i + 1}"
1194 | assert result["batch_processed"] is True
1195 | assert "sentiment" in result
1196 |
1197 |
1198 | # Error Handling and Edge Cases
1199 |
1200 |
1201 | class TestErrorHandlingAndEdgeCases:
1202 | """Test comprehensive error handling and edge cases."""
1203 |
1204 | @pytest.mark.unit
1205 | async def test_empty_search_results(self, mock_llm):
1206 | """Test behavior when search returns no results."""
1207 | provider = ExaSearchProvider("test_key")
1208 |
1209 | with patch("exa_py.Exa") as mock_exa:
1210 | # Mock empty results
1211 | mock_client = MagicMock()
1212 | mock_response = MagicMock()
1213 | mock_response.results = []
1214 | mock_client.search_and_contents.return_value = mock_response
1215 | mock_exa.return_value = mock_client
1216 |
1217 | results = await provider.search("nonexistent topic", num_results=5)
1218 |
1219 | assert results == []
1220 |
1221 | @pytest.mark.unit
1222 | async def test_malformed_search_response(self, mock_llm):
1223 | """Test handling of malformed search responses."""
1224 | provider = ExaSearchProvider("test_key")
1225 |
1226 | with patch("exa_py.Exa") as mock_exa:
1227 | # Mock malformed response
1228 | mock_client = MagicMock()
1229 | mock_client.search_and_contents.side_effect = Exception(
1230 | "Invalid response format"
1231 | )
1232 | mock_exa.return_value = mock_client
1233 |
1234 | with pytest.raises(WebSearchError):
1235 | await provider.search("test query")
1236 |
1237 | @pytest.mark.unit
1238 | async def test_network_timeout_recovery(self):
1239 | """Test network timeout recovery mechanisms."""
1240 | provider = ExaSearchProvider("test_key")
1241 |
1242 | # Simulate multiple timeouts followed by success
1243 | with patch("exa_py.Exa") as mock_exa:
1244 | call_count = 0
1245 |
1246 | async def mock_search_with_recovery(*args, **kwargs):
1247 | nonlocal call_count
1248 | call_count += 1
1249 | if call_count <= 2:
1250 | raise TimeoutError("Network timeout")
1251 | else:
1252 | # Success on third try
1253 | mock_response = MagicMock()
1254 | mock_result = MagicMock()
1255 | mock_result.url = "https://example.com"
1256 | mock_result.title = "Test Result"
1257 | mock_result.text = "Test content"
1258 | mock_result.published_date = "2024-01-15"
1259 | mock_result.score = 0.8
1260 | mock_response.results = [mock_result]
1261 | return mock_response
1262 |
1263 | mock_client = MagicMock()
1264 | mock_client.search_and_contents.side_effect = mock_search_with_recovery
1265 | mock_exa.return_value = mock_client
1266 |
1267 | # First two calls should fail and record failures
1268 | for _ in range(2):
1269 | with pytest.raises(WebSearchError):
1270 | await provider.search("test query", timeout_budget=1.0)
1271 |
1272 | # Provider should still be healthy (failures recorded but not exceeded threshold)
1273 | assert provider._failure_count == 2
1274 |
1275 | # Third call should succeed and reset failure count
1276 | results = await provider.search("test query")
1277 | assert len(results) > 0
1278 | assert provider._failure_count == 0 # Reset on success
1279 |
1280 | @pytest.mark.unit
1281 | async def test_concurrent_request_limits(self, sample_research_tasks):
1282 | """Test that concurrent request limits are respected."""
1283 | config = ParallelResearchConfig(max_concurrent_agents=2) # Very low limit
1284 | orchestrator = ParallelResearchOrchestrator(config)
1285 |
1286 | execution_times = []
1287 |
1288 | async def tracking_executor(task):
1289 | start = time.time()
1290 | await asyncio.sleep(0.5) # Simulate work
1291 | end = time.time()
1292 | execution_times.append((start, end))
1293 | return {"result": "success"}
1294 |
1295 | await orchestrator.execute_parallel_research(
1296 | tasks=sample_research_tasks, # 3 tasks
1297 | research_executor=tracking_executor,
1298 | )
1299 |
1300 | # With max_concurrent_agents=2, the third task should start after one of the first two finishes
1301 | # This means there should be overlap but not all three running simultaneously
1302 | assert len(execution_times) == 3
1303 |
1304 | # Sort by start time
1305 | execution_times.sort()
1306 |
1307 | # The third task should start after the first task finishes
1308 | # (allowing for some timing tolerance)
1309 | third_start = execution_times[2][0]
1310 | first_end = execution_times[0][1]
1311 |
1312 | # Third should start after first ends (with small tolerance for async timing)
1313 | assert third_start >= (first_end - 0.1)
1314 |
1315 |
1316 | # Integration Test Suite
1317 |
1318 |
1319 | class TestFullIntegrationScenarios:
1320 | """End-to-end integration tests for complete research workflows."""
1321 |
1322 | @pytest.mark.integration
1323 | @pytest.mark.slow
1324 | @patch("maverick_mcp.agents.deep_research.get_cached_search_provider")
1325 | @patch("exa_py.Exa")
1326 | async def test_complete_research_workflow(
1327 | self, mock_exa_class, mock_provider, mock_llm, mock_exa_client
1328 | ):
1329 | """Test complete research workflow from query to final report."""
1330 | # Setup comprehensive mocks
1331 | mock_provider.return_value = ExaSearchProvider("test_key")
1332 | mock_exa_class.return_value = mock_exa_client
1333 |
1334 | agent = DeepResearchAgent(
1335 | llm=mock_llm,
1336 | persona="moderate",
1337 | exa_api_key="test_key",
1338 | research_depth="standard",
1339 | enable_parallel_execution=True,
1340 | )
1341 |
1342 | # Execute complete research workflow
1343 | result = await agent.research_comprehensive(
1344 | topic="Apple Inc (AAPL) investment analysis with market sentiment and competitive position",
1345 | session_id="integration_test_session",
1346 | depth="standard",
1347 | focus_areas=["fundamentals", "market_sentiment", "competitive_analysis"],
1348 | timeframe="1m",
1349 | use_parallel_execution=True,
1350 | )
1351 |
1352 | # Verify comprehensive result structure
1353 | if result.get("status") == "success":
1354 | assert "findings" in result
1355 | assert "confidence_score" in result
1356 | assert isinstance(result["confidence_score"], int | float)
1357 | assert 0.0 <= result["confidence_score"] <= 1.0
1358 | assert "citations" in result
1359 | assert "execution_time_ms" in result
1360 |
1361 | # Check for parallel execution indicators
1362 | if "parallel_execution_stats" in result:
1363 | assert "successful_tasks" in result["parallel_execution_stats"]
1364 | assert "parallel_efficiency" in result["parallel_execution_stats"]
1365 |
1366 | # Should handle both success and controlled failure scenarios
1367 | assert "status" in result or "error" in result
1368 |
1369 | @pytest.mark.integration
1370 | async def test_multi_persona_consistency(self, mock_llm, mock_exa_client):
1371 | """Test research consistency across different investor personas."""
1372 | personas = ["conservative", "moderate", "aggressive", "day_trader"]
1373 | results = {}
1374 |
1375 | for persona in personas:
1376 | with (
1377 | patch(
1378 | "maverick_mcp.agents.deep_research.get_cached_search_provider"
1379 | ) as mock_provider,
1380 | patch("exa_py.Exa") as mock_exa_class,
1381 | ):
1382 | mock_provider.return_value = ExaSearchProvider("test_key")
1383 | mock_exa_class.return_value = mock_exa_client
1384 |
1385 | agent = DeepResearchAgent(
1386 | llm=mock_llm,
1387 | persona=persona,
1388 | exa_api_key="test_key",
1389 | research_depth="basic",
1390 | )
1391 |
1392 | result = await agent.research_comprehensive(
1393 | topic="AAPL investment outlook",
1394 | session_id=f"persona_test_{persona}",
1395 | depth="basic",
1396 | )
1397 |
1398 | results[persona] = result
1399 |
1400 | # All personas should provide valid responses
1401 | for persona, result in results.items():
1402 | assert isinstance(result, dict), f"Invalid result for {persona}"
1403 | # Should have some form of result (success or controlled failure)
1404 | assert "status" in result or "error" in result or "success" in result
1405 |
1406 |
1407 | if __name__ == "__main__":
1408 | # Run specific test categories based on markers
1409 | pytest.main(
1410 | [
1411 | __file__,
1412 | "-v",
1413 | "--tb=short",
1414 | "-m",
1415 | "unit", # Run unit tests by default
1416 | ]
1417 | )
1418 |
```
--------------------------------------------------------------------------------
/maverick_mcp/utils/llm_optimization.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | LLM-side optimizations for research agents to prevent timeouts.
3 |
4 | This module provides comprehensive optimization strategies including:
5 | - Adaptive model selection based on time constraints
6 | - Progressive token budgeting with confidence tracking
7 | - Parallel LLM processing with intelligent load balancing
8 | - Optimized prompt engineering for speed
9 | - Early termination based on confidence thresholds
10 | - Content filtering to reduce processing overhead
11 | """
12 |
13 | import asyncio
14 | import logging
15 | import re
16 | import time
17 | from datetime import datetime
18 | from enum import Enum
19 | from typing import Any
20 |
21 | from langchain_core.messages import HumanMessage, SystemMessage
22 | from pydantic import BaseModel, Field
23 |
24 | from maverick_mcp.providers.openrouter_provider import (
25 | OpenRouterProvider,
26 | TaskType,
27 | )
28 | from maverick_mcp.utils.orchestration_logging import (
29 | get_orchestration_logger,
30 | log_method_call,
31 | )
32 |
33 | logger = logging.getLogger(__name__)
34 |
35 |
36 | class ResearchPhase(str, Enum):
37 | """Research phases for token allocation."""
38 |
39 | SEARCH = "search"
40 | CONTENT_ANALYSIS = "content_analysis"
41 | SYNTHESIS = "synthesis"
42 | VALIDATION = "validation"
43 |
44 |
45 | class ModelConfiguration(BaseModel):
46 | """Configuration for model selection with time optimization."""
47 |
48 | model_id: str = Field(description="OpenRouter model identifier")
49 | max_tokens: int = Field(description="Maximum output tokens")
50 | temperature: float = Field(description="Model temperature")
51 | timeout_seconds: float = Field(description="Request timeout")
52 | parallel_batch_size: int = Field(
53 | default=1, description="Sources per batch for this model"
54 | )
55 |
56 |
57 | class TokenAllocation(BaseModel):
58 | """Token allocation for a research phase."""
59 |
60 | input_tokens: int = Field(description="Maximum input tokens")
61 | output_tokens: int = Field(description="Maximum output tokens")
62 | per_source_tokens: int = Field(description="Tokens per source")
63 | emergency_reserve: int = Field(description="Emergency reserve tokens")
64 | timeout_seconds: float = Field(description="Processing timeout")
65 |
66 |
67 | class AdaptiveModelSelector:
68 | """Intelligent model selection based on time budgets and task complexity."""
69 |
70 | def __init__(self, openrouter_provider: OpenRouterProvider):
71 | self.provider = openrouter_provider
72 | self.performance_cache = {} # Cache model performance metrics
73 |
74 | def select_model_for_time_budget(
75 | self,
76 | task_type: TaskType,
77 | time_remaining_seconds: float,
78 | complexity_score: float,
79 | content_size_tokens: int,
80 | confidence_threshold: float = 0.8,
81 | current_confidence: float = 0.0,
82 | ) -> ModelConfiguration:
83 | """Select optimal model based on available time and requirements."""
84 |
85 | # Time pressure categories with adaptive thresholds
86 | if time_remaining_seconds < 10:
87 | return self._select_emergency_model(task_type, content_size_tokens)
88 | elif time_remaining_seconds < 25:
89 | return self._select_fast_quality_model(task_type, complexity_score)
90 | elif time_remaining_seconds < 45:
91 | return self._select_balanced_model(
92 | task_type, complexity_score, current_confidence
93 | )
94 | else:
95 | return self._select_optimal_model(
96 | task_type, complexity_score, confidence_threshold
97 | )
98 |
99 | def _select_emergency_model(
100 | self, task_type: TaskType, content_size: int
101 | ) -> ModelConfiguration:
102 | """Ultra-fast models for time-critical situations."""
103 | # OPTIMIZATION: Prioritize speed with increased batch sizes
104 | if content_size > 20000: # Large content needs fast + capable models
105 | return ModelConfiguration(
106 | model_id="google/gemini-2.5-flash", # 199 tokens/sec - fastest available
107 | max_tokens=min(800, content_size // 25), # Adaptive token limit
108 | temperature=0.05, # OPTIMIZATION: Minimal temp for deterministic fast response
109 | timeout_seconds=5, # OPTIMIZATION: Reduced from 8s
110 | parallel_batch_size=8, # OPTIMIZATION: Doubled for faster processing
111 | )
112 | else:
113 | return ModelConfiguration(
114 | model_id="openai/gpt-4o-mini", # 126 tokens/sec - excellent speed/cost balance
115 | max_tokens=min(500, content_size // 20),
116 | temperature=0.03, # OPTIMIZATION: Near-zero for fastest response
117 | timeout_seconds=4, # OPTIMIZATION: Reduced from 6s
118 | parallel_batch_size=10, # OPTIMIZATION: Doubled for maximum parallelism
119 | )
120 |
121 | def _select_fast_quality_model(
122 | self, task_type: TaskType, complexity_score: float
123 | ) -> ModelConfiguration:
124 | """Balance speed and quality for time-constrained situations."""
125 | if complexity_score > 0.7 or task_type == TaskType.COMPLEX_REASONING:
126 | # Complex tasks - use fast model with good quality
127 | return ModelConfiguration(
128 | model_id="openai/gpt-4o-mini", # 126 tokens/sec + good quality
129 | max_tokens=1200,
130 | temperature=0.1, # OPTIMIZATION: Reduced for faster response
131 | timeout_seconds=10, # OPTIMIZATION: Reduced from 18s
132 | parallel_batch_size=6, # OPTIMIZATION: Doubled for better parallelism
133 | )
134 | else:
135 | # Simple tasks - use the fastest model available
136 | return ModelConfiguration(
137 | model_id="google/gemini-2.5-flash", # 199 tokens/sec - fastest
138 | max_tokens=1000,
139 | temperature=0.1, # OPTIMIZATION: Reduced for faster response
140 | timeout_seconds=8, # OPTIMIZATION: Reduced from 12s
141 | parallel_batch_size=8, # OPTIMIZATION: Doubled for maximum speed
142 | )
143 |
144 | def _select_balanced_model(
145 | self, task_type: TaskType, complexity_score: float, current_confidence: float
146 | ) -> ModelConfiguration:
147 | """Standard mode with cost-effectiveness focus."""
148 | # If confidence is already high, use fastest models for validation
149 | if current_confidence > 0.7:
150 | return ModelConfiguration(
151 | model_id="google/gemini-2.5-flash", # 199 tokens/sec - fastest validation
152 | max_tokens=1500,
153 | temperature=0.25,
154 | timeout_seconds=20, # Reduced for fastest model
155 | parallel_batch_size=4, # Increased for speed
156 | )
157 |
158 | # Standard balanced approach - prioritize speed-optimized models
159 | if task_type in [TaskType.DEEP_RESEARCH, TaskType.RESULT_SYNTHESIS]:
160 | return ModelConfiguration(
161 | model_id="openai/gpt-4o-mini", # Speed + quality balance for research
162 | max_tokens=2000,
163 | temperature=0.3,
164 | timeout_seconds=25, # Reduced for faster model
165 | parallel_batch_size=3, # Increased for speed
166 | )
167 | else:
168 | return ModelConfiguration(
169 | model_id="google/gemini-2.5-flash", # Fastest for general tasks
170 | max_tokens=1500,
171 | temperature=0.25,
172 | timeout_seconds=20, # Reduced for fastest model
173 | parallel_batch_size=4, # Increased for speed
174 | )
175 |
176 | def _select_optimal_model(
177 | self, task_type: TaskType, complexity_score: float, confidence_threshold: float
178 | ) -> ModelConfiguration:
179 | """Comprehensive mode for complex analysis."""
180 | # Use premium models for the most complex tasks when time allows
181 | if complexity_score > 0.8 and task_type == TaskType.DEEP_RESEARCH:
182 | return ModelConfiguration(
183 | model_id="google/gemini-2.5-pro",
184 | max_tokens=3000,
185 | temperature=0.3,
186 | timeout_seconds=45,
187 | parallel_batch_size=1, # Deep thinking models work better individually
188 | )
189 |
190 | # High-quality cost-effective models for standard comprehensive analysis
191 | return ModelConfiguration(
192 | model_id="anthropic/claude-sonnet-4",
193 | max_tokens=2500,
194 | temperature=0.3,
195 | timeout_seconds=40,
196 | parallel_batch_size=2,
197 | )
198 |
199 | def calculate_task_complexity(
200 | self, content: str, task_type: TaskType, focus_areas: list[str] | None = None
201 | ) -> float:
202 | """Calculate complexity score based on content and task requirements."""
203 | if not content:
204 | return 0.3 # Default low complexity
205 |
206 | content_lower = content.lower()
207 |
208 | # Financial complexity indicators
209 | complexity_indicators = {
210 | "financial_jargon": len(
211 | re.findall(
212 | r"\b(?:ebitda|dcf|roic?|wacc|beta|volatility|sharpe)\b",
213 | content_lower,
214 | )
215 | ),
216 | "numerical_data": len(re.findall(r"\$?[\d,]+\.?\d*[%kmbKMB]?", content)),
217 | "comparative_analysis": len(
218 | re.findall(
219 | r"\b(?:versus|compared|relative|outperform|underperform)\b",
220 | content_lower,
221 | )
222 | ),
223 | "temporal_analysis": len(
224 | re.findall(r"\b(?:quarterly|q[1-4]|fy|yoy|qoq|annual)\b", content_lower)
225 | ),
226 | "market_terms": len(
227 | re.findall(
228 | r"\b(?:bullish|bearish|catalyst|headwind|tailwind)\b", content_lower
229 | )
230 | ),
231 | "technical_terms": len(
232 | re.findall(
233 | r"\b(?:support|resistance|breakout|rsi|macd|sma|ema)\b",
234 | content_lower,
235 | )
236 | ),
237 | }
238 |
239 | # Calculate base complexity
240 | total_indicators = sum(complexity_indicators.values())
241 | content_length = len(content.split())
242 | base_complexity = min(total_indicators / max(content_length / 100, 1), 1.0)
243 |
244 | # Task-specific complexity adjustments
245 | task_multipliers = {
246 | TaskType.DEEP_RESEARCH: 1.4,
247 | TaskType.COMPLEX_REASONING: 1.6,
248 | TaskType.RESULT_SYNTHESIS: 1.2,
249 | TaskType.TECHNICAL_ANALYSIS: 1.3,
250 | TaskType.SENTIMENT_ANALYSIS: 0.8,
251 | TaskType.QUICK_ANSWER: 0.5,
252 | }
253 |
254 | # Focus area adjustments
255 | focus_multiplier = 1.0
256 | if focus_areas:
257 | complex_focus_areas = [
258 | "competitive_analysis",
259 | "fundamental_analysis",
260 | "complex_reasoning",
261 | ]
262 | if any(area in focus_areas for area in complex_focus_areas):
263 | focus_multiplier = 1.2
264 |
265 | final_complexity = (
266 | base_complexity * task_multipliers.get(task_type, 1.0) * focus_multiplier
267 | )
268 | return min(final_complexity, 1.0)
269 |
270 |
271 | class ProgressiveTokenBudgeter:
272 | """Manages token budgets across research phases with time awareness."""
273 |
274 | def __init__(
275 | self, total_time_budget_seconds: float, confidence_target: float = 0.75
276 | ):
277 | self.total_time_budget = total_time_budget_seconds
278 | self.confidence_target = confidence_target
279 | self.phase_budgets = self._calculate_base_phase_budgets()
280 | self.time_started = time.time()
281 |
282 | def _calculate_base_phase_budgets(self) -> dict[ResearchPhase, int]:
283 | """Calculate base token budgets for each research phase."""
284 | # Allocate tokens based on typical phase requirements
285 | if self.total_time_budget < 30:
286 | # Emergency mode - minimal tokens
287 | return {
288 | ResearchPhase.SEARCH: 500,
289 | ResearchPhase.CONTENT_ANALYSIS: 2000,
290 | ResearchPhase.SYNTHESIS: 800,
291 | ResearchPhase.VALIDATION: 300,
292 | }
293 | elif self.total_time_budget < 60:
294 | # Fast mode
295 | return {
296 | ResearchPhase.SEARCH: 1000,
297 | ResearchPhase.CONTENT_ANALYSIS: 4000,
298 | ResearchPhase.SYNTHESIS: 1500,
299 | ResearchPhase.VALIDATION: 500,
300 | }
301 | else:
302 | # Standard mode
303 | return {
304 | ResearchPhase.SEARCH: 1500,
305 | ResearchPhase.CONTENT_ANALYSIS: 6000,
306 | ResearchPhase.SYNTHESIS: 2500,
307 | ResearchPhase.VALIDATION: 1000,
308 | }
309 |
310 | def allocate_tokens_for_phase(
311 | self,
312 | phase: ResearchPhase,
313 | sources_count: int,
314 | current_confidence: float,
315 | complexity_score: float = 0.5,
316 | ) -> TokenAllocation:
317 | """Allocate tokens for a research phase based on current state."""
318 |
319 | time_elapsed = time.time() - self.time_started
320 | time_remaining = max(0, self.total_time_budget - time_elapsed)
321 |
322 | base_budget = self.phase_budgets[phase]
323 |
324 | # Confidence-based scaling
325 | if current_confidence > self.confidence_target:
326 | # High confidence - focus on validation with fewer tokens
327 | confidence_multiplier = 0.7
328 | elif current_confidence < 0.4:
329 | # Low confidence - increase token usage if time allows
330 | confidence_multiplier = 1.3 if time_remaining > 30 else 0.9
331 | else:
332 | confidence_multiplier = 1.0
333 |
334 | # Time pressure scaling
335 | time_multiplier = self._calculate_time_multiplier(time_remaining)
336 |
337 | # Complexity scaling
338 | complexity_multiplier = 0.8 + (complexity_score * 0.4) # Range: 0.8 to 1.2
339 |
340 | # Source count scaling (diminishing returns)
341 | if sources_count > 0:
342 | source_multiplier = min(1.0 + (sources_count - 3) * 0.05, 1.3)
343 | else:
344 | source_multiplier = 1.0
345 |
346 | # Calculate final budget
347 | final_budget = int(
348 | base_budget
349 | * confidence_multiplier
350 | * time_multiplier
351 | * complexity_multiplier
352 | * source_multiplier
353 | )
354 |
355 | # Calculate timeout based on available time and token budget
356 | base_timeout = min(time_remaining * 0.8, 45) # Max 45 seconds per phase
357 | adjusted_timeout = base_timeout * (final_budget / base_budget) ** 0.5
358 |
359 | return TokenAllocation(
360 | input_tokens=min(int(final_budget * 0.75), 15000), # Cap input tokens
361 | output_tokens=min(int(final_budget * 0.25), 3000), # Cap output tokens
362 | per_source_tokens=final_budget // max(sources_count, 1)
363 | if sources_count > 0
364 | else final_budget,
365 | emergency_reserve=200, # Always keep emergency reserve
366 | timeout_seconds=max(adjusted_timeout, 5), # Minimum 5 seconds
367 | )
368 |
369 | def get_next_allocation(
370 | self,
371 | sources_remaining: int,
372 | current_confidence: float,
373 | time_elapsed_seconds: float,
374 | ) -> dict[str, Any]:
375 | """Get the next token allocation for processing sources."""
376 | time_remaining = max(0, self.total_time_budget - time_elapsed_seconds)
377 |
378 | # Determine priority based on confidence and time pressure
379 | if current_confidence < 0.4 and time_remaining > 30:
380 | priority = "high"
381 | elif current_confidence < 0.6 and time_remaining > 15:
382 | priority = "medium"
383 | else:
384 | priority = "low"
385 |
386 | # Calculate time budget per remaining source
387 | if sources_remaining > 0:
388 | time_per_source = time_remaining / sources_remaining
389 | else:
390 | time_per_source = 0
391 |
392 | # Calculate token budget
393 | base_tokens = self.phase_budgets.get(ResearchPhase.CONTENT_ANALYSIS, 2000)
394 |
395 | if priority == "high":
396 | max_tokens = min(int(base_tokens * 1.2), 4000)
397 | elif priority == "medium":
398 | max_tokens = base_tokens
399 | else:
400 | max_tokens = int(base_tokens * 0.8)
401 |
402 | return {
403 | "time_budget": min(time_per_source, 30.0), # Cap at 30 seconds
404 | "max_tokens": max_tokens,
405 | "priority": priority,
406 | "sources_remaining": sources_remaining,
407 | }
408 |
409 | def _calculate_time_multiplier(self, time_remaining: float) -> float:
410 | """Scale token budget based on time pressure."""
411 | if time_remaining < 5:
412 | return 0.2 # Extreme emergency mode
413 | elif time_remaining < 15:
414 | return 0.4 # Emergency mode
415 | elif time_remaining < 30:
416 | return 0.7 # Time-constrained
417 | elif time_remaining < 60:
418 | return 0.9 # Slightly reduced
419 | else:
420 | return 1.0 # Full budget available
421 |
422 |
423 | class ParallelLLMProcessor:
424 | """Handles parallel LLM operations with intelligent load balancing."""
425 |
426 | def __init__(
427 | self,
428 | openrouter_provider: OpenRouterProvider,
429 | max_concurrent: int = 5, # OPTIMIZATION: Increased from 3
430 | ):
431 | self.provider = openrouter_provider
432 | self.max_concurrent = max_concurrent
433 | self.semaphore = asyncio.BoundedSemaphore(
434 | max_concurrent
435 | ) # OPTIMIZATION: Use BoundedSemaphore
436 | self.model_selector = AdaptiveModelSelector(openrouter_provider)
437 | self.orchestration_logger = get_orchestration_logger("ParallelLLMProcessor")
438 | # OPTIMIZATION: Track active requests for better coordination
439 | self._active_requests = 0
440 | self._request_lock = asyncio.Lock()
441 |
442 | @log_method_call(component="ParallelLLMProcessor", include_timing=True)
443 | async def parallel_content_analysis(
444 | self,
445 | sources: list[dict],
446 | analysis_type: str,
447 | persona: str,
448 | time_budget_seconds: float,
449 | current_confidence: float = 0.0,
450 | ) -> list[dict]:
451 | """Analyze multiple sources in parallel with adaptive optimization."""
452 |
453 | if not sources:
454 | return []
455 |
456 | self.orchestration_logger.set_request_context(
457 | analysis_type=analysis_type,
458 | source_count=len(sources),
459 | time_budget=time_budget_seconds,
460 | )
461 |
462 | # Calculate complexity for all sources
463 | combined_content = "\n".join(
464 | [source.get("content", "")[:1000] for source in sources[:5]]
465 | )
466 | overall_complexity = self.model_selector.calculate_task_complexity(
467 | combined_content,
468 | TaskType.SENTIMENT_ANALYSIS
469 | if analysis_type == "sentiment"
470 | else TaskType.MARKET_ANALYSIS,
471 | )
472 |
473 | # Determine optimal batching strategy
474 | model_config = self.model_selector.select_model_for_time_budget(
475 | task_type=TaskType.SENTIMENT_ANALYSIS
476 | if analysis_type == "sentiment"
477 | else TaskType.MARKET_ANALYSIS,
478 | time_remaining_seconds=time_budget_seconds,
479 | complexity_score=overall_complexity,
480 | content_size_tokens=len(combined_content) // 4,
481 | current_confidence=current_confidence,
482 | )
483 |
484 | # Create batches based on model configuration
485 | batches = self._create_optimal_batches(
486 | sources, model_config.parallel_batch_size
487 | )
488 |
489 | self.orchestration_logger.info(
490 | "🔄 PARALLEL_ANALYSIS_START",
491 | total_sources=len(sources),
492 | batch_count=len(batches),
493 | )
494 |
495 | # OPTIMIZATION: Process batches using create_task for immediate parallelism
496 | running_tasks = []
497 | for i, batch in enumerate(batches):
498 | # Create task immediately without awaiting
499 | task_future = asyncio.create_task(
500 | self._analyze_source_batch(
501 | batch=batch,
502 | batch_id=i,
503 | analysis_type=analysis_type,
504 | persona=persona,
505 | model_config=model_config,
506 | overall_complexity=overall_complexity,
507 | )
508 | )
509 | running_tasks.append((i, task_future)) # Track batch ID with future
510 |
511 | # OPTIMIZATION: Minimal stagger to prevent API overload
512 | if i < len(batches) - 1: # Don't delay after last batch
513 | await asyncio.sleep(0.01) # 10ms micro-delay
514 |
515 | # OPTIMIZATION: Use as_completed for progressive result handling
516 | batch_results = [None] * len(batches) # Pre-allocate results list
517 | timeout_at = time.time() + (time_budget_seconds * 0.9)
518 |
519 | try:
520 | for batch_id, task_future in running_tasks:
521 | remaining_time = timeout_at - time.time()
522 | if remaining_time <= 0:
523 | raise TimeoutError()
524 |
525 | try:
526 | result = await asyncio.wait_for(task_future, timeout=remaining_time)
527 | batch_results[batch_id] = result
528 | except Exception as e:
529 | batch_results[batch_id] = e
530 | except TimeoutError:
531 | self.orchestration_logger.warning(
532 | "⏰ PARALLEL_ANALYSIS_TIMEOUT", timeout=time_budget_seconds
533 | )
534 | return self._create_fallback_results(sources)
535 |
536 | # Flatten and process results
537 | final_results = []
538 | successful_batches = 0
539 | for i, batch_result in enumerate(batch_results):
540 | if isinstance(batch_result, Exception):
541 | self.orchestration_logger.warning(
542 | "⚠️ BATCH_FAILED", batch_id=i, error=str(batch_result)
543 | )
544 | # Add fallback results for failed batch
545 | final_results.extend(self._create_fallback_results(batches[i]))
546 | else:
547 | final_results.extend(batch_result)
548 | successful_batches += 1
549 |
550 | self.orchestration_logger.info(
551 | "✅ PARALLEL_ANALYSIS_COMPLETE",
552 | successful_batches=successful_batches,
553 | results_count=len(final_results),
554 | )
555 |
556 | return final_results
557 |
558 | def _create_optimal_batches(
559 | self, sources: list[dict], batch_size: int
560 | ) -> list[list[dict]]:
561 | """Create optimal batches for parallel processing."""
562 | if batch_size <= 1:
563 | return [[source] for source in sources]
564 |
565 | batches = []
566 | for i in range(0, len(sources), batch_size):
567 | batch = sources[i : i + batch_size]
568 | batches.append(batch)
569 |
570 | return batches
571 |
572 | async def _analyze_source_batch(
573 | self,
574 | batch: list[dict],
575 | batch_id: int,
576 | analysis_type: str,
577 | persona: str,
578 | model_config: ModelConfiguration,
579 | overall_complexity: float,
580 | ) -> list[dict]:
581 | """Analyze a batch of sources with optimized LLM call."""
582 |
583 | # OPTIMIZATION: Track active requests for better coordination
584 | async with self._request_lock:
585 | self._active_requests += 1
586 |
587 | try:
588 | # OPTIMIZATION: Acquire semaphore without blocking other task creation
589 | await self.semaphore.acquire()
590 | try:
591 | # Create batch analysis prompt
592 | batch_prompt = self._create_batch_analysis_prompt(
593 | batch, analysis_type, persona, model_config.max_tokens
594 | )
595 |
596 | # Get LLM instance
597 | llm = self.provider.get_llm(
598 | model_override=model_config.model_id,
599 | temperature=model_config.temperature,
600 | max_tokens=model_config.max_tokens,
601 | )
602 |
603 | # Execute with timeout
604 | start_time = time.time()
605 | result = await asyncio.wait_for(
606 | llm.ainvoke(
607 | [
608 | SystemMessage(
609 | content="You are a financial analyst. Provide structured, concise analysis."
610 | ),
611 | HumanMessage(content=batch_prompt),
612 | ]
613 | ),
614 | timeout=model_config.timeout_seconds,
615 | )
616 |
617 | execution_time = time.time() - start_time
618 |
619 | # Parse batch results
620 | parsed_results = self._parse_batch_analysis_result(
621 | result.content, batch
622 | )
623 |
624 | self.orchestration_logger.debug(
625 | "✨ BATCH_SUCCESS",
626 | batch_id=batch_id,
627 | duration=f"{execution_time:.2f}s",
628 | )
629 |
630 | return parsed_results
631 |
632 | except TimeoutError:
633 | self.orchestration_logger.warning(
634 | "⏰ BATCH_TIMEOUT",
635 | batch_id=batch_id,
636 | timeout=model_config.timeout_seconds,
637 | )
638 | return self._create_fallback_results(batch)
639 | except Exception as e:
640 | self.orchestration_logger.error(
641 | "💥 BATCH_ERROR", batch_id=batch_id, error=str(e)
642 | )
643 | return self._create_fallback_results(batch)
644 | finally:
645 | # OPTIMIZATION: Always release semaphore
646 | self.semaphore.release()
647 | finally:
648 | # OPTIMIZATION: Track active requests
649 | async with self._request_lock:
650 | self._active_requests -= 1
651 |
652 | def _create_batch_analysis_prompt(
653 | self, batch: list[dict], analysis_type: str, persona: str, max_tokens: int
654 | ) -> str:
655 | """Create optimized prompt for batch analysis."""
656 |
657 | # Determine prompt style based on token budget
658 | if max_tokens < 800:
659 | style = "ultra_concise"
660 | elif max_tokens < 1500:
661 | style = "concise"
662 | else:
663 | style = "detailed"
664 |
665 | prompt_templates = {
666 | "ultra_concise": """URGENT BATCH ANALYSIS - {analysis_type} for {persona} investor.
667 |
668 | Analyze {source_count} sources. For EACH source, provide:
669 | SOURCE_N: SENTIMENT:Bull/Bear/Neutral|CONFIDENCE:0-1|INSIGHT:one key point|RISK:main risk
670 |
671 | {sources}
672 |
673 | Keep total response under 500 words.""",
674 | "concise": """BATCH ANALYSIS - {analysis_type} for {persona} investor perspective.
675 |
676 | Analyze these {source_count} sources. For each source provide:
677 | - Sentiment: Bull/Bear/Neutral + confidence (0-1)
678 | - Key insight (1 sentence)
679 | - Main risk (1 sentence)
680 | - Relevance score (0-1)
681 |
682 | {sources}
683 |
684 | Format consistently. Target ~100 words per source.""",
685 | "detailed": """Comprehensive {analysis_type} analysis for {persona} investor.
686 |
687 | Analyze these {source_count} sources with structured output for each:
688 |
689 | {sources}
690 |
691 | For each source provide:
692 | 1. Sentiment (direction, confidence 0-1, brief reasoning)
693 | 2. Key insights (2-3 main points)
694 | 3. Risk factors (1-2 key risks)
695 | 4. Opportunities (1-2 opportunities if any)
696 | 5. Credibility assessment (0-1 score)
697 | 6. Relevance score (0-1)
698 |
699 | Maintain {persona} investor perspective throughout.""",
700 | }
701 |
702 | # Format sources for prompt
703 | sources_text = ""
704 | for i, source in enumerate(batch, 1):
705 | content = source.get("content", "")[:1500] # Limit content length
706 | title = source.get("title", f"Source {i}")
707 | sources_text += f"\nSOURCE {i} - {title}:\n{content}\n{'---' * 20}\n"
708 |
709 | template = prompt_templates[style]
710 | return template.format(
711 | analysis_type=analysis_type,
712 | persona=persona,
713 | source_count=len(batch),
714 | sources=sources_text.strip(),
715 | )
716 |
717 | def _parse_batch_analysis_result(
718 | self, result_content: str, batch: list[dict]
719 | ) -> list[dict]:
720 | """Parse LLM batch analysis result into structured data."""
721 |
722 | results = []
723 |
724 | # Try structured parsing first
725 | source_sections = re.split(r"\n(?:SOURCE\s+\d+|---+)", result_content)
726 |
727 | if len(source_sections) >= len(batch):
728 | # Structured parsing successful
729 | for _i, (source, section) in enumerate(
730 | zip(batch, source_sections[1 : len(batch) + 1], strict=False)
731 | ):
732 | parsed = self._parse_source_analysis(section, source)
733 | results.append(parsed)
734 | else:
735 | # Fallback to simple parsing
736 | for i, source in enumerate(batch):
737 | fallback_analysis = self._create_simple_fallback_analysis(
738 | result_content, source, i
739 | )
740 | results.append(fallback_analysis)
741 |
742 | return results
743 |
744 | def _parse_source_analysis(self, analysis_text: str, source: dict) -> dict:
745 | """Parse analysis text for a single source."""
746 |
747 | # Extract sentiment
748 | sentiment_match = re.search(
749 | r"sentiment:?\s*(\w+)[,\s]*(?:confidence:?\s*([\d.]+))?",
750 | analysis_text.lower(),
751 | )
752 | if sentiment_match:
753 | direction = sentiment_match.group(1).lower()
754 | confidence = float(sentiment_match.group(2) or 0.5)
755 |
756 | # Map common sentiment terms
757 | if direction in ["bull", "bullish", "positive"]:
758 | direction = "bullish"
759 | elif direction in ["bear", "bearish", "negative"]:
760 | direction = "bearish"
761 | else:
762 | direction = "neutral"
763 | else:
764 | direction = "neutral"
765 | confidence = 0.5
766 |
767 | # Extract other information
768 | insights = self._extract_insights(analysis_text)
769 | risks = self._extract_risks(analysis_text)
770 | opportunities = self._extract_opportunities(analysis_text)
771 |
772 | # Extract scores
773 | relevance_match = re.search(r"relevance:?\s*([\d.]+)", analysis_text.lower())
774 | relevance_score = float(relevance_match.group(1)) if relevance_match else 0.6
775 |
776 | credibility_match = re.search(
777 | r"credibility:?\s*([\d.]+)", analysis_text.lower()
778 | )
779 | credibility_score = (
780 | float(credibility_match.group(1)) if credibility_match else 0.7
781 | )
782 |
783 | return {
784 | **source,
785 | "analysis": {
786 | "insights": insights,
787 | "sentiment": {"direction": direction, "confidence": confidence},
788 | "risk_factors": risks,
789 | "opportunities": opportunities,
790 | "credibility_score": credibility_score,
791 | "relevance_score": relevance_score,
792 | "analysis_timestamp": datetime.now(),
793 | "batch_processed": True,
794 | },
795 | }
796 |
797 | def _extract_insights(self, text: str) -> list[str]:
798 | """Extract insights from analysis text."""
799 | insights = []
800 |
801 | # Look for insight patterns
802 | insight_patterns = [
803 | r"insight:?\s*([^.\n]+)",
804 | r"key point:?\s*([^.\n]+)",
805 | r"main finding:?\s*([^.\n]+)",
806 | ]
807 |
808 | for pattern in insight_patterns:
809 | matches = re.findall(pattern, text, re.IGNORECASE)
810 | insights.extend([m.strip() for m in matches if m.strip()])
811 |
812 | # If no structured insights found, extract bullet points
813 | if not insights:
814 | bullet_matches = re.findall(r"[•\-\*]\s*([^.\n]+)", text)
815 | insights.extend([m.strip() for m in bullet_matches if m.strip()][:3])
816 |
817 | return insights[:5] # Limit to 5 insights
818 |
819 | def _extract_risks(self, text: str) -> list[str]:
820 | """Extract risk factors from analysis text."""
821 | risk_patterns = [
822 | r"risk:?\s*([^.\n]+)",
823 | r"concern:?\s*([^.\n]+)",
824 | r"headwind:?\s*([^.\n]+)",
825 | ]
826 |
827 | risks = []
828 | for pattern in risk_patterns:
829 | matches = re.findall(pattern, text, re.IGNORECASE)
830 | risks.extend([m.strip() for m in matches if m.strip()])
831 |
832 | return risks[:3]
833 |
834 | def _extract_opportunities(self, text: str) -> list[str]:
835 | """Extract opportunities from analysis text."""
836 | opp_patterns = [
837 | r"opportunit(?:y|ies):?\s*([^.\n]+)",
838 | r"catalyst:?\s*([^.\n]+)",
839 | r"tailwind:?\s*([^.\n]+)",
840 | ]
841 |
842 | opportunities = []
843 | for pattern in opp_patterns:
844 | matches = re.findall(pattern, text, re.IGNORECASE)
845 | opportunities.extend([m.strip() for m in matches if m.strip()])
846 |
847 | return opportunities[:3]
848 |
849 | def _create_simple_fallback_analysis(
850 | self, full_analysis: str, source: dict, index: int
851 | ) -> dict:
852 | """Create simple fallback analysis when parsing fails."""
853 |
854 | # Basic sentiment analysis from text
855 | analysis_lower = full_analysis.lower()
856 |
857 | positive_words = ["positive", "bullish", "strong", "growth", "opportunity"]
858 | negative_words = ["negative", "bearish", "weak", "decline", "risk"]
859 |
860 | pos_count = sum(1 for word in positive_words if word in analysis_lower)
861 | neg_count = sum(1 for word in negative_words if word in analysis_lower)
862 |
863 | if pos_count > neg_count:
864 | sentiment = "bullish"
865 | confidence = 0.6
866 | elif neg_count > pos_count:
867 | sentiment = "bearish"
868 | confidence = 0.6
869 | else:
870 | sentiment = "neutral"
871 | confidence = 0.5
872 |
873 | return {
874 | **source,
875 | "analysis": {
876 | "insights": [f"Analysis based on source content (index {index})"],
877 | "sentiment": {"direction": sentiment, "confidence": confidence},
878 | "risk_factors": ["Unable to extract specific risks"],
879 | "opportunities": ["Unable to extract specific opportunities"],
880 | "credibility_score": 0.5,
881 | "relevance_score": 0.5,
882 | "analysis_timestamp": datetime.now(),
883 | "fallback_used": True,
884 | "batch_processed": True,
885 | },
886 | }
887 |
888 | def _create_fallback_results(self, sources: list[dict]) -> list[dict]:
889 | """Create fallback results when batch processing fails."""
890 | results = []
891 | for source in sources:
892 | fallback_result = {
893 | **source,
894 | "analysis": {
895 | "insights": ["Analysis failed - using fallback"],
896 | "sentiment": {"direction": "neutral", "confidence": 0.3},
897 | "risk_factors": ["Analysis timeout - unable to assess risks"],
898 | "opportunities": [],
899 | "credibility_score": 0.5,
900 | "relevance_score": 0.5,
901 | "analysis_timestamp": datetime.now(),
902 | "fallback_used": True,
903 | "batch_timeout": True,
904 | },
905 | }
906 | results.append(fallback_result)
907 | return results
908 |
909 |
910 | class OptimizedPromptEngine:
911 | """Creates optimized prompts for different time constraints and confidence levels."""
912 |
913 | def __init__(self):
914 | self.prompt_cache = {} # Cache for generated prompts
915 |
916 | self.prompt_templates = {
917 | "emergency": {
918 | "content_analysis": """URGENT: Quick 3-point analysis of financial content for {persona} investor.
919 |
920 | Content: {content}
921 |
922 | Provide ONLY:
923 | 1. SENTIMENT: Bull/Bear/Neutral + confidence (0-1)
924 | 2. KEY_RISK: Primary risk factor
925 | 3. KEY_OPPORTUNITY: Main opportunity (if any)
926 |
927 | Format: SENTIMENT:Bull|0.8 KEY_RISK:Market volatility KEY_OPPORTUNITY:Earnings growth
928 | Max 50 words total. No explanations.""",
929 | "synthesis": """URGENT: 2-sentence summary from {source_count} sources for {persona} investor.
930 |
931 | Key findings: {key_points}
932 |
933 | Provide: 1) Overall sentiment direction 2) Primary investment implication
934 | Max 40 words total.""",
935 | },
936 | "fast": {
937 | "content_analysis": """Quick financial analysis for {persona} investor - 5 points max.
938 |
939 | Content: {content}
940 |
941 | Provide concisely:
942 | • Sentiment: Bull/Bear/Neutral (confidence 0-1)
943 | • Key insight (1 sentence)
944 | • Main risk (1 sentence)
945 | • Main opportunity (1 sentence)
946 | • Relevance score (0-1)
947 |
948 | Target: Under 150 words total.""",
949 | "synthesis": """Synthesize research findings for {persona} investor.
950 |
951 | Sources: {source_count} | Key insights: {insights}
952 |
953 | 4-part summary:
954 | 1. Overall sentiment + confidence
955 | 2. Top 2 opportunities
956 | 3. Top 2 risks
957 | 4. Recommended action
958 |
959 | Limit: 200 words max.""",
960 | },
961 | "standard": {
962 | "content_analysis": """Financial content analysis for {persona} investor.
963 |
964 | Content: {content}
965 | Focus areas: {focus_areas}
966 |
967 | Structured analysis:
968 | - Sentiment (direction, confidence 0-1, brief reasoning)
969 | - Key insights (3-5 bullet points)
970 | - Risk factors (2-3 main risks)
971 | - Opportunities (2-3 opportunities)
972 | - Credibility assessment (0-1)
973 | - Relevance score (0-1)
974 |
975 | Target: 300-500 words.""",
976 | "synthesis": """Comprehensive research synthesis for {persona} investor.
977 |
978 | Research Summary:
979 | - Sources analyzed: {source_count}
980 | - Key insights: {insights}
981 | - Time horizon: {time_horizon}
982 |
983 | Provide detailed analysis:
984 | 1. Executive Summary (2-3 sentences)
985 | 2. Key Findings (5-7 bullet points)
986 | 3. Investment Implications
987 | 4. Risk Assessment
988 | 5. Recommended Actions
989 | 6. Confidence Level + reasoning
990 |
991 | Tailor specifically for {persona} investment characteristics.""",
992 | },
993 | }
994 |
995 | def get_optimized_prompt(
996 | self,
997 | prompt_type: str,
998 | time_remaining: float,
999 | confidence_level: float,
1000 | **context,
1001 | ) -> str:
1002 | """Generate optimized prompt based on time constraints and confidence."""
1003 |
1004 | # Create cache key
1005 | cache_key = f"{prompt_type}_{time_remaining:.0f}_{confidence_level:.1f}_{hash(str(sorted(context.items())))}"
1006 |
1007 | if cache_key in self.prompt_cache:
1008 | return self.prompt_cache[cache_key]
1009 |
1010 | # Select template based on time pressure
1011 | if time_remaining < 15:
1012 | template_category = "emergency"
1013 | elif time_remaining < 45:
1014 | template_category = "fast"
1015 | else:
1016 | template_category = "standard"
1017 |
1018 | template = self.prompt_templates[template_category].get(prompt_type)
1019 |
1020 | if not template:
1021 | # Fallback to fast template
1022 | template = self.prompt_templates["fast"].get(
1023 | prompt_type, "Analyze the content quickly and provide key insights."
1024 | )
1025 |
1026 | # Add confidence-based instructions
1027 | confidence_instructions = ""
1028 | if confidence_level > 0.7:
1029 | confidence_instructions = "\n\nNOTE: High confidence already achieved. Focus on validation and contradictory evidence."
1030 | elif confidence_level < 0.4:
1031 | confidence_instructions = "\n\nNOTE: Low confidence. Look for strong supporting evidence to build confidence."
1032 |
1033 | # Format template with context
1034 | formatted_prompt = template.format(**context) + confidence_instructions
1035 |
1036 | # Cache the result
1037 | self.prompt_cache[cache_key] = formatted_prompt
1038 |
1039 | return formatted_prompt
1040 |
1041 | def create_time_optimized_synthesis_prompt(
1042 | self,
1043 | sources: list[dict],
1044 | persona: str,
1045 | time_remaining: float,
1046 | current_confidence: float,
1047 | ) -> str:
1048 | """Create synthesis prompt optimized for available time."""
1049 |
1050 | # Extract key information from sources
1051 | insights = []
1052 | sentiments = []
1053 | for source in sources:
1054 | analysis = source.get("analysis", {})
1055 | insights.extend(analysis.get("insights", [])[:2]) # Limit per source
1056 | sentiment = analysis.get("sentiment", {})
1057 | if sentiment:
1058 | sentiments.append(sentiment.get("direction", "neutral"))
1059 |
1060 | # Prepare context
1061 | context = {
1062 | "persona": persona,
1063 | "source_count": len(sources),
1064 | "insights": "; ".join(insights[:8]), # Top 8 insights
1065 | "key_points": "; ".join(insights[:8]), # For backward compatibility
1066 | "time_horizon": "short-term" if time_remaining < 30 else "medium-term",
1067 | }
1068 |
1069 | return self.get_optimized_prompt(
1070 | "synthesis", time_remaining, current_confidence, **context
1071 | )
1072 |
1073 |
1074 | class ConfidenceTracker:
1075 | """Tracks research confidence and triggers early termination when appropriate."""
1076 |
1077 | def __init__(
1078 | self,
1079 | target_confidence: float = 0.75,
1080 | min_sources: int = 3,
1081 | max_sources: int = 15,
1082 | ):
1083 | self.target_confidence = target_confidence
1084 | self.min_sources = min_sources
1085 | self.max_sources = max_sources
1086 | self.confidence_history = []
1087 | self.evidence_history = []
1088 | self.source_count = 0
1089 | self.sources_analyzed = 0 # For backward compatibility
1090 | self.last_significant_improvement = 0
1091 | self.sentiment_votes = {"bullish": 0, "bearish": 0, "neutral": 0}
1092 |
1093 | def update_confidence(
1094 | self,
1095 | new_evidence: dict,
1096 | source_credibility: float | None = None,
1097 | credibility_score: float | None = None,
1098 | ) -> dict[str, Any]:
1099 | """Update confidence based on new evidence and return continuation decision."""
1100 |
1101 | # Handle both parameter names for backward compatibility
1102 | if source_credibility is None and credibility_score is not None:
1103 | source_credibility = credibility_score
1104 | elif source_credibility is None and credibility_score is None:
1105 | source_credibility = 0.5 # Default value
1106 |
1107 | self.source_count += 1
1108 | self.sources_analyzed += 1 # Keep both for compatibility
1109 |
1110 | # Store evidence
1111 | self.evidence_history.append(
1112 | {
1113 | "evidence": new_evidence,
1114 | "credibility": source_credibility,
1115 | "timestamp": datetime.now(),
1116 | }
1117 | )
1118 |
1119 | # Update sentiment voting
1120 | sentiment = new_evidence.get("sentiment", {})
1121 | direction = sentiment.get("direction", "neutral")
1122 | confidence = sentiment.get("confidence", 0.5)
1123 |
1124 | # Weight vote by source credibility and sentiment confidence
1125 | vote_weight = source_credibility * confidence
1126 | self.sentiment_votes[direction] += vote_weight
1127 |
1128 | # Calculate evidence strength
1129 | evidence_strength = self._calculate_evidence_strength(
1130 | new_evidence, source_credibility
1131 | )
1132 |
1133 | # Update confidence using Bayesian-style updating
1134 | current_confidence = self._update_bayesian_confidence(evidence_strength)
1135 | self.confidence_history.append(current_confidence)
1136 |
1137 | # Check for significant improvement
1138 | if len(self.confidence_history) >= 2:
1139 | improvement = current_confidence - self.confidence_history[-2]
1140 | if improvement > 0.1: # 10% improvement
1141 | self.last_significant_improvement = self.source_count
1142 |
1143 | # Make continuation decision
1144 | should_continue = self._should_continue_research(current_confidence)
1145 |
1146 | return {
1147 | "current_confidence": current_confidence,
1148 | "should_continue": should_continue,
1149 | "sources_processed": self.source_count,
1150 | "sources_analyzed": self.source_count, # For backward compatibility
1151 | "confidence_trend": self._calculate_confidence_trend(),
1152 | "early_termination_reason": None
1153 | if should_continue
1154 | else self._get_termination_reason(current_confidence),
1155 | "sentiment_consensus": self._calculate_sentiment_consensus(),
1156 | }
1157 |
1158 | def _calculate_evidence_strength(self, evidence: dict, credibility: float) -> float:
1159 | """Calculate the strength of new evidence."""
1160 |
1161 | # Base strength from sentiment confidence
1162 | sentiment = evidence.get("sentiment", {})
1163 | sentiment_confidence = sentiment.get("confidence", 0.5)
1164 |
1165 | # Adjust for source credibility
1166 | credibility_adjusted = sentiment_confidence * credibility
1167 |
1168 | # Factor in evidence richness
1169 | insights_count = len(evidence.get("insights", []))
1170 | risk_factors_count = len(evidence.get("risk_factors", []))
1171 | opportunities_count = len(evidence.get("opportunities", []))
1172 |
1173 | # Evidence richness score (0-1)
1174 | evidence_richness = min(
1175 | (insights_count + risk_factors_count + opportunities_count) / 12, 1.0
1176 | )
1177 |
1178 | # Relevance factor
1179 | relevance_score = evidence.get("relevance_score", 0.5)
1180 |
1181 | # Final evidence strength calculation
1182 | final_strength = credibility_adjusted * (
1183 | 0.5 + 0.3 * evidence_richness + 0.2 * relevance_score
1184 | )
1185 |
1186 | return min(final_strength, 1.0)
1187 |
1188 | def _update_bayesian_confidence(self, evidence_strength: float) -> float:
1189 | """Update confidence using Bayesian approach."""
1190 |
1191 | if not self.confidence_history:
1192 | # First evidence - base confidence
1193 | return evidence_strength
1194 |
1195 | # Current prior
1196 | prior = self.confidence_history[-1]
1197 |
1198 | # Bayesian update with evidence strength as likelihood
1199 | # Simple approximation: weighted average with decay
1200 | decay_factor = 0.9 ** (self.source_count - 1) # Diminishing returns
1201 |
1202 | updated = prior * decay_factor + evidence_strength * (1 - decay_factor)
1203 |
1204 | # Ensure within bounds
1205 | return max(0.1, min(updated, 0.95))
1206 |
1207 | def _should_continue_research(self, current_confidence: float) -> bool:
1208 | """Determine if research should continue based on multiple factors."""
1209 |
1210 | # Always process minimum sources
1211 | if self.source_count < self.min_sources:
1212 | return True
1213 |
1214 | # Stop at maximum sources
1215 | if self.source_count >= self.max_sources:
1216 | return False
1217 |
1218 | # High confidence reached
1219 | if current_confidence >= self.target_confidence:
1220 | return False
1221 |
1222 | # Check for diminishing returns
1223 | if self.source_count - self.last_significant_improvement > 4:
1224 | # No significant improvement in last 4 sources
1225 | return False
1226 |
1227 | # Check sentiment consensus
1228 | consensus_score = self._calculate_sentiment_consensus()
1229 | if consensus_score > 0.8 and self.source_count >= 5:
1230 | # Strong consensus with adequate sample
1231 | return False
1232 |
1233 | # Check confidence plateau
1234 | if len(self.confidence_history) >= 3:
1235 | recent_change = abs(current_confidence - self.confidence_history[-3])
1236 | if recent_change < 0.03: # Less than 3% change in last 3 sources
1237 | return False
1238 |
1239 | return True
1240 |
1241 | def _calculate_confidence_trend(self) -> str:
1242 | """Calculate the trend in confidence over recent sources."""
1243 |
1244 | if len(self.confidence_history) < 3:
1245 | return "insufficient_data"
1246 |
1247 | recent = self.confidence_history[-3:]
1248 |
1249 | # Calculate trend
1250 | if recent[-1] > recent[0] + 0.05:
1251 | return "increasing"
1252 | elif recent[-1] < recent[0] - 0.05:
1253 | return "decreasing"
1254 | else:
1255 | return "stable"
1256 |
1257 | def _calculate_sentiment_consensus(self) -> float:
1258 | """Calculate how much sources agree on sentiment."""
1259 |
1260 | total_votes = sum(self.sentiment_votes.values())
1261 | if total_votes == 0:
1262 | return 0.0
1263 |
1264 | # Calculate consensus as max vote share
1265 | max_votes = max(self.sentiment_votes.values())
1266 | consensus = max_votes / total_votes
1267 |
1268 | return consensus
1269 |
1270 | def _get_termination_reason(self, current_confidence: float) -> str:
1271 | """Get reason for early termination."""
1272 |
1273 | if current_confidence >= self.target_confidence:
1274 | return "target_confidence_reached"
1275 | elif self.source_count >= self.max_sources:
1276 | return "max_sources_reached"
1277 | elif self._calculate_sentiment_consensus() > 0.8:
1278 | return "strong_consensus"
1279 | elif self.source_count - self.last_significant_improvement > 4:
1280 | return "diminishing_returns"
1281 | else:
1282 | return "confidence_plateau"
1283 |
1284 |
1285 | class IntelligentContentFilter:
1286 | """Pre-filters and prioritizes content to reduce LLM processing overhead."""
1287 |
1288 | def __init__(self):
1289 | self.relevance_keywords = {
1290 | "fundamental": {
1291 | "high": [
1292 | "earnings",
1293 | "revenue",
1294 | "profit",
1295 | "ebitda",
1296 | "cash flow",
1297 | "debt",
1298 | "valuation",
1299 | ],
1300 | "medium": [
1301 | "balance sheet",
1302 | "income statement",
1303 | "financial",
1304 | "quarterly",
1305 | "annual",
1306 | ],
1307 | "context": ["company", "business", "financial results", "guidance"],
1308 | },
1309 | "technical": {
1310 | "high": [
1311 | "price",
1312 | "chart",
1313 | "trend",
1314 | "support",
1315 | "resistance",
1316 | "breakout",
1317 | ],
1318 | "medium": ["volume", "rsi", "macd", "moving average", "pattern"],
1319 | "context": ["technical analysis", "trading", "momentum"],
1320 | },
1321 | "sentiment": {
1322 | "high": ["rating", "upgrade", "downgrade", "buy", "sell", "hold"],
1323 | "medium": ["analyst", "recommendation", "target price", "outlook"],
1324 | "context": ["opinion", "sentiment", "market mood"],
1325 | },
1326 | "competitive": {
1327 | "high": [
1328 | "market share",
1329 | "competitor",
1330 | "competitive advantage",
1331 | "industry",
1332 | ],
1333 | "medium": ["peer", "comparison", "market position", "sector"],
1334 | "context": ["competitive landscape", "industry analysis"],
1335 | },
1336 | }
1337 |
1338 | self.domain_credibility_scores = {
1339 | "reuters.com": 0.95,
1340 | "bloomberg.com": 0.95,
1341 | "wsj.com": 0.90,
1342 | "ft.com": 0.90,
1343 | "marketwatch.com": 0.85,
1344 | "cnbc.com": 0.80,
1345 | "yahoo.com": 0.75,
1346 | "seekingalpha.com": 0.80,
1347 | "fool.com": 0.70,
1348 | "investing.com": 0.75,
1349 | }
1350 |
1351 | async def filter_and_prioritize_sources(
1352 | self,
1353 | sources: list[dict],
1354 | research_focus: str,
1355 | time_budget: float,
1356 | target_source_count: int | None = None,
1357 | current_confidence: float = 0.0,
1358 | ) -> list[dict]:
1359 | """Filter and prioritize sources based on relevance, quality, and time constraints."""
1360 |
1361 | if not sources:
1362 | return []
1363 |
1364 | # Determine target count based on time budget and confidence
1365 | if target_source_count is None:
1366 | target_source_count = self._calculate_optimal_source_count(
1367 | time_budget, current_confidence, len(sources)
1368 | )
1369 |
1370 | # Quick relevance scoring without LLM
1371 | scored_sources = []
1372 | for source in sources:
1373 | relevance_score = self._calculate_relevance_score(source, research_focus)
1374 | credibility_score = self._get_source_credibility(source)
1375 | recency_score = self._calculate_recency_score(source.get("published_date"))
1376 |
1377 | # Combined score with weights
1378 | combined_score = (
1379 | relevance_score * 0.5 + credibility_score * 0.3 + recency_score * 0.2
1380 | )
1381 |
1382 | if combined_score > 0.3: # Relevance threshold
1383 | scored_sources.append((combined_score, source))
1384 |
1385 | # Sort by combined score
1386 | scored_sources.sort(key=lambda x: x[0], reverse=True)
1387 |
1388 | # Select diverse sources
1389 | selected_sources = self._select_diverse_sources(
1390 | scored_sources, target_source_count, research_focus
1391 | )
1392 |
1393 | # Pre-process content for faster LLM processing
1394 | processed_sources = []
1395 | for score, source in selected_sources:
1396 | processed_source = self._preprocess_content(
1397 | source, research_focus, time_budget
1398 | )
1399 | processed_source["relevance_score"] = score
1400 | processed_sources.append(processed_source)
1401 |
1402 | return processed_sources
1403 |
1404 | def _calculate_optimal_source_count(
1405 | self, time_budget: float, current_confidence: float, available_sources: int
1406 | ) -> int:
1407 | """Calculate optimal number of sources to process given constraints."""
1408 |
1409 | # Base count from time budget
1410 | if time_budget < 20:
1411 | base_count = 3
1412 | elif time_budget < 40:
1413 | base_count = 6
1414 | elif time_budget < 80:
1415 | base_count = 10
1416 | else:
1417 | base_count = 15
1418 |
1419 | # Adjust for confidence level
1420 | if current_confidence > 0.7:
1421 | # High confidence - fewer sources needed
1422 | confidence_multiplier = 0.7
1423 | elif current_confidence < 0.4:
1424 | # Low confidence - more sources helpful
1425 | confidence_multiplier = 1.2
1426 | else:
1427 | confidence_multiplier = 1.0
1428 |
1429 | # Final calculation
1430 | target_count = int(base_count * confidence_multiplier)
1431 |
1432 | # Ensure we don't exceed available sources
1433 | return min(target_count, available_sources, 20) # Cap at 20
1434 |
1435 | def _calculate_relevance_score(self, source: dict, research_focus: str) -> float:
1436 | """Calculate relevance score using keyword matching and heuristics."""
1437 |
1438 | content = source.get("content", "").lower()
1439 | title = source.get("title", "").lower()
1440 |
1441 | if not content and not title:
1442 | return 0.0
1443 |
1444 | focus_keywords = self.relevance_keywords.get(research_focus, {})
1445 |
1446 | # High-value keywords
1447 | high_keywords = focus_keywords.get("high", [])
1448 | high_score = sum(1 for keyword in high_keywords if keyword in content) / max(
1449 | len(high_keywords), 1
1450 | )
1451 |
1452 | # Medium-value keywords
1453 | medium_keywords = focus_keywords.get("medium", [])
1454 | medium_score = sum(
1455 | 1 for keyword in medium_keywords if keyword in content
1456 | ) / max(len(medium_keywords), 1)
1457 |
1458 | # Context keywords
1459 | context_keywords = focus_keywords.get("context", [])
1460 | context_score = sum(
1461 | 1 for keyword in context_keywords if keyword in content
1462 | ) / max(len(context_keywords), 1)
1463 |
1464 | # Title relevance (titles are more focused)
1465 | title_high_score = sum(
1466 | 1 for keyword in high_keywords if keyword in title
1467 | ) / max(len(high_keywords), 1)
1468 |
1469 | # Combine scores with weights
1470 | relevance_score = (
1471 | high_score * 0.4
1472 | + medium_score * 0.25
1473 | + context_score * 0.15
1474 | + title_high_score * 0.2
1475 | )
1476 |
1477 | # Boost for very relevant titles
1478 | if any(keyword in title for keyword in high_keywords):
1479 | relevance_score *= 1.2
1480 |
1481 | return min(relevance_score, 1.0)
1482 |
1483 | def _get_source_credibility(self, source: dict) -> float:
1484 | """Calculate source credibility based on domain and other factors."""
1485 |
1486 | url = source.get("url", "").lower()
1487 |
1488 | # Domain-based credibility
1489 | domain_score = 0.5 # Default
1490 | for domain, score in self.domain_credibility_scores.items():
1491 | if domain in url:
1492 | domain_score = score
1493 | break
1494 |
1495 | # Boost for specific high-quality indicators
1496 | if any(indicator in url for indicator in [".gov", ".edu", "sec.gov"]):
1497 | domain_score = min(domain_score + 0.2, 1.0)
1498 |
1499 | # Penalty for low-quality indicators
1500 | if any(indicator in url for indicator in ["blog", "forum", "reddit"]):
1501 | domain_score *= 0.8
1502 |
1503 | return domain_score
1504 |
1505 | def _calculate_recency_score(self, published_date: str) -> float:
1506 | """Calculate recency score based on publication date."""
1507 |
1508 | if not published_date:
1509 | return 0.5 # Default for unknown dates
1510 |
1511 | try:
1512 | # Parse date (handle various formats)
1513 | if "T" in published_date:
1514 | pub_date = datetime.fromisoformat(published_date.replace("Z", "+00:00"))
1515 | else:
1516 | pub_date = datetime.strptime(published_date, "%Y-%m-%d")
1517 |
1518 | # Calculate days old
1519 | days_old = (datetime.now() - pub_date.replace(tzinfo=None)).days
1520 |
1521 | # Scoring based on age
1522 | if days_old <= 1:
1523 | return 1.0 # Very recent
1524 | elif days_old <= 7:
1525 | return 0.9 # Recent
1526 | elif days_old <= 30:
1527 | return 0.7 # Fairly recent
1528 | elif days_old <= 90:
1529 | return 0.5 # Moderately old
1530 | else:
1531 | return 0.3 # Old
1532 |
1533 | except (ValueError, TypeError):
1534 | return 0.5 # Default for unparseable dates
1535 |
1536 | def _select_diverse_sources(
1537 | self,
1538 | scored_sources: list[tuple[float, dict]],
1539 | target_count: int,
1540 | research_focus: str,
1541 | ) -> list[tuple[float, dict]]:
1542 | """Select diverse sources to avoid redundancy."""
1543 |
1544 | if len(scored_sources) <= target_count:
1545 | return scored_sources
1546 |
1547 | selected = []
1548 | used_domains = set()
1549 |
1550 | # First pass: select high-scoring diverse sources
1551 | for score, source in scored_sources:
1552 | if len(selected) >= target_count:
1553 | break
1554 |
1555 | url = source.get("url", "")
1556 | domain = self._extract_domain(url)
1557 |
1558 | # Ensure diversity by domain (max 2 from same domain initially)
1559 | domain_count = sum(
1560 | 1
1561 | for _, s in selected
1562 | if self._extract_domain(s.get("url", "")) == domain
1563 | )
1564 |
1565 | if domain_count < 2 or len(selected) < target_count // 2:
1566 | selected.append((score, source))
1567 | used_domains.add(domain)
1568 |
1569 | # Second pass: fill remaining slots with best remaining sources
1570 | remaining_needed = target_count - len(selected)
1571 | if remaining_needed > 0:
1572 | remaining_sources = scored_sources[len(selected) :]
1573 | selected.extend(remaining_sources[:remaining_needed])
1574 |
1575 | return selected[:target_count]
1576 |
1577 | def _extract_domain(self, url: str) -> str:
1578 | """Extract domain from URL."""
1579 | try:
1580 | if "//" in url:
1581 | domain = url.split("//")[1].split("/")[0]
1582 | return domain.replace("www.", "")
1583 | return url
1584 | except Exception:
1585 | return url
1586 |
1587 | def _preprocess_content(
1588 | self, source: dict, research_focus: str, time_budget: float
1589 | ) -> dict:
1590 | """Pre-process content to optimize for LLM analysis."""
1591 |
1592 | content = source.get("content", "")
1593 | if not content:
1594 | return source
1595 |
1596 | # Determine content length limit based on time budget
1597 | if time_budget < 30:
1598 | max_length = 800 # Emergency mode
1599 | elif time_budget < 60:
1600 | max_length = 1200 # Fast mode
1601 | else:
1602 | max_length = 2000 # Standard mode
1603 |
1604 | # If content is already short enough, return as-is
1605 | if len(content) <= max_length:
1606 | source_copy = source.copy()
1607 | source_copy["original_length"] = len(content)
1608 | source_copy["filtered"] = False
1609 | return source_copy
1610 |
1611 | # Extract most relevant sentences/paragraphs
1612 | sentences = re.split(r"[.!?]+", content)
1613 | focus_keywords = self.relevance_keywords.get(research_focus, {})
1614 | all_keywords = (
1615 | focus_keywords.get("high", [])
1616 | + focus_keywords.get("medium", [])
1617 | + focus_keywords.get("context", [])
1618 | )
1619 |
1620 | # Score sentences by keyword relevance
1621 | scored_sentences = []
1622 | for sentence in sentences:
1623 | if len(sentence.strip()) < 20: # Skip very short sentences
1624 | continue
1625 |
1626 | sentence_lower = sentence.lower()
1627 | keyword_count = sum(
1628 | 1 for keyword in all_keywords if keyword in sentence_lower
1629 | )
1630 |
1631 | # Boost for financial numbers and percentages
1632 | has_numbers = bool(re.search(r"\$?[\d,]+\.?\d*[%kmbKMB]?", sentence))
1633 | number_boost = 0.5 if has_numbers else 0
1634 |
1635 | sentence_score = keyword_count + number_boost
1636 | if sentence_score > 0:
1637 | scored_sentences.append((sentence_score, sentence.strip()))
1638 |
1639 | # Sort by relevance and select top sentences
1640 | scored_sentences.sort(key=lambda x: x[0], reverse=True)
1641 |
1642 | # Build filtered content
1643 | filtered_content = ""
1644 | for _score, sentence in scored_sentences:
1645 | if len(filtered_content) + len(sentence) > max_length:
1646 | break
1647 | filtered_content += sentence + ". "
1648 |
1649 | # If no relevant sentences found, take first part of original content
1650 | if not filtered_content:
1651 | filtered_content = content[:max_length]
1652 |
1653 | # Create processed source
1654 | source_copy = source.copy()
1655 | source_copy["content"] = filtered_content.strip()
1656 | source_copy["original_length"] = len(content)
1657 | source_copy["filtered_length"] = len(filtered_content)
1658 | source_copy["filtered"] = True
1659 | source_copy["compression_ratio"] = len(filtered_content) / len(content)
1660 |
1661 | return source_copy
1662 |
1663 |
1664 | # Export main classes for integration
1665 | __all__ = [
1666 | "AdaptiveModelSelector",
1667 | "ProgressiveTokenBudgeter",
1668 | "ParallelLLMProcessor",
1669 | "OptimizedPromptEngine",
1670 | "ConfidenceTracker",
1671 | "IntelligentContentFilter",
1672 | "ModelConfiguration",
1673 | "TokenAllocation",
1674 | "ResearchPhase",
1675 | ]
1676 |
```