#
tokens: 42029/50000 1/435 files (page 39/39)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 39 of 39. Use http://codebase.md/wshobson/maverick-mcp?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── FUNDING.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── feature_request.md
│   │   ├── question.md
│   │   └── security_report.md
│   ├── pull_request_template.md
│   └── workflows
│       ├── claude-code-review.yml
│       └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│   ├── launch.json
│   └── settings.json
├── alembic
│   ├── env.py
│   ├── script.py.mako
│   └── versions
│       ├── 001_initial_schema.py
│       ├── 003_add_performance_indexes.py
│       ├── 006_rename_metadata_columns.py
│       ├── 008_performance_optimization_indexes.py
│       ├── 009_rename_to_supply_demand.py
│       ├── 010_self_contained_schema.py
│       ├── 011_remove_proprietary_terms.py
│       ├── 013_add_backtest_persistence_models.py
│       ├── 014_add_portfolio_models.py
│       ├── 08e3945a0c93_merge_heads.py
│       ├── 9374a5c9b679_merge_heads_for_testing.py
│       ├── abf9b9afb134_merge_multiple_heads.py
│       ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│       ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│       ├── f0696e2cac15_add_essential_performance_indexes.py
│       └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── api
│   │   └── backtesting.md
│   ├── BACKTESTING.md
│   ├── COST_BASIS_SPECIFICATION.md
│   ├── deep_research_agent.md
│   ├── exa_research_testing_strategy.md
│   ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│   ├── PORTFOLIO.md
│   ├── SETUP_SELF_CONTAINED.md
│   └── speed_testing_framework.md
├── examples
│   ├── complete_speed_validation.py
│   ├── deep_research_integration.py
│   ├── llm_optimization_example.py
│   ├── llm_speed_demo.py
│   ├── monitoring_example.py
│   ├── parallel_research_example.py
│   ├── speed_optimization_demo.py
│   └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│   ├── __init__.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── circuit_breaker.py
│   │   ├── deep_research.py
│   │   ├── market_analysis.py
│   │   ├── optimized_research.py
│   │   ├── supervisor.py
│   │   └── technical_analysis.py
│   ├── api
│   │   ├── __init__.py
│   │   ├── api_server.py
│   │   ├── connection_manager.py
│   │   ├── dependencies
│   │   │   ├── __init__.py
│   │   │   ├── stock_analysis.py
│   │   │   └── technical_analysis.py
│   │   ├── error_handling.py
│   │   ├── inspector_compatible_sse.py
│   │   ├── inspector_sse.py
│   │   ├── middleware
│   │   │   ├── error_handling.py
│   │   │   ├── mcp_logging.py
│   │   │   ├── rate_limiting_enhanced.py
│   │   │   └── security.py
│   │   ├── openapi_config.py
│   │   ├── routers
│   │   │   ├── __init__.py
│   │   │   ├── agents.py
│   │   │   ├── backtesting.py
│   │   │   ├── data_enhanced.py
│   │   │   ├── data.py
│   │   │   ├── health_enhanced.py
│   │   │   ├── health_tools.py
│   │   │   ├── health.py
│   │   │   ├── intelligent_backtesting.py
│   │   │   ├── introspection.py
│   │   │   ├── mcp_prompts.py
│   │   │   ├── monitoring.py
│   │   │   ├── news_sentiment_enhanced.py
│   │   │   ├── performance.py
│   │   │   ├── portfolio.py
│   │   │   ├── research.py
│   │   │   ├── screening_ddd.py
│   │   │   ├── screening_parallel.py
│   │   │   ├── screening.py
│   │   │   ├── technical_ddd.py
│   │   │   ├── technical_enhanced.py
│   │   │   ├── technical.py
│   │   │   └── tool_registry.py
│   │   ├── server.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   ├── base_service.py
│   │   │   ├── market_service.py
│   │   │   ├── portfolio_service.py
│   │   │   ├── prompt_service.py
│   │   │   └── resource_service.py
│   │   ├── simple_sse.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── insomnia_export.py
│   │       └── postman_export.py
│   ├── application
│   │   ├── __init__.py
│   │   ├── commands
│   │   │   └── __init__.py
│   │   ├── dto
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_dto.py
│   │   ├── queries
│   │   │   ├── __init__.py
│   │   │   └── get_technical_analysis.py
│   │   └── screening
│   │       ├── __init__.py
│   │       ├── dtos.py
│   │       └── queries.py
│   ├── backtesting
│   │   ├── __init__.py
│   │   ├── ab_testing.py
│   │   ├── analysis.py
│   │   ├── batch_processing_stub.py
│   │   ├── batch_processing.py
│   │   ├── model_manager.py
│   │   ├── optimization.py
│   │   ├── persistence.py
│   │   ├── retraining_pipeline.py
│   │   ├── strategies
│   │   │   ├── __init__.py
│   │   │   ├── base.py
│   │   │   ├── ml
│   │   │   │   ├── __init__.py
│   │   │   │   ├── adaptive.py
│   │   │   │   ├── ensemble.py
│   │   │   │   ├── feature_engineering.py
│   │   │   │   └── regime_aware.py
│   │   │   ├── ml_strategies.py
│   │   │   ├── parser.py
│   │   │   └── templates.py
│   │   ├── strategy_executor.py
│   │   ├── vectorbt_engine.py
│   │   └── visualization.py
│   ├── config
│   │   ├── __init__.py
│   │   ├── constants.py
│   │   ├── database_self_contained.py
│   │   ├── database.py
│   │   ├── llm_optimization_config.py
│   │   ├── logging_settings.py
│   │   ├── plotly_config.py
│   │   ├── security_utils.py
│   │   ├── security.py
│   │   ├── settings.py
│   │   ├── technical_constants.py
│   │   ├── tool_estimation.py
│   │   └── validation.py
│   ├── core
│   │   ├── __init__.py
│   │   ├── technical_analysis.py
│   │   └── visualization.py
│   ├── data
│   │   ├── __init__.py
│   │   ├── cache_manager.py
│   │   ├── cache.py
│   │   ├── django_adapter.py
│   │   ├── health.py
│   │   ├── models.py
│   │   ├── performance.py
│   │   ├── session_management.py
│   │   └── validation.py
│   ├── database
│   │   ├── __init__.py
│   │   ├── base.py
│   │   └── optimization.py
│   ├── dependencies.py
│   ├── domain
│   │   ├── __init__.py
│   │   ├── entities
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis.py
│   │   ├── events
│   │   │   └── __init__.py
│   │   ├── portfolio.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   ├── entities.py
│   │   │   ├── services.py
│   │   │   └── value_objects.py
│   │   ├── services
│   │   │   ├── __init__.py
│   │   │   └── technical_analysis_service.py
│   │   ├── stock_analysis
│   │   │   ├── __init__.py
│   │   │   └── stock_analysis_service.py
│   │   └── value_objects
│   │       ├── __init__.py
│   │       └── technical_indicators.py
│   ├── exceptions.py
│   ├── infrastructure
│   │   ├── __init__.py
│   │   ├── cache
│   │   │   └── __init__.py
│   │   ├── caching
│   │   │   ├── __init__.py
│   │   │   └── cache_management_service.py
│   │   ├── connection_manager.py
│   │   ├── data_fetching
│   │   │   ├── __init__.py
│   │   │   └── stock_data_service.py
│   │   ├── health
│   │   │   ├── __init__.py
│   │   │   └── health_checker.py
│   │   ├── persistence
│   │   │   ├── __init__.py
│   │   │   └── stock_repository.py
│   │   ├── providers
│   │   │   └── __init__.py
│   │   ├── screening
│   │   │   ├── __init__.py
│   │   │   └── repositories.py
│   │   └── sse_optimizer.py
│   ├── langchain_tools
│   │   ├── __init__.py
│   │   ├── adapters.py
│   │   └── registry.py
│   ├── logging_config.py
│   ├── memory
│   │   ├── __init__.py
│   │   └── stores.py
│   ├── monitoring
│   │   ├── __init__.py
│   │   ├── health_check.py
│   │   ├── health_monitor.py
│   │   ├── integration_example.py
│   │   ├── metrics.py
│   │   ├── middleware.py
│   │   └── status_dashboard.py
│   ├── providers
│   │   ├── __init__.py
│   │   ├── dependencies.py
│   │   ├── factories
│   │   │   ├── __init__.py
│   │   │   ├── config_factory.py
│   │   │   └── provider_factory.py
│   │   ├── implementations
│   │   │   ├── __init__.py
│   │   │   ├── cache_adapter.py
│   │   │   ├── macro_data_adapter.py
│   │   │   ├── market_data_adapter.py
│   │   │   ├── persistence_adapter.py
│   │   │   └── stock_data_adapter.py
│   │   ├── interfaces
│   │   │   ├── __init__.py
│   │   │   ├── cache.py
│   │   │   ├── config.py
│   │   │   ├── macro_data.py
│   │   │   ├── market_data.py
│   │   │   ├── persistence.py
│   │   │   └── stock_data.py
│   │   ├── llm_factory.py
│   │   ├── macro_data.py
│   │   ├── market_data.py
│   │   ├── mocks
│   │   │   ├── __init__.py
│   │   │   ├── mock_cache.py
│   │   │   ├── mock_config.py
│   │   │   ├── mock_macro_data.py
│   │   │   ├── mock_market_data.py
│   │   │   ├── mock_persistence.py
│   │   │   └── mock_stock_data.py
│   │   ├── openrouter_provider.py
│   │   ├── optimized_screening.py
│   │   ├── optimized_stock_data.py
│   │   └── stock_data.py
│   ├── README.md
│   ├── tests
│   │   ├── __init__.py
│   │   ├── README_INMEMORY_TESTS.md
│   │   ├── test_cache_debug.py
│   │   ├── test_fixes_validation.py
│   │   ├── test_in_memory_routers.py
│   │   ├── test_in_memory_server.py
│   │   ├── test_macro_data_provider.py
│   │   ├── test_mailgun_email.py
│   │   ├── test_market_calendar_caching.py
│   │   ├── test_mcp_tool_fixes_pytest.py
│   │   ├── test_mcp_tool_fixes.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_models_functional.py
│   │   ├── test_server.py
│   │   ├── test_stock_data_enhanced.py
│   │   ├── test_stock_data_provider.py
│   │   └── test_technical_analysis.py
│   ├── tools
│   │   ├── __init__.py
│   │   ├── performance_monitoring.py
│   │   ├── portfolio_manager.py
│   │   ├── risk_management.py
│   │   └── sentiment_analysis.py
│   ├── utils
│   │   ├── __init__.py
│   │   ├── agent_errors.py
│   │   ├── batch_processing.py
│   │   ├── cache_warmer.py
│   │   ├── circuit_breaker_decorators.py
│   │   ├── circuit_breaker_services.py
│   │   ├── circuit_breaker.py
│   │   ├── data_chunking.py
│   │   ├── database_monitoring.py
│   │   ├── debug_utils.py
│   │   ├── fallback_strategies.py
│   │   ├── llm_optimization.py
│   │   ├── logging_example.py
│   │   ├── logging_init.py
│   │   ├── logging.py
│   │   ├── mcp_logging.py
│   │   ├── memory_profiler.py
│   │   ├── monitoring_middleware.py
│   │   ├── monitoring.py
│   │   ├── orchestration_logging.py
│   │   ├── parallel_research.py
│   │   ├── parallel_screening.py
│   │   ├── quick_cache.py
│   │   ├── resource_manager.py
│   │   ├── shutdown.py
│   │   ├── stock_helpers.py
│   │   ├── structured_logger.py
│   │   ├── tool_monitoring.py
│   │   ├── tracing.py
│   │   └── yfinance_pool.py
│   ├── validation
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── data.py
│   │   ├── middleware.py
│   │   ├── portfolio.py
│   │   ├── responses.py
│   │   ├── screening.py
│   │   └── technical.py
│   └── workflows
│       ├── __init__.py
│       ├── agents
│       │   ├── __init__.py
│       │   ├── market_analyzer.py
│       │   ├── optimizer_agent.py
│       │   ├── strategy_selector.py
│       │   └── validator_agent.py
│       ├── backtesting_workflow.py
│       └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│   ├── dev.sh
│   ├── INSTALLATION_GUIDE.md
│   ├── load_example.py
│   ├── load_market_data.py
│   ├── load_tiingo_data.py
│   ├── migrate_db.py
│   ├── README_TIINGO_LOADER.md
│   ├── requirements_tiingo.txt
│   ├── run_stock_screening.py
│   ├── run-migrations.sh
│   ├── seed_db.py
│   ├── seed_sp500.py
│   ├── setup_database.sh
│   ├── setup_self_contained.py
│   ├── setup_sp500_database.sh
│   ├── test_seeded_data.py
│   ├── test_tiingo_loader.py
│   ├── tiingo_config.py
│   └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── core
│   │   └── test_technical_analysis.py
│   ├── data
│   │   └── test_portfolio_models.py
│   ├── domain
│   │   ├── conftest.py
│   │   ├── test_portfolio_entities.py
│   │   └── test_technical_analysis_service.py
│   ├── fixtures
│   │   └── orchestration_fixtures.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── README.md
│   │   ├── run_integration_tests.sh
│   │   ├── test_api_technical.py
│   │   ├── test_chaos_engineering.py
│   │   ├── test_config_management.py
│   │   ├── test_full_backtest_workflow_advanced.py
│   │   ├── test_full_backtest_workflow.py
│   │   ├── test_high_volume.py
│   │   ├── test_mcp_tools.py
│   │   ├── test_orchestration_complete.py
│   │   ├── test_portfolio_persistence.py
│   │   ├── test_redis_cache.py
│   │   ├── test_security_integration.py.disabled
│   │   └── vcr_setup.py
│   ├── performance
│   │   ├── __init__.py
│   │   ├── test_benchmarks.py
│   │   ├── test_load.py
│   │   ├── test_profiling.py
│   │   └── test_stress.py
│   ├── providers
│   │   └── test_stock_data_simple.py
│   ├── README.md
│   ├── test_agents_router_mcp.py
│   ├── test_backtest_persistence.py
│   ├── test_cache_management_service.py
│   ├── test_cache_serialization.py
│   ├── test_circuit_breaker.py
│   ├── test_database_pool_config_simple.py
│   ├── test_database_pool_config.py
│   ├── test_deep_research_functional.py
│   ├── test_deep_research_integration.py
│   ├── test_deep_research_parallel_execution.py
│   ├── test_error_handling.py
│   ├── test_event_loop_integrity.py
│   ├── test_exa_research_integration.py
│   ├── test_exception_hierarchy.py
│   ├── test_financial_search.py
│   ├── test_graceful_shutdown.py
│   ├── test_integration_simple.py
│   ├── test_langgraph_workflow.py
│   ├── test_market_data_async.py
│   ├── test_market_data_simple.py
│   ├── test_mcp_orchestration_functional.py
│   ├── test_ml_strategies.py
│   ├── test_optimized_research_agent.py
│   ├── test_orchestration_integration.py
│   ├── test_orchestration_logging.py
│   ├── test_orchestration_tools_simple.py
│   ├── test_parallel_research_integration.py
│   ├── test_parallel_research_orchestrator.py
│   ├── test_parallel_research_performance.py
│   ├── test_performance_optimizations.py
│   ├── test_production_validation.py
│   ├── test_provider_architecture.py
│   ├── test_rate_limiting_enhanced.py
│   ├── test_runner_validation.py
│   ├── test_security_comprehensive.py.disabled
│   ├── test_security_cors.py
│   ├── test_security_enhancements.py.disabled
│   ├── test_security_headers.py
│   ├── test_security_penetration.py
│   ├── test_session_management.py
│   ├── test_speed_optimization_validation.py
│   ├── test_stock_analysis_dependencies.py
│   ├── test_stock_analysis_service.py
│   ├── test_stock_data_fetching_service.py
│   ├── test_supervisor_agent.py
│   ├── test_supervisor_functional.py
│   ├── test_tool_estimation_config.py
│   ├── test_visualization.py
│   └── utils
│       ├── test_agent_errors.py
│       ├── test_logging.py
│       ├── test_parallel_screening.py
│       └── test_quick_cache.py
├── tools
│   ├── check_orchestration_config.py
│   ├── experiments
│   │   ├── validation_examples.py
│   │   └── validation_fixed.py
│   ├── fast_dev.sh
│   ├── hot_reload.py
│   ├── quick_test.py
│   └── templates
│       ├── new_router_template.py
│       ├── new_tool_template.py
│       ├── screening_strategy_template.py
│       └── test_template.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/maverick_mcp/agents/deep_research.py:
--------------------------------------------------------------------------------

