This is page 14 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/maverick_mcp/workflows/agents/market_analyzer.py:
--------------------------------------------------------------------------------
```python
"""
Market Analyzer Agent for intelligent market regime detection.
This agent analyzes market conditions to determine the current market regime
(trending, ranging, volatile, etc.) and provides context for strategy selection.
"""
import logging
import math
from datetime import datetime, timedelta
from typing import Any
import numpy as np
import pandas as pd
import pandas_ta as ta
from maverick_mcp.data.cache import CacheManager
from maverick_mcp.providers.stock_data import EnhancedStockDataProvider
from maverick_mcp.workflows.state import BacktestingWorkflowState
logger = logging.getLogger(__name__)
class MarketAnalyzerAgent:
"""Intelligent market regime analyzer for backtesting workflows."""
def __init__(
self,
data_provider: EnhancedStockDataProvider | None = None,
cache_manager: CacheManager | None = None,
):
"""Initialize market analyzer agent.
Args:
data_provider: Stock data provider instance
cache_manager: Cache manager for performance optimization
"""
self.data_provider = data_provider or EnhancedStockDataProvider()
self.cache = cache_manager or CacheManager()
# Market regime detection thresholds
self.TREND_THRESHOLD = 0.15 # 15% for strong trend
self.VOLATILITY_THRESHOLD = 0.02 # 2% daily volatility threshold
self.VOLUME_THRESHOLD = 1.5 # 1.5x average volume for high volume
# Analysis periods for different regimes
self.SHORT_PERIOD = 20 # Short-term trend analysis
self.MEDIUM_PERIOD = 50 # Medium-term trend analysis
self.LONG_PERIOD = 200 # Long-term trend analysis
logger.info("MarketAnalyzerAgent initialized")
async def analyze_market_regime(
self, state: BacktestingWorkflowState
) -> BacktestingWorkflowState:
"""Analyze market regime and update state.
Args:
state: Current workflow state
Returns:
Updated state with market regime analysis
"""
start_time = datetime.now()
try:
logger.info(f"Analyzing market regime for {state['symbol']}")
# Get historical data for analysis
extended_start = self._calculate_extended_start_date(state["start_date"])
price_data = await self._get_price_data(
state["symbol"], extended_start, state["end_date"]
)
if price_data is None or len(price_data) < self.LONG_PERIOD:
raise ValueError(
f"Insufficient data for market regime analysis: {state['symbol']}"
)
# Perform comprehensive market analysis
regime_analysis = self._perform_regime_analysis(price_data)
# Update state with analysis results
state["market_regime"] = regime_analysis["regime"]
state["regime_confidence"] = regime_analysis["confidence"]
state["regime_indicators"] = regime_analysis["indicators"]
state["volatility_percentile"] = regime_analysis["volatility_percentile"]
state["trend_strength"] = regime_analysis["trend_strength"]
state["market_conditions"] = regime_analysis["market_conditions"]
state["volume_profile"] = regime_analysis["volume_profile"]
state["support_resistance_levels"] = regime_analysis["support_resistance"]
# Record execution time
execution_time = (datetime.now() - start_time).total_seconds() * 1000
state["regime_analysis_time_ms"] = execution_time
# Update workflow status
state["workflow_status"] = "selecting_strategies"
state["current_step"] = "market_analysis_completed"
state["steps_completed"].append("market_regime_analysis")
logger.info(
f"Market regime analysis completed for {state['symbol']}: "
f"{state['market_regime']} (confidence: {state['regime_confidence']:.2f})"
)
return state
except Exception as e:
error_info = {
"step": "market_regime_analysis",
"error": str(e),
"timestamp": datetime.now().isoformat(),
"symbol": state["symbol"],
}
state["errors_encountered"].append(error_info)
# Set fallback regime
state["market_regime"] = "unknown"
state["regime_confidence"] = 0.0
state["fallback_strategies_used"].append("regime_detection_fallback")
logger.error(f"Market regime analysis failed for {state['symbol']}: {e}")
return state
def _calculate_extended_start_date(self, start_date: str) -> str:
"""Calculate extended start date to ensure sufficient data for analysis."""
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
# Add extra buffer for technical indicators
extended_start = start_dt - timedelta(days=self.LONG_PERIOD + 50)
return extended_start.strftime("%Y-%m-%d")
async def _get_price_data(
self, symbol: str, start_date: str, end_date: str
) -> pd.DataFrame | None:
"""Get price data with caching."""
cache_key = f"market_analysis:{symbol}:{start_date}:{end_date}"
# Try cache first
cached_data = await self.cache.get(cache_key)
if cached_data is not None:
return pd.DataFrame(cached_data)
try:
# Fetch from provider
data = self.data_provider.get_stock_data(
symbol=symbol, start_date=start_date, end_date=end_date, interval="1d"
)
if data is not None and not data.empty:
# Cache for 30 minutes
await self.cache.set(cache_key, data.to_dict(), ttl=1800)
return data
return None
except Exception as e:
logger.error(f"Failed to fetch price data for {symbol}: {e}")
return None
def _perform_regime_analysis(self, data: pd.DataFrame) -> dict[str, Any]:
"""Perform comprehensive market regime analysis."""
# Ensure column names are lowercase
data.columns = [col.lower() for col in data.columns]
# Calculate technical indicators
close = data["close"]
high = data["high"]
low = data["low"]
volume = data["volume"]
# Trend analysis
trend_analysis = self._analyze_trend(close)
# Volatility analysis
volatility_analysis = self._analyze_volatility(close)
# Volume analysis
volume_analysis = self._analyze_volume(volume, close)
# Support/resistance analysis
support_resistance = self._identify_support_resistance(high, low, close)
# Market structure analysis
market_structure = self._analyze_market_structure(high, low, close)
# Determine overall regime
regime_info = self._classify_regime(
trend_analysis, volatility_analysis, volume_analysis, market_structure
)
return {
"regime": regime_info["regime"],
"confidence": regime_info["confidence"],
"indicators": {
"trend_slope": trend_analysis["slope"],
"trend_r2": trend_analysis["r_squared"],
"volatility_20d": volatility_analysis["volatility_20d"],
"volume_ratio": volume_analysis["volume_ratio"],
"rsi_14": trend_analysis["rsi"],
"adx": trend_analysis["adx"],
},
"volatility_percentile": volatility_analysis["percentile"],
"trend_strength": trend_analysis["strength"],
"market_conditions": {
"trend_direction": trend_analysis["direction"],
"trend_consistency": trend_analysis["consistency"],
"volatility_regime": volatility_analysis["regime"],
"volume_regime": volume_analysis["regime"],
"market_structure": market_structure["structure_type"],
},
"volume_profile": volume_analysis["profile"],
"support_resistance": support_resistance,
}
def _analyze_trend(self, close: pd.Series) -> dict[str, Any]:
"""Analyze trend characteristics."""
# Calculate moving averages
ma_20 = ta.sma(close, length=self.SHORT_PERIOD)
ma_50 = ta.sma(close, length=self.MEDIUM_PERIOD)
ma_200 = ta.sma(close, length=self.LONG_PERIOD)
# Calculate trend slope using linear regression
recent_data = close.tail(self.MEDIUM_PERIOD).reset_index(drop=True)
x = np.arange(len(recent_data))
if len(recent_data) > 1:
slope, intercept = np.polyfit(x, recent_data, 1)
y_pred = slope * x + intercept
r_squared = 1 - (
np.sum((recent_data - y_pred) ** 2)
/ np.sum((recent_data - np.mean(recent_data)) ** 2)
)
else:
slope = 0
r_squared = 0
# Normalize slope by price for comparability
normalized_slope = slope / close.iloc[-1] if close.iloc[-1] != 0 else 0
# Calculate RSI and ADX for trend strength
rsi = ta.rsi(close, length=14).iloc[-1] if len(close) >= 14 else 50
adx_result = ta.adx(
close.to_frame().rename(columns={"close": "high"}),
close.to_frame().rename(columns={"close": "low"}),
close,
length=14,
)
adx = (
adx_result.iloc[-1, 0]
if adx_result is not None and len(adx_result) > 0
else 25
)
# Determine trend direction and strength
if normalized_slope > 0.001: # 0.1% daily trend
direction = "bullish"
strength = min(abs(normalized_slope) * 1000, 1.0) # Cap at 1.0
elif normalized_slope < -0.001:
direction = "bearish"
strength = min(abs(normalized_slope) * 1000, 1.0)
else:
direction = "sideways"
strength = 0.2 # Low strength for sideways
# Calculate trend consistency
ma_alignment = 0
if len(ma_20) > 0 and len(ma_50) > 0 and len(ma_200) > 0:
current_price = close.iloc[-1]
if ma_20.iloc[-1] > ma_50.iloc[-1] > ma_200.iloc[-1] > current_price * 0.95:
ma_alignment = 1.0 # Bullish alignment
elif (
ma_20.iloc[-1] < ma_50.iloc[-1] < ma_200.iloc[-1] < current_price * 1.05
):
ma_alignment = -1.0 # Bearish alignment
else:
ma_alignment = 0.0 # Mixed alignment
consistency = (abs(ma_alignment) + r_squared) / 2
return {
"slope": normalized_slope,
"r_squared": r_squared,
"direction": direction,
"strength": strength,
"consistency": consistency,
"rsi": rsi,
"adx": adx,
}
def _analyze_volatility(self, close: pd.Series) -> dict[str, Any]:
"""Analyze volatility characteristics."""
# Calculate various volatility measures
returns = close.pct_change().dropna()
volatility_5d = (
returns.tail(5).std() * math.sqrt(252) if len(returns) >= 5 else 0
)
volatility_20d = (
returns.tail(20).std() * math.sqrt(252) if len(returns) >= 20 else 0
)
volatility_60d = (
returns.tail(60).std() * math.sqrt(252) if len(returns) >= 60 else 0
)
# Calculate historical volatility percentile
historical_vol = returns.rolling(20).std() * math.sqrt(252)
if len(historical_vol.dropna()) > 0:
current_vol = historical_vol.iloc[-1]
percentile = (historical_vol < current_vol).sum() / len(
historical_vol.dropna()
)
else:
percentile = 0.5
# Classify volatility regime
if volatility_20d > 0.4: # > 40% annualized
regime = "high_volatility"
elif volatility_20d > 0.2: # 20-40% annualized
regime = "medium_volatility"
else:
regime = "low_volatility"
return {
"volatility_5d": volatility_5d,
"volatility_20d": volatility_20d,
"volatility_60d": volatility_60d,
"percentile": percentile,
"regime": regime,
}
def _analyze_volume(self, volume: pd.Series, close: pd.Series) -> dict[str, Any]:
"""Analyze volume characteristics."""
# Calculate volume moving averages
volume_ma_20 = volume.rolling(20).mean()
# Current volume ratio vs average
current_volume = volume.iloc[-1] if len(volume) > 0 else 0
avg_volume = volume_ma_20.iloc[-1] if len(volume_ma_20.dropna()) > 0 else 1
volume_ratio = current_volume / avg_volume if avg_volume > 0 else 1
# Volume trend
recent_volume = volume.tail(10)
volume_trend = (
"increasing"
if recent_volume.iloc[-1] > recent_volume.mean()
else "decreasing"
)
# Price-volume relationship
price_change = close.pct_change().tail(10)
volume_change = volume.pct_change().tail(10)
correlation = price_change.corr(volume_change) if len(price_change) >= 2 else 0
# Volume regime classification
if volume_ratio > 2.0:
regime = "high_volume"
elif volume_ratio > 1.5:
regime = "elevated_volume"
elif volume_ratio < 0.5:
regime = "low_volume"
else:
regime = "normal_volume"
return {
"volume_ratio": volume_ratio,
"volume_trend": volume_trend,
"price_volume_correlation": correlation,
"regime": regime,
"profile": {
"current_vs_20d": volume_ratio,
"trend_direction": volume_trend,
"price_correlation": correlation,
},
}
def _identify_support_resistance(
self, high: pd.Series, low: pd.Series, close: pd.Series
) -> list[float]:
"""Identify key support and resistance levels."""
levels = []
try:
# Recent price range
recent_data = close.tail(50) if len(close) >= 50 else close
price_range = recent_data.max() - recent_data.min()
# Identify local peaks and troughs
try:
from scipy.signal import find_peaks
# Find resistance levels (peaks)
peaks, _ = find_peaks(
high.values, distance=5, prominence=price_range * 0.02
)
resistance_levels = high.iloc[peaks].tolist()
# Find support levels (troughs)
troughs, _ = find_peaks(
-low.values, distance=5, prominence=price_range * 0.02
)
support_levels = low.iloc[troughs].tolist()
except ImportError:
logger.warning("scipy not available, using simple peak detection")
# Fallback to simple method
resistance_levels = [recent_data.max()]
support_levels = [recent_data.min()]
# Combine and filter levels
all_levels = resistance_levels + support_levels
# Remove levels too close to each other
filtered_levels = []
for level in sorted(all_levels):
if not any(
abs(level - existing) < price_range * 0.01
for existing in filtered_levels
):
filtered_levels.append(level)
# Keep only most significant levels
levels = sorted(filtered_levels)[-10:] # Top 10 levels
except Exception as e:
logger.warning(f"Failed to calculate support/resistance levels: {e}")
# Fallback to simple levels
current_price = close.iloc[-1]
levels = [
current_price * 0.95, # 5% below
current_price * 1.05, # 5% above
]
return levels
def _analyze_market_structure(
self, high: pd.Series, low: pd.Series, close: pd.Series
) -> dict[str, Any]:
"""Analyze market structure patterns."""
try:
# Calculate recent highs and lows
lookback = min(20, len(close))
recent_highs = high.tail(lookback)
recent_lows = low.tail(lookback)
# Identify higher highs, higher lows, etc.
higher_highs = (recent_highs.rolling(3).max() == recent_highs).sum()
higher_lows = (recent_lows.rolling(3).min() == recent_lows).sum()
# Classify structure
if higher_highs > lookback * 0.3 and higher_lows > lookback * 0.3:
structure_type = "uptrend_structure"
elif higher_highs < lookback * 0.1 and higher_lows < lookback * 0.1:
structure_type = "downtrend_structure"
else:
structure_type = "ranging_structure"
return {
"structure_type": structure_type,
"higher_highs": higher_highs,
"higher_lows": higher_lows,
}
except Exception as e:
logger.warning(f"Failed to analyze market structure: {e}")
return {
"structure_type": "unknown_structure",
"higher_highs": 0,
"higher_lows": 0,
}
def _classify_regime(
self,
trend_analysis: dict,
volatility_analysis: dict,
volume_analysis: dict,
market_structure: dict,
) -> dict[str, Any]:
"""Classify overall market regime based on component analyses."""
# Initialize scoring system
regime_scores = {
"trending": 0.0,
"ranging": 0.0,
"volatile": 0.0,
"low_volume": 0.0,
}
# Trend scoring
if trend_analysis["strength"] > 0.6 and trend_analysis["consistency"] > 0.6:
regime_scores["trending"] += 0.4
if trend_analysis["adx"] > 25: # Strong trend
regime_scores["trending"] += 0.2
# Ranging scoring
if (
trend_analysis["strength"] < 0.3
and trend_analysis["direction"] == "sideways"
):
regime_scores["ranging"] += 0.4
if market_structure["structure_type"] == "ranging_structure":
regime_scores["ranging"] += 0.2
# Volatility scoring
if volatility_analysis["regime"] == "high_volatility":
regime_scores["volatile"] += 0.3
if volatility_analysis["percentile"] > 0.8: # High volatility percentile
regime_scores["volatile"] += 0.2
# Volume scoring
if volume_analysis["regime"] == "low_volume":
regime_scores["low_volume"] += 0.3
# Determine primary regime
primary_regime = max(regime_scores.items(), key=lambda x: x[1])
regime_name = primary_regime[0]
# Combine regimes for complex cases
if regime_scores["volatile"] > 0.3 and regime_scores["trending"] > 0.3:
regime_name = "volatile_trending"
elif regime_scores["low_volume"] > 0.2 and regime_scores["ranging"] > 0.3:
regime_name = "low_volume_ranging"
# Calculate confidence based on score spread
sorted_scores = sorted(regime_scores.values(), reverse=True)
confidence = (
sorted_scores[0] - sorted_scores[1]
if len(sorted_scores) > 1
else sorted_scores[0]
)
confidence = min(max(confidence, 0.1), 0.95) # Clamp between 0.1 and 0.95
return {
"regime": regime_name,
"confidence": confidence,
"scores": regime_scores,
}
```
--------------------------------------------------------------------------------
/maverick_mcp/monitoring/status_dashboard.py:
--------------------------------------------------------------------------------
```python
"""
Status Dashboard for Backtesting System Health Monitoring.
This module provides a comprehensive dashboard that aggregates health status
from all components and provides real-time metrics visualization data.
"""
import logging
import time
from datetime import UTC, datetime, timedelta
from typing import Any
from maverick_mcp.config.settings import get_settings
from maverick_mcp.utils.circuit_breaker import get_all_circuit_breaker_status
logger = logging.getLogger(__name__)
settings = get_settings()
# Dashboard refresh interval (seconds)
DASHBOARD_REFRESH_INTERVAL = 30
# Historical data retention (hours)
HISTORICAL_DATA_RETENTION = 24
class StatusDashboard:
"""Comprehensive status dashboard for the backtesting system."""
def __init__(self):
self.start_time = time.time()
self.historical_data = []
self.last_update = None
self.alert_thresholds = {
"cpu_usage": 80.0,
"memory_usage": 85.0,
"disk_usage": 90.0,
"response_time_ms": 5000.0,
"failure_rate": 0.1,
}
async def get_dashboard_data(self) -> dict[str, Any]:
"""Get comprehensive dashboard data."""
try:
from maverick_mcp.api.routers.health_enhanced import (
_get_detailed_health_status,
)
# Get current health status
health_status = await _get_detailed_health_status()
# Get circuit breaker status
circuit_breaker_status = get_all_circuit_breaker_status()
# Calculate metrics
metrics = await self._calculate_metrics(
health_status, circuit_breaker_status
)
# Get alerts
alerts = self._generate_alerts(health_status, metrics)
# Build dashboard data
dashboard_data = {
"overview": self._build_overview(health_status),
"components": self._build_component_summary(health_status),
"circuit_breakers": self._build_circuit_breaker_summary(
circuit_breaker_status
),
"resources": self._build_resource_summary(health_status),
"metrics": metrics,
"alerts": alerts,
"historical": self._get_historical_data(),
"metadata": {
"last_updated": datetime.now(UTC).isoformat(),
"uptime_seconds": time.time() - self.start_time,
"dashboard_version": "1.0.0",
"auto_refresh_interval": DASHBOARD_REFRESH_INTERVAL,
},
}
# Update historical data
self._update_historical_data(health_status, metrics)
self.last_update = datetime.now(UTC)
return dashboard_data
except Exception as e:
logger.error(f"Failed to generate dashboard data: {e}")
return self._get_error_dashboard(str(e))
def _build_overview(self, health_status: dict[str, Any]) -> dict[str, Any]:
"""Build overview section of the dashboard."""
components = health_status.get("components", {})
checks_summary = health_status.get("checks_summary", {})
total_components = len(components)
healthy_components = checks_summary.get("healthy", 0)
degraded_components = checks_summary.get("degraded", 0)
unhealthy_components = checks_summary.get("unhealthy", 0)
# Calculate health percentage
health_percentage = (
(healthy_components / total_components * 100) if total_components > 0 else 0
)
return {
"overall_status": health_status.get("status", "unknown"),
"health_percentage": round(health_percentage, 1),
"total_components": total_components,
"component_breakdown": {
"healthy": healthy_components,
"degraded": degraded_components,
"unhealthy": unhealthy_components,
},
"uptime_seconds": health_status.get("uptime_seconds", 0),
"version": health_status.get("version", "unknown"),
}
def _build_component_summary(self, health_status: dict[str, Any]) -> dict[str, Any]:
"""Build component summary with status and response times."""
components = health_status.get("components", {})
component_summary = {}
for name, status in components.items():
component_summary[name] = {
"status": status.status,
"response_time_ms": status.response_time_ms,
"last_check": status.last_check,
"has_error": status.error is not None,
"error_message": status.error,
}
return component_summary
def _build_circuit_breaker_summary(
self, circuit_breaker_status: dict[str, Any]
) -> dict[str, Any]:
"""Build circuit breaker summary."""
summary = {
"total_breakers": len(circuit_breaker_status),
"states": {"closed": 0, "open": 0, "half_open": 0},
"breakers": {},
}
for name, status in circuit_breaker_status.items():
state = status.get("state", "unknown")
if state in summary["states"]:
summary["states"][state] += 1
metrics = status.get("metrics", {})
summary["breakers"][name] = {
"state": state,
"failure_count": status.get("consecutive_failures", 0),
"success_rate": metrics.get("success_rate", 0),
"avg_response_time": metrics.get("avg_response_time", 0),
"total_calls": metrics.get("total_calls", 0),
}
return summary
def _build_resource_summary(self, health_status: dict[str, Any]) -> dict[str, Any]:
"""Build resource usage summary."""
resource_usage = health_status.get("resource_usage", {})
return {
"cpu_percent": resource_usage.get("cpu_percent", 0),
"memory_percent": resource_usage.get("memory_percent", 0),
"disk_percent": resource_usage.get("disk_percent", 0),
"memory_used_mb": resource_usage.get("memory_used_mb", 0),
"memory_total_mb": resource_usage.get("memory_total_mb", 0),
"disk_used_gb": resource_usage.get("disk_used_gb", 0),
"disk_total_gb": resource_usage.get("disk_total_gb", 0),
"load_average": resource_usage.get("load_average", []),
}
async def _calculate_metrics(
self, health_status: dict[str, Any], circuit_breaker_status: dict[str, Any]
) -> dict[str, Any]:
"""Calculate performance and availability metrics."""
components = health_status.get("components", {})
resource_usage = health_status.get("resource_usage", {})
# Calculate average response time
response_times = [
comp.response_time_ms
for comp in components.values()
if comp.response_time_ms is not None
]
avg_response_time = (
sum(response_times) / len(response_times) if response_times else 0
)
# Calculate availability
total_components = len(components)
available_components = sum(
1 for comp in components.values() if comp.status in ["healthy", "degraded"]
)
availability_percentage = (
(available_components / total_components * 100)
if total_components > 0
else 0
)
# Calculate circuit breaker metrics
total_breakers = len(circuit_breaker_status)
closed_breakers = sum(
1 for cb in circuit_breaker_status.values() if cb.get("state") == "closed"
)
breaker_health = (
(closed_breakers / total_breakers * 100) if total_breakers > 0 else 100
)
# Get resource metrics
cpu_usage = resource_usage.get("cpu_percent", 0)
memory_usage = resource_usage.get("memory_percent", 0)
disk_usage = resource_usage.get("disk_percent", 0)
# Calculate system health score (0-100)
health_score = self._calculate_health_score(
availability_percentage,
breaker_health,
cpu_usage,
memory_usage,
avg_response_time,
)
return {
"availability_percentage": round(availability_percentage, 2),
"average_response_time_ms": round(avg_response_time, 2),
"circuit_breaker_health": round(breaker_health, 2),
"system_health_score": round(health_score, 1),
"resource_utilization": {
"cpu_percent": cpu_usage,
"memory_percent": memory_usage,
"disk_percent": disk_usage,
},
"performance_indicators": {
"total_components": total_components,
"available_components": available_components,
"response_times_collected": len(response_times),
"circuit_breakers_closed": closed_breakers,
"circuit_breakers_total": total_breakers,
},
}
def _calculate_health_score(
self,
availability: float,
breaker_health: float,
cpu_usage: float,
memory_usage: float,
response_time: float,
) -> float:
"""Calculate overall system health score (0-100)."""
# Weighted scoring
weights = {
"availability": 0.3,
"breaker_health": 0.25,
"cpu_performance": 0.2,
"memory_performance": 0.15,
"response_time": 0.1,
}
# Calculate individual scores (higher is better)
availability_score = availability # Already 0-100
breaker_score = breaker_health # Already 0-100
# CPU score (invert usage - lower usage is better)
cpu_score = max(0, 100 - cpu_usage)
# Memory score (invert usage - lower usage is better)
memory_score = max(0, 100 - memory_usage)
# Response time score (lower is better, scale to 0-100)
if response_time <= 100:
response_score = 100
elif response_time <= 1000:
response_score = (
100 - (response_time - 100) / 9
) # Linear decay from 100 to 0
else:
response_score = max(
0, 100 - response_time / 50
) # Slower decay for very slow responses
# Calculate weighted score
health_score = (
availability_score * weights["availability"]
+ breaker_score * weights["breaker_health"]
+ cpu_score * weights["cpu_performance"]
+ memory_score * weights["memory_performance"]
+ response_score * weights["response_time"]
)
return min(100, max(0, health_score))
def _generate_alerts(
self, health_status: dict[str, Any], metrics: dict[str, Any]
) -> list[dict[str, Any]]:
"""Generate alerts based on health status and metrics."""
alerts = []
# Check overall system health
if health_status.get("status") == "unhealthy":
alerts.append(
{
"severity": "critical",
"type": "system_health",
"title": "System Unhealthy",
"message": "One or more critical components are unhealthy",
"timestamp": datetime.now(UTC).isoformat(),
}
)
elif health_status.get("status") == "degraded":
alerts.append(
{
"severity": "warning",
"type": "system_health",
"title": "System Degraded",
"message": "System is operating with reduced functionality",
"timestamp": datetime.now(UTC).isoformat(),
}
)
# Check resource usage
resource_usage = health_status.get("resource_usage", {})
if resource_usage.get("cpu_percent", 0) > self.alert_thresholds["cpu_usage"]:
alerts.append(
{
"severity": "warning",
"type": "resource_usage",
"title": "High CPU Usage",
"message": f"CPU usage is {resource_usage.get('cpu_percent')}%, above threshold of {self.alert_thresholds['cpu_usage']}%",
"timestamp": datetime.now(UTC).isoformat(),
}
)
if (
resource_usage.get("memory_percent", 0)
> self.alert_thresholds["memory_usage"]
):
alerts.append(
{
"severity": "warning",
"type": "resource_usage",
"title": "High Memory Usage",
"message": f"Memory usage is {resource_usage.get('memory_percent')}%, above threshold of {self.alert_thresholds['memory_usage']}%",
"timestamp": datetime.now(UTC).isoformat(),
}
)
if resource_usage.get("disk_percent", 0) > self.alert_thresholds["disk_usage"]:
alerts.append(
{
"severity": "critical",
"type": "resource_usage",
"title": "High Disk Usage",
"message": f"Disk usage is {resource_usage.get('disk_percent')}%, above threshold of {self.alert_thresholds['disk_usage']}%",
"timestamp": datetime.now(UTC).isoformat(),
}
)
# Check response times
avg_response_time = metrics.get("average_response_time_ms", 0)
if avg_response_time > self.alert_thresholds["response_time_ms"]:
alerts.append(
{
"severity": "warning",
"type": "performance",
"title": "Slow Response Times",
"message": f"Average response time is {avg_response_time:.1f}ms, above threshold of {self.alert_thresholds['response_time_ms']}ms",
"timestamp": datetime.now(UTC).isoformat(),
}
)
# Check circuit breakers
circuit_breakers = health_status.get("circuit_breakers", {})
for name, breaker in circuit_breakers.items():
if breaker.state == "open":
alerts.append(
{
"severity": "critical",
"type": "circuit_breaker",
"title": f"Circuit Breaker Open: {name}",
"message": f"Circuit breaker for {name} is open due to failures",
"timestamp": datetime.now(UTC).isoformat(),
}
)
elif breaker.state == "half_open":
alerts.append(
{
"severity": "info",
"type": "circuit_breaker",
"title": f"Circuit Breaker Testing: {name}",
"message": f"Circuit breaker for {name} is testing recovery",
"timestamp": datetime.now(UTC).isoformat(),
}
)
return alerts
def _update_historical_data(
self, health_status: dict[str, Any], metrics: dict[str, Any]
):
"""Update historical data for trending."""
timestamp = datetime.now(UTC)
# Add current data point
data_point = {
"timestamp": timestamp.isoformat(),
"health_score": metrics.get("system_health_score", 0),
"availability": metrics.get("availability_percentage", 0),
"response_time": metrics.get("average_response_time_ms", 0),
"cpu_usage": health_status.get("resource_usage", {}).get("cpu_percent", 0),
"memory_usage": health_status.get("resource_usage", {}).get(
"memory_percent", 0
),
"circuit_breaker_health": metrics.get("circuit_breaker_health", 100),
}
self.historical_data.append(data_point)
# Clean up old data
cutoff_time = timestamp - timedelta(hours=HISTORICAL_DATA_RETENTION)
self.historical_data = [
point
for point in self.historical_data
if datetime.fromisoformat(point["timestamp"].replace("Z", "+00:00"))
> cutoff_time
]
def _get_historical_data(self) -> dict[str, Any]:
"""Get historical data for trending charts."""
if not self.historical_data:
return {"data": [], "summary": {"points": 0, "timespan_hours": 0}}
# Calculate summary
summary = {
"points": len(self.historical_data),
"timespan_hours": HISTORICAL_DATA_RETENTION,
"avg_health_score": sum(p["health_score"] for p in self.historical_data)
/ len(self.historical_data),
"avg_availability": sum(p["availability"] for p in self.historical_data)
/ len(self.historical_data),
"avg_response_time": sum(p["response_time"] for p in self.historical_data)
/ len(self.historical_data),
}
# Downsample data if we have too many points (keep last 100 points for visualization)
data = self.historical_data
if len(data) > 100:
step = len(data) // 100
data = data[::step]
return {
"data": data,
"summary": summary,
}
def _get_error_dashboard(self, error_message: str) -> dict[str, Any]:
"""Get minimal dashboard data when there's an error."""
return {
"overview": {
"overall_status": "error",
"health_percentage": 0,
"error": error_message,
},
"components": {},
"circuit_breakers": {},
"resources": {},
"metrics": {},
"alerts": [
{
"severity": "critical",
"type": "dashboard_error",
"title": "Dashboard Error",
"message": f"Failed to generate dashboard data: {error_message}",
"timestamp": datetime.now(UTC).isoformat(),
}
],
"historical": {"data": [], "summary": {"points": 0, "timespan_hours": 0}},
"metadata": {
"last_updated": datetime.now(UTC).isoformat(),
"dashboard_version": "1.0.0",
"error": True,
},
}
def get_alert_summary(self) -> dict[str, Any]:
"""Get a summary of current alerts."""
try:
# This would typically use cached data or a quick check
return {
"total_alerts": 0,
"critical": 0,
"warning": 0,
"info": 0,
"last_check": datetime.now(UTC).isoformat(),
}
except Exception as e:
logger.error(f"Failed to get alert summary: {e}")
return {
"total_alerts": 1,
"critical": 1,
"warning": 0,
"info": 0,
"error": str(e),
"last_check": datetime.now(UTC).isoformat(),
}
# Global dashboard instance
_dashboard = StatusDashboard()
def get_status_dashboard() -> StatusDashboard:
"""Get the global status dashboard instance."""
return _dashboard
async def get_dashboard_data() -> dict[str, Any]:
"""Get dashboard data (convenience function)."""
return await _dashboard.get_dashboard_data()
def get_dashboard_metadata() -> dict[str, Any]:
"""Get dashboard metadata."""
return {
"version": "1.0.0",
"last_updated": _dashboard.last_update.isoformat()
if _dashboard.last_update
else None,
"uptime_seconds": time.time() - _dashboard.start_time,
"refresh_interval": DASHBOARD_REFRESH_INTERVAL,
"retention_hours": HISTORICAL_DATA_RETENTION,
}
```
--------------------------------------------------------------------------------
/maverick_mcp/workflows/backtesting_workflow.py:
--------------------------------------------------------------------------------
```python
"""
Intelligent Backtesting Workflow using LangGraph.
This workflow orchestrates market regime analysis, strategy selection, parameter optimization,
and validation to provide intelligent, confidence-scored backtesting recommendations.
"""
import logging
from datetime import datetime, timedelta
from typing import Any
from langchain_core.messages import HumanMessage
from langgraph.graph import END, StateGraph
from maverick_mcp.workflows.agents import (
MarketAnalyzerAgent,
OptimizerAgent,
StrategySelectorAgent,
ValidatorAgent,
)
from maverick_mcp.workflows.state import BacktestingWorkflowState
logger = logging.getLogger(__name__)
class BacktestingWorkflow:
"""Intelligent backtesting workflow orchestrator."""
def __init__(
self,
market_analyzer: MarketAnalyzerAgent | None = None,
strategy_selector: StrategySelectorAgent | None = None,
optimizer: OptimizerAgent | None = None,
validator: ValidatorAgent | None = None,
):
"""Initialize backtesting workflow.
Args:
market_analyzer: Market regime analysis agent
strategy_selector: Strategy selection agent
optimizer: Parameter optimization agent
validator: Results validation agent
"""
self.market_analyzer = market_analyzer or MarketAnalyzerAgent()
self.strategy_selector = strategy_selector or StrategySelectorAgent()
self.optimizer = optimizer or OptimizerAgent()
self.validator = validator or ValidatorAgent()
# Build the workflow graph
self.workflow = self._build_workflow_graph()
logger.info("BacktestingWorkflow initialized")
def _build_workflow_graph(self) -> StateGraph:
"""Build the LangGraph workflow."""
# Define the workflow graph
workflow = StateGraph(BacktestingWorkflowState)
# Add nodes for each step
workflow.add_node("initialize", self._initialize_workflow)
workflow.add_node("analyze_market_regime", self._analyze_market_regime_node)
workflow.add_node("select_strategies", self._select_strategies_node)
workflow.add_node("optimize_parameters", self._optimize_parameters_node)
workflow.add_node("validate_results", self._validate_results_node)
workflow.add_node("finalize_workflow", self._finalize_workflow)
# Define the workflow flow
workflow.set_entry_point("initialize")
# Sequential workflow with conditional routing
workflow.add_edge("initialize", "analyze_market_regime")
workflow.add_conditional_edges(
"analyze_market_regime",
self._should_proceed_after_market_analysis,
{
"continue": "select_strategies",
"fallback": "finalize_workflow",
},
)
workflow.add_conditional_edges(
"select_strategies",
self._should_proceed_after_strategy_selection,
{
"continue": "optimize_parameters",
"fallback": "finalize_workflow",
},
)
workflow.add_conditional_edges(
"optimize_parameters",
self._should_proceed_after_optimization,
{
"continue": "validate_results",
"fallback": "finalize_workflow",
},
)
workflow.add_edge("validate_results", "finalize_workflow")
workflow.add_edge("finalize_workflow", END)
return workflow.compile()
async def run_intelligent_backtest(
self,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
initial_capital: float = 10000.0,
requested_strategy: str | None = None,
) -> dict[str, Any]:
"""Run intelligent backtesting workflow.
Args:
symbol: Stock symbol to analyze
start_date: Start date (YYYY-MM-DD), defaults to 1 year ago
end_date: End date (YYYY-MM-DD), defaults to today
initial_capital: Starting capital for backtest
requested_strategy: User-requested strategy (optional)
Returns:
Comprehensive backtesting results with recommendations
"""
start_time = datetime.now()
try:
logger.info(f"Starting intelligent backtest workflow for {symbol}")
# Set default date range if not provided
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
# Initialize workflow state
initial_state = self._create_initial_state(
symbol=symbol,
start_date=start_date,
end_date=end_date,
initial_capital=initial_capital,
requested_strategy=requested_strategy,
)
# Run the workflow
final_state = await self.workflow.ainvoke(initial_state)
# Convert state to results dictionary
results = self._format_results(final_state)
# Add execution metadata
total_execution_time = (datetime.now() - start_time).total_seconds() * 1000
results["execution_metadata"] = {
"total_execution_time_ms": total_execution_time,
"workflow_completed": final_state.workflow_status == "completed",
"steps_completed": final_state.steps_completed,
"errors_encountered": final_state.errors_encountered,
"fallback_strategies_used": final_state.fallback_strategies_used,
}
logger.info(
f"Intelligent backtest completed for {symbol} in {total_execution_time:.0f}ms: "
f"{final_state.recommended_strategy} recommended with {final_state.recommendation_confidence:.1%} confidence"
)
return results
except Exception as e:
logger.error(f"Intelligent backtest failed for {symbol}: {e}")
return {
"symbol": symbol,
"error": str(e),
"execution_metadata": {
"total_execution_time_ms": (
datetime.now() - start_time
).total_seconds()
* 1000,
"workflow_completed": False,
},
}
def _create_initial_state(
self,
symbol: str,
start_date: str,
end_date: str,
initial_capital: float,
requested_strategy: str | None,
) -> BacktestingWorkflowState:
"""Create initial workflow state."""
return BacktestingWorkflowState(
# Base agent state
messages=[
HumanMessage(content=f"Analyze backtesting opportunities for {symbol}")
],
session_id=f"backtest_{symbol}_{datetime.now().isoformat()}",
persona="intelligent_backtesting_agent",
timestamp=datetime.now(),
token_count=0,
error=None,
analyzed_stocks={},
key_price_levels={},
last_analysis_time={},
conversation_context={},
execution_time_ms=None,
api_calls_made=0,
cache_hits=0,
cache_misses=0,
# Input parameters
symbol=symbol,
start_date=start_date,
end_date=end_date,
initial_capital=initial_capital,
requested_strategy=requested_strategy,
# Market regime analysis (initialized)
market_regime="unknown",
regime_confidence=0.0,
regime_indicators={},
regime_analysis_time_ms=0.0,
volatility_percentile=0.0,
trend_strength=0.0,
market_conditions={},
sector_performance={},
correlation_to_market=0.0,
volume_profile={},
support_resistance_levels=[],
# Strategy selection (initialized)
candidate_strategies=[],
strategy_rankings={},
selected_strategies=[],
strategy_selection_reasoning="",
strategy_selection_confidence=0.0,
# Parameter optimization (initialized)
optimization_config={},
parameter_grids={},
optimization_results={},
best_parameters={},
optimization_time_ms=0.0,
optimization_iterations=0,
# Validation (initialized)
walk_forward_results={},
monte_carlo_results={},
out_of_sample_performance={},
robustness_score={},
validation_warnings=[],
# Final recommendations (initialized)
final_strategy_ranking=[],
recommended_strategy="",
recommended_parameters={},
recommendation_confidence=0.0,
risk_assessment={},
# Performance metrics (initialized)
comparative_metrics={},
benchmark_comparison={},
risk_adjusted_performance={},
drawdown_analysis={},
# Workflow control (initialized)
workflow_status="initializing",
current_step="initialization",
steps_completed=[],
total_execution_time_ms=0.0,
# Error handling (initialized)
errors_encountered=[],
fallback_strategies_used=[],
data_quality_issues=[],
# Caching (initialized)
cached_results={},
cache_hit_rate=0.0,
# Advanced analysis (initialized)
regime_transition_analysis={},
multi_timeframe_analysis={},
correlation_analysis={},
macroeconomic_context={},
)
async def _initialize_workflow(
self, state: BacktestingWorkflowState
) -> BacktestingWorkflowState:
"""Initialize the workflow and validate inputs."""
logger.info(f"Initializing backtesting workflow for {state.symbol}")
# Validate inputs
if not state.symbol:
state.errors_encountered.append(
{
"step": "initialization",
"error": "Symbol is required",
"timestamp": datetime.now().isoformat(),
}
)
state.workflow_status = "failed"
return state
# Update workflow state
state.workflow_status = "analyzing_regime"
state.current_step = "initialization_completed"
state.steps_completed.append("initialization")
logger.info(f"Workflow initialized for {state.symbol}")
return state
async def _analyze_market_regime_node(
self, state: BacktestingWorkflowState
) -> BacktestingWorkflowState:
"""Market regime analysis node."""
return await self.market_analyzer.analyze_market_regime(state)
async def _select_strategies_node(
self, state: BacktestingWorkflowState
) -> BacktestingWorkflowState:
"""Strategy selection node."""
return await self.strategy_selector.select_strategies(state)
async def _optimize_parameters_node(
self, state: BacktestingWorkflowState
) -> BacktestingWorkflowState:
"""Parameter optimization node."""
return await self.optimizer.optimize_parameters(state)
async def _validate_results_node(
self, state: BacktestingWorkflowState
) -> BacktestingWorkflowState:
"""Results validation node."""
return await self.validator.validate_strategies(state)
async def _finalize_workflow(
self, state: BacktestingWorkflowState
) -> BacktestingWorkflowState:
"""Finalize the workflow and prepare results."""
if state.workflow_status != "completed":
# Handle incomplete workflow
if not state.recommended_strategy and state.best_parameters:
# Select first available strategy as fallback
state.recommended_strategy = list(state.best_parameters.keys())[0]
state.recommended_parameters = state.best_parameters[
state.recommended_strategy
]
state.recommendation_confidence = 0.3
state.fallback_strategies_used.append("incomplete_workflow_fallback")
state.current_step = "workflow_finalized"
logger.info(f"Workflow finalized for {state.symbol}")
return state
def _should_proceed_after_market_analysis(
self, state: BacktestingWorkflowState
) -> str:
"""Decide whether to proceed after market analysis."""
if state.errors_encountered and any(
"market_regime_analysis" in err.get("step", "")
for err in state.errors_encountered
):
return "fallback"
if state.market_regime == "unknown" and state.regime_confidence < 0.1:
return "fallback"
return "continue"
def _should_proceed_after_strategy_selection(
self, state: BacktestingWorkflowState
) -> str:
"""Decide whether to proceed after strategy selection."""
if not state.selected_strategies:
return "fallback"
if state.strategy_selection_confidence < 0.2:
return "fallback"
return "continue"
def _should_proceed_after_optimization(
self, state: BacktestingWorkflowState
) -> str:
"""Decide whether to proceed after optimization."""
if not state.best_parameters:
return "fallback"
return "continue"
def _format_results(self, state: BacktestingWorkflowState) -> dict[str, Any]:
"""Format final results for output."""
return {
"symbol": state.symbol,
"period": {
"start_date": state.start_date,
"end_date": state.end_date,
"initial_capital": state.initial_capital,
},
"market_analysis": {
"regime": state.market_regime,
"regime_confidence": state.regime_confidence,
"regime_indicators": state.regime_indicators,
"volatility_percentile": state.volatility_percentile,
"trend_strength": state.trend_strength,
"market_conditions": state.market_conditions,
"support_resistance_levels": state.support_resistance_levels,
},
"strategy_selection": {
"selected_strategies": state.selected_strategies,
"strategy_rankings": state.strategy_rankings,
"selection_reasoning": state.strategy_selection_reasoning,
"selection_confidence": state.strategy_selection_confidence,
"candidate_strategies": state.candidate_strategies,
},
"optimization": {
"optimization_config": state.optimization_config,
"best_parameters": state.best_parameters,
"optimization_iterations": state.optimization_iterations,
"optimization_time_ms": state.optimization_time_ms,
},
"validation": {
"robustness_scores": state.robustness_score,
"validation_warnings": state.validation_warnings,
"out_of_sample_performance": state.out_of_sample_performance,
},
"recommendation": {
"recommended_strategy": state.recommended_strategy,
"recommended_parameters": state.recommended_parameters,
"recommendation_confidence": state.recommendation_confidence,
"final_strategy_ranking": state.final_strategy_ranking,
"risk_assessment": state.risk_assessment,
},
"performance_analysis": {
"comparative_metrics": state.comparative_metrics,
"benchmark_comparison": state.benchmark_comparison,
"risk_adjusted_performance": state.risk_adjusted_performance,
},
}
async def run_quick_analysis(
self,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
initial_capital: float = 10000.0,
) -> dict[str, Any]:
"""Run quick analysis with market regime detection and basic strategy recommendations.
This is a faster alternative that skips parameter optimization and validation
for rapid insights.
Args:
symbol: Stock symbol to analyze
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
initial_capital: Starting capital
Returns:
Quick analysis results with strategy recommendations
"""
start_time = datetime.now()
try:
logger.info(f"Running quick analysis for {symbol}")
# Set default dates
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
if not start_date:
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
# Create initial state
state = self._create_initial_state(
symbol=symbol,
start_date=start_date,
end_date=end_date,
initial_capital=initial_capital,
requested_strategy=None,
)
# Run market analysis
state = await self.market_analyzer.analyze_market_regime(state)
# Run strategy selection
if state["market_regime"] != "unknown" or state["regime_confidence"] > 0.3:
state = await self.strategy_selector.select_strategies(state)
# Format quick results
execution_time = (datetime.now() - start_time).total_seconds() * 1000
return {
"symbol": symbol,
"analysis_type": "quick_analysis",
"market_regime": {
"regime": state["market_regime"],
"confidence": state["regime_confidence"],
"trend_strength": state["trend_strength"],
"volatility_percentile": state["volatility_percentile"],
},
"recommended_strategies": state["selected_strategies"][:3], # Top 3
"strategy_fitness": {
strategy: state["strategy_rankings"].get(strategy, 0)
for strategy in state["selected_strategies"][:3]
},
"market_conditions": state["market_conditions"],
"selection_reasoning": state["strategy_selection_reasoning"],
"execution_time_ms": execution_time,
"data_quality": {
"errors": len(state["errors_encountered"]),
"warnings": state["data_quality_issues"],
},
}
except Exception as e:
logger.error(f"Quick analysis failed for {symbol}: {e}")
return {
"symbol": symbol,
"analysis_type": "quick_analysis",
"error": str(e),
"execution_time_ms": (datetime.now() - start_time).total_seconds()
* 1000,
}
def get_workflow_status(self, state: BacktestingWorkflowState) -> dict[str, Any]:
"""Get current workflow status and progress."""
total_steps = 5 # initialize, analyze, select, optimize, validate
completed_steps = len(state.steps_completed)
return {
"workflow_status": state.workflow_status,
"current_step": state.current_step,
"progress_percentage": (completed_steps / total_steps) * 100,
"steps_completed": state.steps_completed,
"errors_count": len(state.errors_encountered),
"warnings_count": len(state.validation_warnings),
"execution_time_ms": state.total_execution_time_ms,
"recommended_strategy": state.recommended_strategy or "TBD",
"recommendation_confidence": state.recommendation_confidence,
}
```
--------------------------------------------------------------------------------
/tests/integration/test_config_management.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for configuration management features.
This module tests the integration of ToolEstimationConfig and DatabasePoolConfig
with the actual server implementation and other components. Tests verify:
- server.py correctly uses ToolEstimationConfig
- Database connections work with DatabasePoolConfig
- Configuration changes are properly applied
- Monitoring and logging functionality works end-to-end
- Real-world usage patterns work correctly
"""
import os
from unittest.mock import Mock, patch
import pytest
from sqlalchemy import create_engine, text
from maverick_mcp.config.database import (
DatabasePoolConfig,
validate_production_config,
)
from maverick_mcp.config.tool_estimation import (
get_tool_estimate,
get_tool_estimation_config,
should_alert_for_usage,
)
@pytest.mark.integration
class TestServerToolEstimationIntegration:
"""Test integration of ToolEstimationConfig with server.py."""
def test_server_imports_configuration_correctly(self):
"""Test that server.py can import and use tool estimation configuration."""
# This tests the import path used in server.py
from maverick_mcp.config.tool_estimation import (
get_tool_estimate,
get_tool_estimation_config,
should_alert_for_usage,
)
# Should work without errors
config = get_tool_estimation_config()
estimate = get_tool_estimate("get_stock_price")
should_alert, message = should_alert_for_usage("test_tool", 5, 1000)
assert config is not None
assert estimate is not None
assert isinstance(should_alert, bool)
@patch("maverick_mcp.config.tool_estimation.logger")
def test_server_logging_pattern_with_low_confidence(self, mock_logger):
"""Test the logging pattern used in server.py for low confidence estimates."""
config = get_tool_estimation_config()
# Find a tool with low confidence (< 0.8)
low_confidence_tool = None
for tool_name, estimate in config.tool_estimates.items():
if estimate.confidence < 0.8:
low_confidence_tool = tool_name
break
if low_confidence_tool:
# Simulate the server.py logging pattern
tool_estimate = get_tool_estimate(low_confidence_tool)
# This mimics the server.py code path
if tool_estimate.confidence < 0.8:
# Log the warning as server.py would
logger_extra = {
"tool_name": low_confidence_tool,
"confidence": tool_estimate.confidence,
"basis": tool_estimate.based_on.value,
"complexity": tool_estimate.complexity.value,
"estimated_llm_calls": tool_estimate.llm_calls,
"estimated_tokens": tool_estimate.total_tokens,
}
# Verify the data structure matches server.py expectations
assert "tool_name" in logger_extra
assert "confidence" in logger_extra
assert "basis" in logger_extra
assert "complexity" in logger_extra
assert "estimated_llm_calls" in logger_extra
assert "estimated_tokens" in logger_extra
# Values should be in expected formats
assert isinstance(logger_extra["confidence"], float)
assert isinstance(logger_extra["basis"], str)
assert isinstance(logger_extra["complexity"], str)
assert isinstance(logger_extra["estimated_llm_calls"], int)
assert isinstance(logger_extra["estimated_tokens"], int)
def test_server_error_handling_fallback_pattern(self):
"""Test the error handling pattern used in server.py."""
config = get_tool_estimation_config()
# Simulate the server.py error handling pattern
actual_tool_name = "nonexistent_tool"
tool_estimate = None
try:
tool_estimate = get_tool_estimate(actual_tool_name)
llm_calls = tool_estimate.llm_calls
total_tokens = tool_estimate.total_tokens
except Exception:
# Fallback to conservative defaults (server.py pattern)
fallback_estimate = config.unknown_tool_estimate
llm_calls = fallback_estimate.llm_calls
total_tokens = fallback_estimate.total_tokens
# Should have fallback values
assert llm_calls > 0
assert total_tokens > 0
assert tool_estimate == config.unknown_tool_estimate
def test_server_usage_estimates_integration(self):
"""Test integration with usage estimation as done in server.py."""
# Test known tools that should have specific estimates
test_tools = [
("get_stock_price", "simple"),
("get_rsi_analysis", "standard"),
("get_full_technical_analysis", "complex"),
("analyze_market_with_agent", "premium"),
]
for tool_name, expected_complexity in test_tools:
estimate = get_tool_estimate(tool_name)
# Verify estimate has all fields needed for server.py
assert hasattr(estimate, "llm_calls")
assert hasattr(estimate, "total_tokens")
assert hasattr(estimate, "confidence")
assert hasattr(estimate, "based_on")
assert hasattr(estimate, "complexity")
# Verify complexity matches expectations
assert expected_complexity in estimate.complexity.value.lower()
# Verify estimates are reasonable for usage tracking
if expected_complexity == "simple":
assert estimate.llm_calls <= 1
elif expected_complexity == "premium":
assert estimate.llm_calls >= 8
@pytest.mark.integration
class TestDatabasePoolConfigIntegration:
"""Test integration of DatabasePoolConfig with database operations."""
def test_database_config_with_real_sqlite(self):
"""Test database configuration with real SQLite database."""
# Use SQLite for testing (no external dependencies)
database_url = "sqlite:///test_integration.db"
config = DatabasePoolConfig(
pool_size=5,
max_overflow=2,
pool_timeout=30,
pool_recycle=3600,
max_database_connections=20,
expected_concurrent_users=3,
connections_per_user=1.0,
)
# Create engine with configuration
engine_kwargs = {
"url": database_url,
**config.get_pool_kwargs(),
}
# Remove poolclass for SQLite (not applicable)
if "sqlite" in database_url:
engine_kwargs.pop("poolclass", None)
engine = create_engine(**engine_kwargs)
try:
# Test connection
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
assert result.scalar() == 1
# Test monitoring setup (should not error)
config.setup_pool_monitoring(engine)
finally:
engine.dispose()
# Clean up test database
if os.path.exists("test_integration.db"):
os.remove("test_integration.db")
@patch.dict(
os.environ,
{
"DB_POOL_SIZE": "8",
"DB_MAX_OVERFLOW": "4",
"DB_POOL_TIMEOUT": "45",
},
)
def test_config_respects_environment_variables(self):
"""Test that configuration respects environment variables in integration."""
config = DatabasePoolConfig()
# Should use environment variable values
assert config.pool_size == 8
assert config.max_overflow == 4
assert config.pool_timeout == 45
def test_legacy_compatibility_integration(self):
"""Test legacy DatabaseConfig compatibility in real usage."""
from maverick_mcp.providers.interfaces.persistence import DatabaseConfig
# Create enhanced config
enhanced_config = DatabasePoolConfig(
pool_size=10,
max_overflow=5,
pool_timeout=30,
pool_recycle=1800,
)
# Convert to legacy format
database_url = "sqlite:///test_legacy.db"
legacy_config = enhanced_config.to_legacy_config(database_url)
# Should be usable with existing code patterns
assert isinstance(legacy_config, DatabaseConfig)
assert legacy_config.database_url == database_url
assert legacy_config.pool_size == 10
assert legacy_config.max_overflow == 5
def test_production_validation_integration(self):
"""Test production validation with realistic configurations."""
# Test development config - should warn but not fail
dev_config = DatabasePoolConfig(
pool_size=5,
max_overflow=2,
pool_timeout=30,
pool_recycle=3600,
)
with patch("maverick_mcp.config.database.logger") as mock_logger:
result = validate_production_config(dev_config)
assert result is True # Should pass with warnings
# Should have logged warnings about small pool size
assert mock_logger.warning.called
# Test production config - should pass without warnings
prod_config = DatabasePoolConfig(
pool_size=25,
max_overflow=15,
pool_timeout=30,
pool_recycle=3600,
)
with patch("maverick_mcp.config.database.logger") as mock_logger:
result = validate_production_config(prod_config)
assert result is True
# Should have passed without warnings
info_call = mock_logger.info.call_args[0][0]
assert "validation passed" in info_call.lower()
@pytest.mark.integration
class TestConfigurationMonitoring:
"""Test monitoring and alerting integration."""
def test_tool_estimation_alerting_integration(self):
"""Test tool estimation alerting with realistic usage patterns."""
get_tool_estimation_config()
# Test scenarios that should trigger alerts
alert_scenarios = [
# High LLM usage
("get_stock_price", 10, 1000, "should alert on unexpected LLM usage"),
# High token usage
("calculate_sma", 1, 50000, "should alert on excessive tokens"),
# Both high
("get_market_hours", 20, 40000, "should alert on both metrics"),
]
for tool_name, llm_calls, tokens, description in alert_scenarios:
should_alert, message = should_alert_for_usage(tool_name, llm_calls, tokens)
assert should_alert, f"Failed: {description}"
assert len(message) > 0, f"Alert message should not be empty: {description}"
assert "Critical" in message or "Warning" in message
def test_database_pool_monitoring_integration(self):
"""Test database pool monitoring integration."""
config = DatabasePoolConfig(pool_size=10, echo_pool=True)
# Create mock engine to test monitoring
mock_engine = Mock()
mock_pool = Mock()
mock_engine.pool = mock_pool
# Test different pool usage scenarios
scenarios = [
(5, "normal usage", False, False), # 50% usage
(8, "warning usage", True, False), # 80% usage
(10, "critical usage", True, True), # 100% usage
]
with patch("maverick_mcp.config.database.event") as mock_event:
config.setup_pool_monitoring(mock_engine)
# Get the connect listener function
connect_listener = None
for call in mock_event.listens_for.call_args_list:
if call[0][1] == "connect":
connect_listener = call[0][2]
break
assert connect_listener is not None
# Test each scenario
for checked_out, _description, should_warn, should_error in scenarios:
mock_pool.checkedout.return_value = checked_out
mock_pool.checkedin.return_value = 10 - checked_out
with patch("maverick_mcp.config.database.logger") as mock_logger:
connect_listener(None, None)
if should_warn:
mock_logger.warning.assert_called()
if should_error:
mock_logger.error.assert_called()
def test_configuration_logging_integration(self):
"""Test that configuration logging works correctly."""
with patch("maverick_mcp.config.database.logger") as mock_logger:
DatabasePoolConfig(
pool_size=15,
max_overflow=8,
expected_concurrent_users=20,
connections_per_user=1.2,
max_database_connections=100,
)
# Should have logged configuration summary
assert mock_logger.info.called
log_message = mock_logger.info.call_args[0][0]
assert "Database pool configured" in log_message
assert "pool_size=15" in log_message
@pytest.mark.integration
class TestRealWorldIntegrationScenarios:
"""Test realistic integration scenarios."""
def test_microservice_deployment_scenario(self):
"""Test configuration for microservice deployment."""
# Simulate microservice environment
with patch.dict(
os.environ,
{
"DB_POOL_SIZE": "8",
"DB_MAX_OVERFLOW": "4",
"DB_MAX_CONNECTIONS": "50",
"DB_EXPECTED_CONCURRENT_USERS": "10",
"ENVIRONMENT": "production",
},
):
# Get configuration from environment
db_config = DatabasePoolConfig()
# Should be suitable for microservice
assert db_config.pool_size == 8
assert db_config.max_overflow == 4
assert db_config.expected_concurrent_users == 10
# Should pass production validation
assert validate_production_config(db_config) is True
# Test tool estimation in this context
get_tool_estimation_config()
# Should handle typical microservice tools
api_tools = [
"get_stock_price",
"get_company_info",
"get_rsi_analysis",
"fetch_stock_data",
]
for tool in api_tools:
estimate = get_tool_estimate(tool)
assert estimate is not None
assert estimate.confidence > 0.0
def test_development_environment_scenario(self):
"""Test configuration for development environment."""
# Simulate development environment
with patch.dict(
os.environ,
{
"DB_POOL_SIZE": "3",
"DB_MAX_OVERFLOW": "1",
"DB_ECHO_POOL": "true",
"ENVIRONMENT": "development",
},
):
db_config = DatabasePoolConfig()
# Should use development-friendly settings
assert db_config.pool_size == 3
assert db_config.max_overflow == 1
assert db_config.echo_pool is True
# Should handle development testing
get_tool_estimation_config()
# Should provide estimates for development tools
dev_tools = ["generate_dev_token", "clear_cache", "get_cached_price_data"]
for tool in dev_tools:
estimate = get_tool_estimate(tool)
assert estimate.complexity.value in ["simple", "standard"]
def test_high_traffic_scenario(self):
"""Test configuration for high traffic scenario."""
# High traffic configuration
db_config = DatabasePoolConfig(
pool_size=50,
max_overflow=30,
expected_concurrent_users=100,
connections_per_user=1.2,
max_database_connections=200,
)
# Should handle the expected load
total_capacity = db_config.pool_size + db_config.max_overflow
expected_demand = (
db_config.expected_concurrent_users * db_config.connections_per_user
)
assert total_capacity >= expected_demand
# Should pass production validation
assert validate_production_config(db_config) is True
# Test tool estimation for high-usage tools
high_usage_tools = [
"get_full_technical_analysis",
"analyze_market_with_agent",
"get_all_screening_recommendations",
]
for tool in high_usage_tools:
estimate = get_tool_estimate(tool)
# Should have monitoring in place for expensive tools
should_alert, _ = should_alert_for_usage(
tool,
estimate.llm_calls * 2, # Double the expected usage
estimate.total_tokens * 2,
)
assert should_alert # Should trigger alerts for high usage
def test_configuration_change_propagation(self):
"""Test that configuration changes propagate correctly."""
# Start with one configuration
original_config = get_tool_estimation_config()
original_estimate = get_tool_estimate("get_stock_price")
# Configuration should be singleton
new_config = get_tool_estimation_config()
assert new_config is original_config
# Estimates should be consistent
new_estimate = get_tool_estimate("get_stock_price")
assert new_estimate == original_estimate
def test_error_recovery_integration(self):
"""Test error recovery in integrated scenarios."""
# Test database connection failure recovery
config = DatabasePoolConfig(
pool_size=5,
max_overflow=2,
pool_timeout=1, # Short timeout for testing
)
# Should handle connection errors gracefully
try:
# This would fail in a real scenario with invalid URL
engine_kwargs = config.get_pool_kwargs()
assert "pool_size" in engine_kwargs
except Exception:
# Should not prevent configuration from working
assert config.pool_size == 5
def test_monitoring_data_collection(self):
"""Test that monitoring data can be collected for analysis."""
tool_config = get_tool_estimation_config()
# Collect monitoring data
stats = tool_config.get_summary_stats()
# Should provide useful metrics
assert "total_tools" in stats
assert "by_complexity" in stats
assert "avg_confidence" in stats
# Should be suitable for monitoring dashboards
assert stats["total_tools"] > 0
assert 0 <= stats["avg_confidence"] <= 1
# Complexity distribution should make sense
complexity_counts = stats["by_complexity"]
total_by_complexity = sum(complexity_counts.values())
assert total_by_complexity == stats["total_tools"]
def test_configuration_validation_end_to_end(self):
"""Test end-to-end configuration validation."""
# Test complete validation pipeline
# 1. Tool estimation configuration
tool_config = get_tool_estimation_config()
assert (
len(tool_config.tool_estimates) > 20
) # Should have substantial tool coverage
# 2. Database configuration
db_config = DatabasePoolConfig(
pool_size=20,
max_overflow=10,
expected_concurrent_users=25,
connections_per_user=1.2,
max_database_connections=100,
)
# 3. Production readiness
assert validate_production_config(db_config) is True
# 4. Integration compatibility
legacy_config = db_config.to_legacy_config("postgresql://test")
enhanced_again = DatabasePoolConfig.from_legacy_config(legacy_config)
assert enhanced_again.pool_size == db_config.pool_size
# 5. Monitoring setup
thresholds = db_config.get_monitoring_thresholds()
assert thresholds["warning_threshold"] > 0
assert thresholds["critical_threshold"] > thresholds["warning_threshold"]
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/optimized_stock_data.py:
--------------------------------------------------------------------------------
```python
"""
Optimized stock data provider with performance enhancements.
This module provides enhanced stock data access with:
- Request-level caching for expensive operations
- Optimized database queries with proper indexing
- Connection pooling and query monitoring
- Smart cache invalidation strategies
"""
import logging
from collections.abc import Awaitable, Callable
from datetime import datetime
from typing import Any
import pandas as pd
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from maverick_mcp.data.models import (
MaverickStocks,
PriceCache,
Stock,
)
from maverick_mcp.data.performance import (
cached,
monitored_db_session,
query_optimizer,
request_cache,
)
logger = logging.getLogger(__name__)
class OptimizedStockDataProvider:
"""
Performance-optimized stock data provider.
This provider implements:
- Smart caching strategies for different data types
- Optimized database queries with minimal N+1 issues
- Connection pooling and query monitoring
- Efficient bulk operations for large datasets
"""
def __init__(self):
self.cache_ttl_stock_data = 3600 # 1 hour for stock data
self.cache_ttl_screening = 7200 # 2 hours for screening results
self.cache_ttl_market_data = 300 # 5 minutes for real-time data
@cached(data_type="stock_data", ttl=3600)
@query_optimizer.monitor_query("get_stock_basic_info")
async def get_stock_basic_info(self, symbol: str) -> dict[str, Any] | None:
"""
Get basic stock information with caching.
Args:
symbol: Stock ticker symbol
Returns:
Stock information dictionary or None if not found
"""
async with monitored_db_session("get_stock_basic_info") as session:
async_session: AsyncSession = session
stmt = select(Stock).where(Stock.ticker_symbol == symbol.upper())
result = await async_session.execute(stmt)
stock = result.scalars().first()
if stock:
return {
"symbol": stock.ticker_symbol,
"name": stock.company_name,
"sector": stock.sector,
"industry": stock.industry,
"exchange": stock.exchange,
"country": stock.country,
"currency": stock.currency,
}
return None
@cached(data_type="stock_data", ttl=1800)
@query_optimizer.monitor_query("get_stock_price_data")
async def get_stock_price_data(
self,
symbol: str,
start_date: str,
end_date: str | None = None,
use_optimized_query: bool = True,
) -> pd.DataFrame:
"""
Get stock price data with optimized queries and caching.
Args:
symbol: Stock ticker symbol
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
use_optimized_query: Use optimized query with proper indexing
Returns:
DataFrame with OHLCV data
"""
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
async with monitored_db_session("get_stock_price_data") as session:
async_session: AsyncSession = session
if use_optimized_query:
# Optimized query using the composite index (stock_id, date)
query = text(
"""
SELECT
pc.date,
pc.open_price as "open",
pc.high_price as "high",
pc.low_price as "low",
pc.close_price as "close",
pc.volume
FROM stocks_pricecache pc
INNER JOIN stocks_stock s ON pc.stock_id = s.stock_id
WHERE s.ticker_symbol = :symbol
AND pc.date >= :start_date::date
AND pc.date <= :end_date::date
ORDER BY pc.date
"""
)
result = await async_session.execute(
query,
{
"symbol": symbol.upper(),
"start_date": start_date,
"end_date": end_date,
},
)
rows = result.fetchall()
column_index = pd.Index([str(key) for key in result.keys()])
df = pd.DataFrame(rows, columns=column_index)
else:
# Traditional SQLAlchemy query (for comparison)
stmt = (
select(
PriceCache.date,
PriceCache.open_price.label("open"),
PriceCache.high_price.label("high"),
PriceCache.low_price.label("low"),
PriceCache.close_price.label("close"),
PriceCache.volume,
)
.join(Stock)
.where(
Stock.ticker_symbol == symbol.upper(),
PriceCache.date >= pd.to_datetime(start_date).date(),
PriceCache.date <= pd.to_datetime(end_date).date(),
)
.order_by(PriceCache.date)
)
result = await async_session.execute(stmt)
rows = result.fetchall()
column_index = pd.Index([str(key) for key in result.keys()])
df = pd.DataFrame(rows, columns=column_index)
if not df.empty:
df["date"] = pd.to_datetime(df["date"])
df.set_index("date", inplace=True)
# Convert decimal types to float for performance
for col in ["open", "high", "low", "close"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
df["volume"] = pd.to_numeric(df["volume"], errors="coerce")
return df
@cached(data_type="screening", ttl=7200)
@query_optimizer.monitor_query("get_maverick_recommendations")
async def get_maverick_recommendations(
self,
limit: int = 50,
min_score: float | None = None,
use_optimized_query: bool = True,
) -> list[dict[str, Any]]:
"""
Get Maverick bullish recommendations with performance optimizations.
Args:
limit: Maximum number of results
min_score: Minimum score threshold
use_optimized_query: Use optimized query with proper indexing
Returns:
List of recommendation dictionaries
"""
async with monitored_db_session("get_maverick_recommendations") as session:
async_session: AsyncSession = session
if use_optimized_query:
# Use raw SQL with optimized indexes
where_clause = ""
params: dict[str, Any] = {"limit": limit}
if min_score is not None:
where_clause = "WHERE ms.combined_score >= :min_score"
params["min_score"] = min_score
query = text(
f"""
SELECT
s.ticker_symbol,
s.company_name,
s.sector,
s.industry,
ms.combined_score AS score,
ms.pattern_detected AS rank,
ms.date_analyzed,
ms.analysis_data
FROM stocks_maverickstocks ms
INNER JOIN stocks_stock s ON ms.stock_id = s.stock_id
{where_clause}
ORDER BY ms.combined_score DESC, ms.pattern_detected ASC
LIMIT :limit
"""
)
result = await async_session.execute(query, params)
rows = result.fetchall()
return [
{
"symbol": row.ticker_symbol,
"name": row.company_name,
"sector": row.sector,
"industry": row.industry,
"score": float(getattr(row, "score", 0) or 0),
"rank": getattr(row, "rank", None),
"date_analyzed": (
row.date_analyzed.isoformat() if row.date_analyzed else None
),
"analysis_data": getattr(row, "analysis_data", None),
}
for row in rows
]
else:
# Traditional SQLAlchemy query with eager loading
stmt = (
select(MaverickStocks)
.options(joinedload(MaverickStocks.stock))
.order_by(
MaverickStocks.combined_score.desc(),
MaverickStocks.pattern_detected.asc(),
)
.limit(limit)
)
if min_score is not None:
stmt = stmt.where(MaverickStocks.combined_score >= min_score)
result = await async_session.execute(stmt)
recommendations = result.scalars().all()
formatted: list[dict[str, Any]] = []
for rec in recommendations:
stock = getattr(rec, "stock", None)
analysis_date = getattr(rec, "date_analyzed", None)
isoformatted = (
analysis_date.isoformat()
if analysis_date is not None
and hasattr(analysis_date, "isoformat")
else None
)
formatted.append(
{
"symbol": getattr(stock, "ticker_symbol", None),
"name": getattr(stock, "company_name", None),
"sector": getattr(stock, "sector", None),
"industry": getattr(stock, "industry", None),
"score": float(getattr(rec, "combined_score", 0) or 0),
"rank": getattr(rec, "pattern_detected", None),
"date_analyzed": isoformatted,
"analysis_data": getattr(rec, "analysis_data", None),
}
)
return formatted
@cached(data_type="screening", ttl=7200)
@query_optimizer.monitor_query("get_trending_recommendations")
async def get_trending_recommendations(
self,
limit: int = 50,
min_momentum_score: float | None = None,
) -> list[dict[str, Any]]:
"""
Get trending supply/demand breakout recommendations with optimized queries.
Args:
limit: Maximum number of results
min_momentum_score: Minimum momentum score threshold
Returns:
List of recommendation dictionaries
"""
async with monitored_db_session("get_trending_recommendations") as session:
async_session: AsyncSession = session
# Use optimized raw SQL query
where_clause = ""
params: dict[str, Any] = {"limit": limit}
if min_momentum_score is not None:
where_clause = "WHERE ms.momentum_score >= :min_momentum_score"
params["min_momentum_score"] = min_momentum_score
query = text(
f"""
SELECT
s.ticker_symbol,
s.company_name,
s.sector,
s.industry,
ms.momentum_score,
ms.stage,
ms.date_analyzed,
ms.analysis_data
FROM stocks_supply_demand_breakouts ms
INNER JOIN stocks_stock s ON ms.stock_id = s.stock_id
{where_clause}
ORDER BY ms.momentum_score DESC
LIMIT :limit
"""
)
result = await async_session.execute(query, params)
rows = result.fetchall()
return [
{
"symbol": row.ticker_symbol,
"name": row.company_name,
"sector": row.sector,
"industry": row.industry,
"momentum_score": (
float(row.momentum_score) if row.momentum_score else 0
),
"stage": row.stage,
"date_analyzed": (
row.date_analyzed.isoformat() if row.date_analyzed else None
),
"analysis_data": row.analysis_data,
}
for row in rows
]
@cached(data_type="market_data", ttl=300)
@query_optimizer.monitor_query("get_high_volume_stocks")
async def get_high_volume_stocks(
self,
date: str | None = None,
limit: int = 100,
min_volume: int = 1000000,
) -> list[dict[str, Any]]:
"""
Get high volume stocks for a specific date with optimized query.
Args:
date: Date to filter (default: latest available)
limit: Maximum number of results
min_volume: Minimum volume threshold
Returns:
List of high volume stock data
"""
if not date:
date = datetime.now().strftime("%Y-%m-%d")
async with monitored_db_session("get_high_volume_stocks") as session:
async_session: AsyncSession = session
# Use optimized query with volume index
query = text(
"""
SELECT
s.ticker_symbol,
s.company_name,
s.sector,
pc.volume,
pc.close_price,
pc.date
FROM stocks_pricecache pc
INNER JOIN stocks_stock s ON pc.stock_id = s.stock_id
WHERE pc.date = :date::date
AND pc.volume >= :min_volume
ORDER BY pc.volume DESC
LIMIT :limit
"""
)
result = await async_session.execute(
query,
{
"date": date,
"min_volume": min_volume,
"limit": limit,
},
)
rows = result.fetchall()
return [
{
"symbol": row.ticker_symbol,
"name": row.company_name,
"sector": row.sector,
"volume": int(row.volume) if row.volume else 0,
"close_price": float(row.close_price) if row.close_price else 0,
"date": row.date.isoformat() if row.date else None,
}
for row in rows
]
@query_optimizer.monitor_query("bulk_get_stock_data")
async def bulk_get_stock_data(
self,
symbols: list[str],
start_date: str,
end_date: str | None = None,
) -> dict[str, pd.DataFrame]:
"""
Efficiently fetch stock data for multiple symbols using bulk operations.
Args:
symbols: List of stock symbols
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
Returns:
Dictionary mapping symbols to DataFrames
"""
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
# Convert symbols to uppercase for consistency
symbols = [s.upper() for s in symbols]
async with monitored_db_session("bulk_get_stock_data") as session:
async_session: AsyncSession = session
# Use bulk query with IN clause for efficiency
query = text(
"""
SELECT
s.ticker_symbol,
pc.date,
pc.open_price as "open",
pc.high_price as "high",
pc.low_price as "low",
pc.close_price as "close",
pc.volume
FROM stocks_pricecache pc
INNER JOIN stocks_stock s ON pc.stock_id = s.stock_id
WHERE s.ticker_symbol = ANY(:symbols)
AND pc.date >= :start_date::date
AND pc.date <= :end_date::date
ORDER BY s.ticker_symbol, pc.date
"""
)
result = await async_session.execute(
query,
{
"symbols": symbols,
"start_date": start_date,
"end_date": end_date,
},
)
# Group results by symbol
symbol_data = {}
for row in result.fetchall():
symbol = row.ticker_symbol
if symbol not in symbol_data:
symbol_data[symbol] = []
symbol_data[symbol].append(
{
"date": row.date,
"open": row.open,
"high": row.high,
"low": row.low,
"close": row.close,
"volume": row.volume,
}
)
# Convert to DataFrames
result_dfs = {}
for symbol in symbols:
if symbol in symbol_data:
df = pd.DataFrame(symbol_data[symbol])
df["date"] = pd.to_datetime(df["date"])
df.set_index("date", inplace=True)
# Convert decimal types to float
for col in ["open", "high", "low", "close"]:
df[col] = pd.to_numeric(df[col], errors="coerce")
df["volume"] = pd.to_numeric(df["volume"], errors="coerce")
result_dfs[symbol] = df
else:
# Return empty DataFrame for missing symbols
result_dfs[symbol] = pd.DataFrame(
columns=pd.Index(["open", "high", "low", "close", "volume"])
)
return result_dfs
async def invalidate_cache_for_symbol(self, symbol: str) -> None:
"""
Invalidate all cached data for a specific symbol.
Args:
symbol: Stock symbol to invalidate
"""
invalidate_basic_info: Callable[[str], Awaitable[None]] | None = getattr(
self.get_stock_basic_info, "invalidate_cache", None
)
if invalidate_basic_info is not None:
await invalidate_basic_info(symbol)
# Invalidate stock price data (pattern-based)
await request_cache.delete_pattern(
f"cache:*get_stock_price_data*{symbol.upper()}*"
)
logger.info(f"Cache invalidated for symbol: {symbol}")
async def invalidate_screening_cache(self) -> None:
"""Invalidate all screening-related cache."""
patterns = [
"cache:*get_maverick_recommendations*",
"cache:*get_trending_recommendations*",
"cache:*get_high_volume_stocks*",
]
for pattern in patterns:
await request_cache.delete_pattern(pattern)
logger.info("Screening cache invalidated")
async def get_performance_metrics(self) -> dict[str, Any]:
"""Get performance metrics for the optimized provider."""
return {
"cache_metrics": request_cache.get_metrics(),
"query_stats": query_optimizer.get_query_stats(),
"cache_ttl_config": {
"stock_data": self.cache_ttl_stock_data,
"screening": self.cache_ttl_screening,
"market_data": self.cache_ttl_market_data,
},
}
# Global instance
optimized_stock_provider = OptimizedStockDataProvider()
```
--------------------------------------------------------------------------------
/tests/test_optimized_research_agent.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive test suite for OptimizedDeepResearchAgent.
Tests the core functionality of the optimized research agent including:
- Model selection logic
- Token budgeting
- Confidence tracking
- Content filtering
- Parallel processing
- Error handling
"""
import time
from unittest.mock import AsyncMock, Mock, patch
import pytest
from maverick_mcp.agents.optimized_research import (
OptimizedContentAnalyzer,
OptimizedDeepResearchAgent,
create_optimized_research_agent,
)
from maverick_mcp.providers.openrouter_provider import OpenRouterProvider, TaskType
from maverick_mcp.utils.llm_optimization import (
AdaptiveModelSelector,
ConfidenceTracker,
ModelConfiguration,
ProgressiveTokenBudgeter,
)
class TestOptimizedContentAnalyzer:
"""Test the OptimizedContentAnalyzer component."""
@pytest.fixture
def mock_openrouter(self):
"""Create a mock OpenRouter provider."""
provider = Mock(spec=OpenRouterProvider)
provider.get_llm = Mock()
return provider
@pytest.fixture
def analyzer(self, mock_openrouter):
"""Create an OptimizedContentAnalyzer instance."""
return OptimizedContentAnalyzer(mock_openrouter)
@pytest.mark.asyncio
async def test_analyze_content_optimized_success(self, analyzer, mock_openrouter):
"""Test successful optimized content analysis."""
# Setup mock LLM response
mock_llm = AsyncMock()
mock_response = Mock()
mock_response.content = '{"insights": ["Test insight"], "sentiment": {"direction": "bullish", "confidence": 0.8}}'
mock_llm.ainvoke.return_value = mock_response
mock_openrouter.get_llm.return_value = mock_llm
# Test analysis
result = await analyzer.analyze_content_optimized(
content="Test financial content about stocks",
persona="moderate",
analysis_focus="market_analysis",
time_budget_seconds=30.0,
current_confidence=0.5,
)
# Verify results
assert result["insights"] == ["Test insight"]
assert result["sentiment"]["direction"] == "bullish"
assert result["sentiment"]["confidence"] == 0.8
assert result["optimization_applied"] is True
assert "model_used" in result
assert "execution_time" in result
@pytest.mark.asyncio
async def test_analyze_empty_content(self, analyzer):
"""Test handling of empty content."""
result = await analyzer.analyze_content_optimized(
content="",
persona="moderate",
analysis_focus="general",
time_budget_seconds=30.0,
)
assert result["empty_content"] is True
assert result["insights"] == []
assert result["sentiment"]["direction"] == "neutral"
assert result["sentiment"]["confidence"] == 0.0
@pytest.mark.asyncio
async def test_analyze_with_timeout(self, analyzer, mock_openrouter):
"""Test timeout handling during analysis."""
# Setup mock to timeout
mock_llm = AsyncMock()
mock_llm.ainvoke.side_effect = TimeoutError("Analysis timeout")
mock_openrouter.get_llm.return_value = mock_llm
result = await analyzer.analyze_content_optimized(
content="Test content",
persona="aggressive",
analysis_focus="technical",
time_budget_seconds=5.0,
)
# Should return fallback analysis
assert "insights" in result
assert "sentiment" in result
assert result["sentiment"]["direction"] in ["bullish", "bearish", "neutral"]
@pytest.mark.asyncio
async def test_batch_analyze_content(self, analyzer, mock_openrouter):
"""Test batch content analysis with parallel processing."""
# Setup mock parallel processor
with patch.object(
analyzer.parallel_processor,
"parallel_content_analysis",
new_callable=AsyncMock,
) as mock_parallel:
mock_results = [
{
"analysis": {
"insights": ["Insight 1"],
"sentiment": {"direction": "bullish", "confidence": 0.7},
}
},
{
"analysis": {
"insights": ["Insight 2"],
"sentiment": {"direction": "neutral", "confidence": 0.6},
}
},
]
mock_parallel.return_value = mock_results
sources = [
{"content": "Source 1 content", "url": "http://example1.com"},
{"content": "Source 2 content", "url": "http://example2.com"},
]
results = await analyzer.batch_analyze_content(
sources=sources,
persona="moderate",
analysis_type="fundamental",
time_budget_seconds=60.0,
current_confidence=0.5,
)
assert len(results) == 2
assert results[0]["analysis"]["insights"] == ["Insight 1"]
assert results[1]["analysis"]["sentiment"]["direction"] == "neutral"
class TestOptimizedDeepResearchAgent:
"""Test the main OptimizedDeepResearchAgent."""
@pytest.fixture
def mock_openrouter(self):
"""Create a mock OpenRouter provider."""
provider = Mock(spec=OpenRouterProvider)
provider.get_llm = Mock(return_value=AsyncMock())
return provider
@pytest.fixture
def mock_search_provider(self):
"""Create a mock search provider."""
provider = AsyncMock()
provider.search = AsyncMock(
return_value=[
{
"title": "Test Result 1",
"url": "http://example1.com",
"content": "Financial analysis content",
},
{
"title": "Test Result 2",
"url": "http://example2.com",
"content": "Market research content",
},
]
)
return provider
@pytest.fixture
def agent(self, mock_openrouter, mock_search_provider):
"""Create an OptimizedDeepResearchAgent instance."""
agent = OptimizedDeepResearchAgent(
openrouter_provider=mock_openrouter,
persona="moderate",
optimization_enabled=True,
)
# Add mock search provider
agent.search_providers = [mock_search_provider]
# Initialize confidence tracker for tests that need it
agent.confidence_tracker = ConfidenceTracker()
return agent
@pytest.mark.asyncio
async def test_research_comprehensive_success(
self, agent, mock_search_provider, mock_openrouter
):
"""Test successful comprehensive research."""
# Setup mock LLM for synthesis
mock_llm = AsyncMock()
mock_response = Mock()
mock_response.content = "Comprehensive synthesis of research findings."
mock_llm.ainvoke.return_value = mock_response
mock_openrouter.get_llm.return_value = mock_llm
# Mock analysis phase to return analyzed sources
async def mock_analysis_phase(*args, **kwargs):
return {
"analyzed_sources": [
{
"title": "AAPL Analysis Report",
"url": "http://example.com",
"analysis": {
"insights": ["Key insight"],
"sentiment": {"direction": "bullish", "confidence": 0.8},
"credibility_score": 0.9,
"relevance_score": 0.85,
"optimization_applied": True,
},
},
{
"title": "Technical Analysis AAPL",
"url": "http://example2.com",
"analysis": {
"insights": ["Technical insight"],
"sentiment": {"direction": "bullish", "confidence": 0.7},
"credibility_score": 0.8,
"relevance_score": 0.8,
"optimization_applied": True,
},
},
],
"final_confidence": 0.8,
"early_terminated": False,
"processing_mode": "optimized",
}
with patch.object(
agent, "_optimized_analysis_phase", new_callable=AsyncMock
) as mock_analysis:
mock_analysis.side_effect = mock_analysis_phase
result = await agent.research_comprehensive(
topic="AAPL stock analysis",
session_id="test_session",
depth="standard",
focus_areas=["fundamental", "technical"],
timeframe="30d",
time_budget_seconds=120.0,
target_confidence=0.75,
)
# Verify successful research
assert result["status"] == "success"
assert result["agent_type"] == "optimized_deep_research"
assert result["optimization_enabled"] is True
assert result["research_topic"] == "AAPL stock analysis"
assert result["sources_analyzed"] > 0
assert "findings" in result
assert "citations" in result
assert "optimization_metrics" in result
@pytest.mark.asyncio
async def test_research_with_no_providers(self, mock_openrouter):
"""Test research when no search providers are configured."""
agent = OptimizedDeepResearchAgent(
openrouter_provider=mock_openrouter,
optimization_enabled=True,
)
agent.search_providers = [] # No providers
result = await agent.research_comprehensive(
topic="Test topic",
session_id="test_session",
time_budget_seconds=60.0,
)
assert "error" in result
assert "no search providers configured" in result["error"].lower()
@pytest.mark.asyncio
async def test_research_with_early_termination(
self, agent, mock_search_provider, mock_openrouter
):
"""Test early termination based on confidence threshold."""
# Mock the entire analysis phase to return early termination
async def mock_analysis_phase(*args, **kwargs):
return {
"analyzed_sources": [
{
"title": "Mock Source",
"url": "http://example.com",
"analysis": {
"insights": ["High confidence insight"],
"sentiment": {"direction": "bullish", "confidence": 0.95},
"credibility_score": 0.95,
"relevance_score": 0.9,
},
}
],
"final_confidence": 0.92,
"early_terminated": True,
"termination_reason": "confidence_threshold_reached",
"processing_mode": "optimized",
}
with patch.object(
agent, "_optimized_analysis_phase", new_callable=AsyncMock
) as mock_analysis:
mock_analysis.side_effect = mock_analysis_phase
result = await agent.research_comprehensive(
topic="Quick research test",
session_id="test_session",
time_budget_seconds=120.0,
target_confidence=0.9,
)
assert result["findings"]["early_terminated"] is True
assert (
result["findings"]["termination_reason"]
== "confidence_threshold_reached"
)
@pytest.mark.asyncio
async def test_research_emergency_response(self, agent, mock_search_provider):
"""Test emergency response when time is critically low."""
# Test with very short time budget
result = agent._create_emergency_response(
topic="Emergency test",
search_results={"filtered_sources": [{"title": "Source 1"}]},
start_time=time.time() - 1, # 1 second ago
)
assert result["status"] == "partial_success"
assert result["emergency_mode"] is True
assert "Emergency mode" in result["findings"]["synthesis"]
assert result["findings"]["confidence_score"] == 0.3
class TestModelSelectionLogic:
"""Test the adaptive model selection logic."""
@pytest.fixture
def model_selector(self):
"""Create a model selector with mock provider."""
provider = Mock(spec=OpenRouterProvider)
return AdaptiveModelSelector(provider)
def test_calculate_task_complexity(self, model_selector):
"""Test task complexity calculation."""
# Create content with financial complexity indicators
content = (
"""
This comprehensive financial analysis examines EBITDA, DCF valuation, and ROIC metrics.
The company shows strong quarterly YoY growth with bullish sentiment from analysts.
Technical analysis indicates RSI oversold conditions with MACD crossover signals.
Support levels at $150 with resistance at $200. Volatility and beta measures suggest
the stock outperforms relative to market. The Sharpe ratio indicates favorable
risk-adjusted returns versus comparable companies in Q4 results.
"""
* 20
) # Repeat to increase complexity
complexity = model_selector.calculate_task_complexity(
content, TaskType.DEEP_RESEARCH, ["fundamental", "technical"]
)
assert 0 <= complexity <= 1
assert complexity > 0.1 # Should show some complexity with financial terms
def test_select_model_for_time_budget(self, model_selector):
"""Test model selection based on time constraints."""
# Test with short time budget - should select fast model
config = model_selector.select_model_for_time_budget(
task_type=TaskType.QUICK_ANSWER,
time_remaining_seconds=10.0,
complexity_score=0.3,
content_size_tokens=100,
current_confidence=0.5,
)
assert isinstance(config, ModelConfiguration)
assert (
config.timeout_seconds <= 15.0
) # Allow some flexibility for emergency models
assert config.model_id is not None
# Test with long time budget - can select quality model
config_long = model_selector.select_model_for_time_budget(
task_type=TaskType.DEEP_RESEARCH,
time_remaining_seconds=300.0,
complexity_score=0.8,
content_size_tokens=5000,
current_confidence=0.3,
)
assert config_long.timeout_seconds > config.timeout_seconds
assert config_long.max_tokens >= config.max_tokens
class TestTokenBudgetingAndConfidence:
"""Test token budgeting and confidence tracking."""
def test_progressive_token_budgeter(self):
"""Test progressive token budget allocation."""
budgeter = ProgressiveTokenBudgeter(
total_time_budget_seconds=120.0, confidence_target=0.8
)
# Test initial allocation
allocation = budgeter.get_next_allocation(
sources_remaining=10,
current_confidence=0.3,
time_elapsed_seconds=10.0,
)
assert allocation["time_budget"] > 0
assert allocation["max_tokens"] > 0
assert allocation["priority"] in ["low", "medium", "high"]
# Test with higher confidence
allocation_high = budgeter.get_next_allocation(
sources_remaining=5,
current_confidence=0.7,
time_elapsed_seconds=60.0,
)
# With fewer sources and higher confidence, priority should be lower or equal
assert allocation_high["priority"] in ["low", "medium"]
# The high confidence scenario should have lower or equal priority
priority_order = {"low": 0, "medium": 1, "high": 2}
assert (
priority_order[allocation_high["priority"]]
<= priority_order[allocation["priority"]]
)
def test_confidence_tracker(self):
"""Test confidence tracking and early termination."""
tracker = ConfidenceTracker(
target_confidence=0.8, min_sources=3, max_sources=20
)
# Test confidence updates
analysis = {
"sentiment": {"confidence": 0.7},
"insights": ["insight1", "insight2"],
}
update = tracker.update_confidence(analysis, credibility_score=0.8)
assert "current_confidence" in update
assert "should_continue" in update
assert update["sources_analyzed"] == 1
# Test minimum sources requirement
for _i in range(2):
update = tracker.update_confidence(analysis, credibility_score=0.9)
# Should continue even with high confidence if min sources not met
if tracker.sources_analyzed < tracker.min_sources:
assert update["should_continue"] is True
class TestErrorHandlingAndRecovery:
"""Test error handling and recovery mechanisms."""
@pytest.mark.asyncio
async def test_search_timeout_handling(self):
"""Test handling of search provider timeouts."""
agent = OptimizedDeepResearchAgent(
openrouter_provider=Mock(spec=OpenRouterProvider),
optimization_enabled=True,
)
# Mock search provider that times out
mock_provider = AsyncMock()
mock_provider.search.side_effect = TimeoutError("Search timeout")
results = await agent._search_with_timeout(
mock_provider, "test query", timeout=1.0
)
assert results == [] # Should return empty list on timeout
@pytest.mark.asyncio
async def test_synthesis_fallback(self):
"""Test fallback synthesis when LLM fails."""
agent = OptimizedDeepResearchAgent(
openrouter_provider=Mock(spec=OpenRouterProvider),
optimization_enabled=True,
)
# Mock LLM failure
with patch.object(
agent.openrouter_provider,
"get_llm",
side_effect=Exception("LLM unavailable"),
):
result = await agent._optimized_synthesis_phase(
analyzed_sources=[{"analysis": {"insights": ["test"]}}],
topic="Test topic",
time_budget_seconds=10.0,
)
assert "fallback_used" in result
assert result["fallback_used"] is True
assert "basic processing" in result["synthesis"]
class TestIntegrationWithParallelProcessing:
"""Test integration with parallel processing capabilities."""
@pytest.mark.asyncio
async def test_parallel_batch_processing(self):
"""Test parallel batch processing of sources."""
analyzer = OptimizedContentAnalyzer(Mock(spec=OpenRouterProvider))
# Mock parallel processor
with patch.object(
analyzer.parallel_processor,
"parallel_content_analysis",
new_callable=AsyncMock,
) as mock_parallel:
mock_parallel.return_value = [
{"analysis": {"insights": [f"Insight {i}"]}} for i in range(5)
]
sources = [{"content": f"Source {i}"} for i in range(5)]
results = await analyzer.batch_analyze_content(
sources=sources,
persona="moderate",
analysis_type="general",
time_budget_seconds=30.0,
)
assert len(results) == 5
mock_parallel.assert_called_once()
class TestFactoryFunction:
"""Test the factory function for creating optimized agents."""
def test_create_optimized_research_agent(self):
"""Test agent creation through factory function."""
with patch.dict("os.environ", {"OPENROUTER_API_KEY": "test_key"}):
agent = create_optimized_research_agent(
openrouter_api_key="test_key",
persona="aggressive",
time_budget_seconds=180.0,
target_confidence=0.85,
)
assert isinstance(agent, OptimizedDeepResearchAgent)
assert agent.optimization_enabled is True
assert agent.persona.name == "Aggressive"
if __name__ == "__main__":
pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/tests/domain/test_portfolio_entities.py:
--------------------------------------------------------------------------------
```python
"""
Unit tests for portfolio domain entities.
Tests the pure business logic of Position and Portfolio entities without
any database or infrastructure dependencies.
"""
from datetime import UTC, datetime, timedelta
from decimal import Decimal
import pytest
from maverick_mcp.domain.portfolio import Portfolio, Position
class TestPosition:
"""Test suite for Position value object."""
def test_position_creation(self):
"""Test creating a valid position."""
pos = Position(
ticker="AAPL",
shares=Decimal("10"),
average_cost_basis=Decimal("150.00"),
total_cost=Decimal("1500.00"),
purchase_date=datetime.now(UTC),
)
assert pos.ticker == "AAPL"
assert pos.shares == Decimal("10")
assert pos.average_cost_basis == Decimal("150.00")
assert pos.total_cost == Decimal("1500.00")
def test_position_normalizes_ticker(self):
"""Test that ticker is normalized to uppercase."""
pos = Position(
ticker="aapl",
shares=Decimal("10"),
average_cost_basis=Decimal("150.00"),
total_cost=Decimal("1500.00"),
purchase_date=datetime.now(UTC),
)
assert pos.ticker == "AAPL"
def test_position_rejects_zero_shares(self):
"""Test that positions cannot have zero shares."""
with pytest.raises(ValueError, match="Shares must be positive"):
Position(
ticker="AAPL",
shares=Decimal("0"),
average_cost_basis=Decimal("150.00"),
total_cost=Decimal("1500.00"),
purchase_date=datetime.now(UTC),
)
def test_position_rejects_negative_shares(self):
"""Test that positions cannot have negative shares."""
with pytest.raises(ValueError, match="Shares must be positive"):
Position(
ticker="AAPL",
shares=Decimal("-10"),
average_cost_basis=Decimal("150.00"),
total_cost=Decimal("1500.00"),
purchase_date=datetime.now(UTC),
)
def test_position_rejects_zero_cost_basis(self):
"""Test that positions cannot have zero cost basis."""
with pytest.raises(ValueError, match="Average cost basis must be positive"):
Position(
ticker="AAPL",
shares=Decimal("10"),
average_cost_basis=Decimal("0"),
total_cost=Decimal("1500.00"),
purchase_date=datetime.now(UTC),
)
def test_position_rejects_negative_total_cost(self):
"""Test that positions cannot have negative total cost."""
with pytest.raises(ValueError, match="Total cost must be positive"):
Position(
ticker="AAPL",
shares=Decimal("10"),
average_cost_basis=Decimal("150.00"),
total_cost=Decimal("-1500.00"),
purchase_date=datetime.now(UTC),
)
def test_add_shares_averages_cost_basis(self):
"""Test that adding shares correctly averages the cost basis."""
# Start with 10 shares @ $150
pos = Position(
ticker="AAPL",
shares=Decimal("10"),
average_cost_basis=Decimal("150.00"),
total_cost=Decimal("1500.00"),
purchase_date=datetime.now(UTC),
)
# Add 10 shares @ $170
pos = pos.add_shares(Decimal("10"), Decimal("170.00"), datetime.now(UTC))
# Should have 20 shares @ $160 average
assert pos.shares == Decimal("20")
assert pos.average_cost_basis == Decimal("160.0000")
assert pos.total_cost == Decimal("3200.00")
def test_add_shares_updates_purchase_date(self):
"""Test that adding shares updates purchase date to earliest."""
later_date = datetime.now(UTC)
earlier_date = later_date - timedelta(days=30)
pos = Position(
ticker="AAPL",
shares=Decimal("10"),
average_cost_basis=Decimal("150.00"),
total_cost=Decimal("1500.00"),
purchase_date=later_date,
)
pos = pos.add_shares(Decimal("10"), Decimal("170.00"), earlier_date)
assert pos.purchase_date == earlier_date
def test_add_shares_rejects_zero_shares(self):
"""Test that adding zero shares raises error."""
pos = Position(
ticker="AAPL",
shares=Decimal("10"),
average_cost_basis=Decimal("150.00"),
total_cost=Decimal("1500.00"),
purchase_date=datetime.now(UTC),
)
with pytest.raises(ValueError, match="Shares to add must be positive"):
pos.add_shares(Decimal("0"), Decimal("170.00"), datetime.now(UTC))
def test_add_shares_rejects_zero_price(self):
"""Test that adding shares at zero price raises error."""
pos = Position(
ticker="AAPL",
shares=Decimal("10"),
average_cost_basis=Decimal("150.00"),
total_cost=Decimal("1500.00"),
purchase_date=datetime.now(UTC),
)
with pytest.raises(ValueError, match="Price must be positive"):
pos.add_shares(Decimal("10"), Decimal("0"), datetime.now(UTC))
def test_remove_shares_partial(self):
"""Test removing part of a position."""
pos = Position(
ticker="AAPL",
shares=Decimal("20"),
average_cost_basis=Decimal("160.00"),
total_cost=Decimal("3200.00"),
purchase_date=datetime.now(UTC),
)
pos = pos.remove_shares(Decimal("10"))
assert pos is not None
assert pos.shares == Decimal("10")
assert pos.average_cost_basis == Decimal("160.00") # Unchanged
assert pos.total_cost == Decimal("1600.00")
def test_remove_shares_full(self):
"""Test removing entire position returns None."""
pos = Position(
ticker="AAPL",
shares=Decimal("20"),
average_cost_basis=Decimal("160.00"),
total_cost=Decimal("3200.00"),
purchase_date=datetime.now(UTC),
)
result = pos.remove_shares(Decimal("20"))
assert result is None
def test_remove_shares_more_than_held(self):
"""Test removing more shares than held closes position."""
pos = Position(
ticker="AAPL",
shares=Decimal("20"),
average_cost_basis=Decimal("160.00"),
total_cost=Decimal("3200.00"),
purchase_date=datetime.now(UTC),
)
result = pos.remove_shares(Decimal("25"))
assert result is None
def test_remove_shares_rejects_zero(self):
"""Test that removing zero shares raises error."""
pos = Position(
ticker="AAPL",
shares=Decimal("20"),
average_cost_basis=Decimal("160.00"),
total_cost=Decimal("3200.00"),
purchase_date=datetime.now(UTC),
)
with pytest.raises(ValueError, match="Shares to remove must be positive"):
pos.remove_shares(Decimal("0"))
def test_calculate_current_value_with_gain(self):
"""Test calculating current value with unrealized gain."""
pos = Position(
ticker="AAPL",
shares=Decimal("20"),
average_cost_basis=Decimal("160.00"),
total_cost=Decimal("3200.00"),
purchase_date=datetime.now(UTC),
)
metrics = pos.calculate_current_value(Decimal("175.50"))
assert metrics["current_value"] == Decimal("3510.00")
assert metrics["unrealized_pnl"] == Decimal("310.00")
assert metrics["pnl_percentage"] == Decimal("9.69")
def test_calculate_current_value_with_loss(self):
"""Test calculating current value with unrealized loss."""
pos = Position(
ticker="AAPL",
shares=Decimal("20"),
average_cost_basis=Decimal("160.00"),
total_cost=Decimal("3200.00"),
purchase_date=datetime.now(UTC),
)
metrics = pos.calculate_current_value(Decimal("145.00"))
assert metrics["current_value"] == Decimal("2900.00")
assert metrics["unrealized_pnl"] == Decimal("-300.00")
assert metrics["pnl_percentage"] == Decimal("-9.38")
def test_calculate_current_value_unchanged(self):
"""Test calculating current value when price unchanged."""
pos = Position(
ticker="AAPL",
shares=Decimal("20"),
average_cost_basis=Decimal("160.00"),
total_cost=Decimal("3200.00"),
purchase_date=datetime.now(UTC),
)
metrics = pos.calculate_current_value(Decimal("160.00"))
assert metrics["current_value"] == Decimal("3200.00")
assert metrics["unrealized_pnl"] == Decimal("0.00")
assert metrics["pnl_percentage"] == Decimal("0.00")
def test_fractional_shares(self):
"""Test that fractional shares are supported."""
pos = Position(
ticker="AAPL",
shares=Decimal("10.5"),
average_cost_basis=Decimal("150.25"),
total_cost=Decimal("1577.625"),
purchase_date=datetime.now(UTC),
)
assert pos.shares == Decimal("10.5")
metrics = pos.calculate_current_value(Decimal("175.50"))
assert metrics["current_value"] == Decimal("1842.75")
def test_to_dict(self):
"""Test converting position to dictionary."""
date = datetime.now(UTC)
pos = Position(
ticker="AAPL",
shares=Decimal("10"),
average_cost_basis=Decimal("150.00"),
total_cost=Decimal("1500.00"),
purchase_date=date,
notes="Long-term hold",
)
result = pos.to_dict()
assert result["ticker"] == "AAPL"
assert result["shares"] == 10.0
assert result["average_cost_basis"] == 150.0
assert result["total_cost"] == 1500.0
assert result["purchase_date"] == date.isoformat()
assert result["notes"] == "Long-term hold"
class TestPortfolio:
"""Test suite for Portfolio aggregate root."""
def test_portfolio_creation(self):
"""Test creating an empty portfolio."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
assert portfolio.portfolio_id == "test-id"
assert portfolio.user_id == "default"
assert portfolio.name == "My Portfolio"
assert len(portfolio.positions) == 0
def test_add_position_new(self):
"""Test adding a new position."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
portfolio.add_position(
ticker="AAPL",
shares=Decimal("10"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
assert len(portfolio.positions) == 1
assert portfolio.positions[0].ticker == "AAPL"
assert portfolio.positions[0].shares == Decimal("10")
def test_add_position_existing_averages(self):
"""Test that adding to existing position averages cost basis."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
# First purchase
portfolio.add_position(
ticker="AAPL",
shares=Decimal("10"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
# Second purchase
portfolio.add_position(
ticker="AAPL",
shares=Decimal("10"),
price=Decimal("170.00"),
date=datetime.now(UTC),
)
assert len(portfolio.positions) == 1 # Still one position
assert portfolio.positions[0].shares == Decimal("20")
assert portfolio.positions[0].average_cost_basis == Decimal("160.0000")
def test_add_position_case_insensitive(self):
"""Test that ticker matching is case-insensitive."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
portfolio.add_position(
ticker="aapl",
shares=Decimal("10"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
portfolio.add_position(
ticker="AAPL",
shares=Decimal("10"),
price=Decimal("170.00"),
date=datetime.now(UTC),
)
assert len(portfolio.positions) == 1
assert portfolio.positions[0].ticker == "AAPL"
def test_remove_position_partial(self):
"""Test partially removing a position."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
portfolio.add_position(
ticker="AAPL",
shares=Decimal("20"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
result = portfolio.remove_position("AAPL", Decimal("10"))
assert result is True
assert len(portfolio.positions) == 1
assert portfolio.positions[0].shares == Decimal("10")
def test_remove_position_full(self):
"""Test fully removing a position."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
portfolio.add_position(
ticker="AAPL",
shares=Decimal("20"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
result = portfolio.remove_position("AAPL")
assert result is True
assert len(portfolio.positions) == 0
def test_remove_position_nonexistent(self):
"""Test removing non-existent position returns False."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
result = portfolio.remove_position("AAPL")
assert result is False
def test_get_position(self):
"""Test getting a position by ticker."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
portfolio.add_position(
ticker="AAPL",
shares=Decimal("10"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
pos = portfolio.get_position("AAPL")
assert pos is not None
assert pos.ticker == "AAPL"
def test_get_position_case_insensitive(self):
"""Test that get_position is case-insensitive."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
portfolio.add_position(
ticker="AAPL",
shares=Decimal("10"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
pos = portfolio.get_position("aapl")
assert pos is not None
assert pos.ticker == "AAPL"
def test_get_position_nonexistent(self):
"""Test getting non-existent position returns None."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
pos = portfolio.get_position("AAPL")
assert pos is None
def test_get_total_invested(self):
"""Test calculating total capital invested."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
portfolio.add_position(
ticker="AAPL",
shares=Decimal("10"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
portfolio.add_position(
ticker="MSFT",
shares=Decimal("5"),
price=Decimal("300.00"),
date=datetime.now(UTC),
)
total = portfolio.get_total_invested()
assert total == Decimal("3000.00")
def test_calculate_portfolio_metrics(self):
"""Test calculating comprehensive portfolio metrics."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
portfolio.add_position(
ticker="AAPL",
shares=Decimal("10"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
portfolio.add_position(
ticker="MSFT",
shares=Decimal("5"),
price=Decimal("300.00"),
date=datetime.now(UTC),
)
current_prices = {
"AAPL": Decimal("175.50"),
"MSFT": Decimal("320.00"),
}
metrics = portfolio.calculate_portfolio_metrics(current_prices)
assert metrics["total_value"] == 3355.0 # (10 * 175.50) + (5 * 320)
assert metrics["total_invested"] == 3000.0
assert metrics["total_pnl"] == 355.0
assert metrics["total_pnl_percentage"] == 11.83
assert metrics["position_count"] == 2
assert len(metrics["positions"]) == 2
def test_calculate_portfolio_metrics_uses_fallback_price(self):
"""Test that missing prices fall back to cost basis."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
portfolio.add_position(
ticker="AAPL",
shares=Decimal("10"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
# No current price provided
metrics = portfolio.calculate_portfolio_metrics({})
# Should use cost basis as current price
assert metrics["total_value"] == 1500.0
assert metrics["total_pnl"] == 0.0
def test_clear_all_positions(self):
"""Test clearing all positions."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
portfolio.add_position(
ticker="AAPL",
shares=Decimal("10"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
portfolio.add_position(
ticker="MSFT",
shares=Decimal("5"),
price=Decimal("300.00"),
date=datetime.now(UTC),
)
portfolio.clear_all_positions()
assert len(portfolio.positions) == 0
def test_to_dict(self):
"""Test converting portfolio to dictionary."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
portfolio.add_position(
ticker="AAPL",
shares=Decimal("10"),
price=Decimal("150.00"),
date=datetime.now(UTC),
)
result = portfolio.to_dict()
assert result["portfolio_id"] == "test-id"
assert result["user_id"] == "default"
assert result["name"] == "My Portfolio"
assert result["position_count"] == 1
assert result["total_invested"] == 1500.0
assert len(result["positions"]) == 1
def test_multiple_positions_with_different_performance(self):
"""Test portfolio with positions having different performance."""
portfolio = Portfolio(
portfolio_id="test-id",
user_id="default",
name="My Portfolio",
)
# Winner
portfolio.add_position(
ticker="NVDA",
shares=Decimal("5"),
price=Decimal("450.00"),
date=datetime.now(UTC),
)
# Loser
portfolio.add_position(
ticker="MARA",
shares=Decimal("50"),
price=Decimal("18.50"),
date=datetime.now(UTC),
)
current_prices = {
"NVDA": Decimal("520.00"), # +15.6%
"MARA": Decimal("13.50"), # -27.0%
}
metrics = portfolio.calculate_portfolio_metrics(current_prices)
# Check individual positions
nvda_pos = next(p for p in metrics["positions"] if p["ticker"] == "NVDA")
mara_pos = next(p for p in metrics["positions"] if p["ticker"] == "MARA")
assert nvda_pos["unrealized_pnl"] == 350.0 # (520 - 450) * 5
assert mara_pos["unrealized_pnl"] == -250.0 # (13.50 - 18.50) * 50
# Overall portfolio
assert metrics["total_pnl"] == 100.0 # 350 - 250
```
--------------------------------------------------------------------------------
/maverick_mcp/api/routers/introspection.py:
--------------------------------------------------------------------------------
```python
"""MCP Introspection Tools for Better Discovery and Understanding."""
from typing import Any
from fastmcp import FastMCP
def register_introspection_tools(mcp: FastMCP) -> None:
"""Register introspection tools for better discovery."""
@mcp.tool(name="discover_capabilities")
async def discover_capabilities() -> dict[str, Any]:
"""
Discover all available capabilities of the MaverickMCP server.
This tool provides comprehensive information about:
- Available strategies (traditional and ML)
- Tool categories and their functions
- Parameter requirements for each strategy
- Example usage patterns
Use this as your first tool to understand what's available.
"""
return {
"server_info": {
"name": "MaverickMCP",
"version": "1.0.0",
"description": "Advanced stock analysis and backtesting MCP server",
},
"capabilities": {
"backtesting": {
"description": "Run and optimize trading strategies",
"strategies_available": 15,
"ml_strategies": ["online_learning", "regime_aware", "ensemble"],
"traditional_strategies": [
"sma_cross",
"rsi",
"macd",
"bollinger",
"momentum",
"ema_cross",
"mean_reversion",
"breakout",
"volume_momentum",
],
"features": [
"parameter_optimization",
"strategy_comparison",
"walk_forward_analysis",
],
},
"technical_analysis": {
"description": "Calculate technical indicators and patterns",
"indicators": [
"SMA",
"EMA",
"RSI",
"MACD",
"Bollinger Bands",
"Support/Resistance",
],
"chart_analysis": True,
"pattern_recognition": True,
},
"screening": {
"description": "Pre-calculated S&P 500 screening results",
"strategies": [
"maverick_bullish",
"maverick_bearish",
"supply_demand_breakouts",
],
"database": "520 S&P 500 stocks pre-seeded",
},
"research": {
"description": "AI-powered research with parallel execution",
"features": [
"comprehensive_research",
"company_analysis",
"sentiment_analysis",
],
"performance": "7-256x speedup with parallel agents",
"ai_models": "400+ models via OpenRouter",
},
},
"quick_start": {
"first_command": "Run: discover_capabilities() to see this info",
"simple_backtest": "run_backtest(symbol='AAPL', strategy_type='sma_cross')",
"ml_strategy": "run_backtest(symbol='TSLA', strategy_type='online_learning')",
"get_help": "Use prompts like 'backtest_strategy_guide' for detailed guides",
},
}
@mcp.tool(name="list_all_strategies")
async def list_all_strategies() -> list[dict[str, Any]]:
"""
List all available backtesting strategies with their parameters.
Returns detailed information about each strategy including:
- Strategy name and description
- Required and optional parameters
- Default parameter values
- Example usage
"""
strategies = [] # Return as array
# Traditional strategies
strategies.extend(
[
{
"name": "sma_cross",
"description": "Simple Moving Average Crossover",
"parameters": {
"fast_period": {
"type": "int",
"default": 10,
"description": "Fast MA period",
},
"slow_period": {
"type": "int",
"default": 20,
"description": "Slow MA period",
},
},
"example": "run_backtest(symbol='AAPL', strategy_type='sma_cross', fast_period=10, slow_period=20)",
},
{
"name": "rsi",
"description": "RSI Mean Reversion",
"parameters": {
"period": {
"type": "int",
"default": 14,
"description": "RSI calculation period",
},
"oversold": {
"type": "int",
"default": 30,
"description": "Oversold threshold",
},
"overbought": {
"type": "int",
"default": 70,
"description": "Overbought threshold",
},
},
"example": "run_backtest(symbol='MSFT', strategy_type='rsi', period=14)",
},
{
"name": "macd",
"description": "MACD Signal Line Crossover",
"parameters": {
"fast_period": {
"type": "int",
"default": 12,
"description": "Fast EMA period",
},
"slow_period": {
"type": "int",
"default": 26,
"description": "Slow EMA period",
},
"signal_period": {
"type": "int",
"default": 9,
"description": "Signal line period",
},
},
"example": "run_backtest(symbol='GOOGL', strategy_type='macd')",
},
{
"name": "bollinger",
"description": "Bollinger Bands Mean Reversion",
"parameters": {
"period": {
"type": "int",
"default": 20,
"description": "BB calculation period",
},
"std_dev": {
"type": "float",
"default": 2,
"description": "Standard deviations",
},
},
"example": "run_backtest(symbol='AMZN', strategy_type='bollinger')",
},
{
"name": "momentum",
"description": "Momentum Trading Strategy",
"parameters": {
"period": {
"type": "int",
"default": 10,
"description": "Momentum period",
},
"threshold": {
"type": "float",
"default": 0.02,
"description": "Entry threshold",
},
},
"example": "run_backtest(symbol='NVDA', strategy_type='momentum')",
},
{
"name": "ema_cross",
"description": "Exponential Moving Average Crossover",
"parameters": {
"fast_period": {
"type": "int",
"default": 12,
"description": "Fast EMA period",
},
"slow_period": {
"type": "int",
"default": 26,
"description": "Slow EMA period",
},
},
"example": "run_backtest(symbol='META', strategy_type='ema_cross')",
},
{
"name": "mean_reversion",
"description": "Statistical Mean Reversion",
"parameters": {
"lookback": {
"type": "int",
"default": 20,
"description": "Lookback period",
},
"entry_z": {
"type": "float",
"default": -2,
"description": "Entry z-score",
},
"exit_z": {
"type": "float",
"default": 0,
"description": "Exit z-score",
},
},
"example": "run_backtest(symbol='SPY', strategy_type='mean_reversion')",
},
{
"name": "breakout",
"description": "Channel Breakout Strategy",
"parameters": {
"lookback": {
"type": "int",
"default": 20,
"description": "Channel period",
},
"breakout_factor": {
"type": "float",
"default": 1.5,
"description": "Breakout multiplier",
},
},
"example": "run_backtest(symbol='QQQ', strategy_type='breakout')",
},
{
"name": "volume_momentum",
"description": "Volume-Weighted Momentum",
"parameters": {
"period": {
"type": "int",
"default": 10,
"description": "Momentum period",
},
"volume_factor": {
"type": "float",
"default": 1.5,
"description": "Volume multiplier",
},
},
"example": "run_backtest(symbol='TSLA', strategy_type='volume_momentum')",
},
]
)
# ML strategies
strategies.extend(
[
{
"name": "ml_predictor",
"description": "Machine Learning predictor using Random Forest",
"parameters": {
"model_type": {
"type": "str",
"default": "random_forest",
"description": "ML model type",
},
"n_estimators": {
"type": "int",
"default": 100,
"description": "Number of trees",
},
"max_depth": {
"type": "int",
"default": None,
"description": "Max tree depth",
},
},
"example": "run_ml_strategy_backtest(symbol='AAPL', strategy_type='ml_predictor', model_type='random_forest')",
},
{
"name": "online_learning",
"description": "Online learning adaptive strategy (alias for adaptive)",
"parameters": {
"learning_rate": {
"type": "float",
"default": 0.01,
"description": "Adaptation rate",
},
"adaptation_method": {
"type": "str",
"default": "gradient",
"description": "Method for adaptation",
},
},
"example": "run_ml_strategy_backtest(symbol='AAPL', strategy_type='online_learning')",
},
{
"name": "regime_aware",
"description": "Market regime detection and adaptation",
"parameters": {
"regime_window": {
"type": "int",
"default": 50,
"description": "Regime detection window",
},
"threshold": {
"type": "float",
"default": 0.02,
"description": "Regime change threshold",
},
},
"example": "run_backtest(symbol='SPY', strategy_type='regime_aware')",
},
{
"name": "ensemble",
"description": "Ensemble voting with multiple strategies",
"parameters": {
"fast_period": {
"type": "int",
"default": 10,
"description": "Fast MA for ensemble",
},
"slow_period": {
"type": "int",
"default": 20,
"description": "Slow MA for ensemble",
},
"rsi_period": {
"type": "int",
"default": 14,
"description": "RSI period for ensemble",
},
},
"example": "run_ml_strategy_backtest(symbol='MSFT', strategy_type='ensemble')",
},
{
"name": "adaptive",
"description": "Adaptive strategy that adjusts based on performance",
"parameters": {
"learning_rate": {
"type": "float",
"default": 0.01,
"description": "How quickly to adapt",
},
"adaptation_method": {
"type": "str",
"default": "gradient",
"description": "Method for adaptation",
},
},
"example": "run_ml_strategy_backtest(symbol='GOOGL', strategy_type='adaptive')",
},
]
)
return strategies # Return array
@mcp.tool(name="get_strategy_help")
async def get_strategy_help(strategy_type: str) -> dict[str, Any]:
"""
Get detailed help for a specific strategy.
Args:
strategy_type: Name of the strategy (e.g., 'sma_cross', 'online_learning')
Returns:
Detailed information about the strategy including theory, parameters, and best practices.
"""
strategy_help = {
"sma_cross": {
"name": "Simple Moving Average Crossover",
"theory": "Generates buy signals when fast SMA crosses above slow SMA, sell when opposite occurs",
"best_for": "Trending markets with clear directional moves",
"parameters": {
"fast_period": "Typically 10-20 days for short-term trends",
"slow_period": "Typically 20-50 days for medium-term trends",
},
"tips": [
"Works best in trending markets",
"Consider adding volume confirmation",
"Use wider periods for less noise",
],
},
"ml_predictor": {
"name": "Machine Learning Predictor",
"theory": "Uses Random Forest or other ML models to predict price movements",
"best_for": "Complex markets with multiple factors",
"parameters": {
"model_type": "Type of ML model (random_forest)",
"n_estimators": "Number of trees in forest (50-200)",
"max_depth": "Maximum tree depth (None or 5-20)",
},
"tips": [
"More estimators for better accuracy but slower",
"Limit depth to prevent overfitting",
"Requires sufficient historical data",
],
},
"online_learning": {
"name": "Online Learning Strategy",
"theory": "Continuously adapts strategy parameters based on recent performance",
"best_for": "Dynamic markets with changing patterns",
"parameters": {
"learning_rate": "How quickly to adapt (0.001-0.1)",
"adaptation_method": "Method for adaptation (gradient, bayesian)",
},
"tips": [
"Lower learning rates for stable adaptation",
"Works well in volatile markets",
"This is an alias for the adaptive strategy",
],
},
"adaptive": {
"name": "Adaptive Strategy",
"theory": "Dynamically adjusts strategy parameters based on performance",
"best_for": "Markets with changing characteristics",
"parameters": {
"learning_rate": "How quickly to adapt (0.001-0.1)",
"adaptation_method": "Method for adaptation (gradient, bayesian)",
},
"tips": [
"Start with lower learning rates",
"Monitor for overfitting",
"Works best with stable base strategy",
],
},
"ensemble": {
"name": "Strategy Ensemble",
"theory": "Combines multiple strategies with weighted voting",
"best_for": "Risk reduction through diversification",
"parameters": {
"base_strategies": "List of strategies to combine",
"weighting_method": "How to weight strategies (equal, performance, volatility)",
},
"tips": [
"Combine uncorrelated strategies",
"Performance weighting adapts to market",
"More strategies reduce single-point failure",
],
},
"regime_aware": {
"name": "Market Regime Detection Strategy",
"theory": "Identifies market regimes (trending vs ranging) and adapts strategy accordingly",
"best_for": "Markets that alternate between trending and sideways movement",
"parameters": {
"regime_window": "Period for regime detection (30-100 days)",
"threshold": "Sensitivity to regime changes (0.01-0.05)",
},
"tips": [
"Longer windows for major regime shifts",
"Lower thresholds for more sensitive detection",
"Combines well with other indicators",
],
},
}
if strategy_type in strategy_help:
return strategy_help[strategy_type]
else:
return {
"error": f"Strategy '{strategy_type}' not found",
"available_strategies": list(strategy_help.keys()),
"tip": "Use list_all_strategies() to see all available strategies",
}
```
--------------------------------------------------------------------------------
/maverick_mcp/tests/test_in_memory_routers.py:
--------------------------------------------------------------------------------
```python
"""
In-memory tests for domain-specific routers using FastMCP patterns.
Tests individual router functionality in isolation using FastMCP's
router mounting and in-memory testing capabilities.
"""
import asyncio
from unittest.mock import Mock, patch
import pytest
from fastmcp import Client, FastMCP
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from maverick_mcp.api.routers.data import data_router
from maverick_mcp.api.routers.portfolio import portfolio_router
from maverick_mcp.api.routers.screening import screening_router
from maverick_mcp.api.routers.technical import technical_router
from maverick_mcp.data.models import (
Base,
MaverickStocks,
Stock,
SupplyDemandBreakoutStocks,
)
@pytest.fixture
def test_server():
"""Create a test server with only specific routers mounted."""
test_mcp: FastMCP = FastMCP("TestMaverick-MCP")
return test_mcp
@pytest.fixture
def screening_db():
"""Create test database with screening data."""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
with Session(engine) as session:
# Add test stocks
stocks = [
Stock(
ticker_symbol="AAPL",
company_name="Apple Inc.",
sector="Technology",
industry="Consumer Electronics",
),
Stock(
ticker_symbol="MSFT",
company_name="Microsoft Corp.",
sector="Technology",
industry="Software",
),
Stock(
ticker_symbol="GOOGL",
company_name="Alphabet Inc.",
sector="Technology",
industry="Internet",
),
Stock(
ticker_symbol="AMZN",
company_name="Amazon.com Inc.",
sector="Consumer Cyclical",
industry="Internet Retail",
),
Stock(
ticker_symbol="TSLA",
company_name="Tesla Inc.",
sector="Consumer Cyclical",
industry="Auto Manufacturers",
),
]
session.add_all(stocks)
session.commit()
# Add Maverick screening results
maverick_stocks = [
MaverickStocks(
id=1,
stock="AAPL",
close=150.0,
open=148.0,
high=152.0,
low=147.0,
volume=10000000,
combined_score=92,
momentum_score=88,
adr_pct=2.5,
atr=3.2,
pat="Cup and Handle",
sqz="Yes",
consolidation="trending",
entry="151.50",
compression_score=85,
pattern_detected=1,
ema_21=149.0,
sma_50=148.0,
sma_150=145.0,
sma_200=140.0,
avg_vol_30d=9500000,
),
MaverickStocks(
id=2,
stock="MSFT",
close=300.0,
open=298.0,
high=302.0,
low=297.0,
volume=8000000,
combined_score=89,
momentum_score=82,
adr_pct=2.1,
atr=4.5,
pat="Ascending Triangle",
sqz="No",
consolidation="trending",
entry="301.00",
compression_score=80,
pattern_detected=1,
ema_21=299.0,
sma_50=298.0,
sma_150=295.0,
sma_200=290.0,
avg_vol_30d=7500000,
),
]
session.add_all(maverick_stocks)
# Add trending screening results
trending_stocks = [
SupplyDemandBreakoutStocks(
id=1,
stock="GOOGL",
close=140.0,
open=138.0,
high=142.0,
low=137.0,
volume=5000000,
momentum_score=91,
adr_pct=2.8,
atr=3.5,
pat="Base Breakout",
sqz="Yes",
consolidation="trending",
entry="141.00",
ema_21=139.0,
sma_50=138.0,
sma_150=135.0,
sma_200=130.0,
avg_volume_30d=4800000,
),
]
session.add_all(trending_stocks)
session.commit()
with patch("maverick_mcp.data.models.engine", engine):
with patch("maverick_mcp.data.models.SessionLocal", lambda: Session(engine)):
yield engine
class TestTechnicalRouter:
"""Test technical analysis router functionality."""
@pytest.mark.asyncio
async def test_rsi_calculation(self, test_server, screening_db):
"""Test RSI calculation through the router."""
test_server.mount("/technical", technical_router)
# Mock price data for RSI calculation
with patch(
"maverick_mcp.providers.stock_data.StockDataProvider.get_stock_data"
) as mock_data:
# Create 30 days of price data
import pandas as pd
dates = pd.date_range(end="2024-01-31", periods=30)
prices = pd.DataFrame(
{
"Close": [
100 + (i % 5) - 2 for i in range(30)
], # Oscillating prices
"High": [101 + (i % 5) - 2 for i in range(30)],
"Low": [99 + (i % 5) - 2 for i in range(30)],
"Open": [100 + (i % 5) - 2 for i in range(30)],
"Volume": [1000000] * 30,
},
index=dates,
)
mock_data.return_value = prices
async with Client(test_server) as client:
result = await client.call_tool(
"/technical_get_rsi_analysis", {"ticker": "AAPL", "period": 14}
)
assert len(result) > 0
assert result[0].text is not None
# RSI should be calculated
assert "rsi" in result[0].text.lower()
@pytest.mark.asyncio
async def test_macd_analysis(self, test_server, screening_db):
"""Test MACD analysis with custom parameters."""
test_server.mount("/technical", technical_router)
with patch(
"maverick_mcp.providers.stock_data.StockDataProvider.get_stock_data"
) as mock_data:
# Create trending price data
import pandas as pd
dates = pd.date_range(end="2024-01-31", periods=50)
prices = pd.DataFrame(
{
"Close": [100 + (i * 0.5) for i in range(50)], # Upward trend
"High": [101 + (i * 0.5) for i in range(50)],
"Low": [99 + (i * 0.5) for i in range(50)],
"Open": [100 + (i * 0.5) for i in range(50)],
"Volume": [1000000] * 50,
},
index=dates,
)
mock_data.return_value = prices
async with Client(test_server) as client:
result = await client.call_tool(
"/technical_get_macd_analysis",
{
"ticker": "MSFT",
"fast_period": 12,
"slow_period": 26,
"signal_period": 9,
},
)
assert len(result) > 0
assert result[0].text is not None
data = eval(result[0].text)
assert "analysis" in data
assert "histogram" in data["analysis"]
assert "indicator" in data["analysis"]
@pytest.mark.asyncio
async def test_support_resistance(self, test_server, screening_db):
"""Test support and resistance level detection."""
test_server.mount("/technical", technical_router)
with patch(
"maverick_mcp.providers.stock_data.StockDataProvider.get_stock_data"
) as mock_data:
# Create price data with clear levels
import pandas as pd
dates = pd.date_range(end="2024-01-31", periods=100)
prices = []
for i in range(100):
if i % 20 < 10:
price = 100 # Support level
else:
price = 110 # Resistance level
prices.append(
{
"High": price + 1,
"Low": price - 1,
"Close": price,
"Open": price,
"Volume": 1000000,
}
)
prices_df = pd.DataFrame(prices, index=dates)
mock_data.return_value = prices_df
async with Client(test_server) as client:
result = await client.call_tool(
"/technical_get_support_resistance",
{"ticker": "GOOGL", "days": 90},
)
assert len(result) > 0
assert result[0].text is not None
data = eval(result[0].text)
assert "support_levels" in data
assert "resistance_levels" in data
assert len(data["support_levels"]) > 0
assert len(data["resistance_levels"]) > 0
class TestScreeningRouter:
"""Test stock screening router functionality."""
@pytest.mark.asyncio
async def test_maverick_screening(self, test_server, screening_db):
"""Test Maverick bullish screening."""
test_server.mount("/screening", screening_router)
async with Client(test_server) as client:
result = await client.call_tool(
"/screening_get_maverick_stocks", {"limit": 10}
)
assert len(result) > 0
assert result[0].text is not None
data = eval(result[0].text)
assert "stocks" in data
assert len(data["stocks"]) == 2 # AAPL and MSFT
assert (
data["stocks"][0]["combined_score"]
> data["stocks"][1]["combined_score"]
) # Sorted by combined score
assert all(
stock["combined_score"] > 0 for stock in data["stocks"]
) # Score should be positive
@pytest.mark.asyncio
async def test_trending_screening(self, test_server, screening_db):
"""Test trending screening."""
test_server.mount("/screening", screening_router)
async with Client(test_server) as client:
result = await client.call_tool(
"/screening_get_trending_stocks", {"limit": 5}
)
assert len(result) > 0
assert result[0].text is not None
data = eval(result[0].text)
assert "stocks" in data
assert len(data["stocks"]) == 1 # Only GOOGL
assert data["stocks"][0]["stock"] == "GOOGL"
assert (
data["stocks"][0]["momentum_score"] > 0
) # Momentum score should be positive
@pytest.mark.asyncio
async def test_all_screenings(self, test_server, screening_db):
"""Test combined screening results."""
test_server.mount("/screening", screening_router)
async with Client(test_server) as client:
result = await client.call_tool(
"/screening_get_all_screening_recommendations", {}
)
assert len(result) > 0
assert result[0].text is not None
data = eval(result[0].text)
assert "maverick_stocks" in data
assert "maverick_bear_stocks" in data
assert "trending_stocks" in data
assert len(data["maverick_stocks"]) == 2
assert len(data["trending_stocks"]) == 1
class TestPortfolioRouter:
"""Test portfolio analysis router functionality."""
@pytest.mark.asyncio
async def test_risk_analysis(self, test_server, screening_db):
"""Test portfolio risk analysis."""
test_server.mount("/portfolio", portfolio_router)
# Mock stock data for risk calculations
with patch(
"maverick_mcp.providers.stock_data.StockDataProvider.get_stock_data"
) as mock_data:
# Create price data with volatility
import numpy as np
import pandas as pd
prices = []
base_price = 100.0
for _ in range(252): # One year of trading days
# Add some random walk
change = np.random.normal(0, 2)
base_price = float(base_price * (1 + change / 100))
prices.append(
{
"close": base_price,
"high": base_price + 1,
"low": base_price - 1,
"open": base_price,
"volume": 1000000,
}
)
dates = pd.date_range(end="2024-01-31", periods=252)
prices_df = pd.DataFrame(prices, index=dates)
mock_data.return_value = prices_df
async with Client(test_server) as client:
result = await client.call_tool(
"/portfolio_risk_adjusted_analysis",
{"ticker": "AAPL", "risk_level": 50.0},
)
assert len(result) > 0
assert result[0].text is not None
data = eval(result[0].text)
assert "risk_level" in data or "analysis" in data
assert "ticker" in data
@pytest.mark.asyncio
async def test_correlation_analysis(self, test_server, screening_db):
"""Test correlation analysis between stocks."""
test_server.mount("/portfolio", portfolio_router)
# Mock correlated stock data
with patch(
"maverick_mcp.providers.stock_data.StockDataProvider.get_stock_data"
) as mock_data:
import numpy as np
def create_correlated_data(base_return, correlation):
import pandas as pd
prices = []
base_price = 100
for _ in range(100):
# Create correlated returns
return_pct = base_return + (correlation * np.random.normal(0, 1))
base_price = base_price * (1 + return_pct / 100)
prices.append(
{
"close": base_price,
"high": base_price + 1,
"low": base_price - 1,
"open": base_price,
"volume": 1000000,
}
)
dates = pd.date_range(end="2024-01-31", periods=100)
return pd.DataFrame(prices, index=dates)
# Return different data for different tickers
mock_data.side_effect = [
create_correlated_data(0.1, 0), # AAPL
create_correlated_data(0.1, 0.8), # MSFT (high correlation)
create_correlated_data(0.1, -0.3), # GOOGL (negative correlation)
]
async with Client(test_server) as client:
result = await client.call_tool(
"/portfolio_portfolio_correlation_analysis",
{"tickers": ["AAPL", "MSFT", "GOOGL"]},
)
assert len(result) > 0
assert result[0].text is not None
# Handle NaN values in response
result_text = result[0].text.replace("NaN", "null")
import json
data = json.loads(result_text.replace("'", '"'))
assert "correlation_matrix" in data
assert len(data["correlation_matrix"]) == 3
assert "recommendation" in data
class TestDataRouter:
"""Test data fetching router functionality."""
@pytest.mark.asyncio
async def test_batch_fetch_with_validation(self, test_server, screening_db):
"""Test batch data fetching with validation."""
test_server.mount("/data", data_router)
async with Client(test_server) as client:
# Test with valid tickers
result = await client.call_tool(
"/data_fetch_stock_data_batch",
{
"request": {
"tickers": ["AAPL", "MSFT"],
"start_date": "2024-01-01",
"end_date": "2024-01-31",
}
},
)
assert len(result) > 0
assert result[0].text is not None
data = eval(result[0].text)
assert "results" in data
assert len(data["results"]) == 2
# Test with invalid ticker format
with pytest.raises(Exception) as exc_info:
await client.call_tool(
"/data_fetch_stock_data_batch",
{
"request": {
"tickers": [
"AAPL",
"invalid_ticker",
], # lowercase not allowed
"start_date": "2024-01-01",
"end_date": "2024-01-31",
}
},
)
assert "validation error" in str(exc_info.value).lower()
@pytest.mark.asyncio
async def test_cache_operations(self, test_server, screening_db):
"""Test cache management operations."""
test_server.mount("/data", data_router)
# Patch the _get_redis_client to test cache operations
with patch("maverick_mcp.data.cache._get_redis_client") as mock_redis_client:
cache_instance = Mock()
cache_instance.get.return_value = '{"cached": true, "data": "test"}'
cache_instance.set.return_value = True
cache_instance.delete.return_value = 1
cache_instance.keys.return_value = [b"stock:AAPL:1", b"stock:AAPL:2"]
mock_redis_client.return_value = cache_instance
async with Client(test_server) as client:
# Test cache clear
result = await client.call_tool(
"/data_clear_cache", {"request": {"ticker": "AAPL"}}
)
assert len(result) > 0
assert result[0].text is not None
assert (
"clear" in result[0].text.lower()
or "success" in result[0].text.lower()
)
# Verify cache operations
assert cache_instance.keys.called or cache_instance.delete.called
class TestConcurrentOperations:
"""Test concurrent operations and performance."""
@pytest.mark.asyncio
async def test_concurrent_router_calls(self, test_server, screening_db):
"""Test multiple routers being called concurrently."""
# Mount all routers
test_server.mount("/technical", technical_router)
test_server.mount("/screening", screening_router)
test_server.mount("/portfolio", portfolio_router)
test_server.mount("/data", data_router)
with patch(
"maverick_mcp.providers.stock_data.StockDataProvider.get_stock_data"
) as mock_data:
import pandas as pd
dates = pd.date_range(end="2024-01-31", periods=30)
mock_data.return_value = pd.DataFrame(
{
"Close": [100 + i for i in range(30)],
"High": [101 + i for i in range(30)],
"Low": [99 + i for i in range(30)],
"Open": [100 + i for i in range(30)],
"Volume": [1000000] * 30,
},
index=dates,
)
async with Client(test_server) as client:
# Create concurrent tasks across different routers
tasks = [
client.call_tool(
"/technical_get_rsi_analysis", {"ticker": "AAPL", "period": 14}
),
client.call_tool("/screening_get_maverick_stocks", {"limit": 5}),
client.call_tool(
"/data_fetch_stock_data_batch",
{
"request": {
"tickers": ["AAPL", "MSFT"],
"start_date": "2024-01-01",
"end_date": "2024-01-31",
}
},
),
]
results = await asyncio.gather(*tasks)
# All should complete successfully
assert len(results) == 3
for result in results:
assert len(result) > 0
assert result[0].text is not None
if __name__ == "__main__":
pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/maverick_mcp/monitoring/health_check.py:
--------------------------------------------------------------------------------
```python
"""
Health check module for MaverickMCP.
This module provides comprehensive health checking capabilities for all system components
including database, cache, APIs, and external services.
"""
import asyncio
import logging
import time
from dataclasses import dataclass
from datetime import UTC, datetime
from enum import Enum
from typing import Any
logger = logging.getLogger(__name__)
class HealthStatus(str, Enum):
"""Health status enumeration."""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
@dataclass
class ComponentHealth:
"""Health information for a component."""
name: str
status: HealthStatus
message: str
response_time_ms: float | None = None
details: dict[str, Any] | None = None
last_check: datetime | None = None
@dataclass
class SystemHealth:
"""Overall system health information."""
status: HealthStatus
components: dict[str, ComponentHealth]
overall_response_time_ms: float
timestamp: datetime
uptime_seconds: float | None = None
version: str | None = None
class HealthChecker:
"""
Comprehensive health checker for MaverickMCP system components.
This class provides health checking capabilities for:
- Database connections
- Redis cache
- External APIs (Tiingo, OpenRouter, etc.)
- System resources
- Application services
"""
def __init__(self):
"""Initialize the health checker."""
self.start_time = time.time()
self._component_checkers = {}
self._setup_component_checkers()
def _setup_component_checkers(self):
"""Setup component-specific health checkers."""
self._component_checkers = {
"database": self._check_database_health,
"cache": self._check_cache_health,
"tiingo_api": self._check_tiingo_api_health,
"openrouter_api": self._check_openrouter_api_health,
"exa_api": self._check_exa_api_health,
"system_resources": self._check_system_resources_health,
}
async def check_health(self, components: list[str] | None = None) -> SystemHealth:
"""
Check health of specified components or all components.
Args:
components: List of component names to check. If None, checks all components.
Returns:
SystemHealth object with overall and component-specific health information.
"""
start_time = time.time()
# Determine which components to check
components_to_check = components or list(self._component_checkers.keys())
# Run health checks concurrently
component_results = {}
tasks = []
for component_name in components_to_check:
if component_name in self._component_checkers:
task = asyncio.create_task(
self._check_component_with_timeout(component_name),
name=f"health_check_{component_name}",
)
tasks.append((component_name, task))
# Wait for all checks to complete
for component_name, task in tasks:
try:
component_results[component_name] = await task
except Exception as e:
logger.error(f"Health check failed for {component_name}: {e}")
component_results[component_name] = ComponentHealth(
name=component_name,
status=HealthStatus.UNHEALTHY,
message=f"Health check failed: {str(e)}",
last_check=datetime.now(UTC),
)
# Calculate overall response time
overall_response_time = (time.time() - start_time) * 1000
# Determine overall health status
overall_status = self._calculate_overall_status(component_results)
return SystemHealth(
status=overall_status,
components=component_results,
overall_response_time_ms=overall_response_time,
timestamp=datetime.now(UTC),
uptime_seconds=time.time() - self.start_time,
version=self._get_application_version(),
)
async def _check_component_with_timeout(
self, component_name: str, timeout: float = 10.0
) -> ComponentHealth:
"""
Check component health with timeout protection.
Args:
component_name: Name of the component to check
timeout: Timeout in seconds
Returns:
ComponentHealth for the component
"""
try:
return await asyncio.wait_for(
self._component_checkers[component_name](), timeout=timeout
)
except TimeoutError:
return ComponentHealth(
name=component_name,
status=HealthStatus.UNHEALTHY,
message=f"Health check timed out after {timeout}s",
last_check=datetime.now(UTC),
)
async def _check_database_health(self) -> ComponentHealth:
"""Check database health."""
start_time = time.time()
try:
from sqlalchemy import text
from maverick_mcp.data.database import get_db_session
with get_db_session() as session:
# Simple query to test database connectivity
result = session.execute(text("SELECT 1 as health_check"))
result.fetchone()
response_time = (time.time() - start_time) * 1000
return ComponentHealth(
name="database",
status=HealthStatus.HEALTHY,
message="Database connection successful",
response_time_ms=response_time,
last_check=datetime.now(UTC),
details={"connection_type": "SQLAlchemy"},
)
except Exception as e:
return ComponentHealth(
name="database",
status=HealthStatus.UNHEALTHY,
message=f"Database connection failed: {str(e)}",
response_time_ms=(time.time() - start_time) * 1000,
last_check=datetime.now(UTC),
)
async def _check_cache_health(self) -> ComponentHealth:
"""Check cache health."""
start_time = time.time()
try:
from maverick_mcp.data.cache import get_cache_stats, get_redis_client
# Check Redis connection if available
redis_client = get_redis_client()
cache_details = {"type": "memory"}
if redis_client:
# Test Redis connection
await asyncio.get_event_loop().run_in_executor(None, redis_client.ping)
cache_details["type"] = "redis"
cache_details["redis_connected"] = True
# Get cache statistics
stats = get_cache_stats()
cache_details.update(
{
"hit_rate_percent": stats.get("hit_rate_percent", 0),
"total_requests": stats.get("total_requests", 0),
"memory_cache_size": stats.get("memory_cache_size", 0),
}
)
response_time = (time.time() - start_time) * 1000
return ComponentHealth(
name="cache",
status=HealthStatus.HEALTHY,
message="Cache system operational",
response_time_ms=response_time,
last_check=datetime.now(UTC),
details=cache_details,
)
except Exception as e:
return ComponentHealth(
name="cache",
status=HealthStatus.DEGRADED,
message=f"Cache issues detected: {str(e)}",
response_time_ms=(time.time() - start_time) * 1000,
last_check=datetime.now(UTC),
)
async def _check_tiingo_api_health(self) -> ComponentHealth:
"""Check Tiingo API health."""
start_time = time.time()
try:
from maverick_mcp.config.settings import get_settings
from maverick_mcp.providers.data_provider import get_stock_provider
settings = get_settings()
if not settings.data_providers.tiingo_api_key:
return ComponentHealth(
name="tiingo_api",
status=HealthStatus.UNKNOWN,
message="Tiingo API key not configured",
response_time_ms=(time.time() - start_time) * 1000,
last_check=datetime.now(UTC),
)
# Test API with a simple quote request
provider = get_stock_provider()
quote = await provider.get_quote("AAPL")
response_time = (time.time() - start_time) * 1000
if quote and quote.get("price"):
return ComponentHealth(
name="tiingo_api",
status=HealthStatus.HEALTHY,
message="Tiingo API responding correctly",
response_time_ms=response_time,
last_check=datetime.now(UTC),
details={"test_symbol": "AAPL", "price_available": True},
)
else:
return ComponentHealth(
name="tiingo_api",
status=HealthStatus.DEGRADED,
message="Tiingo API responding but data may be incomplete",
response_time_ms=response_time,
last_check=datetime.now(UTC),
)
except Exception as e:
return ComponentHealth(
name="tiingo_api",
status=HealthStatus.UNHEALTHY,
message=f"Tiingo API check failed: {str(e)}",
response_time_ms=(time.time() - start_time) * 1000,
last_check=datetime.now(UTC),
)
async def _check_openrouter_api_health(self) -> ComponentHealth:
"""Check OpenRouter API health."""
start_time = time.time()
try:
from maverick_mcp.config.settings import get_settings
settings = get_settings()
if not settings.research.openrouter_api_key:
return ComponentHealth(
name="openrouter_api",
status=HealthStatus.UNKNOWN,
message="OpenRouter API key not configured",
response_time_ms=(time.time() - start_time) * 1000,
last_check=datetime.now(UTC),
)
# For now, just check if the key is configured
# A full API test would require making an actual request
response_time = (time.time() - start_time) * 1000
return ComponentHealth(
name="openrouter_api",
status=HealthStatus.HEALTHY,
message="OpenRouter API key configured",
response_time_ms=response_time,
last_check=datetime.now(UTC),
details={"api_key_configured": True},
)
except Exception as e:
return ComponentHealth(
name="openrouter_api",
status=HealthStatus.UNHEALTHY,
message=f"OpenRouter API check failed: {str(e)}",
response_time_ms=(time.time() - start_time) * 1000,
last_check=datetime.now(UTC),
)
async def _check_exa_api_health(self) -> ComponentHealth:
"""Check Exa API health."""
start_time = time.time()
try:
from maverick_mcp.config.settings import get_settings
settings = get_settings()
if not settings.research.exa_api_key:
return ComponentHealth(
name="exa_api",
status=HealthStatus.UNKNOWN,
message="Exa API key not configured",
response_time_ms=(time.time() - start_time) * 1000,
last_check=datetime.now(UTC),
)
# For now, just check if the key is configured
# A full API test would require making an actual request
response_time = (time.time() - start_time) * 1000
return ComponentHealth(
name="exa_api",
status=HealthStatus.HEALTHY,
message="Exa API key configured",
response_time_ms=response_time,
last_check=datetime.now(UTC),
details={"api_key_configured": True},
)
except Exception as e:
return ComponentHealth(
name="exa_api",
status=HealthStatus.UNHEALTHY,
message=f"Exa API check failed: {str(e)}",
response_time_ms=(time.time() - start_time) * 1000,
last_check=datetime.now(UTC),
)
async def _check_system_resources_health(self) -> ComponentHealth:
"""Check system resource health."""
start_time = time.time()
try:
import psutil
# Get system resource usage
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage("/")
# Determine status based on resource usage
status = HealthStatus.HEALTHY
messages = []
if cpu_percent > 80:
status = (
HealthStatus.DEGRADED
if cpu_percent < 90
else HealthStatus.UNHEALTHY
)
messages.append(f"High CPU usage: {cpu_percent:.1f}%")
if memory.percent > 85:
status = (
HealthStatus.DEGRADED
if memory.percent < 95
else HealthStatus.UNHEALTHY
)
messages.append(f"High memory usage: {memory.percent:.1f}%")
if disk.percent > 90:
status = (
HealthStatus.DEGRADED
if disk.percent < 95
else HealthStatus.UNHEALTHY
)
messages.append(f"High disk usage: {disk.percent:.1f}%")
message = (
"; ".join(messages)
if messages
else "System resources within normal limits"
)
response_time = (time.time() - start_time) * 1000
return ComponentHealth(
name="system_resources",
status=status,
message=message,
response_time_ms=response_time,
last_check=datetime.now(UTC),
details={
"cpu_percent": cpu_percent,
"memory_percent": memory.percent,
"disk_percent": disk.percent,
"memory_available_gb": memory.available / (1024**3),
"disk_free_gb": disk.free / (1024**3),
},
)
except ImportError:
return ComponentHealth(
name="system_resources",
status=HealthStatus.UNKNOWN,
message="psutil not available for system monitoring",
response_time_ms=(time.time() - start_time) * 1000,
last_check=datetime.now(UTC),
)
except Exception as e:
return ComponentHealth(
name="system_resources",
status=HealthStatus.UNHEALTHY,
message=f"System resource check failed: {str(e)}",
response_time_ms=(time.time() - start_time) * 1000,
last_check=datetime.now(UTC),
)
def _calculate_overall_status(
self, components: dict[str, ComponentHealth]
) -> HealthStatus:
"""
Calculate overall system health status based on component health.
Args:
components: Dictionary of component health results
Returns:
Overall HealthStatus
"""
if not components:
return HealthStatus.UNKNOWN
statuses = [comp.status for comp in components.values()]
# If any component is unhealthy, system is unhealthy
if HealthStatus.UNHEALTHY in statuses:
return HealthStatus.UNHEALTHY
# If any component is degraded, system is degraded
if HealthStatus.DEGRADED in statuses:
return HealthStatus.DEGRADED
# If all components are healthy, system is healthy
if all(status == HealthStatus.HEALTHY for status in statuses):
return HealthStatus.HEALTHY
# Mixed healthy/unknown status defaults to degraded
return HealthStatus.DEGRADED
def _get_application_version(self) -> str | None:
"""Get application version."""
try:
from maverick_mcp import __version__
return __version__
except ImportError:
return None
async def check_component(self, component_name: str) -> ComponentHealth:
"""
Check health of a specific component.
Args:
component_name: Name of the component to check
Returns:
ComponentHealth for the specified component
Raises:
ValueError: If component_name is not supported
"""
if component_name not in self._component_checkers:
raise ValueError(
f"Unknown component: {component_name}. "
f"Supported components: {list(self._component_checkers.keys())}"
)
return await self._check_component_with_timeout(component_name)
def get_supported_components(self) -> list[str]:
"""
Get list of supported component names.
Returns:
List of component names that can be checked
"""
return list(self._component_checkers.keys())
def get_health_status(self) -> dict[str, Any]:
"""
Get comprehensive health status (synchronous wrapper).
Returns:
Dictionary with health status and component information
"""
import asyncio
try:
# Try to get the current event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# We're already in an async context, return simplified status
return {
"status": "HEALTHY",
"components": {
name: {"status": "UNKNOWN", "message": "Check pending"}
for name in self._component_checkers.keys()
},
"timestamp": datetime.now(UTC).isoformat(),
"message": "Health check in async context",
"uptime_seconds": time.time() - self.start_time,
}
else:
# Run the async check in the existing loop
result = loop.run_until_complete(self.check_health())
return self._health_to_dict(result)
except RuntimeError:
# No event loop exists, create one
result = asyncio.run(self.check_health())
return self._health_to_dict(result)
async def check_overall_health(self) -> dict[str, Any]:
"""
Async method to check overall health.
Returns:
Dictionary with health status information
"""
result = await self.check_health()
return self._health_to_dict(result)
def _health_to_dict(self, health: SystemHealth) -> dict[str, Any]:
"""
Convert SystemHealth object to dictionary.
Args:
health: SystemHealth object
Returns:
Dictionary representation
"""
return {
"status": health.status.value,
"components": {
name: {
"status": comp.status.value,
"message": comp.message,
"response_time_ms": comp.response_time_ms,
"details": comp.details,
"last_check": comp.last_check.isoformat()
if comp.last_check
else None,
}
for name, comp in health.components.items()
},
"overall_response_time_ms": health.overall_response_time_ms,
"timestamp": health.timestamp.isoformat(),
"uptime_seconds": health.uptime_seconds,
"version": health.version,
}
# Convenience function for quick health checks
async def check_system_health(components: list[str] | None = None) -> SystemHealth:
"""
Convenience function to check system health.
Args:
components: Optional list of component names to check
Returns:
SystemHealth object
"""
checker = HealthChecker()
return await checker.check_health(components)
# Global health checker instance
_global_health_checker: HealthChecker | None = None
def get_health_checker() -> HealthChecker:
"""
Get or create the global health checker instance.
Returns:
HealthChecker instance
"""
global _global_health_checker
if _global_health_checker is None:
_global_health_checker = HealthChecker()
return _global_health_checker
```