This is page 26 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/maverick_mcp/providers/market_data.py:
--------------------------------------------------------------------------------
```python
"""
Market data providers and utilities for Maverick-MCP.
Provides market movers, gainers, losers, and other market-wide data.
"""
import asyncio
import logging
import os
from datetime import UTC, datetime, timedelta
from typing import Any, cast
# Suppress specific pyright warnings for pandas DataFrame column access
# pyright: reportAttributeAccessIssue=false
import pandas as pd
import requests
import yfinance as yf
from dotenv import load_dotenv
from finvizfinance.screener.overview import Overview
from requests.adapters import HTTPAdapter, Retry
from tiingo import TiingoClient
from maverick_mcp.utils.circuit_breaker_decorators import (
with_market_data_circuit_breaker,
)
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("maverick_mcp.market_data")
# Initialize Tiingo client
tiingo_config = {"session": True, "api_key": os.getenv("TIINGO_API_KEY")}
tiingo_client = TiingoClient(tiingo_config) if os.getenv("TIINGO_API_KEY") else None
# Market indices - these are standard references
MARKET_INDICES = {
"^GSPC": "S&P 500",
"^DJI": "Dow Jones",
"^IXIC": "NASDAQ",
"^RUT": "Russell 2000",
"^VIX": "VIX",
"^TNX": "10Y Treasury",
}
# Sector ETFs - these are standard references
SECTOR_ETFS = {
"Technology": "XLK",
"Healthcare": "XLV",
"Financials": "XLF",
"Consumer Discretionary": "XLY",
"Industrials": "XLI",
"Energy": "XLE",
"Utilities": "XLU",
"Materials": "XLB",
"Consumer Staples": "XLP",
"Real Estate": "XLRE",
"Communication Services": "XLC",
}
class ExternalAPIClient:
"""Client for External API."""
def __init__(self):
self.api_key = os.getenv("CAPITAL_COMPANION_API_KEY")
self.base_url = "https://capitalcompanion.io"
self.session = requests.Session()
self.session.headers.update(
{"X-API-KEY": self.api_key}
) if self.api_key else None
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"],
)
adapter = HTTPAdapter(
max_retries=retry_strategy, pool_connections=10, pool_maxsize=10
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
@with_market_data_circuit_breaker(use_fallback=False, service="external_api")
def _make_request(self, endpoint: str, params: dict[str, Any] | None = None) -> Any:
"""Make API request with circuit breaker protection."""
list_endpoints = [
"/gainers",
"/losers",
"/maverick-full",
"/maverick-bullish-stocks",
"/maverick-bearish-stocks",
"/top-ten-retail",
"/aggressive-small-caps",
"/undervalued",
"/tech-earnings-growth",
"/unusual-options-activity",
]
if not self.api_key:
logger.warning("External API key not configured")
return [] if endpoint in list_endpoints else {}
url = f"{self.base_url}{endpoint}"
response = self.session.get(url, params=params, timeout=(5, 30))
response.raise_for_status()
result = response.json()
return result
def get_gainers(self) -> list[dict[str, Any]]:
"""Get top gainers from External API."""
result = self._make_request("/gainers")
return result if isinstance(result, list) else []
def get_losers(self) -> list[dict[str, Any]]:
"""Get top losers from External API."""
result = self._make_request("/losers")
return result if isinstance(result, list) else []
def get_maverick_full(self) -> list[dict[str, Any]]:
"""Get full maverick stocks list."""
result = self._make_request("/maverick-full")
return result if isinstance(result, list) else []
def get_maverick_bullish(self) -> list[dict[str, Any]]:
"""Get maverick bullish stocks."""
result = self._make_request("/maverick-bullish-stocks")
return result if isinstance(result, list) else []
def get_maverick_bearish(self) -> list[dict[str, Any]]:
"""Get maverick bearish stocks."""
result = self._make_request("/maverick-bearish-stocks")
return result if isinstance(result, list) else []
def get_top_retail(self) -> list[dict[str, Any]]:
"""Get top retail traded stocks."""
# Note: The endpoint name uses hyphens, not underscores
result = self._make_request("/top-ten-retail")
return result if isinstance(result, list) else []
def get_aggressive_small_caps(self) -> list[dict[str, Any]]:
"""Get aggressive small cap stocks."""
result = self._make_request("/aggressive-small-caps")
return result if isinstance(result, list) else []
def get_undervalued(self) -> list[dict[str, Any]]:
"""Get potentially undervalued large cap stocks."""
result = self._make_request("/undervalued")
return result if isinstance(result, list) else []
def get_tech_earnings_growth(self) -> list[dict[str, Any]]:
"""Get tech stocks with earnings growth over 25%."""
result = self._make_request("/tech-earnings-growth")
return result if isinstance(result, list) else []
def get_quote(self, symbol: str) -> dict[str, Any]:
"""Get stock quote by symbol."""
result = self._make_request(f"/quote/{symbol}")
return result if isinstance(result, dict) else {}
# Initialize External API client
external_api_client = ExternalAPIClient()
@with_market_data_circuit_breaker(use_fallback=False, service="finviz")
def get_finviz_movers(mover_type: str = "gainers", limit: int = 50) -> list[str]:
"""
Get market movers using finvizfinance screener with circuit breaker protection.
Args:
mover_type: Type of movers to get ("gainers", "losers", "active")
limit: Maximum number of stocks to return
Returns:
List of ticker symbols
"""
foverview = Overview()
# Set up filters based on mover type
if mover_type == "gainers":
filters_dict = {
"Change": "Up 5%", # More than 5% gain
"Average Volume": "Over 1M", # Liquid stocks
"Price": "Over $5", # Avoid penny stocks
}
elif mover_type == "losers":
filters_dict = {
"Change": "Down 5%", # More than 5% loss
"Average Volume": "Over 1M",
"Price": "Over $5",
}
elif mover_type == "active":
filters_dict = {
"Average Volume": "Over 20M", # Very high volume
"Price": "Over $5",
}
else:
# Default to liquid stocks
filters_dict = {
"Average Volume": "Over 10M",
"Market Cap.": "Large (>10bln)",
"Price": "Over $10",
}
foverview.set_filter(filters_dict=filters_dict)
df = foverview.screener_view()
if df is not None and not df.empty:
# Sort by appropriate column
if mover_type == "gainers" and "Change" in df.columns:
df = df.sort_values("Change", ascending=False)
elif mover_type == "losers" and "Change" in df.columns:
df = df.sort_values("Change", ascending=True)
elif mover_type == "active" and "Volume" in df.columns:
df = df.sort_values("Volume", ascending=False)
# Get ticker symbols
if "Ticker" in df.columns:
return list(df["Ticker"].head(limit).tolist())
logger.debug(f"No finviz data available for {mover_type}")
return []
def get_finviz_stock_data(symbols: list[str]) -> list[dict[str, Any]]:
"""
Get stock data for symbols using finvizfinance.
Note: finvizfinance doesn't support direct symbol filtering,
so we use yfinance for specific symbol data instead.
Args:
symbols: List of ticker symbols
Returns:
List of dictionaries with stock data
"""
# Use yfinance for specific symbol data as finvizfinance
# doesn't support direct symbol filtering efficiently
results = []
for symbol in symbols[:20]: # Limit to prevent overwhelming
try:
ticker = yf.Ticker(symbol)
info = ticker.info
if info and "currentPrice" in info:
price = info.get("currentPrice", 0)
prev_close = info.get("previousClose", price)
change = price - prev_close if prev_close else 0
change_percent = (change / prev_close * 100) if prev_close else 0
volume = info.get("volume", 0)
results.append(
{
"symbol": symbol,
"price": round(price, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"volume": volume,
}
)
except Exception as e:
logger.debug(f"Error fetching data for {symbol}: {e}")
continue
return results
def fetch_tiingo_tickers():
"""
Fetch active US stock and ETF tickers. First tries External API,
then falls back to Tiingo if available.
Returns:
List of valid ticker symbols
"""
# Try External API first
try:
maverick_full = external_api_client.get_maverick_full()
if maverick_full:
# Extract symbols from the maverick full list
symbols = []
# Handle different response formats
if isinstance(maverick_full, dict):
# API returns {"maverick_stocks": [...]}
if "maverick_stocks" in maverick_full:
for item in maverick_full["maverick_stocks"]:
if isinstance(item, str):
symbols.append(item)
elif isinstance(item, dict) and "symbol" in item:
symbols.append(item["symbol"])
elif isinstance(maverick_full, list):
# Direct list format
for item in maverick_full:
if isinstance(item, dict) and "symbol" in item:
symbols.append(item["symbol"])
elif isinstance(item, str):
symbols.append(item)
if symbols:
return sorted(set(symbols))
except Exception as e:
logger.debug(f"Could not fetch from External API: {e}")
# Fall back to Tiingo if available
if tiingo_client:
try:
asset_types = frozenset(["Stock", "ETF"])
valid_exchanges = frozenset(["NYSE", "NASDAQ", "BATS", "NYSE ARCA", "AMEX"])
cutoff_date = datetime(2024, 7, 1)
tickers = tiingo_client.list_tickers(assetTypes=list(asset_types))
valid_tickers = set()
for t in tickers:
ticker = t["ticker"].strip()
if (
len(ticker) <= 5
and ticker.isalpha()
and t["exchange"].strip() in valid_exchanges
and t["priceCurrency"].strip() == "USD"
and t["assetType"].strip() in asset_types
and t["endDate"]
and datetime.fromisoformat(t["endDate"].rstrip("Z")) > cutoff_date
):
valid_tickers.add(ticker)
return sorted(valid_tickers)
except Exception as e:
logger.error(f"Error fetching tickers from Tiingo: {str(e)}")
# Fall back to finvizfinance
try:
# Get a mix of liquid stocks from finviz
finviz_symbols: set[str] = set()
# Get some active stocks
active = get_finviz_movers("active", limit=100)
finviz_symbols.update(active)
# Get some gainers
gainers = get_finviz_movers("gainers", limit=50)
finviz_symbols.update(gainers)
# Get some losers
losers = get_finviz_movers("losers", limit=50)
finviz_symbols.update(losers)
if finviz_symbols:
return sorted(finviz_symbols)
except Exception as e:
logger.debug(f"Error fetching from finvizfinance: {e}")
logger.warning("No ticker source available, returning empty list")
return []
class MarketDataProvider:
"""
Provider for market-wide data including top gainers, losers, and other market metrics.
Uses Yahoo Finance and other sources.
"""
def __init__(self):
self.session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"],
)
adapter = HTTPAdapter(
max_retries=retry_strategy, pool_connections=10, pool_maxsize=10
)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
async def _run_in_executor(self, func, *args) -> Any:
"""Run a blocking function in an executor to make it non-blocking."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, *args)
def _fetch_data(
self, url: str, params: dict[str, Any] | None = None
) -> dict[str, Any]:
"""
Fetch data from an API with retry logic.
Args:
url: API endpoint URL
params: Optional query parameters
Returns:
JSON response as dictionary
"""
try:
response = self.session.get(
url,
params=params,
timeout=(5, 30), # Connect timeout, read timeout
headers={"User-Agent": "Maverick-MCP/1.0"},
)
response.raise_for_status()
result = response.json()
return result if isinstance(result, dict) else {}
except requests.Timeout:
logger.error(f"Timeout error fetching data from {url}")
return {}
except requests.HTTPError as e:
logger.error(f"HTTP error fetching data from {url}: {str(e)}")
return {}
except Exception as e:
logger.error(f"Unknown error fetching data from {url}: {str(e)}")
return {}
def get_market_summary(self) -> dict[str, Any]:
"""
Get a summary of major market indices.
Returns:
Dictionary with market summary data
"""
try:
import yfinance as yf
data = {}
for index, name in MARKET_INDICES.items():
ticker = yf.Ticker(index)
history = ticker.history(period="2d")
if history.empty:
continue
prev_close = (
history["Close"].iloc[0]
if len(history) > 1
else history["Close"].iloc[0]
)
current = history["Close"].iloc[-1]
change = current - prev_close
change_percent = (change / prev_close) * 100 if prev_close != 0 else 0
data[index] = {
"name": name,
"symbol": index,
"price": round(current, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
}
return data
except Exception as e:
logger.error(f"Error fetching market summary: {str(e)}")
return {}
def get_top_gainers(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get top gaining stocks in the market.
Args:
limit: Maximum number of stocks to return
Returns:
List of dictionaries with stock data
"""
try:
# First try External API
gainers_data = external_api_client.get_gainers()
if gainers_data:
results = []
# Handle different response formats
gainers_list = []
if isinstance(gainers_data, dict) and "gainers" in gainers_data:
gainers_list = gainers_data["gainers"]
elif isinstance(gainers_data, list):
gainers_list = gainers_data
for item in gainers_list[:limit]:
# Handle different response formats
if isinstance(item, dict):
# Extract standard fields
result = {
"symbol": item.get("symbol", item.get("ticker", "")),
"price": item.get("price", item.get("current_price", 0)),
"change": item.get("change", item.get("price_change", 0)),
"change_percent": item.get(
"percent_change", item.get("change_percent", 0)
),
"volume": item.get("volume", 0),
}
# Ensure numeric types
result["price"] = (
float(result["price"]) if result["price"] else 0
)
result["change"] = (
float(result["change"]) if result["change"] else 0
)
result["change_percent"] = (
float(result["change_percent"])
if result["change_percent"]
else 0
)
result["volume"] = (
int(result["volume"]) if result["volume"] else 0
)
if result["symbol"]:
results.append(result)
if results:
return results[:limit]
# If External API fails, try finvizfinance
logger.info("External API gainers unavailable, trying finvizfinance")
# Try to get gainers from finvizfinance
symbols = get_finviz_movers("gainers", limit=limit * 2)
if symbols:
# First try to get data directly from finviz
results = get_finviz_stock_data(symbols[:limit])
if results:
# Sort by percent change and return top gainers
results.sort(key=lambda x: x["change_percent"], reverse=True)
return results[:limit]
# If finviz doesn't have full data, use yfinance with the symbols
if not symbols:
# Last resort: try to get any liquid stocks from finviz
symbols = get_finviz_movers("active", limit=50)
if not symbols:
logger.warning("No symbols available for gainers calculation")
return []
# Fetch data for these symbols
results = []
batch_str = " ".join(symbols[:50]) # Limit to 50 symbols
data = yf.download(
batch_str,
period="2d",
group_by="ticker",
threads=True,
progress=False,
)
if data is None or data.empty:
logger.warning("No data available from yfinance")
return []
for symbol in symbols[:50]:
try:
if len(symbols) == 1:
ticker_data = data
else:
if symbol not in data.columns.get_level_values(0):
continue
ticker_data = data[symbol]
if len(ticker_data) < 2:
continue
prev_close = ticker_data["Close"].iloc[0]
current = ticker_data["Close"].iloc[-1]
if pd.isna(prev_close) or pd.isna(current) or prev_close == 0:
continue
change = current - prev_close
change_percent = (change / prev_close) * 100
volume = ticker_data["Volume"].iloc[-1]
if pd.notna(change_percent) and pd.notna(volume):
results.append(
{
"symbol": symbol,
"price": round(current, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"volume": int(volume),
}
)
except Exception as e:
logger.debug(f"Error processing {symbol}: {str(e)}")
continue
# Sort by percent change and return top gainers
results.sort(key=lambda x: x["change_percent"], reverse=True)
return results[:limit]
except Exception as e:
logger.error(f"Error fetching top gainers: {str(e)}")
return []
def get_top_losers(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get top losing stocks in the market.
Args:
limit: Maximum number of stocks to return
Returns:
List of dictionaries with stock data
"""
try:
# First try External API
losers_data = external_api_client.get_losers()
if losers_data:
results = []
# Handle different response formats
losers_list = []
if isinstance(losers_data, dict) and "losers" in losers_data:
losers_list = losers_data["losers"]
elif isinstance(losers_data, list):
losers_list = losers_data
for item in losers_list[:limit]:
# Handle different response formats
if isinstance(item, dict):
# Extract standard fields
result = {
"symbol": item.get("symbol", item.get("ticker", "")),
"price": item.get("price", item.get("current_price", 0)),
"change": item.get("change", item.get("price_change", 0)),
"change_percent": item.get(
"percent_change", item.get("change_percent", 0)
),
"volume": item.get("volume", 0),
}
# Ensure numeric types
result["price"] = (
float(result["price"]) if result["price"] else 0
)
result["change"] = (
float(result["change"]) if result["change"] else 0
)
result["change_percent"] = (
float(result["change_percent"])
if result["change_percent"]
else 0
)
result["volume"] = (
int(result["volume"]) if result["volume"] else 0
)
if result["symbol"]:
results.append(result)
if results:
return results[:limit]
# If External API fails, try finvizfinance
logger.info("External API losers unavailable, trying finvizfinance")
# Try to get losers from finvizfinance
symbols = get_finviz_movers("losers", limit=limit * 2)
if symbols:
# First try to get data directly from finviz
results = get_finviz_stock_data(symbols[:limit])
if results:
# Sort by percent change (ascending for losers) and return top losers
results.sort(key=lambda x: x["change_percent"])
return results[:limit]
# If finviz doesn't have full data, use yfinance with the symbols
if not symbols:
# Last resort: try to get any liquid stocks from finviz
symbols = get_finviz_movers("active", limit=50)
if not symbols:
logger.warning("No symbols available for losers calculation")
return []
# Fetch data for these symbols
results = []
batch_str = " ".join(symbols[:50]) # Limit to 50 symbols
data = yf.download(
batch_str,
period="2d",
group_by="ticker",
threads=True,
progress=False,
)
if data is None or data.empty:
logger.warning("No data available from yfinance")
return []
for symbol in symbols[:50]:
try:
if len(symbols) == 1:
ticker_data = data
else:
if symbol not in data.columns.get_level_values(0):
continue
ticker_data = data[symbol]
if len(ticker_data) < 2:
continue
prev_close = ticker_data["Close"].iloc[0]
current = ticker_data["Close"].iloc[-1]
if pd.isna(prev_close) or pd.isna(current) or prev_close == 0:
continue
change = current - prev_close
change_percent = (change / prev_close) * 100
volume = ticker_data["Volume"].iloc[-1]
if pd.notna(change_percent) and pd.notna(volume):
results.append(
{
"symbol": symbol,
"price": round(current, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"volume": int(volume),
}
)
except Exception as e:
logger.debug(f"Error processing {symbol}: {str(e)}")
continue
# Sort by percent change (ascending for losers) and return top losers
results.sort(key=lambda x: x["change_percent"])
return results[:limit]
except Exception as e:
logger.error(f"Error fetching top losers: {str(e)}")
return []
def get_most_active(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get most active stocks by volume.
Args:
limit: Maximum number of stocks to return
Returns:
List of dictionaries with stock data
"""
try:
# Use External API's various endpoints for most active stocks
# First try gainers as they have high volume
active_data = external_api_client.get_gainers()
if not active_data:
# Fall back to maverick stocks
maverick_data = external_api_client.get_maverick_full()
if (
isinstance(maverick_data, dict)
and "maverick_stocks" in maverick_data
):
active_data = [
{"symbol": s}
for s in maverick_data["maverick_stocks"][: limit * 2]
]
if active_data:
results = []
symbols = []
# Extract data depending on format
data_list = []
if isinstance(active_data, dict) and "gainers" in active_data:
data_list = active_data["gainers"]
elif isinstance(active_data, list):
data_list = active_data
# Extract symbols from data
for item in data_list:
if isinstance(item, dict):
symbol = item.get("symbol", item.get("ticker", ""))
if symbol:
symbols.append(symbol)
# If the API already provides full data, use it
if all(
k in item
for k in ["price", "change", "change_percent", "volume"]
):
result = {
"symbol": symbol,
"price": float(item.get("price", 0)),
"change": float(item.get("change", 0)),
"change_percent": float(
item.get("change_percent", 0)
),
"volume": int(item.get("volume", 0)),
}
results.append(result)
elif isinstance(item, str):
symbols.append(item)
# If we have complete results from API, return them
if results:
return results[:limit]
# Otherwise fetch additional data for symbols
if symbols:
# Limit symbols to fetch
symbols = symbols[
: min(limit * 2, 30)
] # Fetch more than limit to account for potential errors
batch_str = " ".join(symbols)
data = yf.download(
batch_str,
period="2d",
group_by="ticker",
threads=True,
progress=False,
)
if data is None or data.empty:
logger.warning("No data available from yfinance")
return results[:limit]
for symbol in symbols:
try:
if len(symbols) == 1:
ticker_data = data
else:
if symbol not in data.columns.get_level_values(0):
continue
ticker_data = data[symbol]
if len(ticker_data) < 2:
continue
prev_close = ticker_data["Close"].iloc[0]
current = ticker_data["Close"].iloc[-1]
volume = ticker_data["Volume"].iloc[-1]
if (
pd.isna(prev_close)
or pd.isna(current)
or pd.isna(volume)
or prev_close == 0
):
continue
change = current - prev_close
change_percent = (change / prev_close) * 100
if pd.notna(change_percent) and pd.notna(volume):
results.append(
{
"symbol": symbol,
"price": round(current, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"volume": int(volume),
}
)
except Exception as e:
logger.debug(f"Error processing {symbol}: {str(e)}")
continue
# Sort by volume and return most active
results.sort(key=lambda x: x["volume"], reverse=True)
return results[:limit]
# If no data from External API, try finvizfinance
logger.info("Trying finvizfinance for most active stocks")
# Get most active stocks from finviz
symbols = get_finviz_movers("active", limit=limit * 2)
if symbols:
# First try to get data directly from finviz
results = get_finviz_stock_data(symbols[:limit])
if results:
# Sort by volume and return most active
results.sort(key=lambda x: x["volume"], reverse=True)
return results[:limit]
# If finviz doesn't have full data, use yfinance
batch_str = " ".join(symbols[: limit * 2])
data = yf.download(
batch_str,
period="2d",
group_by="ticker",
threads=True,
progress=False,
)
if data is None or data.empty:
logger.warning("No data available from yfinance")
return []
results = []
for symbol in symbols[: limit * 2]:
try:
if len(symbols) == 1:
ticker_data = data
else:
if symbol not in data.columns.get_level_values(0):
continue
ticker_data = data[symbol]
if len(ticker_data) < 2:
continue
prev_close = ticker_data["Close"].iloc[0]
current = ticker_data["Close"].iloc[-1]
volume = ticker_data["Volume"].iloc[-1]
if (
pd.isna(prev_close)
or pd.isna(current)
or pd.isna(volume)
or prev_close == 0
):
continue
change = current - prev_close
change_percent = (change / prev_close) * 100
if pd.notna(change_percent) and pd.notna(volume):
results.append(
{
"symbol": symbol,
"price": round(current, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"volume": int(volume),
}
)
except Exception as e:
logger.debug(f"Error processing {symbol}: {str(e)}")
continue
# Sort by volume and return most active
results.sort(key=lambda x: x["volume"], reverse=True)
return results[:limit]
logger.warning("No most active stocks data available")
return []
except Exception as e:
logger.error(f"Error fetching most active stocks: {str(e)}")
return []
def get_sector_performance(self) -> dict[str, float]:
"""
Get sector performance data.
Returns:
Dictionary mapping sector names to performance percentages
"""
try:
import yfinance as yf
results = {}
for sector, etf in SECTOR_ETFS.items():
try:
data = yf.Ticker(etf)
hist = data.history(period="2d")
if len(hist) < 2:
continue
prev_close = hist["Close"].iloc[0]
current = hist["Close"].iloc[-1]
change_percent = ((current - prev_close) / prev_close) * 100
results[sector] = round(change_percent, 2)
except Exception as e:
logger.debug(f"Error processing sector {sector}: {str(e)}")
continue
return results
except Exception as e:
logger.error(f"Error fetching sector performance: {str(e)}")
return {}
def get_earnings_calendar(self, days: int = 7) -> list[dict[str, Any]]:
"""
Get upcoming earnings announcements.
Args:
days: Number of days to look ahead
Returns:
List of dictionaries with earnings data
"""
try:
# Get stocks to check for earnings from External API
stocks_to_check = []
# Try to get a diverse set of stocks from different External API endpoints
try:
# Get gainers for earnings check
gainers_data = external_api_client.get_gainers()
if gainers_data:
gainers_list = []
if isinstance(gainers_data, dict) and "gainers" in gainers_data:
gainers_list = gainers_data["gainers"]
elif isinstance(gainers_data, list):
gainers_list = gainers_data
for item in gainers_list[:15]:
if isinstance(item, dict) and "symbol" in item:
stocks_to_check.append(item["symbol"])
# Add some tech stocks with earnings growth
tech_stocks = external_api_client.get_tech_earnings_growth()
for item in tech_stocks[:10]:
if isinstance(item, dict) and "symbol" in item: # type: ignore[arg-type]
symbol = item["symbol"]
if symbol not in stocks_to_check:
stocks_to_check.append(symbol)
elif isinstance(item, str) and item not in stocks_to_check:
stocks_to_check.append(item)
# Add some undervalued stocks
undervalued = external_api_client.get_undervalued()
for item in undervalued[:10]:
if isinstance(item, dict) and "symbol" in item: # type: ignore[arg-type]
symbol = item["symbol"]
if symbol not in stocks_to_check:
stocks_to_check.append(symbol)
elif isinstance(item, str) and item not in stocks_to_check:
stocks_to_check.append(item)
except Exception as e:
logger.debug(
f"Could not fetch stocks from External API for earnings: {e}"
)
# If no stocks from External API, fall back to fetch_tiingo_tickers
if not stocks_to_check:
tickers = fetch_tiingo_tickers()
stocks_to_check = tickers[:50] if tickers else []
check_stocks = stocks_to_check[:50] # Limit to 50 stocks for performance
results = []
today = datetime.now(UTC).date()
end_date = today + timedelta(days=days)
for ticker in check_stocks:
try:
data = yf.Ticker(ticker)
# Try to get calendar info
if hasattr(data, "calendar") and data.calendar is not None:
try:
calendar = data.calendar
if "Earnings Date" in calendar.index:
earnings_date = calendar.loc["Earnings Date"]
# Handle different date formats
if hasattr(earnings_date, "date"):
earnings_date = earnings_date.date()
elif isinstance(earnings_date, str):
earnings_date = datetime.strptime(
earnings_date, "%Y-%m-%d"
).date()
else:
continue
# Check if earnings date is within our range
if today <= earnings_date <= end_date:
results.append(
{
"ticker": ticker,
"name": data.info.get("shortName", ticker),
"earnings_date": earnings_date.strftime(
"%Y-%m-%d"
),
"eps_estimate": float(
calendar.loc["EPS Estimate"]
)
if "EPS Estimate" in calendar.index
else None,
}
)
except Exception as e:
logger.debug(
f"Error parsing calendar for {ticker}: {str(e)}"
)
continue
except Exception as e:
logger.debug(f"Error fetching data for {ticker}: {str(e)}")
continue
# Sort by earnings date
results.sort(key=lambda x: x["earnings_date"])
return results
except Exception as e:
logger.error(f"Error fetching earnings calendar: {str(e)}")
return []
async def get_market_summary_async(self) -> dict[str, Any]:
"""
Get a summary of major market indices (async version).
"""
result = await self._run_in_executor(self.get_market_summary)
return cast(dict[str, Any], result)
async def get_top_gainers_async(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get top gaining stocks in the market (async version).
"""
result = await self._run_in_executor(self.get_top_gainers, limit)
return cast(list[dict[str, Any]], result)
async def get_top_losers_async(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get top losing stocks in the market (async version).
"""
result = await self._run_in_executor(self.get_top_losers, limit)
return cast(list[dict[str, Any]], result)
async def get_most_active_async(self, limit: int = 10) -> list[dict[str, Any]]:
"""
Get most active stocks by volume (async version).
"""
result = await self._run_in_executor(self.get_most_active, limit)
return cast(list[dict[str, Any]], result)
async def get_sector_performance_async(self) -> dict[str, float]:
"""
Get sector performance data (async version).
"""
result = await self._run_in_executor(self.get_sector_performance)
return cast(dict[str, float], result)
async def get_market_overview_async(self) -> dict[str, Any]:
"""
Get comprehensive market overview including summary, gainers, losers, sectors (async version).
Uses concurrent execution for better performance.
"""
# Run all tasks concurrently
tasks = [
self.get_market_summary_async(),
self.get_top_gainers_async(5),
self.get_top_losers_async(5),
self.get_sector_performance_async(),
]
# Wait for all tasks to complete
summary, gainers, losers, sectors = await asyncio.gather(*tasks) # type: ignore[assignment]
return {
"timestamp": datetime.now(UTC).isoformat(),
"market_summary": summary,
"top_gainers": gainers,
"top_losers": losers,
"sector_performance": sectors,
}
def get_market_overview(self) -> dict[str, Any]:
"""
Get comprehensive market overview including summary, gainers, losers, sectors.
Returns:
Dictionary with market overview data
"""
summary = self.get_market_summary()
gainers = self.get_top_gainers(5)
losers = self.get_top_losers(5)
sectors = self.get_sector_performance()
return {
"timestamp": datetime.now(UTC).isoformat(),
"market_summary": summary,
"top_gainers": gainers,
"top_losers": losers,
"sector_performance": sectors,
}
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/stock_data.py:
--------------------------------------------------------------------------------
```python
"""
Enhanced stock data provider with SQLAlchemy integration and screening recommendations.
Provides comprehensive stock data retrieval with database caching and maverick screening.
"""
# Suppress specific pyright warnings for pandas operations
# pyright: reportOperatorIssue=false
import logging
from datetime import UTC, datetime, timedelta
import pandas as pd
import pandas_market_calendars as mcal
import pytz
import yfinance as yf
from dotenv import load_dotenv
from sqlalchemy import text
from sqlalchemy.orm import Session
from maverick_mcp.data.models import (
MaverickBearStocks,
MaverickStocks,
PriceCache,
SessionLocal,
Stock,
SupplyDemandBreakoutStocks,
bulk_insert_price_data,
get_latest_maverick_screening,
)
from maverick_mcp.data.session_management import get_db_session_read_only
from maverick_mcp.utils.circuit_breaker_decorators import (
with_stock_data_circuit_breaker,
)
from maverick_mcp.utils.yfinance_pool import get_yfinance_pool
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("maverick_mcp.stock_data")
class EnhancedStockDataProvider:
"""
Enhanced provider for stock data with database caching and screening recommendations.
"""
def __init__(self, db_session: Session | None = None):
"""
Initialize the stock data provider.
Args:
db_session: Optional database session for dependency injection.
If not provided, will get sessions as needed.
"""
self.timeout = 30
self.max_retries = 3
self.cache_days = 1 # Cache data for 1 day by default
# Initialize NYSE calendar for US stock market
self.market_calendar = mcal.get_calendar("NYSE")
self._db_session = db_session
# Initialize yfinance connection pool
self._yf_pool = get_yfinance_pool()
if db_session:
# Test the provided session
self._test_db_connection_with_session(db_session)
else:
# Test creating a new session
self._test_db_connection()
def _test_db_connection(self):
"""Test database connection on initialization."""
try:
# Use read-only context manager for automatic session management
with get_db_session_read_only() as session:
# Try a simple query
result = session.execute(text("SELECT 1"))
result.fetchone()
logger.info("Database connection successful")
except Exception as e:
logger.warning(
f"Database connection test failed: {e}. Caching will be disabled."
)
def _test_db_connection_with_session(self, session: Session):
"""Test provided database session."""
try:
# Try a simple query
result = session.execute(text("SELECT 1"))
result.fetchone()
logger.info("Database session test successful")
except Exception as e:
logger.warning(
f"Database session test failed: {e}. Caching may not work properly."
)
def _get_data_with_smart_cache(
self, symbol: str, start_date: str, end_date: str, interval: str
) -> pd.DataFrame:
"""
Get stock data using smart caching strategy.
This method:
1. Gets all available data from cache
2. Identifies missing date ranges
3. Fetches only missing data from yfinance
4. Combines and returns the complete dataset
Args:
symbol: Stock ticker symbol
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
interval: Data interval (only '1d' is cached)
Returns:
DataFrame with complete stock data
"""
symbol = symbol.upper()
session, should_close = self._get_db_session()
try:
# Step 1: Get ALL available cached data for the date range
logger.info(f"Checking cache for {symbol} from {start_date} to {end_date}")
cached_df = self._get_cached_data_flexible(
session, symbol, start_date, end_date
)
# Convert dates for comparison - ensure timezone-naive for consistency
start_dt = pd.to_datetime(start_date).tz_localize(None)
end_dt = pd.to_datetime(end_date).tz_localize(None)
# Step 2: Determine what data we need
if cached_df is not None and not cached_df.empty:
logger.info(f"Found {len(cached_df)} cached records for {symbol}")
# Check if we have all the data we need - ensure timezone-naive for comparison
cached_start = pd.to_datetime(cached_df.index.min()).tz_localize(None)
cached_end = pd.to_datetime(cached_df.index.max()).tz_localize(None)
# Identify missing ranges
missing_ranges = []
# Missing data at the beginning?
if start_dt < cached_start:
# Get trading days in the missing range
missing_start_trading = self._get_trading_days(
start_dt, cached_start - timedelta(days=1)
)
if len(missing_start_trading) > 0:
# Only request data if there are trading days
missing_ranges.append(
(
missing_start_trading[0].strftime("%Y-%m-%d"),
missing_start_trading[-1].strftime("%Y-%m-%d"),
)
)
# Missing recent data?
if end_dt > cached_end:
# Check if there are any trading days after our cached data
if self._is_trading_day_between(cached_end, end_dt):
# Get the actual trading days we need
missing_end_trading = self._get_trading_days(
cached_end + timedelta(days=1), end_dt
)
if len(missing_end_trading) > 0:
missing_ranges.append(
(
missing_end_trading[0].strftime("%Y-%m-%d"),
missing_end_trading[-1].strftime("%Y-%m-%d"),
)
)
# If no missing data, return cached data
if not missing_ranges:
logger.info(
f"Cache hit! Returning {len(cached_df)} cached records for {symbol}"
)
# Filter to requested range - ensure index is timezone-naive
cached_df.index = pd.to_datetime(cached_df.index).tz_localize(None)
mask = (cached_df.index >= start_dt) & (cached_df.index <= end_dt)
return cached_df.loc[mask]
# Step 3: Fetch only missing data
logger.info(f"Cache partial hit. Missing ranges: {missing_ranges}")
all_dfs = [cached_df]
for miss_start, miss_end in missing_ranges:
logger.info(
f"Fetching missing data for {symbol} from {miss_start} to {miss_end}"
)
missing_df = self._fetch_stock_data_from_yfinance(
symbol, miss_start, miss_end, None, interval
)
if not missing_df.empty:
all_dfs.append(missing_df)
# Cache the new data
self._cache_price_data(session, symbol, missing_df)
# Combine all data
combined_df = pd.concat(all_dfs).sort_index()
# Remove any duplicates (keep first)
combined_df = combined_df[~combined_df.index.duplicated(keep="first")]
# Filter to requested range - ensure index is timezone-naive
combined_df.index = pd.to_datetime(combined_df.index).tz_localize(None)
mask = (combined_df.index >= start_dt) & (combined_df.index <= end_dt)
return combined_df.loc[mask]
else:
# No cached data, fetch everything but only for trading days
logger.info(
f"No cached data found for {symbol}, fetching from yfinance"
)
# Adjust dates to trading days
trading_days = self._get_trading_days(start_date, end_date)
if len(trading_days) == 0:
logger.warning(
f"No trading days found between {start_date} and {end_date}"
)
return pd.DataFrame(
columns=[ # type: ignore[arg-type]
"Open",
"High",
"Low",
"Close",
"Volume",
"Dividends",
"Stock Splits",
]
)
# Fetch data only for the trading day range
fetch_start = trading_days[0].strftime("%Y-%m-%d")
fetch_end = trading_days[-1].strftime("%Y-%m-%d")
logger.info(
f"Fetching data for trading days: {fetch_start} to {fetch_end}"
)
df = self._fetch_stock_data_from_yfinance(
symbol, fetch_start, fetch_end, None, interval
)
if not df.empty:
# Ensure stock exists and cache the data
self._get_or_create_stock(session, symbol)
self._cache_price_data(session, symbol, df)
return df
finally:
if should_close:
session.close()
def _get_cached_data_flexible(
self, session: Session, symbol: str, start_date: str, end_date: str
) -> pd.DataFrame | None:
"""
Get cached data with flexible date range.
Unlike the strict version, this returns whatever cached data exists
within the requested range, even if incomplete.
Args:
session: Database session
symbol: Stock ticker symbol (will be uppercased)
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
Returns:
DataFrame with available cached data or None
"""
try:
# Get whatever data exists in the range
df = PriceCache.get_price_data(session, symbol, start_date, end_date)
if df.empty:
return None
# Add expected columns for compatibility
for col in ["Dividends", "Stock Splits"]:
if col not in df.columns:
df[col] = 0.0
# Ensure column names match yfinance format
column_mapping = {
"open": "Open",
"high": "High",
"low": "Low",
"close": "Close",
"volume": "Volume",
}
df.rename(columns=column_mapping, inplace=True)
# Ensure proper data types to match yfinance
# Convert Decimal to float for price columns
for col in ["Open", "High", "Low", "Close"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors="coerce").astype("float64")
# Convert volume to int
if "Volume" in df.columns:
df["Volume"] = (
pd.to_numeric(df["Volume"], errors="coerce")
.fillna(0)
.astype("int64")
)
# Ensure index is timezone-naive for consistency
df.index = pd.to_datetime(df.index).tz_localize(None)
return df
except Exception as e:
logger.error(f"Error getting flexible cached data: {e}")
return None
def _is_trading_day_between(
self, start_date: pd.Timestamp, end_date: pd.Timestamp
) -> bool:
"""
Check if there's a trading day between two dates using market calendar.
Args:
start_date: Start date
end_date: End date
Returns:
True if there's a trading day between the dates
"""
# Add one day to start since we're checking "between"
check_start = start_date + timedelta(days=1)
if check_start > end_date:
return False
# Get trading days in the range
trading_days = self._get_trading_days(check_start, end_date)
return len(trading_days) > 0
def _get_trading_days(self, start_date, end_date) -> pd.DatetimeIndex:
"""
Get all trading days between start and end dates.
Args:
start_date: Start date (can be string or datetime)
end_date: End date (can be string or datetime)
Returns:
DatetimeIndex of trading days (timezone-naive)
"""
# Ensure dates are datetime objects (timezone-naive)
if isinstance(start_date, str):
start_date = pd.to_datetime(start_date).tz_localize(None)
else:
start_date = pd.to_datetime(start_date).tz_localize(None)
if isinstance(end_date, str):
end_date = pd.to_datetime(end_date).tz_localize(None)
else:
end_date = pd.to_datetime(end_date).tz_localize(None)
# Get valid trading days from market calendar
schedule = self.market_calendar.schedule(
start_date=start_date, end_date=end_date
)
# Return timezone-naive index
return schedule.index.tz_localize(None)
def _get_last_trading_day(self, date) -> pd.Timestamp:
"""
Get the last trading day on or before the given date.
Args:
date: Date to check (can be string or datetime)
Returns:
Last trading day as pd.Timestamp
"""
if isinstance(date, str):
date = pd.to_datetime(date)
# Check if the date itself is a trading day
if self._is_trading_day(date):
return date
# Otherwise, find the previous trading day
for i in range(1, 10): # Look back up to 10 days
check_date = date - timedelta(days=i)
if self._is_trading_day(check_date):
return check_date
# Fallback to the date itself if no trading day found
return date
def _is_trading_day(self, date) -> bool:
"""
Check if a specific date is a trading day.
Args:
date: Date to check
Returns:
True if it's a trading day
"""
if isinstance(date, str):
date = pd.to_datetime(date)
schedule = self.market_calendar.schedule(start_date=date, end_date=date)
return len(schedule) > 0
def _get_db_session(self) -> tuple[Session, bool]:
"""
Get a database session.
Returns:
Tuple of (session, should_close) where should_close indicates
whether the caller should close the session.
"""
# Use injected session if available - should NOT be closed
if self._db_session:
return self._db_session, False
# Otherwise, create a new session using session factory - should be closed
try:
session = SessionLocal()
return session, True
except Exception as e:
logger.error(f"Failed to get database session: {e}", exc_info=True)
raise
def _get_or_create_stock(self, session: Session, symbol: str) -> Stock:
"""
Get or create a stock in the database.
Args:
session: Database session
symbol: Stock ticker symbol
Returns:
Stock object
"""
stock = Stock.get_or_create(session, symbol)
# Try to update stock info if it's missing
company_name = getattr(stock, "company_name", None)
if company_name is None or company_name == "":
try:
# Use connection pool for info retrieval
info = self._yf_pool.get_info(symbol)
stock.company_name = info.get("longName", info.get("shortName"))
stock.sector = info.get("sector")
stock.industry = info.get("industry")
stock.exchange = info.get("exchange")
stock.currency = info.get("currency", "USD")
stock.country = info.get("country")
session.commit()
except Exception as e:
logger.warning(f"Could not update stock info for {symbol}: {e}")
session.rollback()
return stock
def _get_cached_price_data(
self, session: Session, symbol: str, start_date: str, end_date: str
) -> pd.DataFrame | None:
"""
DEPRECATED: Use _get_data_with_smart_cache instead.
This method is kept for backward compatibility but is no longer used
in the main flow. The new smart caching approach provides better
database prioritization.
"""
logger.warning("Using deprecated _get_cached_price_data method")
return self._get_cached_data_flexible(
session, symbol.upper(), start_date, end_date
)
def _cache_price_data(
self, session: Session, symbol: str, df: pd.DataFrame
) -> None:
"""
Cache price data in the database.
Args:
session: Database session
symbol: Stock ticker symbol
df: DataFrame with price data
"""
try:
if df.empty:
return
# Ensure symbol is uppercase to match database
symbol = symbol.upper()
# Prepare DataFrame for caching
cache_df = df.copy()
# Ensure proper column names
column_mapping = {
"Open": "open",
"High": "high",
"Low": "low",
"Close": "close",
"Volume": "volume",
}
cache_df.rename(columns=column_mapping, inplace=True)
# Log DataFrame info for debugging
logger.debug(
f"DataFrame columns before caching: {cache_df.columns.tolist()}"
)
logger.debug(f"DataFrame shape: {cache_df.shape}")
logger.debug(f"DataFrame index type: {type(cache_df.index)}")
if not cache_df.empty:
logger.debug(f"Sample row: {cache_df.iloc[0].to_dict()}")
# Insert data
count = bulk_insert_price_data(session, symbol, cache_df)
if count == 0:
logger.info(
f"No new records cached for {symbol} (data may already exist)"
)
else:
logger.info(f"Cached {count} new price records for {symbol}")
except Exception as e:
logger.error(f"Error caching price data for {symbol}: {e}", exc_info=True)
session.rollback()
def get_stock_data(
self,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
period: str | None = None,
interval: str = "1d",
use_cache: bool = True,
) -> pd.DataFrame:
"""
Fetch stock data with database caching support.
This method prioritizes cached data from the database and only fetches
missing data from yfinance when necessary.
Args:
symbol: Stock ticker symbol
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
period: Alternative to start/end dates (e.g., '1d', '5d', '1mo', '3mo', '1y', etc.)
interval: Data interval ('1d', '1wk', '1mo', '1m', '5m', etc.)
use_cache: Whether to use cached data if available
Returns:
DataFrame with stock data
"""
# For non-daily intervals or periods, always fetch fresh data
if interval != "1d" or period:
return self._fetch_stock_data_from_yfinance(
symbol, start_date, end_date, period, interval
)
# Set default dates if not provided
if start_date is None:
start_date = (datetime.now(UTC) - timedelta(days=365)).strftime("%Y-%m-%d")
if end_date is None:
end_date = datetime.now(UTC).strftime("%Y-%m-%d")
# For daily data, adjust end date to last trading day if it's not a trading day
# This prevents unnecessary cache misses on weekends/holidays
if interval == "1d" and use_cache:
end_dt = pd.to_datetime(end_date)
if not self._is_trading_day(end_dt):
last_trading = self._get_last_trading_day(end_dt)
logger.debug(
f"Adjusting end date from {end_date} to last trading day {last_trading.strftime('%Y-%m-%d')}"
)
end_date = last_trading.strftime("%Y-%m-%d")
# If cache is disabled, fetch directly from yfinance
if not use_cache:
logger.info(f"Cache disabled, fetching from yfinance for {symbol}")
return self._fetch_stock_data_from_yfinance(
symbol, start_date, end_date, period, interval
)
# Try a smarter caching approach
try:
return self._get_data_with_smart_cache(
symbol, start_date, end_date, interval
)
except Exception as e:
logger.warning(f"Smart cache failed, falling back to yfinance: {e}")
return self._fetch_stock_data_from_yfinance(
symbol, start_date, end_date, period, interval
)
async def get_stock_data_async(
self,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
period: str | None = None,
interval: str = "1d",
use_cache: bool = True,
) -> pd.DataFrame:
"""
Async version of get_stock_data for parallel processing.
This method wraps the synchronous get_stock_data method to provide
an async interface for use in parallel backtesting operations.
Args:
symbol: Stock ticker symbol
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
period: Alternative to start/end dates (e.g., '1d', '5d', '1mo', '3mo', '1y', etc.)
interval: Data interval ('1d', '1wk', '1mo', '1m', '5m', etc.)
use_cache: Whether to use cached data if available
Returns:
DataFrame with stock data
"""
import asyncio
import functools
# Run the synchronous method in a thread pool to avoid blocking
loop = asyncio.get_event_loop()
# Use functools.partial to create a callable with all arguments
sync_method = functools.partial(
self.get_stock_data,
symbol=symbol,
start_date=start_date,
end_date=end_date,
period=period,
interval=interval,
use_cache=use_cache,
)
# Execute in thread pool to avoid blocking the event loop
return await loop.run_in_executor(None, sync_method)
@with_stock_data_circuit_breaker(
use_fallback=False
) # Fallback handled at higher level
def _fetch_stock_data_from_yfinance(
self,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
period: str | None = None,
interval: str = "1d",
) -> pd.DataFrame:
"""
Fetch stock data from yfinance with circuit breaker protection.
Note: Circuit breaker is applied with use_fallback=False because
fallback strategies are handled at the get_stock_data level.
"""
logger.info(
f"Fetching data from yfinance for {symbol} - Start: {start_date}, End: {end_date}, Period: {period}, Interval: {interval}"
)
# Use connection pool for better performance
# The pool handles session management and retries internally
# Use the optimized connection pool
df = self._yf_pool.get_history(
symbol=symbol,
start=start_date,
end=end_date,
period=period,
interval=interval,
)
# Check if dataframe is empty or if required columns are missing
if df.empty:
logger.warning(f"Empty dataframe returned for {symbol}")
return pd.DataFrame(
columns=["Open", "High", "Low", "Close", "Volume"] # type: ignore[arg-type]
)
# Ensure all expected columns exist
for col in ["Open", "High", "Low", "Close", "Volume"]:
if col not in df.columns:
logger.warning(
f"Column {col} missing from data for {symbol}, adding empty column"
)
# Use appropriate default values
if col == "Volume":
df[col] = 0
else:
df[col] = 0.0
df.index.name = "Date"
return df
def get_maverick_recommendations(
self, limit: int = 20, min_score: int | None = None
) -> list[dict]:
"""
Get top Maverick stock recommendations from the database.
Args:
limit: Maximum number of recommendations
min_score: Minimum combined score filter
Returns:
List of stock recommendations with details
"""
session, should_close = self._get_db_session()
try:
# Build query with filtering at database level
query = session.query(MaverickStocks)
# Apply min_score filter in the query if specified
if min_score:
query = query.filter(MaverickStocks.combined_score >= min_score)
# Order by score and limit results
stocks = (
query.order_by(MaverickStocks.combined_score.desc()).limit(limit).all()
)
# Process results with list comprehension for better performance
recommendations = [
{
**stock.to_dict(),
"recommendation_type": "maverick_bullish",
"reason": self._generate_maverick_reason(stock),
}
for stock in stocks
]
return recommendations
except Exception as e:
logger.error(f"Error getting maverick recommendations: {e}")
return []
finally:
if should_close:
session.close()
def get_maverick_bear_recommendations(
self, limit: int = 20, min_score: int | None = None
) -> list[dict]:
"""
Get top Maverick bear stock recommendations from the database.
Args:
limit: Maximum number of recommendations
min_score: Minimum score filter
Returns:
List of bear stock recommendations with details
"""
session, should_close = self._get_db_session()
try:
# Build query with filtering at database level
query = session.query(MaverickBearStocks)
# Apply min_score filter in the query if specified
if min_score:
query = query.filter(MaverickBearStocks.score >= min_score)
# Order by score and limit results
stocks = query.order_by(MaverickBearStocks.score.desc()).limit(limit).all()
# Process results with list comprehension for better performance
recommendations = [
{
**stock.to_dict(),
"recommendation_type": "maverick_bearish",
"reason": self._generate_bear_reason(stock),
}
for stock in stocks
]
return recommendations
except Exception as e:
logger.error(f"Error getting bear recommendations: {e}")
return []
finally:
if should_close:
session.close()
def get_supply_demand_breakout_recommendations(
self, limit: int = 20, min_momentum_score: float | None = None
) -> list[dict]:
"""
Get stocks showing supply/demand breakout patterns from accumulation phases.
Args:
limit: Maximum number of recommendations
min_momentum_score: Minimum momentum score filter
Returns:
List of supply/demand breakout recommendations with market structure analysis
"""
session, should_close = self._get_db_session()
try:
# Build query with all filters at database level
query = session.query(SupplyDemandBreakoutStocks).filter(
# Supply/demand breakout criteria: price above all moving averages (demand zone)
SupplyDemandBreakoutStocks.close_price
> SupplyDemandBreakoutStocks.sma_50,
SupplyDemandBreakoutStocks.close_price
> SupplyDemandBreakoutStocks.sma_150,
SupplyDemandBreakoutStocks.close_price
> SupplyDemandBreakoutStocks.sma_200,
# Moving average alignment indicates accumulation structure
SupplyDemandBreakoutStocks.sma_50 > SupplyDemandBreakoutStocks.sma_150,
SupplyDemandBreakoutStocks.sma_150 > SupplyDemandBreakoutStocks.sma_200,
)
# Apply min_momentum_score filter if specified
if min_momentum_score:
query = query.filter(
SupplyDemandBreakoutStocks.momentum_score >= min_momentum_score
)
# Order by momentum score and limit results
stocks = (
query.order_by(SupplyDemandBreakoutStocks.momentum_score.desc())
.limit(limit)
.all()
)
# Process results with list comprehension for better performance
recommendations = [
{
**stock.to_dict(),
"recommendation_type": "supply_demand_breakout",
"reason": self._generate_supply_demand_reason(stock),
}
for stock in stocks
]
return recommendations
except Exception as e:
logger.error(f"Error getting trending recommendations: {e}")
return []
finally:
if should_close:
session.close()
def get_all_screening_recommendations(self) -> dict[str, list[dict]]:
"""
Get all screening recommendations in one call.
Returns:
Dictionary with all screening types and their recommendations
"""
try:
results = get_latest_maverick_screening()
# Add recommendation reasons
for stock in results.get("maverick_stocks", []):
stock["recommendation_type"] = "maverick_bullish"
stock["reason"] = self._generate_maverick_reason_from_dict(stock)
for stock in results.get("maverick_bear_stocks", []):
stock["recommendation_type"] = "maverick_bearish"
stock["reason"] = self._generate_bear_reason_from_dict(stock)
for stock in results.get("supply_demand_breakouts", []):
stock["recommendation_type"] = "supply_demand_breakout"
stock["reason"] = self._generate_supply_demand_reason_from_dict(stock)
return results
except Exception as e:
logger.error(f"Error getting all screening recommendations: {e}")
return {
"maverick_stocks": [],
"maverick_bear_stocks": [],
"supply_demand_breakouts": [],
}
def _generate_maverick_reason(self, stock: MaverickStocks) -> str:
"""Generate recommendation reason for Maverick stock."""
reasons = []
combined_score = getattr(stock, "combined_score", None)
if combined_score is not None and combined_score >= 90:
reasons.append("Exceptional combined score")
elif combined_score is not None and combined_score >= 80:
reasons.append("Strong combined score")
momentum_score = getattr(stock, "momentum_score", None)
if momentum_score is not None and momentum_score >= 90:
reasons.append("outstanding relative strength")
elif momentum_score is not None and momentum_score >= 80:
reasons.append("strong relative strength")
pat = getattr(stock, "pat", None)
if pat is not None and pat != "":
reasons.append(f"{pat} pattern detected")
consolidation = getattr(stock, "consolidation", None)
if consolidation is not None and consolidation == "yes":
reasons.append("consolidation characteristics")
sqz = getattr(stock, "sqz", None)
if sqz is not None and sqz != "":
reasons.append(f"squeeze indicator: {sqz}")
return (
"Bullish setup with " + ", ".join(reasons)
if reasons
else "Strong technical setup"
)
def _generate_bear_reason(self, stock: MaverickBearStocks) -> str:
"""Generate recommendation reason for bear stock."""
reasons = []
score = getattr(stock, "score", None)
if score is not None and score >= 90:
reasons.append("Exceptional bear score")
elif score is not None and score >= 80:
reasons.append("Strong bear score")
momentum_score = getattr(stock, "momentum_score", None)
if momentum_score is not None and momentum_score <= 30:
reasons.append("weak relative strength")
rsi_14 = getattr(stock, "rsi_14", None)
if rsi_14 is not None and rsi_14 <= 30:
reasons.append("oversold RSI")
atr_contraction = getattr(stock, "atr_contraction", False)
if atr_contraction is True:
reasons.append("ATR contraction")
big_down_vol = getattr(stock, "big_down_vol", False)
if big_down_vol is True:
reasons.append("heavy selling volume")
return (
"Bearish setup with " + ", ".join(reasons)
if reasons
else "Weak technical setup"
)
def _generate_supply_demand_reason(self, stock: SupplyDemandBreakoutStocks) -> str:
"""Generate recommendation reason for supply/demand breakout stock."""
reasons = ["Supply/demand breakout from accumulation"]
momentum_score = getattr(stock, "momentum_score", None)
if momentum_score is not None and momentum_score >= 90:
reasons.append("exceptional relative strength")
elif momentum_score is not None and momentum_score >= 80:
reasons.append("strong relative strength")
reasons.append("price above all major moving averages")
reasons.append("moving averages in proper alignment")
pat = getattr(stock, "pat", None)
if pat is not None and pat != "":
reasons.append(f"{pat} pattern")
return " with ".join(reasons)
def _generate_maverick_reason_from_dict(self, stock: dict) -> str:
"""Generate recommendation reason for Maverick stock from dict."""
reasons = []
score = stock.get("combined_score", 0)
if score >= 90:
reasons.append("Exceptional combined score")
elif score >= 80:
reasons.append("Strong combined score")
momentum = stock.get("momentum_score", 0)
if momentum >= 90:
reasons.append("outstanding relative strength")
elif momentum >= 80:
reasons.append("strong relative strength")
if stock.get("pattern"):
reasons.append(f"{stock['pattern']} pattern detected")
if stock.get("consolidation") == "yes":
reasons.append("consolidation characteristics")
if stock.get("squeeze"):
reasons.append(f"squeeze indicator: {stock['squeeze']}")
return (
"Bullish setup with " + ", ".join(reasons)
if reasons
else "Strong technical setup"
)
def _generate_bear_reason_from_dict(self, stock: dict) -> str:
"""Generate recommendation reason for bear stock from dict."""
reasons = []
score = stock.get("score", 0)
if score >= 90:
reasons.append("Exceptional bear score")
elif score >= 80:
reasons.append("Strong bear score")
momentum = stock.get("momentum_score", 100)
if momentum <= 30:
reasons.append("weak relative strength")
rsi = stock.get("rsi_14")
if rsi and rsi <= 30:
reasons.append("oversold RSI")
if stock.get("atr_contraction"):
reasons.append("ATR contraction")
if stock.get("big_down_vol"):
reasons.append("heavy selling volume")
return (
"Bearish setup with " + ", ".join(reasons)
if reasons
else "Weak technical setup"
)
def _generate_supply_demand_reason_from_dict(self, stock: dict) -> str:
"""Generate recommendation reason for supply/demand breakout stock from dict."""
reasons = ["Supply/demand breakout from accumulation"]
momentum = stock.get("momentum_score", 0)
if momentum >= 90:
reasons.append("exceptional relative strength")
elif momentum >= 80:
reasons.append("strong relative strength")
reasons.append("price above all major moving averages")
reasons.append("moving averages in proper alignment")
if stock.get("pattern"):
reasons.append(f"{stock['pattern']} pattern")
return " with ".join(reasons)
# Keep all original methods for backward compatibility
@with_stock_data_circuit_breaker(use_fallback=False)
def get_stock_info(self, symbol: str) -> dict:
"""Get detailed stock information from yfinance with circuit breaker protection."""
# Use connection pool for better performance
return self._yf_pool.get_info(symbol)
def get_realtime_data(self, symbol):
"""Get the latest real-time data for a symbol using yfinance."""
try:
# Use connection pool for real-time data
data = self._yf_pool.get_history(symbol, period="1d")
if data.empty:
return None
latest = data.iloc[-1]
# Get previous close for change calculation
info = self._yf_pool.get_info(symbol)
prev_close = info.get("previousClose", None)
if prev_close is None:
# Try to get from 2-day history
data_2d = self._yf_pool.get_history(symbol, period="2d")
if len(data_2d) > 1:
prev_close = data_2d.iloc[0]["Close"]
else:
prev_close = latest["Close"]
# Calculate change
price = latest["Close"]
change = price - prev_close
change_percent = (change / prev_close) * 100 if prev_close != 0 else 0
return {
"symbol": symbol,
"price": round(price, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"volume": int(latest["Volume"]),
"timestamp": data.index[-1],
"timestamp_display": data.index[-1].strftime("%Y-%m-%d %H:%M:%S"),
"is_real_time": False, # yfinance data has some delay
}
except Exception as e:
logger.error(f"Error fetching realtime data for {symbol}: {str(e)}")
return None
def get_all_realtime_data(self, symbols):
"""Get all latest real-time data for multiple symbols."""
results = {}
for symbol in symbols:
data = self.get_realtime_data(symbol)
if data:
results[symbol] = data
return results
def is_market_open(self) -> bool:
"""Check if the US stock market is currently open."""
now = datetime.now(pytz.timezone("US/Eastern"))
# Check if it's a weekday
if now.weekday() >= 5: # 5 and 6 are Saturday and Sunday
return False
# Check if it's between 9:30 AM and 4:00 PM Eastern Time
market_open = now.replace(hour=9, minute=30, second=0, microsecond=0)
market_close = now.replace(hour=16, minute=0, second=0, microsecond=0)
return market_open <= now <= market_close
def get_news(self, symbol: str, limit: int = 10) -> pd.DataFrame:
"""Get news for a stock from yfinance."""
try:
ticker = yf.Ticker(symbol)
news = ticker.news
if not news:
return pd.DataFrame(
columns=[ # type: ignore[arg-type]
"title",
"publisher",
"link",
"providerPublishTime",
"type",
]
)
df = pd.DataFrame(news[:limit])
# Convert timestamp to datetime
if "providerPublishTime" in df.columns:
df["providerPublishTime"] = pd.to_datetime(
df["providerPublishTime"], unit="s"
)
return df
except Exception as e:
logger.error(f"Error fetching news for {symbol}: {str(e)}")
return pd.DataFrame(
columns=["title", "publisher", "link", "providerPublishTime", "type"] # type: ignore[arg-type]
)
def get_earnings(self, symbol: str) -> dict:
"""Get earnings information for a stock."""
try:
ticker = yf.Ticker(symbol)
return {
"earnings": ticker.earnings.to_dict()
if hasattr(ticker, "earnings") and not ticker.earnings.empty
else {},
"earnings_dates": ticker.earnings_dates.to_dict()
if hasattr(ticker, "earnings_dates") and not ticker.earnings_dates.empty
else {},
"earnings_trend": ticker.earnings_trend
if hasattr(ticker, "earnings_trend")
else {},
}
except Exception as e:
logger.error(f"Error fetching earnings for {symbol}: {str(e)}")
return {"earnings": {}, "earnings_dates": {}, "earnings_trend": {}}
def get_recommendations(self, symbol: str) -> pd.DataFrame:
"""Get analyst recommendations for a stock."""
try:
ticker = yf.Ticker(symbol)
recommendations = ticker.recommendations
if recommendations is None or recommendations.empty:
return pd.DataFrame(columns=["firm", "toGrade", "fromGrade", "action"]) # type: ignore[arg-type]
return recommendations
except Exception as e:
logger.error(f"Error fetching recommendations for {symbol}: {str(e)}")
return pd.DataFrame(columns=["firm", "toGrade", "fromGrade", "action"]) # type: ignore[arg-type]
def is_etf(self, symbol: str) -> bool:
"""Check if a given symbol is an ETF."""
try:
stock = yf.Ticker(symbol)
# Check if quoteType exists and is ETF
if "quoteType" in stock.info:
return stock.info["quoteType"].upper() == "ETF" # type: ignore[no-any-return]
# Fallback check for common ETF identifiers
return any(
[
symbol.endswith(("ETF", "FUND")),
symbol
in [
"SPY",
"QQQ",
"IWM",
"DIA",
"XLB",
"XLE",
"XLF",
"XLI",
"XLK",
"XLP",
"XLU",
"XLV",
"XLY",
"XLC",
"XLRE",
"XME",
],
"ETF" in stock.info.get("longName", "").upper(),
]
)
except Exception as e:
logger.error(f"Error checking if {symbol} is ETF: {e}")
return False
# Maintain backward compatibility
StockDataProvider = EnhancedStockDataProvider
```
--------------------------------------------------------------------------------
/tests/test_deep_research_functional.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive functional tests for DeepResearchAgent.
This test suite focuses on testing the actual research functionality including:
## Web Search Integration Tests (TestWebSearchIntegration):
- Exa and Tavily search provider query formatting and result processing
- Provider fallback behavior when APIs fail
- Search result deduplication from multiple providers
- Social media filtering and content processing
## Research Synthesis Tests (TestResearchSynthesis):
- Persona-aware content analysis with different investment styles
- Complete research synthesis workflow from query to findings
- Iterative research refinement based on initial results
- Fact validation and source credibility scoring
## Persona-Based Research Tests (TestPersonaBasedResearch):
- Conservative persona focus on stability, dividends, and risk factors
- Aggressive persona exploration of growth opportunities and innovation
- Day trader persona emphasis on short-term catalysts and volatility
- Research depth differences between conservative and aggressive approaches
## Multi-Step Research Workflow Tests (TestMultiStepResearchWorkflow):
- End-to-end research workflow from initial query to final report
- Handling of insufficient or conflicting information scenarios
- Research focusing and refinement based on discovered gaps
- Citation generation and source attribution
## Research Method Specialization Tests (TestResearchMethodSpecialization):
- Sentiment analysis specialization with news and social signals
- Fundamental analysis focusing on financials and company data
- Competitive analysis examining market position and rivals
- Proper routing to specialized analysis based on focus areas
## Error Handling and Resilience Tests (TestErrorHandlingAndResilience):
- Graceful degradation when search providers are unavailable
- Content analysis fallback when LLM services fail
- Partial search failure handling with provider redundancy
- Circuit breaker behavior and timeout handling
## Research Quality and Validation Tests (TestResearchQualityAndValidation):
- Research confidence calculation based on source quality and diversity
- Source credibility scoring (government, financial sites vs. blogs)
- Source diversity assessment for balanced research
- Investment recommendation logic based on persona and findings
## Key Features Tested:
- **Realistic Mock Data**: Uses comprehensive financial article samples
- **Provider Integration**: Tests both Exa and Tavily search providers
- **LangGraph Workflows**: Tests complete research state machine
- **Persona Adaptation**: Validates different investor behavior patterns
- **Error Resilience**: Ensures system continues operating with degraded capabilities
- **Research Logic**: Tests actual synthesis and analysis rather than just API calls
All tests use realistic mock data and test the research logic rather than just API connectivity.
26 test cases cover the complete research pipeline from initial search to final recommendations.
"""
import json
from unittest.mock import AsyncMock, MagicMock, Mock, patch
import pytest
from maverick_mcp.agents.deep_research import (
PERSONA_RESEARCH_FOCUS,
RESEARCH_DEPTH_LEVELS,
ContentAnalyzer,
DeepResearchAgent,
ExaSearchProvider,
TavilySearchProvider,
)
from maverick_mcp.exceptions import WebSearchError
# Mock Data Fixtures
@pytest.fixture
def mock_llm():
"""Mock LLM with realistic responses for content analysis."""
llm = MagicMock()
llm.ainvoke = AsyncMock()
llm.bind_tools = MagicMock(return_value=llm)
# Default response for content analysis
def mock_response(messages):
response = Mock()
response.content = json.dumps(
{
"KEY_INSIGHTS": [
"Strong revenue growth in cloud services",
"Market expansion in international segments",
"Increasing competitive pressure from rivals",
],
"SENTIMENT": {"direction": "bullish", "confidence": 0.75},
"RISK_FACTORS": [
"Regulatory scrutiny in international markets",
"Supply chain disruptions affecting hardware",
],
"OPPORTUNITIES": [
"AI integration driving new revenue streams",
"Subscription model improving recurring revenue",
],
"CREDIBILITY": 0.8,
"RELEVANCE": 0.9,
"SUMMARY": "Analysis shows strong fundamentals with growth opportunities despite some regulatory risks.",
}
)
return response
llm.ainvoke.side_effect = mock_response
return llm
@pytest.fixture
def comprehensive_search_results():
"""Comprehensive mock search results from multiple providers."""
return [
{
"url": "https://finance.yahoo.com/news/apple-earnings-q4-2024",
"title": "Apple Reports Strong Q4 2024 Earnings",
"content": """Apple Inc. reported quarterly earnings that beat Wall Street expectations,
driven by strong iPhone sales and growing services revenue. The company posted
revenue of $94.9 billion, up 6% year-over-year. CEO Tim Cook highlighted the
success of the iPhone 15 lineup and expressed optimism about AI integration
in future products. Services revenue reached $22.3 billion, representing
a 16% increase. The company also announced a 4% increase in quarterly dividend.""",
"published_date": "2024-01-25T10:30:00Z",
"score": 0.92,
"provider": "exa",
"author": "Financial Times Staff",
},
{
"url": "https://seekingalpha.com/article/apple-technical-analysis-2024",
"title": "Apple Stock Technical Analysis: Bullish Momentum Building",
"content": """Technical analysis of Apple stock shows bullish momentum building
with the stock breaking above key resistance at $190. Volume has been
increasing on up days, suggesting institutional accumulation. The RSI
is at 58, indicating room for further upside. Key support levels are
at $185 and $180. Price target for the next quarter is $210-$220 based
on chart patterns and momentum indicators.""",
"published_date": "2024-01-24T14:45:00Z",
"score": 0.85,
"provider": "exa",
"author": "Tech Analyst Pro",
},
{
"url": "https://reuters.com/apple-supply-chain-concerns",
"title": "Apple Faces Supply Chain Headwinds in 2024",
"content": """Apple is encountering supply chain challenges that could impact
production timelines for its upcoming product launches. Manufacturing
partners in Asia report delays due to component shortages, particularly
for advanced semiconductors. The company is working to diversify its
supplier base to reduce risks. Despite these challenges, analysts
remain optimistic about Apple's ability to meet demand through
strategic inventory management.""",
"published_date": "2024-01-23T08:15:00Z",
"score": 0.78,
"provider": "tavily",
"author": "Reuters Technology Team",
},
{
"url": "https://fool.com/apple-ai-strategy-competitive-advantage",
"title": "Apple's AI Strategy Could Be Its Next Competitive Moat",
"content": """Apple's approach to artificial intelligence differs significantly
from competitors, focusing on on-device processing and privacy protection.
The company's investment in AI chips and machine learning capabilities
positions it well for the next phase of mobile computing. Industry
experts predict Apple's AI integration will drive hardware upgrade
cycles and create new revenue opportunities in services. The privacy-first
approach could become a key differentiator in the market.""",
"published_date": "2024-01-22T16:20:00Z",
"score": 0.88,
"provider": "exa",
"author": "Investment Strategy Team",
},
{
"url": "https://barrons.com/apple-dividend-growth-analysis",
"title": "Apple's Dividend Growth Story Continues",
"content": """Apple has increased its dividend for the 12th consecutive year,
demonstrating strong cash flow generation and commitment to returning
capital to shareholders. The company's dividend yield of 0.5% may seem
modest, but the consistent growth rate of 7% annually makes it attractive
for income-focused investors. With over $162 billion in cash and
marketable securities, Apple has the financial flexibility to continue
rewarding shareholders while investing in growth initiatives.""",
"published_date": "2024-01-21T11:00:00Z",
"score": 0.82,
"provider": "tavily",
"author": "Dividend Analysis Team",
},
]
@pytest.fixture
def mock_research_agent(mock_llm):
"""Create a DeepResearchAgent with mocked dependencies."""
with (
patch("maverick_mcp.agents.deep_research.ExaSearchProvider") as mock_exa,
patch("maverick_mcp.agents.deep_research.TavilySearchProvider") as mock_tavily,
):
# Mock search providers
mock_exa_instance = Mock()
mock_tavily_instance = Mock()
mock_exa.return_value = mock_exa_instance
mock_tavily.return_value = mock_tavily_instance
agent = DeepResearchAgent(
llm=mock_llm,
persona="moderate",
exa_api_key="mock-key",
tavily_api_key="mock-key",
)
# Add mock providers to the agent for testing
agent.search_providers = [mock_exa_instance, mock_tavily_instance]
return agent
class TestWebSearchIntegration:
"""Test web search integration and result processing."""
@pytest.mark.asyncio
async def test_exa_search_provider_query_formatting(self):
"""Test that Exa search queries are properly formatted and sent."""
with patch("maverick_mcp.agents.deep_research.circuit_manager") as mock_circuit:
mock_circuit.get_or_create = AsyncMock()
mock_circuit_instance = AsyncMock()
mock_circuit.get_or_create.return_value = mock_circuit_instance
# Mock the Exa client response
mock_exa_response = Mock()
mock_exa_response.results = [
Mock(
url="https://example.com/test",
title="Test Article",
text="Test content for search",
summary="Test summary",
highlights=["key highlight"],
published_date="2024-01-25",
author="Test Author",
score=0.9,
)
]
with patch("exa_py.Exa") as mock_exa_client:
mock_client_instance = Mock()
mock_client_instance.search_and_contents.return_value = (
mock_exa_response
)
mock_exa_client.return_value = mock_client_instance
# Create actual provider (not mocked)
provider = ExaSearchProvider("test-api-key")
mock_circuit_instance.call.return_value = [
{
"url": "https://example.com/test",
"title": "Test Article",
"content": "Test content for search",
"summary": "Test summary",
"highlights": ["key highlight"],
"published_date": "2024-01-25",
"author": "Test Author",
"score": 0.9,
"provider": "exa",
}
]
# Test the search
results = await provider.search("AAPL stock analysis", num_results=5)
# Verify query was properly formatted
assert len(results) == 1
assert results[0]["url"] == "https://example.com/test"
assert results[0]["provider"] == "exa"
assert results[0]["score"] == 0.9
@pytest.mark.asyncio
async def test_tavily_search_result_processing(self):
"""Test Tavily search result processing and filtering."""
with patch("maverick_mcp.agents.deep_research.circuit_manager") as mock_circuit:
mock_circuit.get_or_create = AsyncMock()
mock_circuit_instance = AsyncMock()
mock_circuit.get_or_create.return_value = mock_circuit_instance
mock_tavily_response = {
"results": [
{
"url": "https://news.example.com/tech-news",
"title": "Tech News Article",
"content": "Content about technology trends",
"raw_content": "Extended raw content with more details",
"published_date": "2024-01-25",
"score": 0.85,
},
{
"url": "https://facebook.com/social-post", # Should be filtered out
"title": "Social Media Post",
"content": "Social media content",
"score": 0.7,
},
]
}
with patch("tavily.TavilyClient") as mock_tavily_client:
mock_client_instance = Mock()
mock_client_instance.search.return_value = mock_tavily_response
mock_tavily_client.return_value = mock_client_instance
provider = TavilySearchProvider("test-api-key")
mock_circuit_instance.call.return_value = [
{
"url": "https://news.example.com/tech-news",
"title": "Tech News Article",
"content": "Content about technology trends",
"raw_content": "Extended raw content with more details",
"published_date": "2024-01-25",
"score": 0.85,
"provider": "tavily",
}
]
results = await provider.search("tech trends analysis")
# Verify results are properly processed and social media filtered
assert len(results) == 1
assert results[0]["provider"] == "tavily"
assert "facebook.com" not in results[0]["url"]
@pytest.mark.asyncio
async def test_search_provider_fallback_behavior(self, mock_research_agent):
"""Test fallback behavior when search providers fail."""
# Mock the execute searches workflow step directly
with patch.object(mock_research_agent, "_execute_searches") as mock_execute:
# Mock first provider to fail, second to succeed
mock_research_agent.search_providers[0].search = AsyncMock(
side_effect=WebSearchError("Exa API rate limit exceeded")
)
mock_research_agent.search_providers[1].search = AsyncMock(
return_value=[
{
"url": "https://backup-source.com/article",
"title": "Backup Article",
"content": "Fallback content from secondary provider",
"provider": "tavily",
"score": 0.75,
}
]
)
# Mock successful execution with fallback results
mock_result = Mock()
mock_result.goto = "analyze_content"
mock_result.update = {
"search_results": [
{
"url": "https://backup-source.com/article",
"title": "Backup Article",
"content": "Fallback content from secondary provider",
"provider": "tavily",
"score": 0.75,
}
],
"research_status": "analyzing",
}
mock_execute.return_value = mock_result
# Test state for search execution
state = {"search_queries": ["AAPL analysis"], "research_depth": "standard"}
# Execute the search step
result = await mock_research_agent._execute_searches(state)
# Should handle provider failure gracefully
assert result.goto == "analyze_content"
assert len(result.update["search_results"]) > 0
@pytest.mark.asyncio
async def test_search_result_deduplication(self, comprehensive_search_results):
"""Test deduplication of search results from multiple providers."""
# Create search results with duplicates
duplicate_results = (
comprehensive_search_results
+ [
{
"url": "https://finance.yahoo.com/news/apple-earnings-q4-2024", # Duplicate URL
"title": "Apple Q4 Results (Duplicate)",
"content": "Duplicate content with different title",
"provider": "tavily",
"score": 0.7,
}
]
)
with patch.object(DeepResearchAgent, "_execute_searches") as mock_execute:
mock_execute.return_value = Mock()
DeepResearchAgent(llm=MagicMock(), persona="moderate")
# Test the deduplication logic directly
# Simulate search execution with duplicates
all_results = duplicate_results
unique_results = []
seen_urls = set()
depth_config = RESEARCH_DEPTH_LEVELS["standard"]
for result in all_results:
if (
result["url"] not in seen_urls
and len(unique_results) < depth_config["max_sources"]
):
unique_results.append(result)
seen_urls.add(result["url"])
# Verify deduplication worked
assert len(unique_results) == 5 # Should remove 1 duplicate
urls = [r["url"] for r in unique_results]
assert len(set(urls)) == len(urls) # All URLs should be unique
class TestResearchSynthesis:
"""Test research synthesis and iterative querying functionality."""
@pytest.mark.asyncio
async def test_content_analysis_with_persona_focus(
self, comprehensive_search_results
):
"""Test that content analysis adapts to persona focus areas."""
# Mock LLM with persona-specific responses
mock_llm = MagicMock()
def persona_aware_response(messages):
response = Mock()
# Check if content is about dividends for conservative persona
content = messages[1].content if len(messages) > 1 else ""
if "conservative" in content and "dividend" in content:
response.content = json.dumps(
{
"KEY_INSIGHTS": [
"Strong dividend yield provides stable income"
],
"SENTIMENT": {"direction": "bullish", "confidence": 0.7},
"RISK_FACTORS": ["Interest rate sensitivity"],
"OPPORTUNITIES": ["Consistent dividend growth"],
"CREDIBILITY": 0.85,
"RELEVANCE": 0.9,
"SUMMARY": "Dividend analysis shows strong income potential for conservative investors.",
}
)
else:
response.content = json.dumps(
{
"KEY_INSIGHTS": ["Growth opportunity in AI sector"],
"SENTIMENT": {"direction": "bullish", "confidence": 0.8},
"RISK_FACTORS": ["Market competition"],
"OPPORTUNITIES": ["Innovation leadership"],
"CREDIBILITY": 0.8,
"RELEVANCE": 0.85,
"SUMMARY": "Analysis shows strong growth opportunities through innovation.",
}
)
return response
mock_llm.ainvoke = AsyncMock(side_effect=persona_aware_response)
analyzer = ContentAnalyzer(mock_llm)
# Test conservative persona analysis with dividend content
conservative_result = await analyzer.analyze_content(
content=comprehensive_search_results[4]["content"], # Dividend article
persona="conservative",
)
# Verify conservative-focused analysis
assert conservative_result["relevance_score"] > 0.8
assert (
"dividend" in conservative_result["summary"].lower()
or "income" in conservative_result["summary"].lower()
)
# Test aggressive persona analysis with growth content
aggressive_result = await analyzer.analyze_content(
content=comprehensive_search_results[3]["content"], # AI strategy article
persona="aggressive",
)
# Verify aggressive-focused analysis
assert aggressive_result["relevance_score"] > 0.7
assert any(
keyword in aggressive_result["summary"].lower()
for keyword in ["growth", "opportunity", "innovation"]
)
@pytest.mark.asyncio
async def test_research_synthesis_workflow(
self, mock_research_agent, comprehensive_search_results
):
"""Test the complete research synthesis workflow."""
# Mock the workflow components using the actual graph structure
with patch.object(mock_research_agent, "graph") as mock_graph:
# Mock successful workflow execution with all required fields
mock_result = {
"research_topic": "AAPL",
"research_depth": "standard",
"search_queries": ["AAPL financial analysis", "Apple earnings 2024"],
"search_results": comprehensive_search_results,
"analyzed_content": [
{
**result,
"analysis": {
"insights": [
"Strong revenue growth",
"AI integration opportunity",
],
"sentiment": {"direction": "bullish", "confidence": 0.8},
"risk_factors": [
"Supply chain risks",
"Regulatory concerns",
],
"opportunities": ["AI monetization", "Services expansion"],
"credibility_score": 0.85,
"relevance_score": 0.9,
"summary": "Strong fundamentals with growth catalysts",
},
}
for result in comprehensive_search_results[:3]
],
"validated_sources": comprehensive_search_results[:3],
"research_findings": {
"synthesis": "Apple shows strong fundamentals with growth opportunities",
"key_insights": ["Revenue growth", "AI opportunities"],
"overall_sentiment": {"direction": "bullish", "confidence": 0.8},
"confidence_score": 0.82,
},
"citations": [
{"id": 1, "title": "Apple Earnings", "url": "https://example.com/1"}
],
"research_status": "completed",
"research_confidence": 0.82,
"execution_time_ms": 1500.0,
"persona": "moderate",
}
mock_graph.ainvoke = AsyncMock(return_value=mock_result)
# Execute research
result = await mock_research_agent.research_comprehensive(
topic="AAPL", session_id="test_synthesis", depth="standard"
)
# Verify synthesis was performed
assert result["status"] == "success"
assert "findings" in result
assert result["sources_analyzed"] > 0
@pytest.mark.asyncio
async def test_iterative_research_refinement(self, mock_research_agent):
"""Test iterative research with follow-up queries based on initial findings."""
# Mock initial research finding gaps
with patch.object(
mock_research_agent, "_generate_search_queries"
) as mock_queries:
# First iteration - general queries
mock_queries.return_value = [
"NVDA competitive analysis",
"NVIDIA market position 2024",
]
queries_first = await mock_research_agent._generate_search_queries(
topic="NVDA competitive position",
persona_focus=PERSONA_RESEARCH_FOCUS["moderate"],
depth_config=RESEARCH_DEPTH_LEVELS["standard"],
)
# Verify initial queries are broad
assert any("competitive" in q.lower() for q in queries_first)
assert any("NVDA" in q or "NVIDIA" in q for q in queries_first)
@pytest.mark.asyncio
async def test_fact_validation_and_source_credibility(self, mock_research_agent):
"""Test fact validation and source credibility scoring."""
# Test source credibility calculation
test_sources = [
{
"url": "https://sec.gov/filing/aapl-10k-2024",
"title": "Apple 10-K Filing",
"content": "Official SEC filing content",
"published_date": "2024-01-20T00:00:00Z",
"analysis": {"credibility_score": 0.9},
},
{
"url": "https://random-blog.com/apple-speculation",
"title": "Random Blog Post",
"content": "Speculative content with no sources",
"published_date": "2023-06-01T00:00:00Z", # Old content
"analysis": {"credibility_score": 0.3},
},
]
# Test credibility scoring
for source in test_sources:
credibility = mock_research_agent._calculate_source_credibility(source)
if "sec.gov" in source["url"]:
assert (
credibility >= 0.8
) # Government sources should be highly credible
elif "random-blog" in source["url"]:
assert credibility <= 0.6 # Random blogs should have lower credibility
class TestPersonaBasedResearch:
"""Test persona-based research behavior and adaptation."""
@pytest.mark.asyncio
async def test_conservative_persona_research_focus(self, mock_llm):
"""Test that conservative persona focuses on stability and risk factors."""
agent = DeepResearchAgent(llm=mock_llm, persona="conservative")
# Test search query generation for conservative persona
persona_focus = PERSONA_RESEARCH_FOCUS["conservative"]
depth_config = RESEARCH_DEPTH_LEVELS["standard"]
queries = await agent._generate_search_queries(
topic="AAPL", persona_focus=persona_focus, depth_config=depth_config
)
# Verify conservative-focused queries
query_text = " ".join(queries).lower()
assert any(
keyword in query_text for keyword in ["dividend", "stability", "risk"]
)
# Test that conservative persona performs more thorough fact-checking
assert persona_focus["risk_focus"] == "downside protection"
assert persona_focus["time_horizon"] == "long-term"
@pytest.mark.asyncio
async def test_aggressive_persona_research_behavior(self, mock_llm):
"""Test aggressive persona explores speculative opportunities."""
agent = DeepResearchAgent(llm=mock_llm, persona="aggressive")
persona_focus = PERSONA_RESEARCH_FOCUS["aggressive"]
# Test query generation for aggressive persona
queries = await agent._generate_search_queries(
topic="TSLA",
persona_focus=persona_focus,
depth_config=RESEARCH_DEPTH_LEVELS["standard"],
)
# Verify aggressive-focused queries
query_text = " ".join(queries).lower()
assert any(
keyword in query_text for keyword in ["growth", "momentum", "opportunity"]
)
# Verify aggressive characteristics
assert persona_focus["risk_focus"] == "upside potential"
assert "innovation" in persona_focus["keywords"]
@pytest.mark.asyncio
async def test_day_trader_persona_short_term_focus(self, mock_llm):
"""Test day trader persona focuses on short-term catalysts and volatility."""
DeepResearchAgent(llm=mock_llm, persona="day_trader")
persona_focus = PERSONA_RESEARCH_FOCUS["day_trader"]
# Test characteristics specific to day trader persona
assert persona_focus["time_horizon"] == "intraday to weekly"
assert "catalysts" in persona_focus["keywords"]
assert "volatility" in persona_focus["keywords"]
assert "earnings" in persona_focus["keywords"]
# Test sources preference
assert "breaking news" in persona_focus["sources"]
assert "social sentiment" in persona_focus["sources"]
@pytest.mark.asyncio
async def test_research_depth_differences_by_persona(self, mock_llm):
"""Test that conservative personas do more thorough research."""
conservative_agent = DeepResearchAgent(
llm=mock_llm, persona="conservative", default_depth="comprehensive"
)
aggressive_agent = DeepResearchAgent(
llm=mock_llm, persona="aggressive", default_depth="standard"
)
# Conservative should use more comprehensive depth by default
assert conservative_agent.default_depth == "comprehensive"
# Aggressive can use standard depth for faster decisions
assert aggressive_agent.default_depth == "standard"
# Test depth level configurations
comprehensive_config = RESEARCH_DEPTH_LEVELS["comprehensive"]
standard_config = RESEARCH_DEPTH_LEVELS["standard"]
assert comprehensive_config["max_sources"] > standard_config["max_sources"]
assert comprehensive_config["validation_required"]
class TestMultiStepResearchWorkflow:
"""Test complete multi-step research workflows."""
@pytest.mark.asyncio
async def test_complete_research_workflow_success(
self, mock_research_agent, comprehensive_search_results
):
"""Test complete research workflow from query to final report."""
# Mock all workflow steps
with patch.object(mock_research_agent, "graph") as mock_graph:
# Mock successful workflow execution
mock_result = {
"research_topic": "AAPL",
"research_depth": "standard",
"search_queries": ["AAPL analysis", "Apple earnings"],
"search_results": comprehensive_search_results,
"analyzed_content": [
{
**result,
"analysis": {
"insights": ["Strong performance"],
"sentiment": {"direction": "bullish", "confidence": 0.8},
"credibility_score": 0.85,
},
}
for result in comprehensive_search_results
],
"validated_sources": comprehensive_search_results[:3],
"research_findings": {
"synthesis": "Apple shows strong fundamentals with growth opportunities",
"key_insights": [
"Revenue growth",
"AI opportunities",
"Strong cash flow",
],
"overall_sentiment": {"direction": "bullish", "confidence": 0.8},
"confidence_score": 0.82,
},
"citations": [
{
"id": 1,
"title": "Apple Earnings",
"url": "https://example.com/1",
},
{
"id": 2,
"title": "Technical Analysis",
"url": "https://example.com/2",
},
],
"research_status": "completed",
"research_confidence": 0.82,
"execution_time_ms": 1500.0,
}
mock_graph.ainvoke = AsyncMock(return_value=mock_result)
# Execute complete research
result = await mock_research_agent.research_comprehensive(
topic="AAPL", session_id="workflow_test", depth="standard"
)
# Verify complete workflow
assert result["status"] == "success"
assert result["agent_type"] == "deep_research"
assert result["research_topic"] == "AAPL"
assert result["sources_analyzed"] == 3
assert result["confidence_score"] == 0.82
assert len(result["citations"]) == 2
@pytest.mark.asyncio
async def test_research_workflow_with_insufficient_information(
self, mock_research_agent
):
"""Test workflow handling when insufficient information is found."""
# Mock scenario with limited/poor quality results
with patch.object(mock_research_agent, "graph") as mock_graph:
mock_result = {
"research_topic": "OBSCURE_STOCK",
"research_depth": "standard",
"search_results": [], # No results found
"validated_sources": [],
"research_findings": {},
"research_confidence": 0.1, # Very low confidence
"research_status": "completed",
"execution_time_ms": 800.0,
}
mock_graph.ainvoke = AsyncMock(return_value=mock_result)
result = await mock_research_agent.research_comprehensive(
topic="OBSCURE_STOCK", session_id="insufficient_test"
)
# Should handle insufficient information gracefully
assert result["status"] == "success"
assert result["confidence_score"] == 0.1
assert result["sources_analyzed"] == 0
@pytest.mark.asyncio
async def test_research_with_conflicting_information(self, mock_research_agent):
"""Test handling of conflicting information from different sources."""
conflicting_sources = [
{
"url": "https://bull-analyst.com/buy-rating",
"title": "Strong Buy Rating for AAPL",
"analysis": {
"sentiment": {"direction": "bullish", "confidence": 0.9},
"credibility_score": 0.8,
},
},
{
"url": "https://bear-analyst.com/sell-rating",
"title": "Sell Rating for AAPL Due to Overvaluation",
"analysis": {
"sentiment": {"direction": "bearish", "confidence": 0.8},
"credibility_score": 0.7,
},
},
]
# Test overall sentiment calculation with conflicting sources
overall_sentiment = mock_research_agent._calculate_overall_sentiment(
conflicting_sources
)
# Should handle conflicts by providing consensus information
assert overall_sentiment["direction"] in ["bullish", "bearish", "neutral"]
assert "consensus" in overall_sentiment
assert overall_sentiment["source_count"] == 2
@pytest.mark.asyncio
async def test_research_focus_and_refinement(self, mock_research_agent):
"""Test research focusing and refinement based on initial findings."""
# Test different research focus areas
focus_areas = ["sentiment", "fundamental", "competitive"]
for focus in focus_areas:
route = mock_research_agent._route_specialized_analysis(
{"focus_areas": [focus]}
)
if focus == "sentiment":
assert route == "sentiment"
elif focus == "fundamental":
assert route == "fundamental"
elif focus == "competitive":
assert route == "competitive"
class TestResearchMethodSpecialization:
"""Test specialized research methods: sentiment, fundamental, competitive analysis."""
@pytest.mark.asyncio
async def test_sentiment_analysis_specialization(self, mock_research_agent):
"""Test sentiment analysis research method."""
test_state = {
"focus_areas": [
"sentiment",
"news",
], # Use keywords that match routing logic
"analyzed_content": [],
}
# Test sentiment analysis routing
route = mock_research_agent._route_specialized_analysis(test_state)
assert route == "sentiment"
# Test sentiment analysis execution (mocked)
with patch.object(mock_research_agent, "_analyze_content") as mock_analyze:
mock_analyze.return_value = Mock()
await mock_research_agent._sentiment_analysis(test_state)
mock_analyze.assert_called_once()
@pytest.mark.asyncio
async def test_fundamental_analysis_specialization(self, mock_research_agent):
"""Test fundamental analysis research method."""
test_state = {
"focus_areas": [
"fundamental",
"financial",
], # Use exact keywords from routing logic
"analyzed_content": [],
}
# Test fundamental analysis routing
route = mock_research_agent._route_specialized_analysis(test_state)
assert route == "fundamental"
# Test fundamental analysis execution
with patch.object(mock_research_agent, "_analyze_content") as mock_analyze:
mock_analyze.return_value = Mock()
await mock_research_agent._fundamental_analysis(test_state)
mock_analyze.assert_called_once()
@pytest.mark.asyncio
async def test_competitive_analysis_specialization(self, mock_research_agent):
"""Test competitive analysis research method."""
test_state = {
"focus_areas": [
"competitive",
"market",
], # Use exact keywords from routing logic
"analyzed_content": [],
}
# Test competitive analysis routing
route = mock_research_agent._route_specialized_analysis(test_state)
assert route == "competitive"
# Test competitive analysis execution
with patch.object(mock_research_agent, "_analyze_content") as mock_analyze:
mock_analyze.return_value = Mock()
await mock_research_agent._competitive_analysis(test_state)
mock_analyze.assert_called_once()
class TestErrorHandlingAndResilience:
"""Test error handling and system resilience."""
@pytest.mark.asyncio
async def test_research_agent_with_no_search_providers(self, mock_llm):
"""Test research agent behavior with no available search providers."""
# Create agent without search providers
agent = DeepResearchAgent(llm=mock_llm, persona="moderate")
# Should initialize successfully but with limited capabilities
assert len(agent.search_providers) == 0
# Research should still attempt to work but with limited results
result = await agent.research_comprehensive(
topic="TEST", session_id="no_providers_test"
)
# Should not crash, may return limited results
assert "status" in result
@pytest.mark.asyncio
async def test_content_analysis_fallback_on_llm_failure(
self, comprehensive_search_results
):
"""Test content analysis fallback when LLM fails."""
# Mock LLM that fails
failing_llm = MagicMock()
failing_llm.ainvoke = AsyncMock(
side_effect=Exception("LLM service unavailable")
)
analyzer = ContentAnalyzer(failing_llm)
# Should use fallback analysis
result = await analyzer.analyze_content(
content=comprehensive_search_results[0]["content"], persona="conservative"
)
# Verify fallback was used
assert result["fallback_used"]
assert result["sentiment"]["direction"] in ["bullish", "bearish", "neutral"]
assert 0 <= result["credibility_score"] <= 1
assert 0 <= result["relevance_score"] <= 1
@pytest.mark.asyncio
async def test_partial_search_failure_handling(self, mock_research_agent):
"""Test handling when some but not all search providers fail."""
# Test the actual search execution logic directly
mock_research_agent.search_providers[0].search = AsyncMock(
side_effect=WebSearchError("Provider 1 failed")
)
mock_research_agent.search_providers[1].search = AsyncMock(
return_value=[
{
"url": "https://working-provider.com/article",
"title": "Working Provider Article",
"content": "Content from working provider",
"provider": "working_provider",
"score": 0.8,
}
]
)
# Test the search execution directly
state = {"search_queries": ["test query"], "research_depth": "standard"}
result = await mock_research_agent._execute_searches(state)
# Should continue with working providers and return results
assert hasattr(result, "update")
assert "search_results" in result.update
# Should have at least the working provider results
assert (
len(result.update["search_results"]) >= 0
) # May be 0 if all fail, but should not crash
@pytest.mark.asyncio
async def test_research_timeout_and_circuit_breaker(self, mock_research_agent):
"""Test research timeout handling and circuit breaker behavior."""
# Test would require actual circuit breaker implementation
# This is a placeholder for circuit breaker testing
with patch(
"maverick_mcp.agents.circuit_breaker.circuit_manager"
) as mock_circuit:
mock_circuit.get_or_create = AsyncMock()
circuit_instance = AsyncMock()
mock_circuit.get_or_create.return_value = circuit_instance
# Mock circuit breaker open state
circuit_instance.call = AsyncMock(
side_effect=Exception("Circuit breaker open")
)
# Research should handle circuit breaker gracefully
# Implementation depends on actual circuit breaker behavior
pass
class TestResearchQualityAndValidation:
"""Test research quality assurance and validation mechanisms."""
def test_research_confidence_calculation(self, mock_research_agent):
"""Test research confidence calculation based on multiple factors."""
# Test with high-quality sources
high_quality_sources = [
{
"url": "https://sec.gov/filing1",
"credibility_score": 0.95,
"analysis": {"relevance_score": 0.9},
},
{
"url": "https://bloomberg.com/article1",
"credibility_score": 0.85,
"analysis": {"relevance_score": 0.8},
},
{
"url": "https://reuters.com/article2",
"credibility_score": 0.8,
"analysis": {"relevance_score": 0.85},
},
]
confidence = mock_research_agent._calculate_research_confidence(
high_quality_sources
)
assert confidence >= 0.65 # Should be reasonably high confidence
# Test with low-quality sources
low_quality_sources = [
{
"url": "https://random-blog.com/post1",
"credibility_score": 0.3,
"analysis": {"relevance_score": 0.4},
}
]
low_confidence = mock_research_agent._calculate_research_confidence(
low_quality_sources
)
assert low_confidence < 0.5 # Should be low confidence
def test_source_diversity_scoring(self, mock_research_agent):
"""Test source diversity calculation."""
diverse_sources = [
{"url": "https://sec.gov/filing"},
{"url": "https://bloomberg.com/news"},
{"url": "https://reuters.com/article"},
{"url": "https://wsj.com/story"},
{"url": "https://ft.com/content"},
]
confidence = mock_research_agent._calculate_research_confidence(diverse_sources)
# More diverse sources should contribute to higher confidence
assert confidence > 0.6
def test_investment_recommendation_logic(self, mock_research_agent):
"""Test investment recommendation based on research findings."""
# Test bullish scenario
bullish_sources = [
{
"analysis": {
"sentiment": {"direction": "bullish", "confidence": 0.9},
"credibility_score": 0.8,
}
}
]
recommendation = mock_research_agent._recommend_action(bullish_sources)
# Conservative persona should be more cautious
if mock_research_agent.persona.name.lower() == "conservative":
assert (
"gradual" in recommendation.lower()
or "risk management" in recommendation.lower()
)
else:
assert (
"consider" in recommendation.lower()
and "position" in recommendation.lower()
)
if __name__ == "__main__":
pytest.main([__file__, "-v", "--tb=short"])
```
--------------------------------------------------------------------------------
/maverick_mcp/api/routers/backtesting.py:
--------------------------------------------------------------------------------
```python
"""MCP router for VectorBT backtesting tools with structured logging."""
from typing import Any
import numpy as np
from fastmcp import Context
from maverick_mcp.backtesting import (
BacktestAnalyzer,
StrategyOptimizer,
VectorBTEngine,
)
from maverick_mcp.backtesting.strategies import STRATEGY_TEMPLATES, StrategyParser
from maverick_mcp.backtesting.strategies.templates import (
get_strategy_info,
list_available_strategies,
)
from maverick_mcp.backtesting.visualization import (
generate_equity_curve,
generate_optimization_heatmap,
generate_performance_dashboard,
generate_trade_scatter,
)
from maverick_mcp.utils.debug_utils import debug_operation
from maverick_mcp.utils.logging import get_logger
from maverick_mcp.utils.structured_logger import (
CorrelationIDGenerator,
get_performance_logger,
with_structured_logging,
)
# Initialize performance logger for backtesting router
performance_logger = get_performance_logger("backtesting_router")
logger = get_logger(__name__)
def convert_numpy_types(obj: Any) -> Any:
"""Recursively convert numpy types to Python native types for JSON serialization.
Args:
obj: Any object that might contain numpy types
Returns:
Object with all numpy types converted to Python native types
"""
import pandas as pd
# Check for numpy integer types (more robust using numpy's type hierarchy)
if isinstance(obj, np.integer):
return int(obj)
# Check for numpy floating point types
elif isinstance(obj, np.floating):
return float(obj)
# Check for numpy boolean type
elif isinstance(obj, np.bool_ | bool) and hasattr(obj, "item"):
return bool(obj)
# Check for numpy complex types
elif isinstance(obj, np.complexfloating):
return complex(obj)
# Handle numpy arrays
elif isinstance(obj, np.ndarray):
return obj.tolist()
# Handle pandas Series
elif isinstance(obj, pd.Series):
return obj.tolist()
# Handle pandas DataFrame
elif isinstance(obj, pd.DataFrame):
return obj.to_dict("records")
# Handle NaN/None values
elif pd.isna(obj):
return None
# Handle other numpy scalars with .item() method
elif hasattr(obj, "item") and hasattr(obj, "dtype"):
try:
return obj.item()
except Exception:
return str(obj)
# Recursively handle dictionaries
elif isinstance(obj, dict):
return {key: convert_numpy_types(value) for key, value in obj.items()}
# Recursively handle lists and tuples
elif isinstance(obj, list | tuple):
return [convert_numpy_types(item) for item in obj]
# Try to handle custom objects with __dict__
elif hasattr(obj, "__dict__") and not isinstance(obj, type):
try:
return convert_numpy_types(obj.__dict__)
except Exception:
return str(obj)
else:
# Return as-is for regular Python types
return obj
def setup_backtesting_tools(mcp):
"""Set up VectorBT backtesting tools for MCP.
Args:
mcp: FastMCP instance
"""
@mcp.tool()
@with_structured_logging("run_backtest", include_performance=True, log_params=True)
@debug_operation("run_backtest", enable_profiling=True, symbol="backtest_symbol")
async def run_backtest(
ctx: Context,
symbol: str,
strategy: str = "sma_cross",
start_date: str | None = None,
end_date: str | None = None,
initial_capital: float = 10000.0,
fast_period: str | int | None = None,
slow_period: str | int | None = None,
period: str | int | None = None,
oversold: str | float | None = None,
overbought: str | float | None = None,
signal_period: str | int | None = None,
std_dev: str | float | None = None,
lookback: str | int | None = None,
threshold: str | float | None = None,
z_score_threshold: str | float | None = None,
breakout_factor: str | float | None = None,
) -> dict[str, Any]:
"""Run a VectorBT backtest with specified strategy and parameters.
Args:
symbol: Stock symbol to backtest
strategy: Strategy type (sma_cross, rsi, macd, bollinger, momentum, etc.)
start_date: Start date (YYYY-MM-DD), defaults to 1 year ago
end_date: End date (YYYY-MM-DD), defaults to today
initial_capital: Starting capital for backtest
Strategy-specific parameters passed as individual arguments (e.g., fast_period=10, slow_period=20)
Returns:
Comprehensive backtest results including metrics, trades, and analysis
Examples:
run_backtest("AAPL", "sma_cross", fast_period=10, slow_period=20)
run_backtest("TSLA", "rsi", period=14, oversold=30, overbought=70)
"""
from datetime import datetime, timedelta
# Default date range
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
# Convert string parameters to appropriate types
def convert_param(value, param_type):
"""Convert string parameter to appropriate type."""
if value is None:
return None
if isinstance(value, str):
try:
if param_type is int:
return int(value)
elif param_type is float:
return float(value)
except (ValueError, TypeError) as e:
raise ValueError(
f"Invalid {param_type.__name__} value: {value}"
) from e
return value
# Build parameters dict from provided arguments with type conversion
param_map = {
"fast_period": convert_param(fast_period, int),
"slow_period": convert_param(slow_period, int),
"period": convert_param(period, int),
"oversold": convert_param(oversold, float),
"overbought": convert_param(overbought, float),
"signal_period": convert_param(signal_period, int),
"std_dev": convert_param(std_dev, float),
"lookback": convert_param(lookback, int),
"threshold": convert_param(threshold, float),
"z_score_threshold": convert_param(z_score_threshold, float),
"breakout_factor": convert_param(breakout_factor, float),
}
# Get default parameters for strategy
if strategy in STRATEGY_TEMPLATES:
parameters = dict(STRATEGY_TEMPLATES[strategy]["parameters"])
# Override with provided non-None parameters
for param_name, param_value in param_map.items():
if param_value is not None:
parameters[param_name] = param_value
else:
# Use only provided parameters for unknown strategies
parameters = {k: v for k, v in param_map.items() if v is not None}
# Initialize engine
engine = VectorBTEngine()
# Run backtest
results = await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date=start_date,
end_date=end_date,
initial_capital=initial_capital,
)
# Analyze results
analyzer = BacktestAnalyzer()
analysis = analyzer.analyze(results)
# Combine results and analysis
results["analysis"] = analysis
# Log business metrics
if results.get("metrics"):
metrics = results["metrics"]
performance_logger.log_business_metric(
"backtest_completion",
1,
symbol=symbol,
strategy=strategy,
total_return=metrics.get("total_return", 0),
sharpe_ratio=metrics.get("sharpe_ratio", 0),
max_drawdown=metrics.get("max_drawdown", 0),
total_trades=metrics.get("total_trades", 0),
)
# Set correlation context for downstream operations
CorrelationIDGenerator.set_correlation_id()
return results
@mcp.tool()
@with_structured_logging(
"optimize_strategy", include_performance=True, log_params=True
)
@debug_operation(
"optimize_strategy", enable_profiling=True, strategy="optimization_strategy"
)
async def optimize_strategy(
ctx: Context,
symbol: str,
strategy: str = "sma_cross",
start_date: str | None = None,
end_date: str | None = None,
optimization_metric: str = "sharpe_ratio",
optimization_level: str = "medium",
top_n: int = 10,
) -> dict[str, Any]:
"""Optimize strategy parameters using VectorBT grid search.
Args:
symbol: Stock symbol to optimize
strategy: Strategy type to optimize
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
optimization_metric: Metric to optimize (sharpe_ratio, total_return, win_rate, etc.)
optimization_level: Level of optimization (coarse, medium, fine)
top_n: Number of top results to return
Returns:
Optimization results with best parameters and performance metrics
"""
from datetime import datetime, timedelta
# Default date range
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365 * 2)).strftime("%Y-%m-%d")
# Initialize engine and optimizer
engine = VectorBTEngine()
optimizer = StrategyOptimizer(engine)
# Generate parameter grid
param_grid = optimizer.generate_param_grid(strategy, optimization_level)
# Run optimization
results = await engine.optimize_parameters(
symbol=symbol,
strategy_type=strategy,
param_grid=param_grid,
start_date=start_date,
end_date=end_date,
optimization_metric=optimization_metric,
top_n=top_n,
)
return results
@mcp.tool()
async def walk_forward_analysis(
ctx: Context,
symbol: str,
strategy: str = "sma_cross",
start_date: str | None = None,
end_date: str | None = None,
window_size: int = 252,
step_size: int = 63,
) -> dict[str, Any]:
"""Perform walk-forward analysis to test strategy robustness.
Args:
symbol: Stock symbol to analyze
strategy: Strategy type
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
window_size: Test window size in trading days (default: 1 year)
step_size: Step size for rolling window (default: 1 quarter)
Returns:
Walk-forward analysis results with out-of-sample performance
"""
from datetime import datetime, timedelta
# Default date range (3 years for walk-forward)
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365 * 3)).strftime("%Y-%m-%d")
# Initialize engine and optimizer
engine = VectorBTEngine()
optimizer = StrategyOptimizer(engine)
# Get default parameters
parameters = STRATEGY_TEMPLATES.get(strategy, {}).get("parameters", {})
# Run walk-forward analysis
results = await optimizer.walk_forward_analysis(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date=start_date,
end_date=end_date,
window_size=window_size,
step_size=step_size,
)
return results
@mcp.tool()
async def monte_carlo_simulation(
ctx: Context,
symbol: str,
strategy: str = "sma_cross",
start_date: str | None = None,
end_date: str | None = None,
num_simulations: int = 1000,
fast_period: str | int | None = None,
slow_period: str | int | None = None,
period: str | int | None = None,
) -> dict[str, Any]:
"""Run Monte Carlo simulation on backtest results.
Args:
symbol: Stock symbol
strategy: Strategy type
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
num_simulations: Number of Monte Carlo simulations
Strategy-specific parameters as individual arguments
Returns:
Monte Carlo simulation results with confidence intervals
"""
from datetime import datetime, timedelta
# Default date range
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
# Convert string parameters to appropriate types
def convert_param(value, param_type):
"""Convert string parameter to appropriate type."""
if value is None:
return None
if isinstance(value, str):
try:
if param_type is int:
return int(value)
elif param_type is float:
return float(value)
except (ValueError, TypeError) as e:
raise ValueError(
f"Invalid {param_type.__name__} value: {value}"
) from e
return value
# Build parameters dict from provided arguments with type conversion
param_map = {
"fast_period": convert_param(fast_period, int),
"slow_period": convert_param(slow_period, int),
"period": convert_param(period, int),
}
# Get parameters
if strategy in STRATEGY_TEMPLATES:
parameters = dict(STRATEGY_TEMPLATES[strategy]["parameters"])
# Override with provided non-None parameters
for param_name, param_value in param_map.items():
if param_value is not None:
parameters[param_name] = param_value
else:
# Use only provided parameters for unknown strategies
parameters = {k: v for k, v in param_map.items() if v is not None}
# Run backtest first
engine = VectorBTEngine()
backtest_results = await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date=start_date,
end_date=end_date,
)
# Run Monte Carlo simulation
optimizer = StrategyOptimizer(engine)
mc_results = await optimizer.monte_carlo_simulation(
backtest_results=backtest_results,
num_simulations=num_simulations,
)
return mc_results
@mcp.tool()
async def compare_strategies(
ctx: Context,
symbol: str,
strategies: list[str] | str | None = None,
start_date: str | None = None,
end_date: str | None = None,
) -> dict[str, Any]:
"""Compare multiple strategies on the same symbol.
Args:
symbol: Stock symbol
strategies: List of strategy types to compare (defaults to all)
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
Comparison results with rankings and analysis
"""
from datetime import datetime, timedelta
# Default date range
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
# Handle strategies as JSON string from some clients
if isinstance(strategies, str):
import json
try:
strategies = json.loads(strategies)
except json.JSONDecodeError:
# If it's not JSON, treat it as a single strategy
strategies = [strategies]
# Default to comparing top strategies
if not strategies:
strategies = ["sma_cross", "rsi", "macd", "bollinger", "momentum"]
# Run backtests for each strategy
engine = VectorBTEngine()
results_list = []
for strategy in strategies:
try:
# Get default parameters
parameters = STRATEGY_TEMPLATES.get(strategy, {}).get("parameters", {})
# Run backtest
results = await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date=start_date,
end_date=end_date,
)
results_list.append(results)
except Exception:
# Skip failed strategies
continue
# Compare results
analyzer = BacktestAnalyzer()
comparison = analyzer.compare_strategies(results_list)
return comparison
@mcp.tool()
async def list_strategies(ctx: Context) -> dict[str, Any]:
"""List all available VectorBT strategies with descriptions.
Returns:
Dictionary of available strategies and their information
"""
strategies = {}
for strategy_type in list_available_strategies():
strategies[strategy_type] = get_strategy_info(strategy_type)
return {
"available_strategies": strategies,
"total_count": len(strategies),
"categories": {
"trend_following": ["sma_cross", "ema_cross", "macd", "breakout"],
"mean_reversion": ["rsi", "bollinger", "mean_reversion"],
"momentum": ["momentum", "volume_momentum"],
},
}
@mcp.tool()
async def parse_strategy(ctx: Context, description: str) -> dict[str, Any]:
"""Parse natural language strategy description into VectorBT parameters.
Args:
description: Natural language description of trading strategy
Returns:
Parsed strategy configuration with type and parameters
Examples:
"Buy when RSI is below 30 and sell when above 70"
"Use 10-day and 20-day moving average crossover"
"MACD strategy with standard parameters"
"""
parser = StrategyParser()
config = parser.parse_simple(description)
# Validate the parsed strategy
if parser.validate_strategy(config):
return {
"success": True,
"strategy": config,
"message": f"Successfully parsed as {config['strategy_type']} strategy",
}
else:
return {
"success": False,
"strategy": config,
"message": "Could not fully parse strategy, using defaults",
}
@mcp.tool()
async def backtest_portfolio(
ctx: Context,
symbols: list[str],
strategy: str = "sma_cross",
start_date: str | None = None,
end_date: str | None = None,
initial_capital: float = 10000.0,
position_size: float = 0.1,
fast_period: str | int | None = None,
slow_period: str | int | None = None,
period: str | int | None = None,
) -> dict[str, Any]:
"""Backtest a strategy across multiple symbols (portfolio).
Args:
symbols: List of stock symbols
strategy: Strategy type to apply
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
initial_capital: Starting capital
position_size: Position size per symbol (0.1 = 10%)
Strategy-specific parameters as individual arguments
Returns:
Portfolio backtest results with aggregate metrics
"""
from datetime import datetime, timedelta
# Default date range
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
# Convert string parameters to appropriate types
def convert_param(value, param_type):
"""Convert string parameter to appropriate type."""
if value is None:
return None
if isinstance(value, str):
try:
if param_type is int:
return int(value)
elif param_type is float:
return float(value)
except (ValueError, TypeError) as e:
raise ValueError(
f"Invalid {param_type.__name__} value: {value}"
) from e
return value
# Build parameters dict from provided arguments with type conversion
param_map = {
"fast_period": convert_param(fast_period, int),
"slow_period": convert_param(slow_period, int),
"period": convert_param(period, int),
}
# Get parameters
if strategy in STRATEGY_TEMPLATES:
parameters = dict(STRATEGY_TEMPLATES[strategy]["parameters"])
# Override with provided non-None parameters
for param_name, param_value in param_map.items():
if param_value is not None:
parameters[param_name] = param_value
else:
# Use only provided parameters for unknown strategies
parameters = {k: v for k, v in param_map.items() if v is not None}
# Run backtests for each symbol
engine = VectorBTEngine()
portfolio_results = []
capital_per_symbol = initial_capital * position_size
for symbol in symbols:
try:
results = await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date=start_date,
end_date=end_date,
initial_capital=capital_per_symbol,
)
portfolio_results.append(results)
except Exception:
# Skip failed symbols
continue
if not portfolio_results:
return {"error": "No symbols could be backtested"}
# Aggregate portfolio metrics
total_return = sum(
r["metrics"]["total_return"] for r in portfolio_results
) / len(portfolio_results)
avg_sharpe = sum(r["metrics"]["sharpe_ratio"] for r in portfolio_results) / len(
portfolio_results
)
max_drawdown = max(r["metrics"]["max_drawdown"] for r in portfolio_results)
total_trades = sum(r["metrics"]["total_trades"] for r in portfolio_results)
return {
"portfolio_metrics": {
"symbols_tested": len(portfolio_results),
"total_return": total_return,
"average_sharpe": avg_sharpe,
"max_drawdown": max_drawdown,
"total_trades": total_trades,
},
"individual_results": portfolio_results,
"summary": f"Portfolio backtest of {len(portfolio_results)} symbols with {strategy} strategy",
}
@mcp.tool()
async def generate_backtest_charts(
ctx: Context,
symbol: str,
strategy: str = "sma_cross",
start_date: str | None = None,
end_date: str | None = None,
theme: str = "light",
) -> dict[str, str]:
"""Generate comprehensive charts for a backtest.
Args:
symbol: Stock symbol
strategy: Strategy type
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
theme: Chart theme (light or dark)
Returns:
Dictionary of base64-encoded chart images
"""
from datetime import datetime, timedelta
import pandas as pd
# Default date range
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
# Run backtest
engine = VectorBTEngine()
# Get default parameters for the strategy
from maverick_mcp.backtesting.strategies import STRATEGY_TEMPLATES
parameters = STRATEGY_TEMPLATES.get(strategy, {}).get("parameters", {})
results = await engine.run_backtest(
symbol=symbol,
strategy_type=strategy,
parameters=parameters,
start_date=start_date,
end_date=end_date,
)
# Prepare data for charts
equity_curve_data = results["equity_curve"]
drawdown_data = results["drawdown_series"]
# Convert to pandas Series for charting
returns = pd.Series(equity_curve_data)
drawdown = pd.Series(drawdown_data)
trades = pd.DataFrame(results["trades"])
# Generate charts
charts = {
"equity_curve": generate_equity_curve(
returns, drawdown, f"{symbol} {strategy} Equity Curve", theme
),
"trade_scatter": generate_trade_scatter(
returns, trades, f"{symbol} {strategy} Trades", theme
),
"performance_dashboard": generate_performance_dashboard(
results["metrics"], f"{symbol} {strategy} Performance", theme
),
}
return charts
@mcp.tool()
async def generate_optimization_charts(
ctx: Context,
symbol: str,
strategy: str = "sma_cross",
start_date: str | None = None,
end_date: str | None = None,
theme: str = "light",
) -> dict[str, str]:
"""Generate chart for strategy parameter optimization.
Args:
symbol: Stock symbol
strategy: Strategy type
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
theme: Chart theme (light or dark)
Returns:
Dictionary of base64-encoded chart images
"""
from datetime import datetime, timedelta
# Default date range
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
# Run optimization
engine = VectorBTEngine()
optimizer = StrategyOptimizer(engine)
param_grid = optimizer.generate_param_grid(strategy, "medium")
# Create optimization results dictionary for heatmap
optimization_results = {}
for param_set, results in param_grid.items():
optimization_results[str(param_set)] = {
"performance": results.get("total_return", 0)
}
# Generate optimization heatmap
heatmap = generate_optimization_heatmap(
optimization_results, f"{symbol} {strategy} Parameter Optimization", theme
)
return {"optimization_heatmap": heatmap}
# ============ ML-ENHANCED STRATEGY TOOLS ============
@mcp.tool()
async def run_ml_strategy_backtest(
ctx: Context,
symbol: str,
strategy_type: str = "ml_predictor",
start_date: str | None = None,
end_date: str | None = None,
initial_capital: float = 10000.0,
train_ratio: float = 0.8,
model_type: str = "random_forest",
n_estimators: int = 100,
max_depth: int | None = None,
learning_rate: float = 0.01,
adaptation_method: str = "gradient",
) -> dict[str, Any]:
"""Run backtest using ML-enhanced strategies.
Args:
symbol: Stock symbol to backtest
strategy_type: ML strategy type (ml_predictor, adaptive, ensemble, regime_aware)
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
initial_capital: Initial capital amount
train_ratio: Ratio of data for training (0.0-1.0)
Strategy-specific parameters passed as individual arguments
Returns:
Backtest results with ML-specific metrics
"""
from datetime import datetime, timedelta
from maverick_mcp.backtesting.strategies.ml import (
AdaptiveStrategy,
MLPredictor,
RegimeAwareStrategy,
StrategyEnsemble,
)
from maverick_mcp.backtesting.strategies.templates import (
SimpleMovingAverageStrategy,
)
# Default date range
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=730)).strftime(
"%Y-%m-%d"
) # 2 years for ML
# Get historical data
engine = VectorBTEngine()
data = await engine.get_historical_data(symbol, start_date, end_date)
# Enhanced data validation for ML strategies
min_total_data = 200 # Minimum total data points for ML strategies
if len(data) < min_total_data:
return {
"error": f"Insufficient data for ML strategy: {len(data)} < {min_total_data} required"
}
# Split data for training/testing
split_idx = int(len(data) * train_ratio)
train_data = data.iloc[:split_idx]
test_data = data.iloc[split_idx:]
# Validate split data sizes
min_train_data = 100
min_test_data = 50
if len(train_data) < min_train_data:
return {
"error": f"Insufficient training data: {len(train_data)} < {min_train_data} required"
}
if len(test_data) < min_test_data:
return {
"error": f"Insufficient test data: {len(test_data)} < {min_test_data} required"
}
logger.info(
f"ML backtest data split: {len(train_data)} training, {len(test_data)} testing samples"
)
try:
# Create ML strategy based on type
if strategy_type == "ml_predictor":
ml_strategy = MLPredictor(
model_type=model_type,
n_estimators=n_estimators,
max_depth=max_depth,
)
# Train the model
training_metrics = ml_strategy.train(train_data)
elif strategy_type == "adaptive" or strategy_type == "online_learning":
# online_learning is an alias for adaptive strategy
base_strategy = SimpleMovingAverageStrategy()
ml_strategy = AdaptiveStrategy(
base_strategy,
learning_rate=learning_rate,
adaptation_method=adaptation_method,
)
training_metrics = {
"adaptation_method": adaptation_method,
"strategy_alias": strategy_type,
}
elif strategy_type == "ensemble":
# Create ensemble with basic strategies
base_strategies = [
SimpleMovingAverageStrategy({"fast_period": 10, "slow_period": 20}),
SimpleMovingAverageStrategy({"fast_period": 5, "slow_period": 15}),
]
ml_strategy = StrategyEnsemble(base_strategies)
training_metrics = {"ensemble_size": len(base_strategies)}
elif strategy_type == "regime_aware":
base_strategies = {
0: SimpleMovingAverageStrategy(
{"fast_period": 5, "slow_period": 20}
), # Bear
1: SimpleMovingAverageStrategy(
{"fast_period": 10, "slow_period": 30}
), # Sideways
2: SimpleMovingAverageStrategy(
{"fast_period": 20, "slow_period": 50}
), # Bull
}
ml_strategy = RegimeAwareStrategy(base_strategies)
# Fit regime detector
ml_strategy.fit_regime_detector(train_data)
training_metrics = {"n_regimes": len(base_strategies)}
else:
return {"error": f"Unsupported ML strategy type: {strategy_type}"}
# Generate signals on test data
entry_signals, exit_signals = ml_strategy.generate_signals(test_data)
# Run backtest analysis on test period
analyzer = BacktestAnalyzer()
backtest_results = await analyzer.run_vectorbt_backtest(
data=test_data,
entry_signals=entry_signals,
exit_signals=exit_signals,
initial_capital=initial_capital,
)
# Add ML-specific metrics
ml_metrics = {
"strategy_type": strategy_type,
"training_period": len(train_data),
"testing_period": len(test_data),
"train_test_split": train_ratio,
"training_metrics": training_metrics,
}
# Add strategy-specific analysis
if hasattr(ml_strategy, "get_feature_importance"):
ml_metrics["feature_importance"] = ml_strategy.get_feature_importance()
if hasattr(ml_strategy, "get_regime_analysis"):
ml_metrics["regime_analysis"] = ml_strategy.get_regime_analysis()
if hasattr(ml_strategy, "get_strategy_weights"):
ml_metrics["strategy_weights"] = ml_strategy.get_strategy_weights()
backtest_results["ml_metrics"] = ml_metrics
# Convert all numpy types before returning
return convert_numpy_types(backtest_results)
except Exception as e:
return {"error": f"ML backtest failed: {str(e)}"}
@mcp.tool()
async def train_ml_predictor(
ctx: Context,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
model_type: str = "random_forest",
target_periods: int = 5,
return_threshold: float = 0.02,
n_estimators: int = 100,
max_depth: int | None = None,
min_samples_split: int = 2,
) -> dict[str, Any]:
"""Train an ML predictor model for trading signals.
Args:
symbol: Stock symbol to train on
start_date: Start date for training data
end_date: End date for training data
model_type: ML model type (random_forest)
target_periods: Forward periods for target variable
return_threshold: Return threshold for signal classification
n_estimators, max_depth, min_samples_split: Model-specific parameters
Returns:
Training results and model metrics
"""
from datetime import datetime, timedelta
from maverick_mcp.backtesting.strategies.ml import MLPredictor
# Default date range (2 years for good ML training)
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=730)).strftime("%Y-%m-%d")
try:
# Get training data
engine = VectorBTEngine()
data = await engine.get_historical_data(symbol, start_date, end_date)
if len(data) < 200:
return {
"error": "Insufficient data for ML training (minimum 200 data points)"
}
# Create and train ML predictor
ml_predictor = MLPredictor(
model_type=model_type,
n_estimators=n_estimators,
max_depth=max_depth,
min_samples_split=min_samples_split,
)
training_metrics = ml_predictor.train(
data=data,
target_periods=target_periods,
return_threshold=return_threshold,
)
# Create model parameters dictionary
model_params = {
"n_estimators": n_estimators,
"max_depth": max_depth,
"min_samples_split": min_samples_split,
}
# Add training details
training_results = {
"symbol": symbol,
"model_type": model_type,
"training_period": f"{start_date} to {end_date}",
"data_points": len(data),
"target_periods": target_periods,
"return_threshold": return_threshold,
"model_parameters": model_params,
"training_metrics": training_metrics,
}
# Convert all numpy types before returning
return convert_numpy_types(training_results)
except Exception as e:
return {"error": f"ML training failed: {str(e)}"}
@mcp.tool()
async def analyze_market_regimes(
ctx: Context,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
method: str = "hmm",
n_regimes: int = 3,
lookback_period: int = 50,
) -> dict[str, Any]:
"""Analyze market regimes for a stock using ML methods.
Args:
symbol: Stock symbol to analyze
start_date: Start date for analysis
end_date: End date for analysis
method: Detection method (hmm, kmeans, threshold)
n_regimes: Number of regimes to detect
lookback_period: Lookback period for regime detection
Returns:
Market regime analysis results
"""
from datetime import datetime, timedelta
from maverick_mcp.backtesting.strategies.ml.regime_aware import (
MarketRegimeDetector,
)
# Default date range
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
try:
# Get historical data
engine = VectorBTEngine()
data = await engine.get_historical_data(symbol, start_date, end_date)
if len(data) < lookback_period + 50:
return {
"error": f"Insufficient data for regime analysis (minimum {lookback_period + 50} data points)"
}
# Create regime detector and analyze
regime_detector = MarketRegimeDetector(
method=method, n_regimes=n_regimes, lookback_period=lookback_period
)
# Fit regime detector
regime_detector.fit_regimes(data)
# Analyze regimes over time
regime_history = []
regime_probabilities = []
for i in range(lookback_period, len(data)):
window_data = data.iloc[i - lookback_period : i + 1]
current_regime = regime_detector.detect_current_regime(window_data)
regime_probs = regime_detector.get_regime_probabilities(window_data)
regime_history.append(
{
"date": data.index[i].strftime("%Y-%m-%d"),
"regime": int(current_regime),
"probabilities": regime_probs.tolist(),
}
)
regime_probabilities.append(regime_probs)
# Calculate regime statistics
regimes = [r["regime"] for r in regime_history]
regime_counts = {i: regimes.count(i) for i in range(n_regimes)}
regime_percentages = {
k: (v / len(regimes)) * 100 for k, v in regime_counts.items()
}
# Calculate average regime durations
regime_durations = {i: [] for i in range(n_regimes)}
current_regime = regimes[0]
duration = 1
for regime in regimes[1:]:
if regime == current_regime:
duration += 1
else:
regime_durations[current_regime].append(duration)
current_regime = regime
duration = 1
regime_durations[current_regime].append(duration)
avg_durations = {
k: np.mean(v) if v else 0 for k, v in regime_durations.items()
}
analysis_results = {
"symbol": symbol,
"analysis_period": f"{start_date} to {end_date}",
"method": method,
"n_regimes": n_regimes,
"regime_names": {
0: "Bear/Declining",
1: "Sideways/Uncertain",
2: "Bull/Trending",
},
"current_regime": regimes[-1] if regimes else 1,
"regime_counts": regime_counts,
"regime_percentages": regime_percentages,
"average_regime_durations": avg_durations,
"recent_regime_history": regime_history[-20:], # Last 20 periods
"total_regime_switches": len(
[i for i in range(1, len(regimes)) if regimes[i] != regimes[i - 1]]
),
}
return analysis_results
except Exception as e:
return {"error": f"Regime analysis failed: {str(e)}"}
@mcp.tool()
async def create_strategy_ensemble(
ctx: Context,
symbols: list[str],
base_strategies: list[str] | None = None,
weighting_method: str = "performance",
start_date: str | None = None,
end_date: str | None = None,
initial_capital: float = 10000.0,
) -> dict[str, Any]:
"""Create and backtest a strategy ensemble across multiple symbols.
Args:
symbols: List of stock symbols
base_strategies: List of base strategy names to ensemble
weighting_method: Weighting method (performance, equal, volatility)
start_date: Start date for backtesting
end_date: End date for backtesting
initial_capital: Initial capital per symbol
Returns:
Ensemble backtest results with strategy weights
"""
from datetime import datetime, timedelta
from maverick_mcp.backtesting.strategies.ml import StrategyEnsemble
from maverick_mcp.backtesting.strategies.templates import (
SimpleMovingAverageStrategy,
)
# Default strategies if none provided
if base_strategies is None:
base_strategies = ["sma_cross", "rsi", "macd"]
# Default date range
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
try:
# Create base strategy instances
strategy_instances = []
for strategy_name in base_strategies:
if strategy_name == "sma_cross":
strategy_instances.append(SimpleMovingAverageStrategy())
elif strategy_name == "rsi":
# Create RSI-based SMA strategy with different parameters
strategy_instances.append(
SimpleMovingAverageStrategy(
{"fast_period": 14, "slow_period": 28}
)
)
elif strategy_name == "macd":
# Create MACD-like SMA strategy with MACD default periods
strategy_instances.append(
SimpleMovingAverageStrategy(
{"fast_period": 12, "slow_period": 26}
)
)
# Add more strategies as needed
if not strategy_instances:
return {"error": "No valid base strategies provided"}
# Create ensemble strategy
ensemble = StrategyEnsemble(
strategies=strategy_instances, weighting_method=weighting_method
)
# Run ensemble backtest on multiple symbols
ensemble_results = []
total_return = 0
total_trades = 0
for symbol in symbols[:5]: # Limit to 5 symbols for performance
try:
# Get data and run backtest
engine = VectorBTEngine()
data = await engine.get_historical_data(
symbol, start_date, end_date
)
if len(data) < 100:
continue
# Generate ensemble signals
entry_signals, exit_signals = ensemble.generate_signals(data)
# Run backtest
analyzer = BacktestAnalyzer()
results = await analyzer.run_vectorbt_backtest(
data=data,
entry_signals=entry_signals,
exit_signals=exit_signals,
initial_capital=initial_capital,
)
# Add ensemble-specific metrics
results["ensemble_metrics"] = {
"strategy_weights": ensemble.get_strategy_weights(),
"strategy_performance": ensemble.get_strategy_performance(),
}
ensemble_results.append({"symbol": symbol, "results": results})
total_return += results["metrics"]["total_return"]
total_trades += results["metrics"]["total_trades"]
except Exception:
continue
if not ensemble_results:
return {"error": "No symbols could be processed"}
# Calculate aggregate metrics
avg_return = total_return / len(ensemble_results)
avg_trades = total_trades / len(ensemble_results)
# Convert all numpy types before returning
return convert_numpy_types(
{
"ensemble_summary": {
"symbols_tested": len(ensemble_results),
"base_strategies": base_strategies,
"weighting_method": weighting_method,
"average_return": avg_return,
"total_trades": total_trades,
"average_trades_per_symbol": avg_trades,
},
"individual_results": ensemble_results,
"final_strategy_weights": ensemble.get_strategy_weights(),
"strategy_performance_analysis": ensemble.get_strategy_performance(),
}
)
except Exception as e:
return {"error": f"Ensemble creation failed: {str(e)}"}
```