```python
   1 | """
   2 | DeepResearchAgent implementation using 2025 LangGraph patterns.
   3 | 
   4 | Provides comprehensive financial research capabilities with web search,
   5 | content analysis, sentiment detection, and source validation.
   6 | """
   7 | 
   8 | from __future__ import annotations
   9 | 
  10 | import asyncio
  11 | import json
  12 | import logging
  13 | from collections.abc import Iterable
  14 | from datetime import UTC, datetime
  15 | from typing import Any
  16 | from uuid import uuid4
  17 | 
  18 | from langchain_core.language_models import BaseChatModel
  19 | from langchain_core.messages import HumanMessage, SystemMessage
  20 | from langchain_core.tools import BaseTool, tool
  21 | from langgraph.checkpoint.memory import MemorySaver
  22 | from langgraph.graph import END, START, StateGraph  # type: ignore[import-untyped]
  23 | from langgraph.types import Command  # type: ignore[import-untyped]
  24 | 
  25 | from maverick_mcp.agents.base import PersonaAwareAgent
  26 | from maverick_mcp.agents.circuit_breaker import circuit_manager
  27 | from maverick_mcp.config.settings import get_settings
  28 | from maverick_mcp.exceptions import (
  29 |     WebSearchError,
  30 | )
  31 | from maverick_mcp.memory.stores import ConversationStore
  32 | from maverick_mcp.utils.orchestration_logging import (
  33 |     get_orchestration_logger,
  34 |     log_agent_execution,
  35 |     log_method_call,
  36 |     log_performance_metrics,
  37 |     log_synthesis_operation,
  38 | )
  39 | 
  40 | try:  # pragma: no cover - optional dependency
  41 |     from tavily import TavilyClient  # type: ignore[import-not-found]
  42 | except ImportError:  # pragma: no cover
  43 |     TavilyClient = None  # type: ignore[assignment]
  44 | 
  45 | # Import moved to avoid circular dependency - will import where needed
  46 | from maverick_mcp.workflows.state import DeepResearchState
  47 | 
  48 | logger = logging.getLogger(__name__)
  49 | settings = get_settings()
  50 | 
  51 | # Global search provider cache and connection manager
  52 | _search_provider_cache: dict[str, Any] = {}
  53 | 
  54 | 
  55 | async def get_cached_search_provider(exa_api_key: str | None = None) -> Any | None:
  56 |     """Get cached Exa search provider to avoid repeated initialization delays."""
  57 |     cache_key = f"exa:{exa_api_key is not None}"
  58 | 
  59 |     if cache_key in _search_provider_cache:
  60 |         return _search_provider_cache[cache_key]
  61 | 
  62 |     logger.info("Initializing Exa search provider")
  63 |     provider = None
  64 | 
  65 |     # Initialize Exa provider with caching
  66 |     if exa_api_key:
  67 |         try:
  68 |             provider = ExaSearchProvider(exa_api_key)
  69 |             logger.info("Initialized Exa search provider")
  70 |             # Cache the provider
  71 |             _search_provider_cache[cache_key] = provider
  72 |         except ImportError as e:
  73 |             logger.warning(f"Failed to initialize Exa provider: {e}")
  74 | 
  75 |     return provider
  76 | 
  77 | 
  78 | # Research depth levels optimized for quick searches
  79 | RESEARCH_DEPTH_LEVELS = {
  80 |     "basic": {
  81 |         "max_sources": 3,
  82 |         "max_searches": 1,  # Reduced for speed
  83 |         "analysis_depth": "summary",
  84 |         "validation_required": False,
  85 |     },
  86 |     "standard": {
  87 |         "max_sources": 5,  # Reduced from 8
  88 |         "max_searches": 2,  # Reduced from 4
  89 |         "analysis_depth": "detailed",
  90 |         "validation_required": False,  # Disabled for speed
  91 |     },
  92 |     "comprehensive": {
  93 |         "max_sources": 10,  # Reduced from 15
  94 |         "max_searches": 3,  # Reduced from 6
  95 |         "analysis_depth": "comprehensive",
  96 |         "validation_required": False,  # Disabled for speed
  97 |     },
  98 |     "exhaustive": {
  99 |         "max_sources": 15,  # Reduced from 25
 100 |         "max_searches": 5,  # Reduced from 10
 101 |         "analysis_depth": "exhaustive",
 102 |         "validation_required": True,
 103 |     },
 104 | }
 105 | 
 106 | # Persona-specific research focus areas
 107 | PERSONA_RESEARCH_FOCUS = {
 108 |     "conservative": {
 109 |         "keywords": [
 110 |             "dividend",
 111 |             "stability",
 112 |             "risk",
 113 |             "debt",
 114 |             "cash flow",
 115 |             "established",
 116 |         ],
 117 |         "sources": [
 118 |             "sec filings",
 119 |             "annual reports",
 120 |             "rating agencies",
 121 |             "dividend history",
 122 |         ],
 123 |         "risk_focus": "downside protection",
 124 |         "time_horizon": "long-term",
 125 |     },
 126 |     "moderate": {
 127 |         "keywords": ["growth", "value", "balance", "diversification", "fundamentals"],
 128 |         "sources": ["financial statements", "analyst reports", "industry analysis"],
 129 |         "risk_focus": "risk-adjusted returns",
 130 |         "time_horizon": "medium-term",
 131 |     },
 132 |     "aggressive": {
 133 |         "keywords": ["growth", "momentum", "opportunity", "innovation", "expansion"],
 134 |         "sources": [
 135 |             "news",
 136 |             "earnings calls",
 137 |             "industry trends",
 138 |             "competitive analysis",
 139 |         ],
 140 |         "risk_focus": "upside potential",
 141 |         "time_horizon": "short to medium-term",
 142 |     },
 143 |     "day_trader": {
 144 |         "keywords": [
 145 |             "catalysts",
 146 |             "earnings",
 147 |             "news",
 148 |             "volume",
 149 |             "volatility",
 150 |             "momentum",
 151 |         ],
 152 |         "sources": ["breaking news", "social sentiment", "earnings announcements"],
 153 |         "risk_focus": "short-term risks",
 154 |         "time_horizon": "intraday to weekly",
 155 |     },
 156 | }
 157 | 
 158 | 
 159 | class WebSearchProvider:
 160 |     """Base class for web search providers with early abort mechanism."""
 161 | 
 162 |     def __init__(self, api_key: str):
 163 |         self.api_key = api_key
 164 |         self.rate_limiter = None  # Implement rate limiting
 165 |         self._failure_count = 0
 166 |         self._max_failures = 3  # Abort after 3 consecutive failures
 167 |         self._is_healthy = True
 168 |         self.settings = get_settings()
 169 |         self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
 170 | 
 171 |     def _calculate_timeout(
 172 |         self, query: str, timeout_budget: float | None = None
 173 |     ) -> float:
 174 |         """Calculate generous timeout for thorough research operations."""
 175 |         query_words = len(query.split())
 176 | 
 177 |         # Generous timeout calculation for thorough search operations
 178 |         if query_words <= 3:
 179 |             base_timeout = 30.0  # Simple queries - 30s for thorough results
 180 |         elif query_words <= 8:
 181 |             base_timeout = 45.0  # Standard queries - 45s for comprehensive search
 182 |         else:
 183 |             base_timeout = 60.0  # Complex queries - 60s for exhaustive search
 184 | 
 185 |         # Apply budget constraints if available
 186 |         if timeout_budget and timeout_budget > 0:
 187 |             # Use generous portion of available budget per search operation
 188 |             budget_timeout = max(
 189 |                 timeout_budget * 0.6, 30.0
 190 |             )  # At least 30s, use 60% of budget
 191 |             calculated_timeout = min(base_timeout, budget_timeout)
 192 | 
 193 |             # Ensure minimum timeout (at least 30s for thorough search)
 194 |             calculated_timeout = max(calculated_timeout, 30.0)
 195 |         else:
 196 |             calculated_timeout = base_timeout
 197 | 
 198 |         # Final timeout with generous minimum for thorough search
 199 |         final_timeout = max(calculated_timeout, 30.0)
 200 | 
 201 |         return final_timeout
 202 | 
 203 |     def _record_failure(self, error_type: str = "unknown") -> None:
 204 |         """Record a search failure and check if provider should be disabled."""
 205 |         self._failure_count += 1
 206 | 
 207 |         # Use separate thresholds for timeout vs other failures
 208 |         timeout_threshold = getattr(
 209 |             self.settings.performance, "search_timeout_failure_threshold", 12
 210 |         )
 211 | 
 212 |         # Much more tolerant of timeout failures - they may be due to network/complexity
 213 |         if error_type == "timeout" and self._failure_count >= timeout_threshold:
 214 |             self._is_healthy = False
 215 |             logger.warning(
 216 |                 f"Search provider {self.__class__.__name__} disabled after "
 217 |                 f"{self._failure_count} consecutive timeout failures (threshold: {timeout_threshold})"
 218 |             )
 219 |         elif error_type != "timeout" and self._failure_count >= self._max_failures * 2:
 220 |             # Be more lenient for non-timeout failures (2x threshold)
 221 |             self._is_healthy = False
 222 |             logger.warning(
 223 |                 f"Search provider {self.__class__.__name__} disabled after "
 224 |                 f"{self._failure_count} total non-timeout failures"
 225 |             )
 226 | 
 227 |         logger.debug(
 228 |             f"Provider {self.__class__.__name__} failure recorded: "
 229 |             f"type={error_type}, count={self._failure_count}, healthy={self._is_healthy}"
 230 |         )
 231 | 
 232 |     def _record_success(self) -> None:
 233 |         """Record a successful search and reset failure count."""
 234 |         if self._failure_count > 0:
 235 |             logger.info(
 236 |                 f"Search provider {self.__class__.__name__} recovered after "
 237 |                 f"{self._failure_count} failures"
 238 |             )
 239 |         self._failure_count = 0
 240 |         self._is_healthy = True
 241 | 
 242 |     def is_healthy(self) -> bool:
 243 |         """Check if provider is healthy and should be used."""
 244 |         return self._is_healthy
 245 | 
 246 |     async def search(
 247 |         self, query: str, num_results: int = 10, timeout_budget: float | None = None
 248 |     ) -> list[dict[str, Any]]:
 249 |         """Perform web search and return results."""
 250 |         raise NotImplementedError
 251 | 
 252 |     async def get_content(self, url: str) -> dict[str, Any]:
 253 |         """Extract content from URL."""
 254 |         raise NotImplementedError
 255 | 
 256 |     async def search_multiple_providers(
 257 |         self,
 258 |         queries: list[str],
 259 |         providers: list[str] | None = None,
 260 |         max_results_per_query: int = 5,
 261 |     ) -> dict[str, list[dict[str, Any]]]:
 262 |         """Search using multiple providers and return aggregated results."""
 263 |         providers = providers or ["exa"]  # Default to available providers
 264 |         results = {}
 265 | 
 266 |         for provider_name in providers:
 267 |             provider_results = []
 268 |             for query in queries:
 269 |                 try:
 270 |                     query_results = await self.search(query, max_results_per_query)
 271 | 
 272 |                     provider_results.extend(query_results or [])
 273 |                 except Exception as e:
 274 |                     self.logger.warning(
 275 |                         f"Search failed for provider {provider_name}, query '{query}': {e}"
 276 |                     )
 277 |                     continue
 278 | 
 279 |             results[provider_name] = provider_results
 280 | 
 281 |         return results
 282 | 
 283 |     def _timeframe_to_date(self, timeframe: str) -> str | None:
 284 |         """Convert timeframe string to date string."""
 285 |         from datetime import datetime, timedelta
 286 | 
 287 |         now = datetime.now()
 288 | 
 289 |         if timeframe == "1d":
 290 |             date = now - timedelta(days=1)
 291 |         elif timeframe == "1w":
 292 |             date = now - timedelta(weeks=1)
 293 |         elif timeframe == "1m":
 294 |             date = now - timedelta(days=30)
 295 |         else:
 296 |             # Invalid or unsupported timeframe, return None
 297 |             return None
 298 | 
 299 |         return date.strftime("%Y-%m-%d")
 300 | 
 301 | 
 302 | class ExaSearchProvider(WebSearchProvider):
 303 |     """Exa search provider for comprehensive web search using MCP tools with financial optimization."""
 304 | 
 305 |     def __init__(self, api_key: str):
 306 |         super().__init__(api_key)
 307 |         # Store the API key for verification
 308 |         self._api_key_verified = bool(api_key)
 309 | 
 310 |         # Financial-specific domain preferences for better results
 311 |         self.financial_domains = [
 312 |             "sec.gov",
 313 |             "edgar.sec.gov",
 314 |             "investor.gov",
 315 |             "bloomberg.com",
 316 |             "reuters.com",
 317 |             "wsj.com",
 318 |             "ft.com",
 319 |             "marketwatch.com",
 320 |             "yahoo.com/finance",
 321 |             "finance.yahoo.com",
 322 |             "morningstar.com",
 323 |             "fool.com",
 324 |             "seekingalpha.com",
 325 |             "investopedia.com",
 326 |             "barrons.com",
 327 |             "cnbc.com",
 328 |             "nasdaq.com",
 329 |             "nyse.com",
 330 |             "finra.org",
 331 |             "federalreserve.gov",
 332 |             "treasury.gov",
 333 |             "bls.gov",
 334 |         ]
 335 | 
 336 |         # Domains to exclude for financial searches
 337 |         self.excluded_domains = [
 338 |             "facebook.com",
 339 |             "twitter.com",
 340 |             "x.com",
 341 |             "instagram.com",
 342 |             "tiktok.com",
 343 |             "reddit.com",
 344 |             "pinterest.com",
 345 |             "linkedin.com",
 346 |             "youtube.com",
 347 |             "wikipedia.org",
 348 |         ]
 349 | 
 350 |         logger.info("Initialized ExaSearchProvider with financial optimization")
 351 | 
 352 |     async def search(
 353 |         self, query: str, num_results: int = 10, timeout_budget: float | None = None
 354 |     ) -> list[dict[str, Any]]:
 355 |         """Search using Exa via async client for comprehensive web results with adaptive timeout."""
 356 |         return await self._search_with_strategy(
 357 |             query, num_results, timeout_budget, "auto"
 358 |         )
 359 | 
 360 |     async def search_financial(
 361 |         self,
 362 |         query: str,
 363 |         num_results: int = 10,
 364 |         timeout_budget: float | None = None,
 365 |         strategy: str = "hybrid",
 366 |     ) -> list[dict[str, Any]]:
 367 |         """
 368 |         Enhanced financial search with optimized queries and domain targeting.
 369 | 
 370 |         Args:
 371 |             query: Search query
 372 |             num_results: Number of results to return
 373 |             timeout_budget: Timeout budget in seconds
 374 |             strategy: Search strategy - 'hybrid', 'authoritative', 'comprehensive', or 'auto'
 375 |         """
 376 |         return await self._search_with_strategy(
 377 |             query, num_results, timeout_budget, strategy
 378 |         )
 379 | 
 380 |     async def _search_with_strategy(
 381 |         self, query: str, num_results: int, timeout_budget: float | None, strategy: str
 382 |     ) -> list[dict[str, Any]]:
 383 |         """Internal method to handle different search strategies."""
 384 | 
 385 |         # Check provider health before attempting search
 386 |         if not self.is_healthy():
 387 |             logger.warning("Exa provider is unhealthy - skipping search")
 388 |             raise WebSearchError("Exa provider disabled due to repeated failures")
 389 | 
 390 |         # Calculate adaptive timeout
 391 |         search_timeout = self._calculate_timeout(query, timeout_budget)
 392 | 
 393 |         try:
 394 |             # Use search-specific circuit breaker settings (more tolerant)
 395 |             circuit_breaker = await circuit_manager.get_or_create(
 396 |                 "exa_search",
 397 |                 failure_threshold=getattr(
 398 |                     self.settings.performance,
 399 |                     "search_circuit_breaker_failure_threshold",
 400 |                     8,
 401 |                 ),
 402 |                 recovery_timeout=getattr(
 403 |                     self.settings.performance,
 404 |                     "search_circuit_breaker_recovery_timeout",
 405 |                     30,
 406 |                 ),
 407 |             )
 408 | 
 409 |             async def _search():
 410 |                 # Use the async exa-py library for web search
 411 |                 try:
 412 |                     from exa_py import AsyncExa
 413 | 
 414 |                     # Initialize AsyncExa client with API key
 415 |                     async_exa_client = AsyncExa(api_key=self.api_key)
 416 | 
 417 |                     # Configure search parameters based on strategy
 418 |                     search_params = self._get_search_params(
 419 |                         query, num_results, strategy
 420 |                     )
 421 | 
 422 |                     # Call Exa search with optimized parameters
 423 |                     exa_response = await async_exa_client.search_and_contents(
 424 |                         **search_params
 425 |                     )
 426 | 
 427 |                     # Convert Exa response to standard format with enhanced metadata
 428 |                     results = []
 429 |                     if exa_response and hasattr(exa_response, "results"):
 430 |                         for result in exa_response.results:
 431 |                             # Enhanced result processing with financial relevance scoring
 432 |                             financial_relevance = self._calculate_financial_relevance(
 433 |                                 result
 434 |                             )
 435 | 
 436 |                             results.append(
 437 |                                 {
 438 |                                     "url": result.url or "",
 439 |                                     "title": result.title or "No Title",
 440 |                                     "content": (result.text or "")[:2000],
 441 |                                     "raw_content": (result.text or "")[
 442 |                                         :5000
 443 |                                     ],  # Increased for financial content
 444 |                                     "published_date": result.published_date or "",
 445 |                                     "score": result.score
 446 |                                     if hasattr(result, "score")
 447 |                                     and result.score is not None
 448 |                                     else 0.7,
 449 |                                     "financial_relevance": financial_relevance,
 450 |                                     "provider": "exa",
 451 |                                     "author": result.author
 452 |                                     if hasattr(result, "author")
 453 |                                     and result.author is not None
 454 |                                     else "",
 455 |                                     "domain": self._extract_domain(result.url or ""),
 456 |                                     "is_authoritative": self._is_authoritative_source(
 457 |                                         result.url or ""
 458 |                                     ),
 459 |                                 }
 460 |                             )
 461 | 
 462 |                     # Sort results by financial relevance and score
 463 |                     results.sort(
 464 |                         key=lambda x: (x["financial_relevance"], x["score"]),
 465 |                         reverse=True,
 466 |                     )
 467 |                     return results
 468 | 
 469 |                 except ImportError:
 470 |                     logger.error("exa-py library not available - cannot perform search")
 471 |                     raise WebSearchError(
 472 |                         "exa-py library required for ExaSearchProvider"
 473 |                     )
 474 |                 except Exception as e:
 475 |                     logger.error(f"Error calling Exa API: {e}")
 476 |                     raise e
 477 | 
 478 |             # Use adaptive timeout based on query complexity and budget
 479 |             result = await asyncio.wait_for(
 480 |                 circuit_breaker.call(_search), timeout=search_timeout
 481 |             )
 482 |             self._record_success()  # Record successful search
 483 |             logger.debug(
 484 |                 f"Exa search completed in {search_timeout:.1f}s timeout window"
 485 |             )
 486 |             return result
 487 | 
 488 |         except TimeoutError:
 489 |             self._record_failure("timeout")  # Record timeout as specific failure type
 490 |             query_snippet = query[:100] + ("..." if len(query) > 100 else "")
 491 |             logger.error(
 492 |                 f"Exa search timeout after {search_timeout:.1f} seconds (failure #{self._failure_count}) "
 493 |                 f"for query: '{query_snippet}'"
 494 |             )
 495 |             raise WebSearchError(
 496 |                 f"Exa search timed out after {search_timeout:.1f} seconds"
 497 |             )
 498 |         except Exception as e:
 499 |             self._record_failure("error")  # Record non-timeout failure
 500 |             logger.error(f"Exa search error (failure #{self._failure_count}): {e}")
 501 |             raise WebSearchError(f"Exa search failed: {str(e)}")
 502 | 
 503 |     def _get_search_params(
 504 |         self, query: str, num_results: int, strategy: str
 505 |     ) -> dict[str, Any]:
 506 |         """
 507 |         Generate optimized search parameters based on strategy and query type.
 508 | 
 509 |         Args:
 510 |             query: Search query
 511 |             num_results: Number of results
 512 |             strategy: Search strategy
 513 | 
 514 |         Returns:
 515 |             Dictionary of search parameters for Exa API
 516 |         """
 517 |         # Base parameters
 518 |         params = {
 519 |             "query": query,
 520 |             "num_results": num_results,
 521 |             "text": {"max_characters": 5000},  # Increased for financial content
 522 |         }
 523 | 
 524 |         # Strategy-specific optimizations
 525 |         if strategy == "authoritative":
 526 |             # Focus on authoritative financial sources
 527 |             # Note: Exa API doesn't allow both include_domains and exclude_domains with content
 528 |             params.update(
 529 |                 {
 530 |                     "include_domains": self.financial_domains[
 531 |                         :10
 532 |                     ],  # Top authoritative sources
 533 |                     "type": "auto",  # Let Exa decide neural vs keyword
 534 |                     "start_published_date": "2020-01-01",  # Recent financial data
 535 |                 }
 536 |             )
 537 | 
 538 |         elif strategy == "comprehensive":
 539 |             # Broad search across all financial sources
 540 |             params.update(
 541 |                 {
 542 |                     "exclude_domains": self.excluded_domains,
 543 |                     "type": "neural",  # Better for comprehensive understanding
 544 |                     "start_published_date": "2018-01-01",  # Broader historical context
 545 |                 }
 546 |             )
 547 | 
 548 |         elif strategy == "hybrid":
 549 |             # Balanced approach with domain preferences
 550 |             params.update(
 551 |                 {
 552 |                     "exclude_domains": self.excluded_domains,
 553 |                     "type": "auto",  # Hybrid neural/keyword approach
 554 |                     "start_published_date": "2019-01-01",
 555 |                     # Use domain weighting rather than strict inclusion
 556 |                 }
 557 |             )
 558 | 
 559 |         else:  # "auto" or default
 560 |             # Standard search with basic optimizations
 561 |             params.update(
 562 |                 {
 563 |                     "exclude_domains": self.excluded_domains[:5],  # Basic exclusions
 564 |                     "type": "auto",
 565 |                 }
 566 |             )
 567 | 
 568 |         # Add financial-specific query enhancements
 569 |         enhanced_query = self._enhance_financial_query(query)
 570 |         if enhanced_query != query:
 571 |             params["query"] = enhanced_query
 572 | 
 573 |         return params
 574 | 
 575 |     def _enhance_financial_query(self, query: str) -> str:
 576 |         """
 577 |         Enhance queries with financial context and terminology.
 578 | 
 579 |         Args:
 580 |             query: Original search query
 581 | 
 582 |         Returns:
 583 |             Enhanced query with financial context
 584 |         """
 585 |         # Financial keywords that improve search quality
 586 |         financial_terms = {
 587 |             "earnings",
 588 |             "revenue",
 589 |             "profit",
 590 |             "loss",
 591 |             "financial",
 592 |             "quarterly",
 593 |             "annual",
 594 |             "SEC",
 595 |             "10-K",
 596 |             "10-Q",
 597 |             "balance sheet",
 598 |             "income statement",
 599 |             "cash flow",
 600 |             "dividend",
 601 |             "stock",
 602 |             "share",
 603 |             "market cap",
 604 |             "valuation",
 605 |         }
 606 | 
 607 |         query_lower = query.lower()
 608 | 
 609 |         # Check if query already contains financial terms
 610 |         has_financial_context = any(term in query_lower for term in financial_terms)
 611 | 
 612 |         # Add context for company/stock queries
 613 |         if not has_financial_context:
 614 |             # Detect if it's a company or stock symbol query
 615 |             if any(
 616 |                 indicator in query_lower
 617 |                 for indicator in ["company", "corp", "inc", "$", "stock"]
 618 |             ):
 619 |                 return f"{query} financial analysis earnings revenue"
 620 |             elif len(query.split()) <= 3 and query.isupper():  # Likely stock symbol
 621 |                 return f"{query} stock financial performance earnings"
 622 |             elif "analysis" in query_lower or "research" in query_lower:
 623 |                 return f"{query} financial data SEC filings"
 624 | 
 625 |         return query
 626 | 
 627 |     def _calculate_financial_relevance(self, result) -> float:
 628 |         """
 629 |         Calculate financial relevance score for a search result.
 630 | 
 631 |         Args:
 632 |             result: Exa search result object
 633 | 
 634 |         Returns:
 635 |             Financial relevance score (0.0 to 1.0)
 636 |         """
 637 |         score = 0.0
 638 | 
 639 |         # Domain-based scoring
 640 |         domain = self._extract_domain(result.url)
 641 |         if domain in self.financial_domains:
 642 |             if domain in ["sec.gov", "edgar.sec.gov", "federalreserve.gov"]:
 643 |                 score += 0.4  # Highest authority
 644 |             elif domain in ["bloomberg.com", "reuters.com", "wsj.com", "ft.com"]:
 645 |                 score += 0.3  # High-quality financial news
 646 |             else:
 647 |                 score += 0.2  # Other financial sources
 648 | 
 649 |         # Content-based scoring
 650 |         if hasattr(result, "text") and result.text:
 651 |             text_lower = result.text.lower()
 652 | 
 653 |             # Financial terminology scoring
 654 |             financial_keywords = [
 655 |                 "earnings",
 656 |                 "revenue",
 657 |                 "profit",
 658 |                 "financial",
 659 |                 "quarterly",
 660 |                 "annual",
 661 |                 "sec filing",
 662 |                 "10-k",
 663 |                 "10-q",
 664 |                 "balance sheet",
 665 |                 "income statement",
 666 |                 "cash flow",
 667 |                 "dividend",
 668 |                 "market cap",
 669 |                 "valuation",
 670 |                 "analyst",
 671 |                 "forecast",
 672 |                 "guidance",
 673 |                 "ebitda",
 674 |                 "eps",
 675 |                 "pe ratio",
 676 |             ]
 677 | 
 678 |             keyword_matches = sum(
 679 |                 1 for keyword in financial_keywords if keyword in text_lower
 680 |             )
 681 |             score += min(keyword_matches * 0.05, 0.3)  # Max 0.3 from keywords
 682 | 
 683 |         # Title-based scoring
 684 |         if hasattr(result, "title") and result.title:
 685 |             title_lower = result.title.lower()
 686 |             if any(
 687 |                 term in title_lower
 688 |                 for term in ["financial", "earnings", "quarterly", "annual", "sec"]
 689 |             ):
 690 |                 score += 0.1
 691 | 
 692 |         # Recency bonus for financial data
 693 |         if hasattr(result, "published_date") and result.published_date:
 694 |             try:
 695 |                 from datetime import datetime
 696 | 
 697 |                 # Handle different date formats
 698 |                 date_str = str(result.published_date)
 699 |                 if date_str and date_str != "":
 700 |                     # Handle ISO format with Z
 701 |                     if date_str.endswith("Z"):
 702 |                         date_str = date_str.replace("Z", "+00:00")
 703 | 
 704 |                     pub_date = datetime.fromisoformat(date_str)
 705 |                     days_old = (datetime.now(UTC) - pub_date).days
 706 | 
 707 |                     if days_old <= 30:
 708 |                         score += 0.1  # Recent data bonus
 709 |                     elif days_old <= 90:
 710 |                         score += 0.05  # Somewhat recent bonus
 711 |             except (ValueError, AttributeError, TypeError):
 712 |                 pass  # Skip if date parsing fails
 713 | 
 714 |         return min(score, 1.0)  # Cap at 1.0
 715 | 
 716 |     def _extract_domain(self, url: str) -> str:
 717 |         """Extract domain from URL."""
 718 |         try:
 719 |             from urllib.parse import urlparse
 720 | 
 721 |             return urlparse(url).netloc.lower().replace("www.", "")
 722 |         except Exception:
 723 |             return ""
 724 | 
 725 |     def _is_authoritative_source(self, url: str) -> bool:
 726 |         """Check if URL is from an authoritative financial source."""
 727 |         domain = self._extract_domain(url)
 728 |         authoritative_domains = [
 729 |             "sec.gov",
 730 |             "edgar.sec.gov",
 731 |             "federalreserve.gov",
 732 |             "treasury.gov",
 733 |             "bloomberg.com",
 734 |             "reuters.com",
 735 |             "wsj.com",
 736 |             "ft.com",
 737 |         ]
 738 |         return domain in authoritative_domains
 739 | 
 740 | 
 741 | class TavilySearchProvider(WebSearchProvider):
 742 |     """Tavily search provider with sensible filtering for financial research."""
 743 | 
 744 |     def __init__(self, api_key: str):
 745 |         super().__init__(api_key)
 746 |         self.excluded_domains = {
 747 |             "facebook.com",
 748 |             "twitter.com",
 749 |             "x.com",
 750 |             "instagram.com",
 751 |             "reddit.com",
 752 |         }
 753 | 
 754 |     async def search(
 755 |         self, query: str, num_results: int = 10, timeout_budget: float | None = None
 756 |     ) -> list[dict[str, Any]]:
 757 |         if not self.is_healthy():
 758 |             raise WebSearchError("Tavily provider disabled due to repeated failures")
 759 | 
 760 |         timeout = self._calculate_timeout(query, timeout_budget)
 761 |         circuit_breaker = await circuit_manager.get_or_create(
 762 |             "tavily_search",
 763 |             failure_threshold=8,
 764 |             recovery_timeout=30,
 765 |         )
 766 | 
 767 |         async def _search() -> list[dict[str, Any]]:
 768 |             if TavilyClient is None:
 769 |                 raise ImportError("tavily package is required for TavilySearchProvider")
 770 | 
 771 |             client = TavilyClient(api_key=self.api_key)
 772 |             response = await asyncio.get_event_loop().run_in_executor(
 773 |                 None,
 774 |                 lambda: client.search(query=query, max_results=num_results),
 775 |             )
 776 |             return self._process_results(response.get("results", []))
 777 | 
 778 |         return await circuit_breaker.call(_search, timeout=timeout)
 779 | 
 780 |     def _process_results(
 781 |         self, results: Iterable[dict[str, Any]]
 782 |     ) -> list[dict[str, Any]]:
 783 |         processed: list[dict[str, Any]] = []
 784 |         for item in results:
 785 |             url = item.get("url", "")
 786 |             if any(domain in url for domain in self.excluded_domains):
 787 |                 continue
 788 |             processed.append(
 789 |                 {
 790 |                     "url": url,
 791 |                     "title": item.get("title"),
 792 |                     "content": item.get("content") or item.get("raw_content", ""),
 793 |                     "raw_content": item.get("raw_content"),
 794 |                     "published_date": item.get("published_date"),
 795 |                     "score": item.get("score", 0.0),
 796 |                     "provider": "tavily",
 797 |                 }
 798 |             )
 799 |         return processed
 800 | 
 801 | 
 802 | class ContentAnalyzer:
 803 |     """AI-powered content analysis for research results with batch processing capability."""
 804 | 
 805 |     def __init__(self, llm: BaseChatModel):
 806 |         self.llm = llm
 807 |         self._batch_size = 4  # Process up to 4 sources concurrently
 808 | 
 809 |     @staticmethod
 810 |     def _coerce_message_content(raw_content: Any) -> str:
 811 |         """Convert LLM response content to a string for JSON parsing."""
 812 |         if isinstance(raw_content, str):
 813 |             return raw_content
 814 | 
 815 |         if isinstance(raw_content, list):
 816 |             parts: list[str] = []
 817 |             for item in raw_content:
 818 |                 if isinstance(item, dict):
 819 |                     text_value = item.get("text")
 820 |                     if isinstance(text_value, str):
 821 |                         parts.append(text_value)
 822 |                     else:
 823 |                         parts.append(str(text_value))
 824 |                 else:
 825 |                     parts.append(str(item))
 826 |             return "".join(parts)
 827 | 
 828 |         return str(raw_content)
 829 | 
 830 |     async def analyze_content(
 831 |         self, content: str, persona: str, analysis_focus: str = "general"
 832 |     ) -> dict[str, Any]:
 833 |         """Analyze content with AI for insights, sentiment, and relevance."""
 834 | 
 835 |         persona_focus = PERSONA_RESEARCH_FOCUS.get(
 836 |             persona, PERSONA_RESEARCH_FOCUS["moderate"]
 837 |         )
 838 | 
 839 |         analysis_prompt = f"""
 840 |         Analyze this financial content from the perspective of a {persona} investor.
 841 | 
 842 |         Content to analyze:
 843 |         {content[:3000]}  # Limit content length
 844 | 
 845 |         Focus Areas: {", ".join(persona_focus["keywords"])}
 846 |         Risk Focus: {persona_focus["risk_focus"]}
 847 |         Time Horizon: {persona_focus["time_horizon"]}
 848 | 
 849 |         Provide analysis in the following structure:
 850 | 
 851 |         1. KEY_INSIGHTS: 3-5 bullet points of most important insights
 852 |         2. SENTIMENT: Overall sentiment (bullish/bearish/neutral) with confidence (0-1)
 853 |         3. RISK_FACTORS: Key risks identified relevant to {persona} investors
 854 |         4. OPPORTUNITIES: Investment opportunities or catalysts identified
 855 |         5. CREDIBILITY: Assessment of source credibility (0-1 score)
 856 |         6. RELEVANCE: How relevant is this to {persona} investment strategy (0-1 score)
 857 |         7. SUMMARY: 2-3 sentence summary for {persona} investors
 858 | 
 859 |         Format as JSON with clear structure.
 860 |         """
 861 | 
 862 |         try:
 863 |             response = await self.llm.ainvoke(
 864 |                 [
 865 |                     SystemMessage(
 866 |                         content="You are a financial content analyst. Return only valid JSON."
 867 |                     ),
 868 |                     HumanMessage(content=analysis_prompt),
 869 |                 ]
 870 |             )
 871 | 
 872 |             raw_content = self._coerce_message_content(response.content).strip()
 873 |             analysis = json.loads(raw_content)
 874 | 
 875 |             return {
 876 |                 "insights": analysis.get("KEY_INSIGHTS", []),
 877 |                 "sentiment": {
 878 |                     "direction": analysis.get("SENTIMENT", {}).get(
 879 |                         "direction", "neutral"
 880 |                     ),
 881 |                     "confidence": analysis.get("SENTIMENT", {}).get("confidence", 0.5),
 882 |                 },
 883 |                 "risk_factors": analysis.get("RISK_FACTORS", []),
 884 |                 "opportunities": analysis.get("OPPORTUNITIES", []),
 885 |                 "credibility_score": analysis.get("CREDIBILITY", 0.5),
 886 |                 "relevance_score": analysis.get("RELEVANCE", 0.5),
 887 |                 "summary": analysis.get("SUMMARY", ""),
 888 |                 "analysis_timestamp": datetime.now(),
 889 |             }
 890 | 
 891 |         except Exception as e:
 892 |             logger.warning(f"AI content analysis failed: {e}, using fallback")
 893 |             return self._fallback_analysis(content, persona)
 894 | 
 895 |     def _fallback_analysis(self, content: str, persona: str) -> dict[str, Any]:
 896 |         """Fallback analysis using keyword matching."""
 897 |         persona_focus = PERSONA_RESEARCH_FOCUS.get(
 898 |             persona, PERSONA_RESEARCH_FOCUS["moderate"]
 899 |         )
 900 | 
 901 |         content_lower = content.lower()
 902 | 
 903 |         # Simple sentiment analysis
 904 |         positive_words = [
 905 |             "growth",
 906 |             "increase",
 907 |             "profit",
 908 |             "success",
 909 |             "opportunity",
 910 |             "strong",
 911 |         ]
 912 |         negative_words = ["decline", "loss", "risk", "problem", "concern", "weak"]
 913 | 
 914 |         positive_count = sum(1 for word in positive_words if word in content_lower)
 915 |         negative_count = sum(1 for word in negative_words if word in content_lower)
 916 | 
 917 |         if positive_count > negative_count:
 918 |             sentiment = "bullish"
 919 |             confidence = 0.6
 920 |         elif negative_count > positive_count:
 921 |             sentiment = "bearish"
 922 |             confidence = 0.6
 923 |         else:
 924 |             sentiment = "neutral"
 925 |             confidence = 0.5
 926 | 
 927 |         # Relevance scoring based on keywords
 928 |         keyword_matches = sum(
 929 |             1 for keyword in persona_focus["keywords"] if keyword in content_lower
 930 |         )
 931 |         relevance_score = min(keyword_matches / len(persona_focus["keywords"]), 1.0)
 932 | 
 933 |         return {
 934 |             "insights": [f"Fallback analysis for {persona} investor perspective"],
 935 |             "sentiment": {"direction": sentiment, "confidence": confidence},
 936 |             "risk_factors": ["Unable to perform detailed risk analysis"],
 937 |             "opportunities": ["Unable to identify specific opportunities"],
 938 |             "credibility_score": 0.5,
 939 |             "relevance_score": relevance_score,
 940 |             "summary": f"Content analysis for {persona} investor using fallback method",
 941 |             "analysis_timestamp": datetime.now(),
 942 |             "fallback_used": True,
 943 |         }
 944 | 
 945 |     async def analyze_content_batch(
 946 |         self,
 947 |         content_items: list[tuple[str, str]],
 948 |         persona: str,
 949 |         analysis_focus: str = "general",
 950 |     ) -> list[dict[str, Any]]:
 951 |         """
 952 |         Analyze multiple content items in parallel batches for improved performance.
 953 | 
 954 |         Args:
 955 |             content_items: List of (content, source_identifier) tuples
 956 |             persona: Investor persona for analysis perspective
 957 |             analysis_focus: Focus area for analysis
 958 | 
 959 |         Returns:
 960 |             List of analysis results in same order as input
 961 |         """
 962 |         if not content_items:
 963 |             return []
 964 | 
 965 |         # Process items in batches to avoid overwhelming the LLM
 966 |         results = []
 967 |         for i in range(0, len(content_items), self._batch_size):
 968 |             batch = content_items[i : i + self._batch_size]
 969 | 
 970 |             # Create concurrent tasks for this batch
 971 |             tasks = [
 972 |                 self.analyze_content(content, persona, analysis_focus)
 973 |                 for content, _ in batch
 974 |             ]
 975 | 
 976 |             # Wait for all tasks in this batch to complete
 977 |             try:
 978 |                 batch_results = await asyncio.gather(*tasks, return_exceptions=True)
 979 | 
 980 |                 # Process results and handle exceptions
 981 |                 for j, result in enumerate(batch_results):
 982 |                     if isinstance(result, Exception):
 983 |                         logger.warning(
 984 |                             f"Batch analysis failed for item {i + j}: {result}"
 985 |                         )
 986 |                         # Use fallback for failed items
 987 |                         content, source_id = batch[j]
 988 |                         fallback_result = self._fallback_analysis(content, persona)
 989 |                         fallback_result["source_identifier"] = source_id
 990 |                         fallback_result["batch_processed"] = True
 991 |                         results.append(fallback_result)
 992 |                     elif isinstance(result, dict):
 993 |                         enriched_result = dict(result)
 994 |                         enriched_result["source_identifier"] = batch[j][1]
 995 |                         enriched_result["batch_processed"] = True
 996 |                         results.append(enriched_result)
 997 |                     else:
 998 |                         content, source_id = batch[j]
 999 |                         fallback_result = self._fallback_analysis(content, persona)
1000 |                         fallback_result["source_identifier"] = source_id
1001 |                         fallback_result["batch_processed"] = True
1002 |                         results.append(fallback_result)
1003 | 
1004 |             except Exception as e:
1005 |                 logger.error(f"Batch analysis completely failed: {e}")
1006 |                 # Fallback for entire batch
1007 |                 for content, source_id in batch:
1008 |                     fallback_result = self._fallback_analysis(content, persona)
1009 |                     fallback_result["source_identifier"] = source_id
1010 |                     fallback_result["batch_processed"] = True
1011 |                     fallback_result["batch_error"] = str(e)
1012 |                     results.append(fallback_result)
1013 | 
1014 |         logger.info(
1015 |             f"Batch content analysis completed: {len(content_items)} items processed "
1016 |             f"in {(len(content_items) + self._batch_size - 1) // self._batch_size} batches"
1017 |         )
1018 | 
1019 |         return results
1020 | 
1021 |     async def analyze_content_items(
1022 |         self,
1023 |         content_items: list[dict[str, Any]],
1024 |         focus_areas: list[str],
1025 |     ) -> dict[str, Any]:
1026 |         """
1027 |         Analyze content items for test compatibility.
1028 | 
1029 |         Args:
1030 |             content_items: List of search result dictionaries with content/text field
1031 |             focus_areas: List of focus areas for analysis
1032 | 
1033 |         Returns:
1034 |             Dictionary with aggregated analysis results
1035 |         """
1036 |         if not content_items:
1037 |             return {
1038 |                 "insights": [],
1039 |                 "sentiment_scores": [],
1040 |                 "credibility_scores": [],
1041 |             }
1042 | 
1043 |         # For test compatibility, directly use LLM with test-compatible format
1044 |         analyzed_results = []
1045 |         for item in content_items:
1046 |             content = item.get("text") or item.get("content") or ""
1047 |             if content:
1048 |                 try:
1049 |                     # Direct LLM call for test compatibility
1050 |                     prompt = f"Analyze: {content[:500]}"
1051 |                     response = await self.llm.ainvoke(
1052 |                         [
1053 |                             SystemMessage(
1054 |                                 content="You are a financial content analyst. Return only valid JSON."
1055 |                             ),
1056 |                             HumanMessage(content=prompt),
1057 |                         ]
1058 |                     )
1059 | 
1060 |                     coerced_content = self._coerce_message_content(
1061 |                         response.content
1062 |                     ).strip()
1063 |                     analysis = json.loads(coerced_content)
1064 |                     analyzed_results.append(analysis)
1065 |                 except Exception as e:
1066 |                     logger.warning(f"Content analysis failed: {e}")
1067 |                     # Add fallback analysis
1068 |                     analyzed_results.append(
1069 |                         {
1070 |                             "insights": [
1071 |                                 {"insight": "Analysis failed", "confidence": 0.1}
1072 |                             ],
1073 |                             "sentiment": {"direction": "neutral", "confidence": 0.5},
1074 |                             "credibility": 0.5,
1075 |                         }
1076 |                     )
1077 | 
1078 |         # Aggregate results
1079 |         all_insights = []
1080 |         sentiment_scores = []
1081 |         credibility_scores = []
1082 | 
1083 |         for result in analyzed_results:
1084 |             # Handle test format with nested insight objects
1085 |             insights = result.get("insights", [])
1086 |             if isinstance(insights, list):
1087 |                 for insight in insights:
1088 |                     if isinstance(insight, dict) and "insight" in insight:
1089 |                         all_insights.append(insight["insight"])
1090 |                     elif isinstance(insight, str):
1091 |                         all_insights.append(insight)
1092 |                     else:
1093 |                         all_insights.append(str(insight))
1094 | 
1095 |             sentiment = result.get("sentiment", {})
1096 |             if sentiment:
1097 |                 sentiment_scores.append(sentiment)
1098 | 
1099 |             credibility = result.get(
1100 |                 "credibility_score", result.get("credibility", 0.5)
1101 |             )
1102 |             credibility_scores.append(credibility)
1103 | 
1104 |         return {
1105 |             "insights": all_insights,
1106 |             "sentiment_scores": sentiment_scores,
1107 |             "credibility_scores": credibility_scores,
1108 |         }
1109 | 
1110 |     async def _analyze_single_content(
1111 |         self, content_item: dict[str, Any] | str, focus_areas: list[str] | None = None
1112 |     ) -> dict[str, Any]:
1113 |         """Analyze single content item - used by tests."""
1114 |         if isinstance(content_item, dict):
1115 |             content = content_item.get("text") or content_item.get("content") or ""
1116 |         else:
1117 |             content = content_item
1118 | 
1119 |         try:
1120 |             result = await self.analyze_content(content, "moderate")
1121 |             # Ensure test-compatible format
1122 |             if "credibility_score" in result and "credibility" not in result:
1123 |                 result["credibility"] = result["credibility_score"]
1124 |             return result
1125 |         except Exception as e:
1126 |             logger.warning(f"Single content analysis failed: {e}")
1127 |             # Return fallback result
1128 |             return {
1129 |                 "sentiment": {"direction": "neutral", "confidence": 0.5},
1130 |                 "credibility": 0.5,
1131 |                 "credibility_score": 0.5,
1132 |                 "insights": [],
1133 |                 "risk_factors": [],
1134 |                 "opportunities": [],
1135 |             }
1136 | 
1137 |     async def _extract_themes(
1138 |         self, content_items: list[dict[str, Any]]
1139 |     ) -> list[dict[str, Any]]:
1140 |         """Extract themes from content items - used by tests."""
1141 |         if not content_items:
1142 |             return []
1143 | 
1144 |         # Use LLM to extract structured themes
1145 |         try:
1146 |             content_text = "\n".join(
1147 |                 [item.get("text", item.get("content", "")) for item in content_items]
1148 |             )
1149 | 
1150 |             prompt = f"""
1151 |             Extract key themes from the following content and return as JSON:
1152 | 
1153 |             {content_text[:2000]}
1154 | 
1155 |             Return format: {{"themes": [{{"theme": "theme_name", "relevance": 0.9, "mentions": 10}}]}}
1156 |             """
1157 | 
1158 |             response = await self.llm.ainvoke(
1159 |                 [
1160 |                     SystemMessage(
1161 |                         content="You are a theme extraction AI. Return only valid JSON."
1162 |                     ),
1163 |                     HumanMessage(content=prompt),
1164 |                 ]
1165 |             )
1166 | 
1167 |             result = json.loads(
1168 |                 ContentAnalyzer._coerce_message_content(response.content)
1169 |             )
1170 |             return result.get("themes", [])
1171 | 
1172 |         except Exception as e:
1173 |             logger.warning(f"Theme extraction failed: {e}")
1174 |             # Fallback to simple keyword-based themes
1175 |             themes = []
1176 |             for item in content_items:
1177 |                 content = item.get("text") or item.get("content") or ""
1178 |                 if content:
1179 |                     content_lower = content.lower()
1180 |                     if "growth" in content_lower:
1181 |                         themes.append(
1182 |                             {"theme": "Growth", "relevance": 0.8, "mentions": 1}
1183 |                         )
1184 |                     if "earnings" in content_lower:
1185 |                         themes.append(
1186 |                             {"theme": "Earnings", "relevance": 0.7, "mentions": 1}
1187 |                         )
1188 |                     if "technology" in content_lower:
1189 |                         themes.append(
1190 |                             {"theme": "Technology", "relevance": 0.6, "mentions": 1}
1191 |                         )
1192 | 
1193 |             return themes
1194 | 
1195 | 
1196 | class DeepResearchAgent(PersonaAwareAgent):
1197 |     """
1198 |     Deep research agent using 2025 LangGraph patterns.
1199 | 
1200 |     Provides comprehensive financial research with web search, content analysis,
1201 |     sentiment detection, and source validation.
1202 |     """
1203 | 
1204 |     def __init__(
1205 |         self,
1206 |         llm: BaseChatModel,
1207 |         persona: str = "moderate",
1208 |         checkpointer: MemorySaver | None = None,
1209 |         ttl_hours: int = 24,  # Research results cached longer
1210 |         exa_api_key: str | None = None,
1211 |         default_depth: str = "standard",
1212 |         max_sources: int | None = None,
1213 |         research_depth: str | None = None,
1214 |         enable_parallel_execution: bool = True,
1215 |         parallel_config=None,  # Type: ParallelResearchConfig | None
1216 |     ):
1217 |         """Initialize deep research agent."""
1218 | 
1219 |         # Import here to avoid circular dependency
1220 |         from maverick_mcp.utils.parallel_research import (
1221 |             ParallelResearchConfig,
1222 |             ParallelResearchOrchestrator,
1223 |             TaskDistributionEngine,
1224 |         )
1225 | 
1226 |         # Store API key for immediate loading of search provider (pre-initialization)
1227 |         self._exa_api_key = exa_api_key
1228 |         self._search_providers_loaded = False
1229 |         self.search_providers = []
1230 | 
1231 |         # Pre-initialize search providers immediately (async init will be called separately)
1232 |         self._initialization_pending = True
1233 | 
1234 |         # Configuration
1235 |         self.default_depth = research_depth or default_depth
1236 |         self.max_sources = max_sources or RESEARCH_DEPTH_LEVELS.get(
1237 |             self.default_depth, {}
1238 |         ).get("max_sources", 10)
1239 |         self.content_analyzer = ContentAnalyzer(llm)
1240 | 
1241 |         # Parallel execution configuration
1242 |         self.enable_parallel_execution = enable_parallel_execution
1243 |         self.parallel_config = parallel_config or ParallelResearchConfig(
1244 |             max_concurrent_agents=settings.data_limits.max_parallel_agents,
1245 |             timeout_per_agent=180,  # 3 minutes per agent for thorough research
1246 |             enable_fallbacks=False,  # Disable fallbacks for speed
1247 |             rate_limit_delay=0.5,  # Reduced delay for faster execution
1248 |         )
1249 |         self.parallel_orchestrator = ParallelResearchOrchestrator(self.parallel_config)
1250 |         self.task_distributor = TaskDistributionEngine()
1251 | 
1252 |         # Get research-specific tools
1253 |         research_tools = self._get_research_tools()
1254 | 
1255 |         # Initialize base class
1256 |         super().__init__(
1257 |             llm=llm,
1258 |             tools=research_tools,
1259 |             persona=persona,
1260 |             checkpointer=checkpointer or MemorySaver(),
1261 |             ttl_hours=ttl_hours,
1262 |         )
1263 | 
1264 |         # Initialize components
1265 |         self.conversation_store = ConversationStore(ttl_hours=ttl_hours)
1266 | 
1267 |     @property
1268 |     def web_search_provider(self):
1269 |         """Compatibility property for tests - returns first search provider."""
1270 |         return self.search_providers[0] if self.search_providers else None
1271 | 
1272 |     def _is_insight_relevant_for_persona(
1273 |         self, insight: dict[str, Any], characteristics: dict[str, Any]
1274 |     ) -> bool:
1275 |         """Check if an insight is relevant for a given persona - used by tests."""
1276 |         # Simple implementation for test compatibility
1277 |         # In a real implementation, this would analyze the insight against persona characteristics
1278 |         return True  # Default permissive approach as mentioned in test comments
1279 | 
1280 |     async def initialize(self) -> None:
1281 |         """Pre-initialize Exa search provider to eliminate lazy loading overhead during research."""
1282 |         if not self._initialization_pending:
1283 |             return
1284 | 
1285 |         try:
1286 |             provider = await get_cached_search_provider(self._exa_api_key)
1287 |             self.search_providers = [provider] if provider else []
1288 |             self._search_providers_loaded = True
1289 |             self._initialization_pending = False
1290 | 
1291 |             if not self.search_providers:
1292 |                 logger.warning(
1293 |                     "Exa search provider not available - research capabilities will be limited"
1294 |                 )
1295 |             else:
1296 |                 logger.info("Pre-initialized Exa search provider")
1297 | 
1298 |         except Exception as e:
1299 |             logger.error(f"Failed to pre-initialize Exa search provider: {e}")
1300 |             self.search_providers = []
1301 |             self._search_providers_loaded = True
1302 |             self._initialization_pending = False
1303 | 
1304 |         logger.info(
1305 |             f"DeepResearchAgent pre-initialized with {len(self.search_providers)} search providers, "
1306 |             f"parallel execution: {self.enable_parallel_execution}"
1307 |         )
1308 | 
1309 |     async def _ensure_search_providers_loaded(self) -> None:
1310 |         """Ensure search providers are loaded - fallback to initialization if not pre-initialized."""
1311 |         if self._search_providers_loaded:
1312 |             return
1313 | 
1314 |         # Check if initialization was marked as needed
1315 |         if hasattr(self, "_needs_initialization") and self._needs_initialization:
1316 |             logger.info("Performing deferred initialization of search providers")
1317 |             await self.initialize()
1318 |             self._needs_initialization = False
1319 |         else:
1320 |             # Fallback to pre-initialization if not done during agent creation
1321 |             logger.warning(
1322 |                 "Search providers not pre-initialized - falling back to lazy loading"
1323 |             )
1324 |             await self.initialize()
1325 | 
1326 |     def get_state_schema(self) -> type:
1327 |         """Return DeepResearchState schema."""
1328 |         return DeepResearchState
1329 | 
1330 |     def _get_research_tools(self) -> list[BaseTool]:
1331 |         """Get tools specific to research capabilities."""
1332 |         tools = []
1333 | 
1334 |         @tool
1335 |         async def web_search_financial(
1336 |             query: str,
1337 |             num_results: int = 10,
1338 |             provider: str = "auto",
1339 |             strategy: str = "hybrid",
1340 |         ) -> dict[str, Any]:
1341 |             """
1342 |             Search the web for financial information using optimized providers and strategies.
1343 | 
1344 |             Args:
1345 |                 query: Search query for financial information
1346 |                 num_results: Number of results to return (default: 10)
1347 |                 provider: Search provider to use ('auto', 'exa', 'tavily')
1348 |                 strategy: Search strategy ('hybrid', 'authoritative', 'comprehensive', 'auto')
1349 |             """
1350 |             return await self._perform_financial_search(
1351 |                 query, num_results, provider, strategy
1352 |             )
1353 | 
1354 |         @tool
1355 |         async def analyze_company_fundamentals(
1356 |             symbol: str, depth: str = "standard"
1357 |         ) -> dict[str, Any]:
1358 |             """Research company fundamentals including financials, competitive position, and outlook."""
1359 |             return await self._research_company_fundamentals(symbol, depth)
1360 | 
1361 |         @tool
1362 |         async def analyze_market_sentiment(
1363 |             topic: str, timeframe: str = "7d"
1364 |         ) -> dict[str, Any]:
1365 |             """Analyze market sentiment around a topic using news and social signals."""
1366 |             return await self._analyze_market_sentiment_tool(topic, timeframe)
1367 | 
1368 |         @tool
1369 |         async def validate_research_claims(
1370 |             claims: list[str], sources: list[str]
1371 |         ) -> dict[str, Any]:
1372 |             """Validate research claims against multiple sources for fact-checking."""
1373 |             return await self._validate_claims(claims, sources)
1374 | 
1375 |         tools.extend(
1376 |             [
1377 |                 web_search_financial,
1378 |                 analyze_company_fundamentals,
1379 |                 analyze_market_sentiment,
1380 |                 validate_research_claims,
1381 |             ]
1382 |         )
1383 | 
1384 |         return tools
1385 | 
1386 |     async def _perform_web_search(
1387 |         self, query: str, num_results: int, provider: str = "auto"
1388 |     ) -> dict[str, Any]:
1389 |         """Fallback web search across configured providers."""
1390 |         await self._ensure_search_providers_loaded()
1391 | 
1392 |         if not self.search_providers:
1393 |             return {
1394 |                 "error": "No search providers available",
1395 |                 "results": [],
1396 |                 "total_results": 0,
1397 |             }
1398 | 
1399 |         aggregated_results: list[dict[str, Any]] = []
1400 |         target = provider.lower()
1401 | 
1402 |         for provider_obj in self.search_providers:
1403 |             provider_name = provider_obj.__class__.__name__.lower()
1404 |             if target != "auto" and target not in provider_name:
1405 |                 continue
1406 | 
1407 |             try:
1408 |                 results = await provider_obj.search(query, num_results)
1409 |                 aggregated_results.extend(results)
1410 |                 if target != "auto":
1411 |                     break
1412 |             except Exception as error:  # pragma: no cover - fallback logging
1413 |                 logger.warning(
1414 |                     "Fallback web search failed for provider %s: %s",
1415 |                     provider_obj.__class__.__name__,
1416 |                     error,
1417 |                 )
1418 | 
1419 |         if not aggregated_results:
1420 |             return {
1421 |                 "error": "Search failed",
1422 |                 "results": [],
1423 |                 "total_results": 0,
1424 |             }
1425 | 
1426 |         truncated_results = aggregated_results[:num_results]
1427 |         return {
1428 |             "results": truncated_results,
1429 |             "total_results": len(truncated_results),
1430 |             "search_duration": 0.0,
1431 |             "search_strategy": "fallback",
1432 |         }
1433 | 
1434 |     async def _research_company_fundamentals(
1435 |         self, symbol: str, depth: str = "standard"
1436 |     ) -> dict[str, Any]:
1437 |         """Convenience wrapper for company fundamental research used by tools."""
1438 | 
1439 |         session_id = f"fundamentals-{symbol}-{uuid4().hex}"
1440 |         focus_areas = [
1441 |             "fundamentals",
1442 |             "financials",
1443 |             "valuation",
1444 |             "risk_management",
1445 |             "growth_drivers",
1446 |         ]
1447 | 
1448 |         return await self.research_comprehensive(
1449 |             topic=f"{symbol} company fundamentals analysis",
1450 |             session_id=session_id,
1451 |             depth=depth,
1452 |             focus_areas=focus_areas,
1453 |             timeframe="180d",
1454 |             use_parallel_execution=False,
1455 |         )
1456 | 
1457 |     async def _analyze_market_sentiment_tool(
1458 |         self, topic: str, timeframe: str = "7d"
1459 |     ) -> dict[str, Any]:
1460 |         """Wrapper used by the sentiment analysis tool."""
1461 | 
1462 |         session_id = f"sentiment-{uuid4().hex}"
1463 |         return await self.analyze_market_sentiment(
1464 |             topic=topic,
1465 |             session_id=session_id,
1466 |             timeframe=timeframe,
1467 |             use_parallel_execution=False,
1468 |         )
1469 | 
1470 |     async def _validate_claims(
1471 |         self, claims: list[str], sources: list[str]
1472 |     ) -> dict[str, Any]:
1473 |         """Lightweight claim validation used for tool compatibility."""
1474 | 
1475 |         validation_results: list[dict[str, Any]] = []
1476 | 
1477 |         for claim in claims:
1478 |             source_checks = []
1479 |             for source in sources:
1480 |                 source_checks.append(
1481 |                     {
1482 |                         "source": source,
1483 |                         "status": "not_verified",
1484 |                         "confidence": 0.0,
1485 |                         "notes": "Automatic validation not available in fallback mode",
1486 |                     }
1487 |                 )
1488 | 
1489 |             validation_results.append(
1490 |                 {
1491 |                     "claim": claim,
1492 |                     "validated": False,
1493 |                     "confidence": 0.0,
1494 |                     "evidence": [],
1495 |                     "source_checks": source_checks,
1496 |                 }
1497 |             )
1498 | 
1499 |         return {
1500 |             "results": validation_results,
1501 |             "summary": "Claim validation is currently using fallback heuristics.",
1502 |         }
1503 | 
1504 |     async def _perform_financial_search(
1505 |         self, query: str, num_results: int, provider: str, strategy: str
1506 |     ) -> dict[str, Any]:
1507 |         """
1508 |         Perform optimized financial search with enhanced strategies.
1509 | 
1510 |         Args:
1511 |             query: Search query
1512 |             num_results: Number of results
1513 |             provider: Search provider preference
1514 |             strategy: Search strategy
1515 | 
1516 |         Returns:
1517 |             Dictionary with search results and metadata
1518 |         """
1519 |         if not self.search_providers:
1520 |             return {
1521 |                 "error": "No search providers available",
1522 |                 "results": [],
1523 |                 "total_results": 0,
1524 |             }
1525 | 
1526 |         start_time = datetime.now()
1527 |         all_results = []
1528 | 
1529 |         # Use Exa provider with financial optimization if available
1530 |         exa_provider = None
1531 |         for p in self.search_providers:
1532 |             if isinstance(p, ExaSearchProvider):
1533 |                 exa_provider = p
1534 |                 break
1535 | 
1536 |         if exa_provider and (provider == "auto" or provider == "exa"):
1537 |             try:
1538 |                 # Use the enhanced financial search method
1539 |                 results = await exa_provider.search_financial(
1540 |                     query, num_results, strategy=strategy
1541 |                 )
1542 | 
1543 |                 # Add search metadata
1544 |                 for result in results:
1545 |                     result.update(
1546 |                         {
1547 |                             "search_strategy": strategy,
1548 |                             "search_timestamp": start_time.isoformat(),
1549 |                             "enhanced_query": query,
1550 |                         }
1551 |                     )
1552 | 
1553 |                 all_results.extend(results)
1554 | 
1555 |                 logger.info(
1556 |                     f"Financial search completed: {len(results)} results "
1557 |                     f"using strategy '{strategy}' in {(datetime.now() - start_time).total_seconds():.2f}s"
1558 |                 )
1559 | 
1560 |             except Exception as e:
1561 |                 logger.error(f"Enhanced financial search failed: {e}")
1562 |                 # Fallback to regular search if available
1563 |                 if hasattr(self, "_perform_web_search"):
1564 |                     return await self._perform_web_search(query, num_results, provider)
1565 |                 else:
1566 |                     return {
1567 |                         "error": f"Financial search failed: {str(e)}",
1568 |                         "results": [],
1569 |                         "total_results": 0,
1570 |                     }
1571 |         else:
1572 |             # Use regular search providers
1573 |             try:
1574 |                 for provider_obj in self.search_providers:
1575 |                     if (
1576 |                         provider == "auto"
1577 |                         or provider.lower() in str(type(provider_obj)).lower()
1578 |                     ):
1579 |                         results = await provider_obj.search(query, num_results)
1580 |                         all_results.extend(results)
1581 |                         break
1582 |             except Exception as e:
1583 |                 logger.error(f"Fallback search failed: {e}")
1584 |                 return {
1585 |                     "error": f"Search failed: {str(e)}",
1586 |                     "results": [],
1587 |                     "total_results": 0,
1588 |                 }
1589 | 
1590 |         # Sort by financial relevance and authority
1591 |         all_results.sort(
1592 |             key=lambda x: (
1593 |                 x.get("financial_relevance", 0),
1594 |                 x.get("is_authoritative", False),
1595 |                 x.get("score", 0),
1596 |             ),
1597 |             reverse=True,
1598 |         )
1599 | 
1600 |         return {
1601 |             "results": all_results[:num_results],
1602 |             "total_results": len(all_results),
1603 |             "search_strategy": strategy,
1604 |             "search_duration": (datetime.now() - start_time).total_seconds(),
1605 |             "enhanced_search": True,
1606 |         }
1607 | 
1608 |     def _build_graph(self):
1609 |         """Build research workflow graph with multi-step research process."""
1610 |         workflow = StateGraph(DeepResearchState)
1611 | 
1612 |         # Core research workflow nodes
1613 |         workflow.add_node("plan_research", self._plan_research)
1614 |         workflow.add_node("execute_searches", self._execute_searches)
1615 |         workflow.add_node("analyze_content", self._analyze_content)
1616 |         workflow.add_node("validate_sources", self._validate_sources)
1617 |         workflow.add_node("synthesize_findings", self._synthesize_findings)
1618 |         workflow.add_node("generate_citations", self._generate_citations)
1619 | 
1620 |         # Specialized research nodes
1621 |         workflow.add_node("sentiment_analysis", self._sentiment_analysis)
1622 |         workflow.add_node("fundamental_analysis", self._fundamental_analysis)
1623 |         workflow.add_node("competitive_analysis", self._competitive_analysis)
1624 | 
1625 |         # Quality control nodes
1626 |         workflow.add_node("fact_validation", self._fact_validation)
1627 |         workflow.add_node("source_credibility", self._source_credibility)
1628 | 
1629 |         # Define workflow edges
1630 |         workflow.add_edge(START, "plan_research")
1631 |         workflow.add_edge("plan_research", "execute_searches")
1632 |         workflow.add_edge("execute_searches", "analyze_content")
1633 | 
1634 |         # Conditional routing based on research type
1635 |         workflow.add_conditional_edges(
1636 |             "analyze_content",
1637 |             self._route_specialized_analysis,
1638 |             {
1639 |                 "sentiment": "sentiment_analysis",
1640 |                 "fundamental": "fundamental_analysis",
1641 |                 "competitive": "competitive_analysis",
1642 |                 "validation": "validate_sources",
1643 |                 "synthesis": "synthesize_findings",
1644 |             },
1645 |         )
1646 | 
1647 |         # Specialized analysis flows
1648 |         workflow.add_edge("sentiment_analysis", "validate_sources")
1649 |         workflow.add_edge("fundamental_analysis", "validate_sources")
1650 |         workflow.add_edge("competitive_analysis", "validate_sources")
1651 | 
1652 |         # Quality control flow
1653 |         workflow.add_edge("validate_sources", "fact_validation")
1654 |         workflow.add_edge("fact_validation", "source_credibility")
1655 |         workflow.add_edge("source_credibility", "synthesize_findings")
1656 | 
1657 |         # Final steps
1658 |         workflow.add_edge("synthesize_findings", "generate_citations")
1659 |         workflow.add_edge("generate_citations", END)
1660 | 
1661 |         return workflow.compile(checkpointer=self.checkpointer)
1662 | 
1663 |     @log_method_call(component="DeepResearchAgent", include_timing=True)
1664 |     async def research_comprehensive(
1665 |         self,
1666 |         topic: str,
1667 |         session_id: str,
1668 |         depth: str | None = None,
1669 |         focus_areas: list[str] | None = None,
1670 |         timeframe: str = "30d",
1671 |         timeout_budget: float | None = None,  # Total timeout budget in seconds
1672 |         **kwargs,
1673 |     ) -> dict[str, Any]:
1674 |         """
1675 |         Comprehensive research on a financial topic.
1676 | 
1677 |         Args:
1678 |             topic: Research topic or company/symbol
1679 |             session_id: Session identifier
1680 |             depth: Research depth (basic/standard/comprehensive/exhaustive)
1681 |             focus_areas: Specific areas to focus on
1682 |             timeframe: Time range for research
1683 |             timeout_budget: Total timeout budget in seconds (enables budget allocation)
1684 |             **kwargs: Additional parameters
1685 | 
1686 |         Returns:
1687 |             Comprehensive research results with analysis and citations
1688 |         """
1689 |         # Ensure search providers are loaded (cached for performance)
1690 |         await self._ensure_search_providers_loaded()
1691 | 
1692 |         # Check if search providers are available
1693 |         if not self.search_providers:
1694 |             return {
1695 |                 "error": "Research functionality unavailable - no search providers configured",
1696 |                 "details": "Please configure EXA_API_KEY environment variable to enable research capabilities",
1697 |                 "topic": topic,
1698 |                 "available_functionality": "Limited to pre-existing data and basic analysis",
1699 |             }
1700 | 
1701 |         start_time = datetime.now()
1702 |         depth = depth or self.default_depth
1703 | 
1704 |         # Calculate timeout budget allocation for generous research timeouts
1705 |         timeout_budgets = {}
1706 |         if timeout_budget and timeout_budget > 0:
1707 |             timeout_budgets = {
1708 |                 "search_budget": timeout_budget
1709 |                 * 0.50,  # 50% for search operations (generous allocation)
1710 |                 "analysis_budget": timeout_budget * 0.30,  # 30% for content analysis
1711 |                 "synthesis_budget": timeout_budget * 0.20,  # 20% for result synthesis
1712 |                 "total_budget": timeout_budget,
1713 |                 "allocation_strategy": "comprehensive_research",
1714 |             }
1715 |             logger.info(
1716 |                 f"TIMEOUT_BUDGET_ALLOCATION: total={timeout_budget}s → "
1717 |                 f"search={timeout_budgets['search_budget']:.1f}s, "
1718 |                 f"analysis={timeout_budgets['analysis_budget']:.1f}s, "
1719 |                 f"synthesis={timeout_budgets['synthesis_budget']:.1f}s"
1720 |             )
1721 | 
1722 |         # Initialize research state
1723 |         initial_state = {
1724 |             "messages": [HumanMessage(content=f"Research: {topic}")],
1725 |             "persona": self.persona.name,
1726 |             "session_id": session_id,
1727 |             "timestamp": datetime.now(),
1728 |             "research_topic": topic,
1729 |             "research_depth": depth,
1730 |             "focus_areas": focus_areas
1731 |             or PERSONA_RESEARCH_FOCUS[self.persona.name.lower()]["keywords"],
1732 |             "timeframe": timeframe,
1733 |             "search_queries": [],
1734 |             "search_results": [],
1735 |             "analyzed_content": [],
1736 |             "validated_sources": [],
1737 |             "research_findings": [],
1738 |             "sentiment_analysis": {},
1739 |             "source_credibility_scores": {},
1740 |             "citations": [],
1741 |             "research_status": "planning",
1742 |             "research_confidence": 0.0,
1743 |             "source_diversity_score": 0.0,
1744 |             "fact_validation_results": [],
1745 |             "execution_time_ms": 0.0,
1746 |             "api_calls_made": 0,
1747 |             "cache_hits": 0,
1748 |             "cache_misses": 0,
1749 |             # Timeout budget allocation for intelligent time management
1750 |             "timeout_budgets": timeout_budgets,
1751 |             # Legacy fields
1752 |             "token_count": 0,
1753 |             "error": None,
1754 |             "analyzed_stocks": {},
1755 |             "key_price_levels": {},
1756 |             "last_analysis_time": {},
1757 |             "conversation_context": {},
1758 |         }
1759 | 
1760 |         # Add additional parameters
1761 |         initial_state.update(kwargs)
1762 | 
1763 |         # Set up orchestration logging
1764 |         orchestration_logger = get_orchestration_logger("DeepResearchAgent")
1765 |         orchestration_logger.set_request_context(
1766 |             session_id=session_id,
1767 |             research_topic=topic[:50],  # Truncate for logging
1768 |             research_depth=depth,
1769 |         )
1770 | 
1771 |         # Check if parallel execution is enabled and requested
1772 |         use_parallel = kwargs.get(
1773 |             "use_parallel_execution", self.enable_parallel_execution
1774 |         )
1775 | 
1776 |         orchestration_logger.info(
1777 |             "🔍 RESEARCH_START",
1778 |             execution_mode="parallel" if use_parallel else "sequential",
1779 |             focus_areas=focus_areas[:3] if focus_areas else None,
1780 |             timeframe=timeframe,
1781 |         )
1782 | 
1783 |         if use_parallel:
1784 |             orchestration_logger.info("🚀 PARALLEL_EXECUTION_SELECTED")
1785 |             try:
1786 |                 result = await self._execute_parallel_research(
1787 |                     topic=topic,
1788 |                     session_id=session_id,
1789 |                     depth=depth,
1790 |                     focus_areas=focus_areas,
1791 |                     timeframe=timeframe,
1792 |                     initial_state=initial_state,
1793 |                     start_time=start_time,
1794 |                     **kwargs,
1795 |                 )
1796 |                 orchestration_logger.info("✅ PARALLEL_EXECUTION_SUCCESS")
1797 |                 return result
1798 |             except Exception as e:
1799 |                 orchestration_logger.warning(
1800 |                     "⚠️ PARALLEL_FALLBACK_TRIGGERED",
1801 |                     error=str(e),
1802 |                     fallback_mode="sequential",
1803 |                 )
1804 |                 # Fall through to sequential execution
1805 | 
1806 |         # Execute research workflow (sequential)
1807 |         orchestration_logger.info("🔄 SEQUENTIAL_EXECUTION_START")
1808 |         try:
1809 |             result = await self.graph.ainvoke(
1810 |                 initial_state,
1811 |                 config={
1812 |                     "configurable": {
1813 |                         "thread_id": session_id,
1814 |                         "checkpoint_ns": "deep_research",
1815 |                     }
1816 |                 },
1817 |             )
1818 | 
1819 |             # Calculate execution time
1820 |             execution_time = (datetime.now() - start_time).total_seconds() * 1000
1821 |             result["execution_time_ms"] = execution_time
1822 | 
1823 |             return self._format_research_response(result)
1824 | 
1825 |         except Exception as e:
1826 |             logger.error(f"Error in deep research: {e}")
1827 |             return {
1828 |                 "status": "error",
1829 |                 "error": str(e),
1830 |                 "execution_time_ms": (datetime.now() - start_time).total_seconds()
1831 |                 * 1000,
1832 |                 "agent_type": "deep_research",
1833 |             }
1834 | 
1835 |     # Workflow node implementations
1836 | 
1837 |     async def _plan_research(self, state: DeepResearchState) -> Command:
1838 |         """Plan research strategy based on topic and persona."""
1839 |         topic = state["research_topic"]
1840 |         depth_config = RESEARCH_DEPTH_LEVELS[state["research_depth"]]
1841 |         persona_focus = PERSONA_RESEARCH_FOCUS[self.persona.name.lower()]
1842 | 
1843 |         # Generate search queries based on topic and persona
1844 |         search_queries = await self._generate_search_queries(
1845 |             topic, persona_focus, depth_config
1846 |         )
1847 | 
1848 |         return Command(
1849 |             goto="execute_searches",
1850 |             update={"search_queries": search_queries, "research_status": "searching"},
1851 |         )
1852 | 
1853 |     async def _safe_search(
1854 |         self,
1855 |         provider: WebSearchProvider,
1856 |         query: str,
1857 |         num_results: int = 5,
1858 |         timeout_budget: float | None = None,
1859 |     ) -> list[dict[str, Any]]:
1860 |         """Safely execute search with a provider, handling exceptions gracefully."""
1861 |         try:
1862 |             return await provider.search(
1863 |                 query, num_results=num_results, timeout_budget=timeout_budget
1864 |             )
1865 |         except Exception as e:
1866 |             logger.warning(
1867 |                 f"Search failed for '{query}' with provider {type(provider).__name__}: {e}"
1868 |             )
1869 |             return []  # Return empty list on failure
1870 | 
1871 |     async def _execute_searches(self, state: DeepResearchState) -> Command:
1872 |         """Execute web searches using available providers with timeout budget awareness."""
1873 |         search_queries = state["search_queries"]
1874 |         depth_config = RESEARCH_DEPTH_LEVELS[state["research_depth"]]
1875 | 
1876 |         # Calculate timeout budget per search operation
1877 |         timeout_budgets = state.get("timeout_budgets", {})
1878 |         search_budget = timeout_budgets.get("search_budget")
1879 | 
1880 |         if search_budget:
1881 |             # Divide search budget across queries and providers
1882 |             total_search_operations = len(
1883 |                 search_queries[: depth_config["max_searches"]]
1884 |             ) * len(self.search_providers)
1885 |             timeout_per_search = (
1886 |                 search_budget / max(total_search_operations, 1)
1887 |                 if total_search_operations > 0
1888 |                 else search_budget
1889 |             )
1890 |             logger.info(
1891 |                 f"SEARCH_BUDGET_ALLOCATION: {search_budget:.1f}s total → "
1892 |                 f"{timeout_per_search:.1f}s per search ({total_search_operations} operations)"
1893 |             )
1894 |         else:
1895 |             timeout_per_search = None
1896 | 
1897 |         all_results = []
1898 | 
1899 |         # Create all search tasks for parallel execution with budget-aware timeouts
1900 |         search_tasks = []
1901 |         for query in search_queries[: depth_config["max_searches"]]:
1902 |             for provider in self.search_providers:
1903 |                 # Create async task for each provider/query combination with timeout budget
1904 |                 search_tasks.append(
1905 |                     self._safe_search(
1906 |                         provider,
1907 |                         query,
1908 |                         num_results=5,
1909 |                         timeout_budget=timeout_per_search,
1910 |                     )
1911 |                 )
1912 | 
1913 |         # Execute all searches in parallel using asyncio.gather()
1914 |         if search_tasks:
1915 |             parallel_results = await asyncio.gather(
1916 |                 *search_tasks, return_exceptions=True
1917 |             )
1918 | 
1919 |             # Process results and filter out exceptions
1920 |             for result in parallel_results:
1921 |                 if isinstance(result, Exception):
1922 |                     # Log the exception but continue with other results
1923 |                     logger.warning(f"Search task failed: {result}")
1924 |                 elif isinstance(result, list):
1925 |                     all_results.extend(result)
1926 |                 elif result is not None:
1927 |                     all_results.append(result)
1928 | 
1929 |         # Deduplicate and limit results
1930 |         unique_results = []
1931 |         seen_urls = set()
1932 |         for result in all_results:
1933 |             if (
1934 |                 result["url"] not in seen_urls
1935 |                 and len(unique_results) < depth_config["max_sources"]
1936 |             ):
1937 |                 unique_results.append(result)
1938 |                 seen_urls.add(result["url"])
1939 | 
1940 |         logger.info(
1941 |             f"Search completed: {len(unique_results)} unique results from {len(all_results)} total"
1942 |         )
1943 | 
1944 |         return Command(
1945 |             goto="analyze_content",
1946 |             update={"search_results": unique_results, "research_status": "analyzing"},
1947 |         )
1948 | 
1949 |     async def _analyze_content(self, state: DeepResearchState) -> Command:
1950 |         """Analyze search results using AI content analysis."""
1951 |         search_results = state["search_results"]
1952 |         analyzed_content = []
1953 | 
1954 |         # Analyze each piece of content
1955 |         for result in search_results:
1956 |             if result.get("content"):
1957 |                 analysis = await self.content_analyzer.analyze_content(
1958 |                     content=result["content"],
1959 |                     persona=self.persona.name.lower(),
1960 |                     analysis_focus=state["research_depth"],
1961 |                 )
1962 | 
1963 |                 analyzed_content.append({**result, "analysis": analysis})
1964 | 
1965 |         return Command(
1966 |             goto="validate_sources",
1967 |             update={
1968 |                 "analyzed_content": analyzed_content,
1969 |                 "research_status": "validating",
1970 |             },
1971 |         )
1972 | 
1973 |     def _route_specialized_analysis(self, state: DeepResearchState) -> str:
1974 |         """Route to specialized analysis based on research focus."""
1975 |         focus_areas = state.get("focus_areas", [])
1976 | 
1977 |         if any(word in focus_areas for word in ["sentiment", "news", "social"]):
1978 |             return "sentiment"
1979 |         elif any(
1980 |             word in focus_areas for word in ["fundamental", "financial", "earnings"]
1981 |         ):
1982 |             return "fundamental"
1983 |         elif any(word in focus_areas for word in ["competitive", "market", "industry"]):
1984 |             return "competitive"
1985 |         else:
1986 |             return "validation"
1987 | 
1988 |     async def _validate_sources(self, state: DeepResearchState) -> Command:
1989 |         """Validate source credibility and filter results."""
1990 |         analyzed_content = state["analyzed_content"]
1991 |         validated_sources = []
1992 |         credibility_scores = {}
1993 | 
1994 |         for content in analyzed_content:
1995 |             # Calculate credibility score based on multiple factors
1996 |             credibility_score = self._calculate_source_credibility(content)
1997 |             credibility_scores[content["url"]] = credibility_score
1998 | 
1999 |             # Only include sources above credibility threshold
2000 |             if credibility_score >= 0.6:  # Configurable threshold
2001 |                 validated_sources.append(content)
2002 | 
2003 |         return Command(
2004 |             goto="synthesize_findings",
2005 |             update={
2006 |                 "validated_sources": validated_sources,
2007 |                 "source_credibility_scores": credibility_scores,
2008 |                 "research_status": "synthesizing",
2009 |             },
2010 |         )
2011 | 
2012 |     async def _synthesize_findings(self, state: DeepResearchState) -> Command:
2013 |         """Synthesize research findings into coherent insights."""
2014 |         validated_sources = state["validated_sources"]
2015 | 
2016 |         # Generate synthesis using LLM
2017 |         synthesis_prompt = self._build_synthesis_prompt(validated_sources, state)
2018 | 
2019 |         synthesis_response = await self.llm.ainvoke(
2020 |             [
2021 |                 SystemMessage(content="You are a financial research synthesizer."),
2022 |                 HumanMessage(content=synthesis_prompt),
2023 |             ]
2024 |         )
2025 | 
2026 |         raw_synthesis = ContentAnalyzer._coerce_message_content(
2027 |             synthesis_response.content
2028 |         )
2029 | 
2030 |         research_findings = {
2031 |             "synthesis": raw_synthesis,
2032 |             "key_insights": self._extract_key_insights(validated_sources),
2033 |             "overall_sentiment": self._calculate_overall_sentiment(validated_sources),
2034 |             "risk_assessment": self._assess_risks(validated_sources),
2035 |             "investment_implications": self._derive_investment_implications(
2036 |                 validated_sources
2037 |             ),
2038 |             "confidence_score": self._calculate_research_confidence(validated_sources),
2039 |         }
2040 | 
2041 |         return Command(
2042 |             goto="generate_citations",
2043 |             update={
2044 |                 "research_findings": research_findings,
2045 |                 "research_confidence": research_findings["confidence_score"],
2046 |                 "research_status": "completing",
2047 |             },
2048 |         )
2049 | 
2050 |     async def _generate_citations(self, state: DeepResearchState) -> Command:
2051 |         """Generate proper citations for all sources."""
2052 |         validated_sources = state["validated_sources"]
2053 | 
2054 |         citations = []
2055 |         for i, source in enumerate(validated_sources, 1):
2056 |             citation = {
2057 |                 "id": i,
2058 |                 "title": source.get("title", "Untitled"),
2059 |                 "url": source["url"],
2060 |                 "published_date": source.get("published_date"),
2061 |                 "author": source.get("author"),
2062 |                 "credibility_score": state["source_credibility_scores"].get(
2063 |                     source["url"], 0.5
2064 |                 ),
2065 |                 "relevance_score": source.get("analysis", {}).get(
2066 |                     "relevance_score", 0.5
2067 |                 ),
2068 |             }
2069 |             citations.append(citation)
2070 | 
2071 |         return Command(
2072 |             goto="__end__",
2073 |             update={"citations": citations, "research_status": "completed"},
2074 |         )
2075 | 
2076 |     # Helper methods
2077 | 
2078 |     async def _generate_search_queries(
2079 |         self, topic: str, persona_focus: dict[str, Any], depth_config: dict[str, Any]
2080 |     ) -> list[str]:
2081 |         """Generate search queries optimized for the research topic and persona."""
2082 | 
2083 |         base_queries = [
2084 |             f"{topic} financial analysis",
2085 |             f"{topic} investment research",
2086 |             f"{topic} market outlook",
2087 |         ]
2088 | 
2089 |         # Add persona-specific queries
2090 |         persona_queries = [
2091 |             f"{topic} {keyword}" for keyword in persona_focus["keywords"][:3]
2092 |         ]
2093 | 
2094 |         # Add source-specific queries
2095 |         source_queries = [
2096 |             f"{topic} {source}" for source in persona_focus["sources"][:2]
2097 |         ]
2098 | 
2099 |         all_queries = base_queries + persona_queries + source_queries
2100 |         return all_queries[: depth_config["max_searches"]]
2101 | 
2102 |     def _calculate_source_credibility(self, content: dict[str, Any]) -> float:
2103 |         """Calculate credibility score for a source."""
2104 |         score = 0.5  # Base score
2105 | 
2106 |         # Domain credibility
2107 |         url = content.get("url", "")
2108 |         if any(domain in url for domain in [".gov", ".edu", ".org"]):
2109 |             score += 0.2
2110 |         elif any(
2111 |             domain in url
2112 |             for domain in [
2113 |                 "sec.gov",
2114 |                 "investopedia.com",
2115 |                 "bloomberg.com",
2116 |                 "reuters.com",
2117 |             ]
2118 |         ):
2119 |             score += 0.3
2120 | 
2121 |         # Publication date recency
2122 |         pub_date = content.get("published_date")
2123 |         if pub_date:
2124 |             try:
2125 |                 date_obj = datetime.fromisoformat(pub_date.replace("Z", "+00:00"))
2126 |                 days_old = (datetime.now() - date_obj).days
2127 |                 if days_old < 30:
2128 |                     score += 0.1
2129 |                 elif days_old < 90:
2130 |                     score += 0.05
2131 |             except (ValueError, TypeError, AttributeError):
2132 |                 pass
2133 | 
2134 |         # Content analysis credibility
2135 |         if "analysis" in content:
2136 |             analysis_cred = content["analysis"].get("credibility_score", 0.5)
2137 |             score = (score + analysis_cred) / 2
2138 | 
2139 |         return min(score, 1.0)
2140 | 
2141 |     def _build_synthesis_prompt(
2142 |         self, sources: list[dict[str, Any]], state: DeepResearchState
2143 |     ) -> str:
2144 |         """Build synthesis prompt for final research output."""
2145 |         topic = state["research_topic"]
2146 |         persona = self.persona.name
2147 | 
2148 |         prompt = f"""
2149 |         Synthesize comprehensive research findings on '{topic}' for a {persona} investor.
2150 | 
2151 |         Research Sources ({len(sources)} validated sources):
2152 |         """
2153 | 
2154 |         for i, source in enumerate(sources, 1):
2155 |             analysis = source.get("analysis", {})
2156 |             prompt += f"\n{i}. {source.get('title', 'Unknown Title')}"
2157 |             prompt += f"   - Insights: {', '.join(analysis.get('insights', [])[:2])}"
2158 |             prompt += f"   - Sentiment: {analysis.get('sentiment', {}).get('direction', 'neutral')}"
2159 |             prompt += f"   - Credibility: {state['source_credibility_scores'].get(source['url'], 0.5):.2f}"
2160 | 
2161 |         prompt += f"""
2162 | 
2163 |         Please provide a comprehensive synthesis that includes:
2164 |         1. Executive Summary (2-3 sentences)
2165 |         2. Key Findings (5-7 bullet points)
2166 |         3. Investment Implications for {persona} investors
2167 |         4. Risk Considerations
2168 |         5. Recommended Actions
2169 |         6. Confidence Level and reasoning
2170 | 
2171 |         Tailor the analysis specifically for {persona} investment characteristics and risk tolerance.
2172 |         """
2173 | 
2174 |         return prompt
2175 | 
2176 |     def _extract_key_insights(self, sources: list[dict[str, Any]]) -> list[str]:
2177 |         """Extract and consolidate key insights from all sources."""
2178 |         all_insights = []
2179 |         for source in sources:
2180 |             analysis = source.get("analysis", {})
2181 |             insights = analysis.get("insights", [])
2182 |             all_insights.extend(insights)
2183 | 
2184 |         # Simple deduplication (could be enhanced with semantic similarity)
2185 |         unique_insights = list(dict.fromkeys(all_insights))
2186 |         return unique_insights[:10]  # Return top 10 insights
2187 | 
2188 |     def _calculate_overall_sentiment(
2189 |         self, sources: list[dict[str, Any]]
2190 |     ) -> dict[str, Any]:
2191 |         """Calculate overall sentiment from all sources."""
2192 |         sentiments = []
2193 |         weights = []
2194 | 
2195 |         for source in sources:
2196 |             analysis = source.get("analysis", {})
2197 |             sentiment = analysis.get("sentiment", {})
2198 | 
2199 |             # Convert sentiment to numeric value
2200 |             direction = sentiment.get("direction", "neutral")
2201 |             if direction == "bullish":
2202 |                 sentiment_value = 1
2203 |             elif direction == "bearish":
2204 |                 sentiment_value = -1
2205 |             else:
2206 |                 sentiment_value = 0
2207 | 
2208 |             confidence = sentiment.get("confidence", 0.5)
2209 |             credibility = source.get("credibility_score", 0.5)
2210 | 
2211 |             sentiments.append(sentiment_value)
2212 |             weights.append(confidence * credibility)
2213 | 
2214 |         if not sentiments:
2215 |             return {"direction": "neutral", "confidence": 0.5, "consensus": 0.5}
2216 | 
2217 |         # Weighted average sentiment
2218 |         weighted_sentiment = sum(
2219 |             s * w for s, w in zip(sentiments, weights, strict=False)
2220 |         ) / sum(weights)
2221 | 
2222 |         # Convert back to direction
2223 |         if weighted_sentiment > 0.2:
2224 |             overall_direction = "bullish"
2225 |         elif weighted_sentiment < -0.2:
2226 |             overall_direction = "bearish"
2227 |         else:
2228 |             overall_direction = "neutral"
2229 | 
2230 |         # Calculate consensus (how much sources agree)
2231 |         sentiment_variance = sum(weights) / len(sentiments) if sentiments else 0
2232 |         consensus = 1 - sentiment_variance if sentiment_variance < 1 else 0
2233 | 
2234 |         return {
2235 |             "direction": overall_direction,
2236 |             "confidence": abs(weighted_sentiment),
2237 |             "consensus": consensus,
2238 |             "source_count": len(sentiments),
2239 |         }
2240 | 
2241 |     def _assess_risks(self, sources: list[dict[str, Any]]) -> list[str]:
2242 |         """Consolidate risk assessments from all sources."""
2243 |         all_risks = []
2244 |         for source in sources:
2245 |             analysis = source.get("analysis", {})
2246 |             risks = analysis.get("risk_factors", [])
2247 |             all_risks.extend(risks)
2248 | 
2249 |         # Deduplicate and return top risks
2250 |         unique_risks = list(dict.fromkeys(all_risks))
2251 |         return unique_risks[:8]
2252 | 
2253 |     def _derive_investment_implications(
2254 |         self, sources: list[dict[str, Any]]
2255 |     ) -> dict[str, Any]:
2256 |         """Derive investment implications based on research findings."""
2257 |         opportunities = []
2258 |         threats = []
2259 | 
2260 |         for source in sources:
2261 |             analysis = source.get("analysis", {})
2262 |             opps = analysis.get("opportunities", [])
2263 |             risks = analysis.get("risk_factors", [])
2264 | 
2265 |             opportunities.extend(opps)
2266 |             threats.extend(risks)
2267 | 
2268 |         return {
2269 |             "opportunities": list(dict.fromkeys(opportunities))[:5],
2270 |             "threats": list(dict.fromkeys(threats))[:5],
2271 |             "recommended_action": self._recommend_action(sources),
2272 |             "time_horizon": PERSONA_RESEARCH_FOCUS[self.persona.name.lower()][
2273 |                 "time_horizon"
2274 |             ],
2275 |         }
2276 | 
2277 |     def _recommend_action(self, sources: list[dict[str, Any]]) -> str:
2278 |         """Recommend investment action based on research findings."""
2279 |         overall_sentiment = self._calculate_overall_sentiment(sources)
2280 | 
2281 |         if (
2282 |             overall_sentiment["direction"] == "bullish"
2283 |             and overall_sentiment["confidence"] > 0.7
2284 |         ):
2285 |             if self.persona.name.lower() == "conservative":
2286 |                 return "Consider gradual position building with proper risk management"
2287 |             else:
2288 |                 return "Consider initiating position with appropriate position sizing"
2289 |         elif (
2290 |             overall_sentiment["direction"] == "bearish"
2291 |             and overall_sentiment["confidence"] > 0.7
2292 |         ):
2293 |             return "Exercise caution - consider waiting for better entry or avoiding"
2294 |         else:
2295 |             return "Monitor closely - mixed signals suggest waiting for clarity"
2296 | 
2297 |     def _calculate_research_confidence(self, sources: list[dict[str, Any]]) -> float:
2298 |         """Calculate overall confidence in research findings."""
2299 |         if not sources:
2300 |             return 0.0
2301 | 
2302 |         # Factors that increase confidence
2303 |         source_count_factor = min(
2304 |             len(sources) / 10, 1.0
2305 |         )  # More sources = higher confidence
2306 | 
2307 |         avg_credibility = sum(
2308 |             source.get("credibility_score", 0.5) for source in sources
2309 |         ) / len(sources)
2310 | 
2311 |         avg_relevance = sum(
2312 |             source.get("analysis", {}).get("relevance_score", 0.5) for source in sources
2313 |         ) / len(sources)
2314 | 
2315 |         # Diversity of sources (different domains)
2316 |         unique_domains = len(
2317 |             {source["url"].split("/")[2] for source in sources if "url" in source}
2318 |         )
2319 |         diversity_factor = min(unique_domains / 5, 1.0)
2320 | 
2321 |         # Combine factors
2322 |         confidence = (
2323 |             source_count_factor + avg_credibility + avg_relevance + diversity_factor
2324 |         ) / 4
2325 | 
2326 |         return round(confidence, 2)
2327 | 
2328 |     def _format_research_response(self, result: dict[str, Any]) -> dict[str, Any]:
2329 |         """Format research response for consistent output."""
2330 |         return {
2331 |             "status": "success",
2332 |             "agent_type": "deep_research",
2333 |             "persona": result.get("persona"),
2334 |             "research_topic": result.get("research_topic"),
2335 |             "research_depth": result.get("research_depth"),
2336 |             "findings": result.get("research_findings", {}),
2337 |             "sources_analyzed": len(result.get("validated_sources", [])),
2338 |             "confidence_score": result.get("research_confidence", 0.0),
2339 |             "citations": result.get("citations", []),
2340 |             "execution_time_ms": result.get("execution_time_ms", 0.0),
2341 |             "search_queries_used": result.get("search_queries", []),
2342 |             "source_diversity": result.get("source_diversity_score", 0.0),
2343 |         }
2344 | 
2345 |     # Specialized research analysis methods
2346 |     async def _sentiment_analysis(self, state: DeepResearchState) -> Command:
2347 |         """Perform specialized sentiment analysis."""
2348 |         logger.info("Performing sentiment analysis")
2349 | 
2350 |         # For now, route to content analysis with sentiment focus
2351 |         original_focus = state.get("focus_areas", [])
2352 |         state["focus_areas"] = ["market_sentiment", "sentiment", "mood"]
2353 |         result = await self._analyze_content(state)
2354 |         state["focus_areas"] = original_focus  # Restore original focus
2355 |         return result
2356 | 
2357 |     async def _fundamental_analysis(self, state: DeepResearchState) -> Command:
2358 |         """Perform specialized fundamental analysis."""
2359 |         logger.info("Performing fundamental analysis")
2360 | 
2361 |         # For now, route to content analysis with fundamental focus
2362 |         original_focus = state.get("focus_areas", [])
2363 |         state["focus_areas"] = ["fundamentals", "financials", "valuation"]
2364 |         result = await self._analyze_content(state)
2365 |         state["focus_areas"] = original_focus  # Restore original focus
2366 |         return result
2367 | 
2368 |     async def _competitive_analysis(self, state: DeepResearchState) -> Command:
2369 |         """Perform specialized competitive analysis."""
2370 |         logger.info("Performing competitive analysis")
2371 | 
2372 |         # For now, route to content analysis with competitive focus
2373 |         original_focus = state.get("focus_areas", [])
2374 |         state["focus_areas"] = ["competitive_landscape", "market_share", "competitors"]
2375 |         result = await self._analyze_content(state)
2376 |         state["focus_areas"] = original_focus  # Restore original focus
2377 |         return result
2378 | 
2379 |     async def _fact_validation(self, state: DeepResearchState) -> Command:
2380 |         """Perform fact validation on research findings."""
2381 |         logger.info("Performing fact validation")
2382 | 
2383 |         # For now, route to source validation
2384 |         return await self._validate_sources(state)
2385 | 
2386 |     async def _source_credibility(self, state: DeepResearchState) -> Command:
2387 |         """Assess source credibility and reliability."""
2388 |         logger.info("Assessing source credibility")
2389 | 
2390 |         # For now, route to source validation
2391 |         return await self._validate_sources(state)
2392 | 
2393 |     async def research_company_comprehensive(
2394 |         self,
2395 |         symbol: str,
2396 |         session_id: str,
2397 |         include_competitive_analysis: bool = False,
2398 |         **kwargs,
2399 |     ) -> dict[str, Any]:
2400 |         """
2401 |         Comprehensive company research.
2402 | 
2403 |         Args:
2404 |             symbol: Stock symbol to research
2405 |             session_id: Session identifier
2406 |             include_competitive_analysis: Whether to include competitive analysis
2407 |             **kwargs: Additional parameters
2408 | 
2409 |         Returns:
2410 |             Comprehensive company research results
2411 |         """
2412 |         topic = f"{symbol} company financial analysis and outlook"
2413 |         if include_competitive_analysis:
2414 |             kwargs["focus_areas"] = kwargs.get("focus_areas", []) + [
2415 |                 "competitive_analysis",
2416 |                 "market_position",
2417 |             ]
2418 | 
2419 |         return await self.research_comprehensive(
2420 |             topic=topic, session_id=session_id, **kwargs
2421 |         )
2422 | 
2423 |     async def research_topic(
2424 |         self,
2425 |         query: str,
2426 |         session_id: str,
2427 |         focus_areas: list[str] | None = None,
2428 |         timeframe: str = "30d",
2429 |         **kwargs,
2430 |     ) -> dict[str, Any]:
2431 |         """
2432 |         General topic research.
2433 | 
2434 |         Args:
2435 |             query: Research query or topic
2436 |             session_id: Session identifier
2437 |             focus_areas: Specific areas to focus on
2438 |             timeframe: Time range for research
2439 |             **kwargs: Additional parameters
2440 | 
2441 |         Returns:
2442 |             Research results for the given topic
2443 |         """
2444 |         return await self.research_comprehensive(
2445 |             topic=query,
2446 |             session_id=session_id,
2447 |             focus_areas=focus_areas,
2448 |             timeframe=timeframe,
2449 |             **kwargs,
2450 |         )
2451 | 
2452 |     async def analyze_market_sentiment(
2453 |         self, topic: str, session_id: str, timeframe: str = "7d", **kwargs
2454 |     ) -> dict[str, Any]:
2455 |         """
2456 |         Analyze market sentiment around a topic.
2457 | 
2458 |         Args:
2459 |             topic: Topic to analyze sentiment for
2460 |             session_id: Session identifier
2461 |             timeframe: Time range for analysis
2462 |             **kwargs: Additional parameters
2463 | 
2464 |         Returns:
2465 |             Market sentiment analysis results
2466 |         """
2467 |         return await self.research_comprehensive(
2468 |             topic=f"market sentiment analysis: {topic}",
2469 |             session_id=session_id,
2470 |             focus_areas=["sentiment", "market_mood", "investor_sentiment"],
2471 |             timeframe=timeframe,
2472 |             **kwargs,
2473 |         )
2474 | 
2475 |     # Parallel Execution Implementation
2476 | 
2477 |     @log_method_call(component="DeepResearchAgent", include_timing=True)
2478 |     async def _execute_parallel_research(
2479 |         self,
2480 |         topic: str,
2481 |         session_id: str,
2482 |         depth: str,
2483 |         focus_areas: list[str] | None = None,
2484 |         timeframe: str = "30d",
2485 |         initial_state: dict[str, Any] | None = None,
2486 |         start_time: datetime | None = None,
2487 |         **kwargs,
2488 |     ) -> dict[str, Any]:
2489 |         """
2490 |         Execute research using parallel subagent execution.
2491 | 
2492 |         Args:
2493 |             topic: Research topic
2494 |             session_id: Session identifier
2495 |             depth: Research depth level
2496 |             focus_areas: Specific focus areas
2497 |             timeframe: Research timeframe
2498 |             initial_state: Initial state for backward compatibility
2499 |             start_time: Start time for execution measurement
2500 |             **kwargs: Additional parameters
2501 | 
2502 |         Returns:
2503 |             Research results in same format as sequential execution
2504 |         """
2505 |         orchestration_logger = get_orchestration_logger("ParallelExecution")
2506 |         orchestration_logger.set_request_context(session_id=session_id)
2507 | 
2508 |         try:
2509 |             # Generate research tasks using task distributor
2510 |             orchestration_logger.info("🎯 TASK_DISTRIBUTION_START")
2511 |             research_tasks = self.task_distributor.distribute_research_tasks(
2512 |                 topic=topic, session_id=session_id, focus_areas=focus_areas
2513 |             )
2514 | 
2515 |             orchestration_logger.info(
2516 |                 "📋 TASKS_GENERATED",
2517 |                 task_count=len(research_tasks),
2518 |                 task_types=[t.task_type for t in research_tasks],
2519 |             )
2520 | 
2521 |             # Execute tasks in parallel
2522 |             orchestration_logger.info("🚀 PARALLEL_ORCHESTRATION_START")
2523 |             research_result = (
2524 |                 await self.parallel_orchestrator.execute_parallel_research(
2525 |                     tasks=research_tasks,
2526 |                     research_executor=self._execute_subagent_task,
2527 |                     synthesis_callback=self._synthesize_parallel_results,
2528 |                 )
2529 |             )
2530 | 
2531 |             # Log parallel execution metrics
2532 |             log_performance_metrics(
2533 |                 "ParallelExecution",
2534 |                 {
2535 |                     "total_tasks": research_result.successful_tasks
2536 |                     + research_result.failed_tasks,
2537 |                     "successful_tasks": research_result.successful_tasks,
2538 |                     "failed_tasks": research_result.failed_tasks,
2539 |                     "parallel_efficiency": research_result.parallel_efficiency,
2540 |                     "execution_time": research_result.total_execution_time,
2541 |                 },
2542 |             )
2543 | 
2544 |             # Convert parallel results to expected format
2545 |             orchestration_logger.info("🔄 RESULT_FORMATTING_START")
2546 |             formatted_result = await self._format_parallel_research_response(
2547 |                 research_result=research_result,
2548 |                 topic=topic,
2549 |                 session_id=session_id,
2550 |                 depth=depth,
2551 |                 initial_state=initial_state,
2552 |                 start_time=start_time,
2553 |             )
2554 | 
2555 |             orchestration_logger.info(
2556 |                 "✅ PARALLEL_RESEARCH_COMPLETE",
2557 |                 result_confidence=formatted_result.get("confidence_score", 0.0),
2558 |                 sources_analyzed=formatted_result.get("sources_analyzed", 0),
2559 |             )
2560 | 
2561 |             return formatted_result
2562 | 
2563 |         except Exception as e:
2564 |             orchestration_logger.error("❌ PARALLEL_RESEARCH_FAILED", error=str(e))
2565 |             raise  # Re-raise to trigger fallback to sequential
2566 | 
2567 |     async def _execute_subagent_task(
2568 |         self, task
2569 |     ) -> dict[str, Any]:  # Type: ResearchTask
2570 |         """
2571 |         Execute a single research task using specialized subagent.
2572 | 
2573 |         Args:
2574 |             task: ResearchTask to execute
2575 | 
2576 |         Returns:
2577 |             Research results from specialized subagent
2578 |         """
2579 |         with log_agent_execution(
2580 |             task.task_type, task.task_id, task.focus_areas
2581 |         ) as agent_logger:
2582 |             agent_logger.info(
2583 |                 "🎯 SUBAGENT_ROUTING",
2584 |                 target_topic=task.target_topic[:50],
2585 |                 focus_count=len(task.focus_areas),
2586 |                 priority=task.priority,
2587 |             )
2588 | 
2589 |             # Route to appropriate subagent based on task type
2590 |             if task.task_type == "fundamental":
2591 |                 subagent = FundamentalResearchAgent(self)
2592 |                 return await subagent.execute_research(task)
2593 |             elif task.task_type == "technical":
2594 |                 subagent = TechnicalResearchAgent(self)
2595 |                 return await subagent.execute_research(task)
2596 |             elif task.task_type == "sentiment":
2597 |                 subagent = SentimentResearchAgent(self)
2598 |                 return await subagent.execute_research(task)
2599 |             elif task.task_type == "competitive":
2600 |                 subagent = CompetitiveResearchAgent(self)
2601 |                 return await subagent.execute_research(task)
2602 |             else:
2603 |                 # Default to fundamental analysis
2604 |                 agent_logger.warning("⚠️ UNKNOWN_TASK_TYPE", fallback="fundamental")
2605 |                 subagent = FundamentalResearchAgent(self)
2606 |                 return await subagent.execute_research(task)
2607 | 
2608 |     async def _synthesize_parallel_results(
2609 |         self,
2610 |         task_results,  # Type: dict[str, ResearchTask]
2611 |     ) -> dict[str, Any]:
2612 |         """
2613 |         Synthesize results from multiple parallel research tasks.
2614 | 
2615 |         Args:
2616 |             task_results: Dictionary of task IDs to ResearchTask objects
2617 | 
2618 |         Returns:
2619 |             Synthesized research insights
2620 |         """
2621 |         synthesis_logger = get_orchestration_logger("ResultSynthesis")
2622 | 
2623 |         log_synthesis_operation(
2624 |             "parallel_research_synthesis",
2625 |             len(task_results),
2626 |             f"Synthesizing from {len(task_results)} research tasks",
2627 |         )
2628 | 
2629 |         # Extract successful results
2630 |         successful_results = {
2631 |             task_id: task.result
2632 |             for task_id, task in task_results.items()
2633 |             if task.status == "completed" and task.result
2634 |         }
2635 | 
2636 |         synthesis_logger.info(
2637 |             "📊 SYNTHESIS_INPUT_ANALYSIS",
2638 |             total_tasks=len(task_results),
2639 |             successful_tasks=len(successful_results),
2640 |             failed_tasks=len(task_results) - len(successful_results),
2641 |         )
2642 | 
2643 |         if not successful_results:
2644 |             synthesis_logger.warning("⚠️ NO_SUCCESSFUL_RESULTS")
2645 |             return {
2646 |                 "synthesis": "No research results available for synthesis",
2647 |                 "confidence_score": 0.0,
2648 |             }
2649 | 
2650 |         all_insights = []
2651 |         all_risks = []
2652 |         all_opportunities = []
2653 |         sentiment_scores = []
2654 |         credibility_scores = []
2655 | 
2656 |         # Aggregate results from all successful tasks
2657 |         for task_id, task in task_results.items():
2658 |             if task.status == "completed" and task.result:
2659 |                 task_type = task_id.split("_")[-1] if "_" in task_id else "unknown"
2660 |                 synthesis_logger.debug(
2661 |                     "🔍 PROCESSING_TASK_RESULT",
2662 |                     task_id=task_id,
2663 |                     task_type=task_type,
2664 |                     has_insights="insights" in task.result
2665 |                     if isinstance(task.result, dict)
2666 |                     else False,
2667 |                 )
2668 | 
2669 |                 result = task.result
2670 | 
2671 |                 # Extract insights
2672 |                 insights = result.get("insights", [])
2673 |                 all_insights.extend(insights)
2674 | 
2675 |                 # Extract risks and opportunities
2676 |                 risks = result.get("risk_factors", [])
2677 |                 opportunities = result.get("opportunities", [])
2678 |                 all_risks.extend(risks)
2679 |                 all_opportunities.extend(opportunities)
2680 | 
2681 |                 # Extract sentiment
2682 |                 sentiment = result.get("sentiment", {})
2683 |                 if sentiment:
2684 |                     sentiment_scores.append(sentiment)
2685 | 
2686 |                 # Extract credibility
2687 |                 credibility = result.get("credibility_score", 0.5)
2688 |                 credibility_scores.append(credibility)
2689 | 
2690 |         # Calculate overall metrics
2691 |         overall_sentiment = self._calculate_aggregated_sentiment(sentiment_scores)
2692 |         average_credibility = (
2693 |             sum(credibility_scores) / len(credibility_scores)
2694 |             if credibility_scores
2695 |             else 0.5
2696 |         )
2697 | 
2698 |         # Generate synthesis using LLM
2699 |         synthesis_prompt = self._build_parallel_synthesis_prompt(
2700 |             task_results, all_insights, all_risks, all_opportunities, overall_sentiment
2701 |         )
2702 | 
2703 |         try:
2704 |             synthesis_response = await self.llm.ainvoke(
2705 |                 [
2706 |                     SystemMessage(
2707 |                         content="You are a financial research synthesizer. Combine insights from multiple specialized research agents."
2708 |                     ),
2709 |                     HumanMessage(content=synthesis_prompt),
2710 |                 ]
2711 |             )
2712 | 
2713 |             synthesis_text = ContentAnalyzer._coerce_message_content(
2714 |                 synthesis_response.content
2715 |             )
2716 |             synthesis_logger.info("🧠 LLM_SYNTHESIS_SUCCESS")
2717 |         except Exception as e:
2718 |             synthesis_logger.warning(
2719 |                 "⚠️ LLM_SYNTHESIS_FAILED", error=str(e), fallback="text_fallback"
2720 |             )
2721 |             synthesis_text = self._generate_fallback_synthesis(
2722 |                 all_insights, overall_sentiment
2723 |             )
2724 | 
2725 |         synthesis_result = {
2726 |             "synthesis": synthesis_text,
2727 |             "key_insights": list(dict.fromkeys(all_insights))[
2728 |                 :10
2729 |             ],  # Deduplicate and limit
2730 |             "overall_sentiment": overall_sentiment,
2731 |             "risk_assessment": list(dict.fromkeys(all_risks))[:8],
2732 |             "investment_implications": {
2733 |                 "opportunities": list(dict.fromkeys(all_opportunities))[:5],
2734 |                 "threats": list(dict.fromkeys(all_risks))[:5],
2735 |                 "recommended_action": self._derive_parallel_recommendation(
2736 |                     overall_sentiment
2737 |                 ),
2738 |             },
2739 |             "confidence_score": average_credibility,
2740 |             "task_breakdown": {
2741 |                 task_id: {
2742 |                     "type": task.task_type,
2743 |                     "status": task.status,
2744 |                     "execution_time": (task.end_time - task.start_time)
2745 |                     if task.start_time and task.end_time
2746 |                     else 0,
2747 |                 }
2748 |                 for task_id, task in task_results.items()
2749 |             },
2750 |         }
2751 | 
2752 |         synthesis_logger.info(
2753 |             "✅ SYNTHESIS_COMPLETE",
2754 |             insights_count=len(all_insights),
2755 |             overall_confidence=average_credibility,
2756 |             sentiment_direction=synthesis_result["overall_sentiment"]["direction"],
2757 |         )
2758 | 
2759 |         return synthesis_result
2760 | 
2761 |     async def _format_parallel_research_response(
2762 |         self,
2763 |         research_result,
2764 |         topic: str,
2765 |         session_id: str,
2766 |         depth: str,
2767 |         initial_state: dict[str, Any] | None,
2768 |         start_time: datetime | None,
2769 |     ) -> dict[str, Any]:
2770 |         """Format parallel research results to match expected sequential format."""
2771 | 
2772 |         if start_time is not None:
2773 |             execution_time = (datetime.now() - start_time).total_seconds() * 1000
2774 |         else:
2775 |             execution_time = 0.0
2776 | 
2777 |         # Extract synthesis from research result
2778 |         synthesis = research_result.synthesis or {}
2779 | 
2780 |         state_snapshot: dict[str, Any] = initial_state or {}
2781 | 
2782 |         # Create citations from task results
2783 |         citations = []
2784 |         all_sources = []
2785 |         citation_id = 1
2786 | 
2787 |         for _task_id, task in research_result.task_results.items():
2788 |             if task.status == "completed" and task.result:
2789 |                 sources = task.result.get("sources", [])
2790 |                 for source in sources:
2791 |                     citation = {
2792 |                         "id": citation_id,
2793 |                         "title": source.get("title", "Unknown Title"),
2794 |                         "url": source.get("url", ""),
2795 |                         "published_date": source.get("published_date"),
2796 |                         "author": source.get("author"),
2797 |                         "credibility_score": source.get("credibility_score", 0.5),
2798 |                         "relevance_score": source.get("relevance_score", 0.5),
2799 |                         "research_type": task.task_type,
2800 |                     }
2801 |                     citations.append(citation)
2802 |                     all_sources.append(source)
2803 |                     citation_id += 1
2804 | 
2805 |         return {
2806 |             "status": "success",
2807 |             "agent_type": "deep_research",
2808 |             "execution_mode": "parallel",
2809 |             "persona": state_snapshot.get("persona"),
2810 |             "research_topic": topic,
2811 |             "research_depth": depth,
2812 |             "findings": synthesis,
2813 |             "sources_analyzed": len(all_sources),
2814 |             "confidence_score": synthesis.get("confidence_score", 0.0),
2815 |             "citations": citations,
2816 |             "execution_time_ms": execution_time,
2817 |             "parallel_execution_stats": {
2818 |                 "total_tasks": len(research_result.task_results),
2819 |                 "successful_tasks": research_result.successful_tasks,
2820 |                 "failed_tasks": research_result.failed_tasks,
2821 |                 "parallel_efficiency": research_result.parallel_efficiency,
2822 |                 "task_breakdown": synthesis.get("task_breakdown", {}),
2823 |             },
2824 |             "search_queries_used": [],  # Will be populated by subagents
2825 |             "source_diversity": len({source.get("url", "") for source in all_sources})
2826 |             / max(len(all_sources), 1),
2827 |         }
2828 | 
2829 |     # Helper methods for parallel execution
2830 | 
2831 |     def _calculate_aggregated_sentiment(
2832 |         self, sentiment_scores: list[dict[str, Any]]
2833 |     ) -> dict[str, Any]:
2834 |         """Calculate overall sentiment from multiple sentiment scores."""
2835 |         if not sentiment_scores:
2836 |             return {"direction": "neutral", "confidence": 0.5}
2837 | 
2838 |         # Convert sentiment directions to numeric values
2839 |         numeric_scores = []
2840 |         confidences = []
2841 | 
2842 |         for sentiment in sentiment_scores:
2843 |             direction = sentiment.get("direction", "neutral")
2844 |             confidence = sentiment.get("confidence", 0.5)
2845 | 
2846 |             if direction == "bullish":
2847 |                 numeric_scores.append(1 * confidence)
2848 |             elif direction == "bearish":
2849 |                 numeric_scores.append(-1 * confidence)
2850 |             else:
2851 |                 numeric_scores.append(0)
2852 | 
2853 |             confidences.append(confidence)
2854 | 
2855 |         # Calculate weighted average
2856 |         avg_score = sum(numeric_scores) / len(numeric_scores)
2857 |         avg_confidence = sum(confidences) / len(confidences)
2858 | 
2859 |         # Convert back to direction
2860 |         if avg_score > 0.2:
2861 |             direction = "bullish"
2862 |         elif avg_score < -0.2:
2863 |             direction = "bearish"
2864 |         else:
2865 |             direction = "neutral"
2866 | 
2867 |         return {
2868 |             "direction": direction,
2869 |             "confidence": avg_confidence,
2870 |             "consensus": 1 - abs(avg_score) if abs(avg_score) < 1 else 0,
2871 |             "source_count": len(sentiment_scores),
2872 |         }
2873 | 
2874 |     def _build_parallel_synthesis_prompt(
2875 |         self,
2876 |         task_results: dict[str, Any],  # Actually dict[str, ResearchTask]
2877 |         all_insights: list[str],
2878 |         all_risks: list[str],
2879 |         all_opportunities: list[str],
2880 |         overall_sentiment: dict[str, Any],
2881 |     ) -> str:
2882 |         """Build synthesis prompt for parallel research results."""
2883 |         successful_tasks = [
2884 |             task for task in task_results.values() if task.status == "completed"
2885 |         ]
2886 | 
2887 |         prompt = f"""
2888 |         Synthesize comprehensive research findings from {len(successful_tasks)} specialized research agents.
2889 | 
2890 |         Research Task Results:
2891 |         """
2892 | 
2893 |         for task in successful_tasks:
2894 |             if task.result:
2895 |                 prompt += f"\n{task.task_type.upper()} RESEARCH:"
2896 |                 prompt += f"  - Status: {task.status}"
2897 |                 prompt += f"  - Key Insights: {', '.join(task.result.get('insights', [])[:3])}"
2898 |                 prompt += f"  - Sentiment: {task.result.get('sentiment', {}).get('direction', 'neutral')}"
2899 | 
2900 |         prompt += f"""
2901 | 
2902 |         AGGREGATED DATA:
2903 |         - Total Insights: {len(all_insights)}
2904 |         - Risk Factors: {len(all_risks)}
2905 |         - Opportunities: {len(all_opportunities)}
2906 |         - Overall Sentiment: {overall_sentiment.get("direction")} (confidence: {overall_sentiment.get("confidence", 0.5):.2f})
2907 | 
2908 |         Please provide a comprehensive synthesis that includes:
2909 |         1. Executive Summary (2-3 sentences)
2910 |         2. Key Findings from all research areas
2911 |         3. Investment Implications for {self.persona.name} investors
2912 |         4. Risk Assessment and Mitigation
2913 |         5. Recommended Actions based on parallel analysis
2914 |         6. Confidence Level and reasoning
2915 | 
2916 |         Focus on insights that are supported by multiple research agents and highlight any contradictions.
2917 |         """
2918 | 
2919 |         return prompt
2920 | 
2921 |     def _generate_fallback_synthesis(
2922 |         self, insights: list[str], sentiment: dict[str, Any]
2923 |     ) -> str:
2924 |         """Generate fallback synthesis when LLM synthesis fails."""
2925 |         return f"""
2926 |         Research synthesis generated from {len(insights)} insights.
2927 | 
2928 |         Overall sentiment: {sentiment.get("direction", "neutral")} with {sentiment.get("confidence", 0.5):.2f} confidence.
2929 | 
2930 |         Key insights identified:
2931 |         {chr(10).join(f"- {insight}" for insight in insights[:5])}
2932 | 
2933 |         This is a fallback synthesis due to LLM processing limitations.
2934 |         """
2935 | 
2936 |     def _derive_parallel_recommendation(self, sentiment: dict[str, Any]) -> str:
2937 |         """Derive investment recommendation from parallel analysis."""
2938 |         direction = sentiment.get("direction", "neutral")
2939 |         confidence = sentiment.get("confidence", 0.5)
2940 | 
2941 |         if direction == "bullish" and confidence > 0.7:
2942 |             return "Strong buy signal based on parallel analysis from multiple research angles"
2943 |         elif direction == "bullish" and confidence > 0.5:
2944 |             return "Consider position building with appropriate risk management"
2945 |         elif direction == "bearish" and confidence > 0.7:
2946 |             return "Exercise significant caution - multiple research areas show negative signals"
2947 |         elif direction == "bearish" and confidence > 0.5:
2948 |             return "Monitor closely - mixed to negative signals suggest waiting"
2949 |         else:
2950 |             return "Neutral stance recommended - parallel analysis shows mixed signals"
2951 | 
2952 | 
2953 | # Specialized Subagent Classes
2954 | 
2955 | 
2956 | class BaseSubagent:
2957 |     """Base class for specialized research subagents."""
2958 | 
2959 |     def __init__(self, parent_agent: DeepResearchAgent):
2960 |         self.parent = parent_agent
2961 |         self.llm = parent_agent.llm
2962 |         self.search_providers = parent_agent.search_providers
2963 |         self.content_analyzer = parent_agent.content_analyzer
2964 |         self.persona = parent_agent.persona
2965 |         self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
2966 | 
2967 |     async def execute_research(self, task) -> dict[str, Any]:  # task: ResearchTask
2968 |         """Execute research task - to be implemented by subclasses."""
2969 |         raise NotImplementedError
2970 | 
2971 |     async def _safe_search(
2972 |         self,
2973 |         provider: WebSearchProvider,
2974 |         query: str,
2975 |         num_results: int = 5,
2976 |         timeout_budget: float | None = None,
2977 |     ) -> list[dict[str, Any]]:
2978 |         """Safely execute search with a provider, handling exceptions gracefully."""
2979 |         try:
2980 |             return await provider.search(
2981 |                 query, num_results=num_results, timeout_budget=timeout_budget
2982 |             )
2983 |         except Exception as e:
2984 |             self.logger.warning(
2985 |                 f"Search failed for '{query}' with provider {type(provider).__name__}: {e}"
2986 |             )
2987 |             return []  # Return empty list on failure
2988 | 
2989 |     async def _perform_specialized_search(
2990 |         self,
2991 |         topic: str,
2992 |         specialized_queries: list[str],
2993 |         max_results: int = 10,
2994 |         timeout_budget: float | None = None,
2995 |     ) -> list[dict[str, Any]]:
2996 |         """Perform specialized web search for this subagent type."""
2997 |         all_results = []
2998 | 
2999 |         # Create all search tasks for parallel execution
3000 |         search_tasks = []
3001 |         results_per_query = (
3002 |             max_results // len(specialized_queries)
3003 |             if specialized_queries
3004 |             else max_results
3005 |         )
3006 | 
3007 |         # Calculate timeout per search if budget provided
3008 |         if timeout_budget:
3009 |             total_searches = len(specialized_queries) * len(self.search_providers)
3010 |             timeout_per_search = timeout_budget / max(total_searches, 1)
3011 |         else:
3012 |             timeout_per_search = None
3013 | 
3014 |         for query in specialized_queries:
3015 |             for provider in self.search_providers:
3016 |                 # Create async task for each provider/query combination
3017 |                 search_tasks.append(
3018 |                     self._safe_search(
3019 |                         provider,
3020 |                         query,
3021 |                         num_results=results_per_query,
3022 |                         timeout_budget=timeout_per_search,
3023 |                     )
3024 |                 )
3025 | 
3026 |         # Execute all searches in parallel using asyncio.gather()
3027 |         if search_tasks:
3028 |             parallel_results = await asyncio.gather(
3029 |                 *search_tasks, return_exceptions=True
3030 |             )
3031 | 
3032 |             # Process results and filter out exceptions
3033 |             for result in parallel_results:
3034 |                 if isinstance(result, Exception):
3035 |                     # Log the exception but continue with other results
3036 |                     self.logger.warning(f"Search task failed: {result}")
3037 |                 elif isinstance(result, list):
3038 |                     all_results.extend(result)
3039 |                 elif result is not None:
3040 |                     all_results.append(result)
3041 | 
3042 |         # Deduplicate results
3043 |         seen_urls = set()
3044 |         unique_results = []
3045 |         for result in all_results:
3046 |             if result.get("url") not in seen_urls:
3047 |                 seen_urls.add(result["url"])
3048 |                 unique_results.append(result)
3049 | 
3050 |         return unique_results[:max_results]
3051 | 
3052 |     async def _analyze_search_results(
3053 |         self, results: list[dict[str, Any]], analysis_focus: str
3054 |     ) -> list[dict[str, Any]]:
3055 |         """Analyze search results with specialized focus."""
3056 |         analyzed_results = []
3057 | 
3058 |         for result in results:
3059 |             if result.get("content"):
3060 |                 try:
3061 |                     analysis = await self.content_analyzer.analyze_content(
3062 |                         content=result["content"],
3063 |                         persona=self.persona.name.lower(),
3064 |                         analysis_focus=analysis_focus,
3065 |                     )
3066 | 
3067 |                     # Add source credibility
3068 |                     credibility_score = self._calculate_source_credibility(result)
3069 |                     analysis["credibility_score"] = credibility_score
3070 | 
3071 |                     analyzed_results.append(
3072 |                         {
3073 |                             **result,
3074 |                             "analysis": analysis,
3075 |                             "credibility_score": credibility_score,
3076 |                         }
3077 |                     )
3078 |                 except Exception as e:
3079 |                     self.logger.warning(
3080 |                         f"Content analysis failed for {result.get('url', 'unknown')}: {e}"
3081 |                     )
3082 | 
3083 |         return analyzed_results
3084 | 
3085 |     def _calculate_source_credibility(self, source: dict[str, Any]) -> float:
3086 |         """Calculate credibility score for a source - reuse from parent."""
3087 |         return self.parent._calculate_source_credibility(source)
3088 | 
3089 | 
3090 | class FundamentalResearchAgent(BaseSubagent):
3091 |     """Specialized agent for fundamental financial analysis."""
3092 | 
3093 |     async def execute_research(self, task) -> dict[str, Any]:  # task: ResearchTask
3094 |         """Execute fundamental analysis research."""
3095 |         self.logger.info(f"Executing fundamental research for: {task.target_topic}")
3096 | 
3097 |         # Generate fundamental-specific search queries
3098 |         queries = self._generate_fundamental_queries(task.target_topic)
3099 | 
3100 |         # Perform specialized search
3101 |         search_results = await self._perform_specialized_search(
3102 |             topic=task.target_topic, specialized_queries=queries, max_results=8
3103 |         )
3104 | 
3105 |         # Analyze results with fundamental focus
3106 |         analyzed_results = await self._analyze_search_results(
3107 |             search_results, analysis_focus="fundamental_analysis"
3108 |         )
3109 | 
3110 |         # Extract fundamental-specific insights
3111 |         insights = []
3112 |         risks = []
3113 |         opportunities = []
3114 |         sources = []
3115 | 
3116 |         for result in analyzed_results:
3117 |             analysis = result.get("analysis", {})
3118 |             insights.extend(analysis.get("insights", []))
3119 |             risks.extend(analysis.get("risk_factors", []))
3120 |             opportunities.extend(analysis.get("opportunities", []))
3121 |             sources.append(
3122 |                 {
3123 |                     "title": result.get("title", ""),
3124 |                     "url": result.get("url", ""),
3125 |                     "credibility_score": result.get("credibility_score", 0.5),
3126 |                     "published_date": result.get("published_date"),
3127 |                     "author": result.get("author"),
3128 |                 }
3129 |             )
3130 | 
3131 |         return {
3132 |             "research_type": "fundamental",
3133 |             "insights": list(dict.fromkeys(insights))[:8],  # Deduplicate
3134 |             "risk_factors": list(dict.fromkeys(risks))[:6],
3135 |             "opportunities": list(dict.fromkeys(opportunities))[:6],
3136 |             "sentiment": self._calculate_fundamental_sentiment(analyzed_results),
3137 |             "credibility_score": self._calculate_average_credibility(analyzed_results),
3138 |             "sources": sources,
3139 |             "focus_areas": [
3140 |                 "earnings",
3141 |                 "valuation",
3142 |                 "financial_health",
3143 |                 "growth_prospects",
3144 |             ],
3145 |         }
3146 | 
3147 |     def _generate_fundamental_queries(self, topic: str) -> list[str]:
3148 |         """Generate fundamental analysis specific queries."""
3149 |         return [
3150 |             f"{topic} earnings report financial results",
3151 |             f"{topic} revenue growth profit margins",
3152 |             f"{topic} balance sheet debt ratio financial health",
3153 |             f"{topic} valuation PE ratio price earnings",
3154 |             f"{topic} cash flow dividend payout",
3155 |         ]
3156 | 
3157 |     def _calculate_fundamental_sentiment(
3158 |         self, results: list[dict[str, Any]]
3159 |     ) -> dict[str, Any]:
3160 |         """Calculate sentiment specific to fundamental analysis."""
3161 |         sentiments = []
3162 |         for result in results:
3163 |             analysis = result.get("analysis", {})
3164 |             sentiment = analysis.get("sentiment", {})
3165 |             if sentiment:
3166 |                 sentiments.append(sentiment)
3167 | 
3168 |         if not sentiments:
3169 |             return {"direction": "neutral", "confidence": 0.5}
3170 | 
3171 |         # Simple aggregation for now
3172 |         bullish_count = sum(1 for s in sentiments if s.get("direction") == "bullish")
3173 |         bearish_count = sum(1 for s in sentiments if s.get("direction") == "bearish")
3174 | 
3175 |         if bullish_count > bearish_count:
3176 |             return {"direction": "bullish", "confidence": 0.7}
3177 |         elif bearish_count > bullish_count:
3178 |             return {"direction": "bearish", "confidence": 0.7}
3179 |         else:
3180 |             return {"direction": "neutral", "confidence": 0.5}
3181 | 
3182 |     def _calculate_average_credibility(self, results: list[dict[str, Any]]) -> float:
3183 |         """Calculate average credibility of sources."""
3184 |         if not results:
3185 |             return 0.5
3186 | 
3187 |         credibility_scores = [r.get("credibility_score", 0.5) for r in results]
3188 |         return sum(credibility_scores) / len(credibility_scores)
3189 | 
3190 | 
3191 | class TechnicalResearchAgent(BaseSubagent):
3192 |     """Specialized agent for technical analysis research."""
3193 | 
3194 |     async def execute_research(self, task) -> dict[str, Any]:  # task: ResearchTask
3195 |         """Execute technical analysis research."""
3196 |         self.logger.info(f"Executing technical research for: {task.target_topic}")
3197 | 
3198 |         queries = self._generate_technical_queries(task.target_topic)
3199 |         search_results = await self._perform_specialized_search(
3200 |             topic=task.target_topic, specialized_queries=queries, max_results=6
3201 |         )
3202 | 
3203 |         analyzed_results = await self._analyze_search_results(
3204 |             search_results, analysis_focus="technical_analysis"
3205 |         )
3206 | 
3207 |         # Extract technical-specific insights
3208 |         insights = []
3209 |         risks = []
3210 |         opportunities = []
3211 |         sources = []
3212 | 
3213 |         for result in analyzed_results:
3214 |             analysis = result.get("analysis", {})
3215 |             insights.extend(analysis.get("insights", []))
3216 |             risks.extend(analysis.get("risk_factors", []))
3217 |             opportunities.extend(analysis.get("opportunities", []))
3218 |             sources.append(
3219 |                 {
3220 |                     "title": result.get("title", ""),
3221 |                     "url": result.get("url", ""),
3222 |                     "credibility_score": result.get("credibility_score", 0.5),
3223 |                     "published_date": result.get("published_date"),
3224 |                     "author": result.get("author"),
3225 |                 }
3226 |             )
3227 | 
3228 |         return {
3229 |             "research_type": "technical",
3230 |             "insights": list(dict.fromkeys(insights))[:8],
3231 |             "risk_factors": list(dict.fromkeys(risks))[:6],
3232 |             "opportunities": list(dict.fromkeys(opportunities))[:6],
3233 |             "sentiment": self._calculate_technical_sentiment(analyzed_results),
3234 |             "credibility_score": self._calculate_average_credibility(analyzed_results),
3235 |             "sources": sources,
3236 |             "focus_areas": [
3237 |                 "price_action",
3238 |                 "chart_patterns",
3239 |                 "technical_indicators",
3240 |                 "support_resistance",
3241 |             ],
3242 |         }
3243 | 
3244 |     def _generate_technical_queries(self, topic: str) -> list[str]:
3245 |         """Generate technical analysis specific queries."""
3246 |         return [
3247 |             f"{topic} technical analysis chart pattern",
3248 |             f"{topic} price target support resistance",
3249 |             f"{topic} RSI MACD technical indicators",
3250 |             f"{topic} breakout trend analysis",
3251 |             f"{topic} volume analysis price movement",
3252 |         ]
3253 | 
3254 |     def _calculate_technical_sentiment(
3255 |         self, results: list[dict[str, Any]]
3256 |     ) -> dict[str, Any]:
3257 |         """Calculate sentiment specific to technical analysis."""
3258 |         # Similar to fundamental but focused on technical indicators
3259 |         sentiments = [
3260 |             r.get("analysis", {}).get("sentiment", {})
3261 |             for r in results
3262 |             if r.get("analysis")
3263 |         ]
3264 |         sentiments = [s for s in sentiments if s]
3265 | 
3266 |         if not sentiments:
3267 |             return {"direction": "neutral", "confidence": 0.5}
3268 | 
3269 |         bullish_count = sum(1 for s in sentiments if s.get("direction") == "bullish")
3270 |         bearish_count = sum(1 for s in sentiments if s.get("direction") == "bearish")
3271 | 
3272 |         if bullish_count > bearish_count:
3273 |             return {"direction": "bullish", "confidence": 0.6}
3274 |         elif bearish_count > bullish_count:
3275 |             return {"direction": "bearish", "confidence": 0.6}
3276 |         else:
3277 |             return {"direction": "neutral", "confidence": 0.5}
3278 | 
3279 |     def _calculate_average_credibility(self, results: list[dict[str, Any]]) -> float:
3280 |         """Calculate average credibility of sources."""
3281 |         if not results:
3282 |             return 0.5
3283 |         credibility_scores = [r.get("credibility_score", 0.5) for r in results]
3284 |         return sum(credibility_scores) / len(credibility_scores)
3285 | 
3286 | 
3287 | class SentimentResearchAgent(BaseSubagent):
3288 |     """Specialized agent for market sentiment analysis."""
3289 | 
3290 |     async def execute_research(self, task) -> dict[str, Any]:  # task: ResearchTask
3291 |         """Execute sentiment analysis research."""
3292 |         self.logger.info(f"Executing sentiment research for: {task.target_topic}")
3293 | 
3294 |         queries = self._generate_sentiment_queries(task.target_topic)
3295 |         search_results = await self._perform_specialized_search(
3296 |             topic=task.target_topic, specialized_queries=queries, max_results=10
3297 |         )
3298 | 
3299 |         analyzed_results = await self._analyze_search_results(
3300 |             search_results, analysis_focus="sentiment_analysis"
3301 |         )
3302 | 
3303 |         # Extract sentiment-specific insights
3304 |         insights = []
3305 |         risks = []
3306 |         opportunities = []
3307 |         sources = []
3308 | 
3309 |         for result in analyzed_results:
3310 |             analysis = result.get("analysis", {})
3311 |             insights.extend(analysis.get("insights", []))
3312 |             risks.extend(analysis.get("risk_factors", []))
3313 |             opportunities.extend(analysis.get("opportunities", []))
3314 |             sources.append(
3315 |                 {
3316 |                     "title": result.get("title", ""),
3317 |                     "url": result.get("url", ""),
3318 |                     "credibility_score": result.get("credibility_score", 0.5),
3319 |                     "published_date": result.get("published_date"),
3320 |                     "author": result.get("author"),
3321 |                 }
3322 |             )
3323 | 
3324 |         return {
3325 |             "research_type": "sentiment",
3326 |             "insights": list(dict.fromkeys(insights))[:8],
3327 |             "risk_factors": list(dict.fromkeys(risks))[:6],
3328 |             "opportunities": list(dict.fromkeys(opportunities))[:6],
3329 |             "sentiment": self._calculate_market_sentiment(analyzed_results),
3330 |             "credibility_score": self._calculate_average_credibility(analyzed_results),
3331 |             "sources": sources,
3332 |             "focus_areas": [
3333 |                 "market_sentiment",
3334 |                 "analyst_opinions",
3335 |                 "news_sentiment",
3336 |                 "social_sentiment",
3337 |             ],
3338 |         }
3339 | 
3340 |     def _generate_sentiment_queries(self, topic: str) -> list[str]:
3341 |         """Generate sentiment analysis specific queries."""
3342 |         return [
3343 |             f"{topic} analyst rating recommendation upgrade downgrade",
3344 |             f"{topic} market sentiment investor opinion",
3345 |             f"{topic} news sentiment positive negative",
3346 |             f"{topic} social sentiment reddit twitter discussion",
3347 |             f"{topic} institutional investor sentiment",
3348 |         ]
3349 | 
3350 |     def _calculate_market_sentiment(
3351 |         self, results: list[dict[str, Any]]
3352 |     ) -> dict[str, Any]:
3353 |         """Calculate overall market sentiment."""
3354 |         sentiments = [
3355 |             r.get("analysis", {}).get("sentiment", {})
3356 |             for r in results
3357 |             if r.get("analysis")
3358 |         ]
3359 |         sentiments = [s for s in sentiments if s]
3360 | 
3361 |         if not sentiments:
3362 |             return {"direction": "neutral", "confidence": 0.5}
3363 | 
3364 |         # Weighted by confidence
3365 |         weighted_scores = []
3366 |         total_confidence = 0
3367 | 
3368 |         for sentiment in sentiments:
3369 |             direction = sentiment.get("direction", "neutral")
3370 |             confidence = sentiment.get("confidence", 0.5)
3371 | 
3372 |             if direction == "bullish":
3373 |                 weighted_scores.append(1 * confidence)
3374 |             elif direction == "bearish":
3375 |                 weighted_scores.append(-1 * confidence)
3376 |             else:
3377 |                 weighted_scores.append(0)
3378 | 
3379 |             total_confidence += confidence
3380 | 
3381 |         if not weighted_scores:
3382 |             return {"direction": "neutral", "confidence": 0.5}
3383 | 
3384 |         avg_score = sum(weighted_scores) / len(weighted_scores)
3385 |         avg_confidence = total_confidence / len(sentiments)
3386 | 
3387 |         if avg_score > 0.3:
3388 |             return {"direction": "bullish", "confidence": avg_confidence}
3389 |         elif avg_score < -0.3:
3390 |             return {"direction": "bearish", "confidence": avg_confidence}
3391 |         else:
3392 |             return {"direction": "neutral", "confidence": avg_confidence}
3393 | 
3394 |     def _calculate_average_credibility(self, results: list[dict[str, Any]]) -> float:
3395 |         """Calculate average credibility of sources."""
3396 |         if not results:
3397 |             return 0.5
3398 |         credibility_scores = [r.get("credibility_score", 0.5) for r in results]
3399 |         return sum(credibility_scores) / len(credibility_scores)
3400 | 
3401 | 
3402 | class CompetitiveResearchAgent(BaseSubagent):
3403 |     """Specialized agent for competitive and industry analysis."""
3404 | 
3405 |     async def execute_research(self, task) -> dict[str, Any]:  # task: ResearchTask
3406 |         """Execute competitive analysis research."""
3407 |         self.logger.info(f"Executing competitive research for: {task.target_topic}")
3408 | 
3409 |         queries = self._generate_competitive_queries(task.target_topic)
3410 |         search_results = await self._perform_specialized_search(
3411 |             topic=task.target_topic, specialized_queries=queries, max_results=8
3412 |         )
3413 | 
3414 |         analyzed_results = await self._analyze_search_results(
3415 |             search_results, analysis_focus="competitive_analysis"
3416 |         )
3417 | 
3418 |         # Extract competitive-specific insights
3419 |         insights = []
3420 |         risks = []
3421 |         opportunities = []
3422 |         sources = []
3423 | 
3424 |         for result in analyzed_results:
3425 |             analysis = result.get("analysis", {})
3426 |             insights.extend(analysis.get("insights", []))
3427 |             risks.extend(analysis.get("risk_factors", []))
3428 |             opportunities.extend(analysis.get("opportunities", []))
3429 |             sources.append(
3430 |                 {
3431 |                     "title": result.get("title", ""),
3432 |                     "url": result.get("url", ""),
3433 |                     "credibility_score": result.get("credibility_score", 0.5),
3434 |                     "published_date": result.get("published_date"),
3435 |                     "author": result.get("author"),
3436 |                 }
3437 |             )
3438 | 
3439 |         return {
3440 |             "research_type": "competitive",
3441 |             "insights": list(dict.fromkeys(insights))[:8],
3442 |             "risk_factors": list(dict.fromkeys(risks))[:6],
3443 |             "opportunities": list(dict.fromkeys(opportunities))[:6],
3444 |             "sentiment": self._calculate_competitive_sentiment(analyzed_results),
3445 |             "credibility_score": self._calculate_average_credibility(analyzed_results),
3446 |             "sources": sources,
3447 |             "focus_areas": [
3448 |                 "competitive_position",
3449 |                 "market_share",
3450 |                 "industry_trends",
3451 |                 "competitive_advantages",
3452 |             ],
3453 |         }
3454 | 
3455 |     def _generate_competitive_queries(self, topic: str) -> list[str]:
3456 |         """Generate competitive analysis specific queries."""
3457 |         return [
3458 |             f"{topic} market share competitive position industry",
3459 |             f"{topic} competitors comparison competitive advantage",
3460 |             f"{topic} industry analysis market trends",
3461 |             f"{topic} competitive landscape market dynamics",
3462 |             f"{topic} industry outlook sector performance",
3463 |         ]
3464 | 
3465 |     def _calculate_competitive_sentiment(
3466 |         self, results: list[dict[str, Any]]
3467 |     ) -> dict[str, Any]:
3468 |         """Calculate sentiment specific to competitive positioning."""
3469 |         sentiments = [
3470 |             r.get("analysis", {}).get("sentiment", {})
3471 |             for r in results
3472 |             if r.get("analysis")
3473 |         ]
3474 |         sentiments = [s for s in sentiments if s]
3475 | 
3476 |         if not sentiments:
3477 |             return {"direction": "neutral", "confidence": 0.5}
3478 | 
3479 |         # Focus on competitive strength indicators
3480 |         bullish_count = sum(1 for s in sentiments if s.get("direction") == "bullish")
3481 |         bearish_count = sum(1 for s in sentiments if s.get("direction") == "bearish")
3482 | 
3483 |         if bullish_count > bearish_count:
3484 |             return {"direction": "bullish", "confidence": 0.6}
3485 |         elif bearish_count > bullish_count:
3486 |             return {"direction": "bearish", "confidence": 0.6}
3487 |         else:
3488 |             return {"direction": "neutral", "confidence": 0.5}
3489 | 
3490 |     def _calculate_average_credibility(self, results: list[dict[str, Any]]) -> float:
3491 |         """Calculate average credibility of sources."""
3492 |         if not results:
3493 |             return 0.5
3494 |         credibility_scores = [r.get("credibility_score", 0.5) for r in results]
3495 |         return sum(credibility_scores) / len(credibility_scores)
3496 | 
```
Page 39/39FirstPrevNextLast