This is page 6 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/maverick_mcp/api/routers/intelligent_backtesting.py:
--------------------------------------------------------------------------------
```python
"""MCP router for intelligent backtesting workflow."""
import logging
from typing import Any
from fastmcp import Context
from maverick_mcp.workflows import BacktestingWorkflow
logger = logging.getLogger(__name__)
def setup_intelligent_backtesting_tools(mcp):
"""Set up intelligent backtesting tools for MCP.
Args:
mcp: FastMCP instance
"""
@mcp.tool()
async def run_intelligent_backtest(
ctx: Context,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
initial_capital: float = 10000.0,
requested_strategy: str | None = None,
) -> dict[str, Any]:
"""Run intelligent backtesting workflow with market regime analysis and strategy optimization.
This advanced workflow analyzes market conditions, intelligently selects appropriate strategies,
optimizes parameters, and validates results through walk-forward analysis and Monte Carlo simulation.
Args:
symbol: Stock symbol to analyze (e.g., 'AAPL', 'TSLA')
start_date: Start date (YYYY-MM-DD), defaults to 1 year ago
end_date: End date (YYYY-MM-DD), defaults to today
initial_capital: Starting capital for backtest (default: $10,000)
requested_strategy: User-preferred strategy (optional, e.g., 'sma_cross', 'rsi', 'macd')
Returns:
Comprehensive analysis including:
- Market regime classification (trending/ranging/volatile)
- Intelligent strategy recommendations with confidence scores
- Optimized parameters for best performance
- Validation through walk-forward analysis
- Risk assessment and confidence-scored final recommendation
Examples:
run_intelligent_backtest("AAPL") # Full analysis for Apple
run_intelligent_backtest("TSLA", "2022-01-01", "2023-12-31") # Specific period
run_intelligent_backtest("MSFT", requested_strategy="rsi") # With strategy preference
"""
try:
# Initialize workflow
workflow = BacktestingWorkflow()
# Run intelligent backtesting
results = await workflow.run_intelligent_backtest(
symbol=symbol,
start_date=start_date,
end_date=end_date,
initial_capital=initial_capital,
requested_strategy=requested_strategy,
)
return results
except Exception as e:
logger.error(f"Intelligent backtest failed for {symbol}: {e}")
return {
"symbol": symbol,
"error": str(e),
"message": "Intelligent backtesting failed. Please check symbol and date range.",
}
@mcp.tool()
async def quick_market_regime_analysis(
ctx: Context,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
) -> dict[str, Any]:
"""Perform quick market regime analysis and strategy recommendations.
This is a faster alternative to full backtesting that provides market regime classification
and basic strategy recommendations without parameter optimization.
Args:
symbol: Stock symbol to analyze
start_date: Start date (YYYY-MM-DD), defaults to 1 year ago
end_date: End date (YYYY-MM-DD), defaults to today
Returns:
Quick analysis results with:
- Market regime classification (trending/ranging/volatile)
- Top 3 recommended strategies for current conditions
- Strategy fitness scores
- Market conditions summary
- Execution metadata
Examples:
quick_market_regime_analysis("AAPL")
quick_market_regime_analysis("BTC-USD", "2023-01-01", "2023-12-31")
"""
try:
# Initialize workflow
workflow = BacktestingWorkflow()
# Run quick analysis
results = await workflow.run_quick_analysis(
symbol=symbol,
start_date=start_date,
end_date=end_date,
)
return results
except Exception as e:
logger.error(f"Quick market analysis failed for {symbol}: {e}")
return {
"symbol": symbol,
"analysis_type": "quick_analysis",
"error": str(e),
"message": "Quick market analysis failed. Please check symbol and date range.",
}
@mcp.tool()
async def explain_market_regime(
ctx: Context,
regime: str,
) -> dict[str, Any]:
"""Explain market regime characteristics and suitable strategies.
Args:
regime: Market regime to explain (trending, ranging, volatile, etc.)
Returns:
Detailed explanation of the regime and strategy recommendations
"""
regime_explanations = {
"trending": {
"description": "A market in a clear directional movement (up or down trend)",
"characteristics": [
"Strong directional price movement",
"Higher highs and higher lows (uptrend) or lower highs and lower lows (downtrend)",
"Good momentum indicators",
"Volume supporting the trend direction",
],
"best_strategies": [
"sma_cross",
"ema_cross",
"macd",
"breakout",
"momentum",
],
"avoid_strategies": ["rsi", "mean_reversion", "bollinger"],
"risk_factors": [
"Trend reversals can be sudden",
"False breakouts in weak trends",
"Momentum strategies can give late signals",
],
},
"ranging": {
"description": "A market moving sideways within a defined price range",
"characteristics": [
"Price oscillates between support and resistance",
"No clear directional bias",
"Mean reversion tendencies",
"Lower volatility within the range",
],
"best_strategies": ["rsi", "bollinger", "mean_reversion"],
"avoid_strategies": ["sma_cross", "breakout", "momentum"],
"risk_factors": [
"False breakouts from range",
"Choppy price action can cause whipsaws",
"Range can persist longer than expected",
],
},
"volatile": {
"description": "A market with high price variability and unpredictable movements",
"characteristics": [
"Large price swings in short periods",
"High volatility percentile",
"Unpredictable direction changes",
"Often associated with news events or uncertainty",
],
"best_strategies": ["breakout", "volatility_breakout", "momentum"],
"avoid_strategies": ["mean_reversion", "sma_cross"],
"risk_factors": [
"High drawdown potential",
"Many false signals",
"Requires wider stops and position sizing",
],
},
"volatile_trending": {
"description": "A trending market with high volatility - strong moves with significant pullbacks",
"characteristics": [
"Clear trend direction but with high volatility",
"Strong moves followed by sharp retracements",
"Higher risk but potentially higher rewards",
"Often seen in growth stocks or emerging trends",
],
"best_strategies": [
"breakout",
"momentum",
"volatility_breakout",
"macd",
],
"avoid_strategies": ["rsi", "mean_reversion"],
"risk_factors": [
"High drawdown during pullbacks",
"Requires strong risk management",
"Emotional stress from volatility",
],
},
"low_volume": {
"description": "A market with below-average trading volume",
"characteristics": [
"Lower than average volume",
"Potentially less reliable signals",
"Wider bid-ask spreads",
"Less institutional participation",
],
"best_strategies": ["sma_cross", "ema_cross", "rsi"],
"avoid_strategies": ["breakout", "momentum"],
"risk_factors": [
"Lower liquidity",
"Breakouts may not sustain",
"Slippage on entries and exits",
],
},
}
if regime.lower() in regime_explanations:
return {
"regime": regime,
"explanation": regime_explanations[regime.lower()],
"trading_tips": [
f"Focus on {', '.join(regime_explanations[regime.lower()]['best_strategies'])} strategies",
f"Avoid {', '.join(regime_explanations[regime.lower()]['avoid_strategies'])} strategies",
"Always use proper risk management",
"Consider the broader market context",
],
}
else:
return {
"regime": regime,
"error": f"Unknown regime '{regime}'",
"available_regimes": list(regime_explanations.keys()),
"message": "Please specify one of the available market regimes.",
}
```
--------------------------------------------------------------------------------
/maverick_mcp/utils/circuit_breaker_services.py:
--------------------------------------------------------------------------------
```python
"""
Service-specific circuit breakers for external APIs.
Provides pre-configured circuit breakers for different external services.
"""
import logging
import pandas as pd
import requests
from requests.exceptions import ConnectionError, HTTPError, RequestException, Timeout
from maverick_mcp.config.settings import get_settings
from maverick_mcp.utils.circuit_breaker import (
CircuitBreakerConfig,
EnhancedCircuitBreaker,
FailureDetectionStrategy,
)
from maverick_mcp.utils.fallback_strategies import (
ECONOMIC_DATA_FALLBACK,
MARKET_DATA_FALLBACK,
NEWS_FALLBACK,
STOCK_DATA_FALLBACK_CHAIN,
)
logger = logging.getLogger(__name__)
settings = get_settings()
# Service-specific configurations
YFINANCE_CONFIG = CircuitBreakerConfig(
name="yfinance",
failure_threshold=3,
failure_rate_threshold=0.5,
timeout_threshold=30.0,
recovery_timeout=120, # 2 minutes
success_threshold=2,
window_size=300, # 5 minutes
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,), # yfinance can throw various exceptions
)
FINVIZ_CONFIG = CircuitBreakerConfig(
name="finviz",
failure_threshold=5,
failure_rate_threshold=0.6,
timeout_threshold=20.0,
recovery_timeout=180, # 3 minutes
success_threshold=3,
window_size=300,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
)
FRED_CONFIG = CircuitBreakerConfig(
name="fred_api",
failure_threshold=5,
failure_rate_threshold=0.5,
timeout_threshold=15.0,
recovery_timeout=300, # 5 minutes
success_threshold=3,
window_size=600, # 10 minutes
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
)
EXTERNAL_API_CONFIG = CircuitBreakerConfig(
name="external_api",
failure_threshold=3,
failure_rate_threshold=0.4,
timeout_threshold=10.0,
recovery_timeout=60,
success_threshold=2,
window_size=300,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(RequestException, HTTPError, Timeout, ConnectionError),
)
TIINGO_CONFIG = CircuitBreakerConfig(
name="tiingo",
failure_threshold=3,
failure_rate_threshold=0.5,
timeout_threshold=15.0,
recovery_timeout=120,
success_threshold=2,
window_size=300,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
)
HTTP_CONFIG = CircuitBreakerConfig(
name="http_general",
failure_threshold=5,
failure_rate_threshold=0.6,
timeout_threshold=30.0,
recovery_timeout=60,
success_threshold=3,
window_size=300,
detection_strategy=FailureDetectionStrategy.FAILURE_RATE,
expected_exceptions=(RequestException, HTTPError, Timeout, ConnectionError),
)
class StockDataCircuitBreaker(EnhancedCircuitBreaker):
"""Circuit breaker for stock data APIs (yfinance)."""
def __init__(self):
"""Initialize with yfinance configuration."""
super().__init__(YFINANCE_CONFIG)
self.fallback_chain = STOCK_DATA_FALLBACK_CHAIN
def fetch_with_fallback(
self,
fetch_func: callable,
symbol: str,
start_date: str,
end_date: str,
**kwargs,
) -> pd.DataFrame:
"""
Fetch stock data with circuit breaker and fallback.
Args:
fetch_func: The function to fetch data (e.g., yfinance call)
symbol: Stock symbol
start_date: Start date
end_date: End date
**kwargs: Additional arguments for fetch_func
Returns:
DataFrame with stock data
"""
try:
# Try primary fetch through circuit breaker
return self.call_sync(fetch_func, symbol, start_date, end_date, **kwargs)
except Exception as e:
logger.warning(
f"Primary stock data fetch failed for {symbol}: {e}. "
f"Attempting fallback strategies."
)
# Execute fallback chain
return self.fallback_chain.execute_sync(
symbol, start_date, end_date, **kwargs
)
async def fetch_with_fallback_async(
self,
fetch_func: callable,
symbol: str,
start_date: str,
end_date: str,
**kwargs,
) -> pd.DataFrame:
"""Async version of fetch_with_fallback."""
try:
return await self.call_async(
fetch_func, symbol, start_date, end_date, **kwargs
)
except Exception as e:
logger.warning(
f"Primary stock data fetch failed for {symbol}: {e}. "
f"Attempting fallback strategies."
)
return await self.fallback_chain.execute_async(
symbol, start_date, end_date, **kwargs
)
class MarketDataCircuitBreaker(EnhancedCircuitBreaker):
"""Circuit breaker for market data APIs (finviz, External API)."""
def __init__(self, service_name: str = "market_data"):
"""Initialize with market data configuration."""
if service_name == "finviz":
config = FINVIZ_CONFIG
elif service_name == "external_api":
config = EXTERNAL_API_CONFIG
else:
config = FINVIZ_CONFIG # Default
super().__init__(config)
self.fallback = MARKET_DATA_FALLBACK
def fetch_with_fallback(
self, fetch_func: callable, mover_type: str = "gainers", **kwargs
) -> dict:
"""Fetch market data with circuit breaker and fallback."""
try:
return self.call_sync(fetch_func, mover_type, **kwargs)
except Exception as e:
logger.warning(
f"Market data fetch failed for {mover_type}: {e}. "
f"Returning fallback data."
)
return self.fallback.execute_sync(mover_type, **kwargs)
class EconomicDataCircuitBreaker(EnhancedCircuitBreaker):
"""Circuit breaker for economic data APIs (FRED)."""
def __init__(self):
"""Initialize with FRED configuration."""
super().__init__(FRED_CONFIG)
self.fallback = ECONOMIC_DATA_FALLBACK
def fetch_with_fallback(
self,
fetch_func: callable,
series_id: str,
start_date: str,
end_date: str,
**kwargs,
) -> pd.Series:
"""Fetch economic data with circuit breaker and fallback."""
try:
return self.call_sync(fetch_func, series_id, start_date, end_date, **kwargs)
except Exception as e:
logger.warning(
f"Economic data fetch failed for {series_id}: {e}. "
f"Using fallback values."
)
return self.fallback.execute_sync(series_id, start_date, end_date, **kwargs)
class NewsDataCircuitBreaker(EnhancedCircuitBreaker):
"""Circuit breaker for news/sentiment APIs."""
def __init__(self):
"""Initialize with news API configuration."""
# Use a generic config for news APIs
config = CircuitBreakerConfig(
name="news_api",
failure_threshold=3,
failure_rate_threshold=0.6,
timeout_threshold=10.0,
recovery_timeout=300,
success_threshold=2,
window_size=600,
detection_strategy=FailureDetectionStrategy.COMBINED,
expected_exceptions=(Exception,),
)
super().__init__(config)
self.fallback = NEWS_FALLBACK
def fetch_with_fallback(self, fetch_func: callable, symbol: str, **kwargs) -> dict:
"""Fetch news data with circuit breaker and fallback."""
try:
return self.call_sync(fetch_func, symbol, **kwargs)
except Exception as e:
logger.warning(
f"News data fetch failed for {symbol}: {e}. Returning empty news data."
)
return self.fallback.execute_sync(symbol, **kwargs)
class HttpCircuitBreaker(EnhancedCircuitBreaker):
"""General circuit breaker for HTTP requests."""
def __init__(self):
"""Initialize with HTTP configuration."""
super().__init__(HTTP_CONFIG)
def request_with_circuit_breaker(
self, method: str, url: str, session: requests.Session | None = None, **kwargs
) -> requests.Response:
"""
Make HTTP request with circuit breaker protection.
Args:
method: HTTP method (GET, POST, etc.)
url: URL to request
session: Optional requests session
**kwargs: Additional arguments for requests
Returns:
Response object
"""
def make_request():
# Ensure timeout is set
if "timeout" not in kwargs:
kwargs["timeout"] = self.config.timeout_threshold
if session:
return session.request(method, url, **kwargs)
else:
return requests.request(method, url, **kwargs)
return self.call_sync(make_request)
# Global instances for reuse
stock_data_breaker = StockDataCircuitBreaker()
market_data_breaker = MarketDataCircuitBreaker()
economic_data_breaker = EconomicDataCircuitBreaker()
news_data_breaker = NewsDataCircuitBreaker()
http_breaker = HttpCircuitBreaker()
def get_service_circuit_breaker(service: str) -> EnhancedCircuitBreaker:
"""
Get a circuit breaker for a specific service.
Args:
service: Service name (yfinance, finviz, fred, news, http)
Returns:
Configured circuit breaker for the service
"""
service_breakers = {
"yfinance": stock_data_breaker,
"finviz": market_data_breaker,
"fred": economic_data_breaker,
"external_api": MarketDataCircuitBreaker("external_api"),
"tiingo": EnhancedCircuitBreaker(TIINGO_CONFIG),
"news": news_data_breaker,
"http": http_breaker,
}
breaker = service_breakers.get(service)
if not breaker:
logger.warning(
f"No specific circuit breaker for service '{service}', using HTTP breaker"
)
return http_breaker
return breaker
```
--------------------------------------------------------------------------------
/docs/SETUP_SELF_CONTAINED.md:
--------------------------------------------------------------------------------
```markdown
# MaverickMCP Self-Contained Setup Guide
⚠️ **IMPORTANT FINANCIAL DISCLAIMER**: This software is for educational and informational purposes only. It is NOT financial advice. Past performance does not guarantee future results. Always consult with a qualified financial advisor before making investment decisions.
This guide explains how to set up MaverickMCP as a completely self-contained system for personal-use financial analysis with Claude Desktop.
## Overview
MaverickMCP is now fully self-contained and doesn't require any external database dependencies. All data is stored in its own PostgreSQL database with the `mcp_` prefix for all tables to avoid conflicts.
## Prerequisites
- Python 3.12+
- PostgreSQL 14+ (or SQLite for development)
- Redis (optional, for caching)
- Tiingo API account (free tier available)
## Quick Start
### 1. Clone and Install
```bash
# Clone the repository
git clone https://github.com/wshobson/maverick-mcp.git
cd maverick-mcp
# Install dependencies using uv (recommended)
uv sync
# Or use pip
pip install -e .
```
### 2. Configure Environment
Create a `.env` file with your configuration:
```bash
# Database Configuration (MaverickMCP's own database)
MCP_DATABASE_URL=postgresql://user:password@localhost/maverick_mcp
# Or use SQLite for development
# MCP_DATABASE_URL=sqlite:///maverick_mcp.db
# Tiingo API Configuration (required for data loading)
TIINGO_API_TOKEN=your_tiingo_api_token_here
# Redis Configuration (optional)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
CACHE_ENABLED=true
# Personal Use Configuration
# Note: Authentication and billing systems have been removed for simplicity
# This version is designed for local personal use only
```
### 3. Create Database
```bash
# Create PostgreSQL database
createdb maverick_mcp
# Or use existing PostgreSQL
psql -U postgres -c "CREATE DATABASE maverick_mcp;"
```
### 4. Run Migrations
```bash
# Initialize Alembic (if not already done)
alembic init alembic
# Run all migrations to create schema
alembic upgrade head
# Verify migration
alembic current
```
The migration creates the following self-contained tables:
- `mcp_stocks` - Master stock information
- `mcp_price_cache` - Historical price data
- `mcp_maverick_stocks` - Maverick screening results
- `mcp_maverick_bear_stocks` - Bear market screening
- `mcp_supply_demand_breakouts` - Supply/demand analysis
- `mcp_technical_cache` - Technical indicator cache
- `mcp_users`, `mcp_api_keys`, etc. - Authentication tables
### 5. Load Initial Data
#### Option A: Quick Start (Top 10 S&P 500)
```bash
# Load 2 years of data for top 10 S&P 500 stocks
python scripts/load_tiingo_data.py \
--symbols AAPL MSFT GOOGL AMZN NVDA META TSLA LLY V UNH \
--years 2 \
--calculate-indicators \
--run-screening
```
#### Option B: Full S&P 500
```bash
# Load S&P 500 stocks with screening
python scripts/load_tiingo_data.py \
--sp500 \
--years 2 \
--calculate-indicators \
--run-screening
```
#### Option C: Custom Symbols
```bash
# Load specific symbols with custom date range
python scripts/load_tiingo_data.py \
--symbols AAPL MSFT GOOGL \
--start-date 2022-01-01 \
--end-date 2024-01-01 \
--calculate-indicators \
--run-screening
```
### 6. Start the Server
```bash
# Recommended: Use the Makefile
make dev
# Alternative: Direct FastMCP server
python -m maverick_mcp.api.server --transport streamable-http --port 8003
# Development mode with hot reload
./scripts/dev.sh
```
### 7. Verify Setup
```bash
# Check health endpoint
curl http://localhost:8003/health
# Test a simple query (if using MCP client)
echo '{"method": "tools/list"}' | nc localhost 8003
# Or use the API directly
curl http://localhost:8003/api/data/stock/AAPL
```
## Database Schema
The self-contained schema uses the `mcp_` prefix for all tables:
```sql
-- Stock Data Tables
mcp_stocks -- Master stock information
├── stock_id (UUID PK)
├── ticker_symbol
├── company_name
├── sector
└── ...
mcp_price_cache -- Historical OHLCV data
├── price_cache_id (UUID PK)
├── stock_id (FK -> mcp_stocks)
├── date
├── open_price, high_price, low_price, close_price
└── volume
-- Screening Tables
mcp_maverick_stocks -- Momentum screening
├── id (PK)
├── stock_id (FK -> mcp_stocks)
├── combined_score
└── technical indicators...
mcp_maverick_bear_stocks -- Bear screening
├── id (PK)
├── stock_id (FK -> mcp_stocks)
├── score
└── bearish indicators...
mcp_supply_demand_breakouts -- Supply/demand analysis
├── id (PK)
├── stock_id (FK -> mcp_stocks)
├── momentum_score
└── accumulation metrics...
mcp_technical_cache -- Flexible indicator storage
├── id (PK)
├── stock_id (FK -> mcp_stocks)
├── indicator_type
└── values...
```
## Data Loading Details
### Rate Limiting
Tiingo API has a rate limit of 2400 requests/hour. The loader automatically handles this:
```python
# Configure in load_tiingo_data.py
MAX_CONCURRENT_REQUESTS = 5 # Parallel requests
RATE_LIMIT_DELAY = 1.5 # Seconds between requests
```
### Resume Capability
The loader saves progress and can resume interrupted loads:
```bash
# Start loading (creates checkpoint file)
python scripts/load_tiingo_data.py --sp500 --years 2
# If interrupted, resume from checkpoint
python scripts/load_tiingo_data.py --resume
```
### Technical Indicators Calculated
- **Trend**: SMA (20, 50, 150, 200), EMA (21)
- **Momentum**: RSI, MACD, Relative Strength Rating
- **Volatility**: ATR, ADR (Average Daily Range)
- **Volume**: 30-day average, volume ratio
### Screening Algorithms
1. **Maverick Screening** (Momentum)
- Price > EMA21 > SMA50 > SMA200
- Momentum Score > 70
- Combined score calculation
2. **Bear Screening** (Weakness)
- Price < EMA21 < SMA50
- Momentum Score < 30
- Negative MACD
3. **Supply/Demand Screening** (Accumulation)
- Price > SMA50 > SMA150 > SMA200
- Momentum Score > 80
- Volume confirmation
## Troubleshooting
### Database Connection Issues
```bash
# Test PostgreSQL connection
psql -U user -d maverick_mcp -c "SELECT 1;"
# Use SQLite for testing
export MCP_DATABASE_URL=sqlite:///test.db
```
### Tiingo API Issues
```bash
# Test Tiingo API token
curl -H "Authorization: Token YOUR_TOKEN" \
"https://api.tiingo.com/api/test"
# Check rate limit status
curl -H "Authorization: Token YOUR_TOKEN" \
"https://api.tiingo.com/account/usage"
```
### Migration Issues
```bash
# Check current migration
alembic current
# Show migration history
alembic history
# Downgrade if needed
alembic downgrade -1
# Re-run specific migration
alembic upgrade 010_self_contained_schema
```
### Data Loading Issues
```bash
# Check checkpoint file
cat tiingo_load_progress.json
# Clear checkpoint to start fresh
rm tiingo_load_progress.json
# Load single symbol for testing
python scripts/load_tiingo_data.py \
--symbols AAPL \
--years 1 \
--calculate-indicators
```
## Performance Optimization
### Database Indexes
The migration creates optimized indexes for common queries:
```sql
-- Price data lookups
CREATE INDEX mcp_price_cache_stock_id_date_idx
ON mcp_price_cache(stock_id, date);
-- Screening queries
CREATE INDEX mcp_maverick_stocks_combined_score_idx
ON mcp_maverick_stocks(combined_score DESC);
-- Supply/demand filtering
CREATE INDEX mcp_supply_demand_breakouts_ma_filter_idx
ON mcp_supply_demand_breakouts(close_price, sma_50, sma_150, sma_200);
```
### Connection Pooling
Configure in `.env`:
```bash
DB_POOL_SIZE=20
DB_MAX_OVERFLOW=10
DB_POOL_TIMEOUT=30
DB_POOL_RECYCLE=3600
```
### Caching with Redis
Enable Redis caching for better performance:
```bash
CACHE_ENABLED=true
CACHE_TTL_SECONDS=300
REDIS_HOST=localhost
REDIS_PORT=6379
```
## Personal Use Deployment
### 1. Use Local Database
```bash
# Use SQLite for simplicity
MCP_DATABASE_URL=sqlite:///maverick_mcp.db
# Or PostgreSQL for better performance
MCP_DATABASE_URL=postgresql://user:password@localhost/maverick_mcp
```
### 2. Connect with Claude Desktop
Add to `~/Library/Application Support/Claude/claude_desktop_config.json`:
```json
{
"mcpServers": {
"maverick-mcp": {
"command": "npx",
"args": ["-y", "mcp-remote", "http://localhost:8003/mcp"]
}
}
}
```
### 5. Set Up Daily Data Updates
Create a cron job for daily updates:
```bash
# Add to crontab
0 1 * * * /path/to/venv/bin/python /path/to/scripts/load_tiingo_data.py \
--sp500 --years 0.1 --calculate-indicators --run-screening
```
## API Usage Examples
### Fetch Stock Data
```python
from maverick_mcp.data.models import Stock, PriceCache, get_db
# Get historical data
with get_db() as session:
df = PriceCache.get_price_data(
session,
"AAPL",
"2023-01-01",
"2024-01-01"
)
```
### Run Screening
```python
from maverick_mcp.data.models import MaverickStocks
# Get top momentum stocks
with get_db() as session:
top_stocks = MaverickStocks.get_top_stocks(session, limit=20)
for stock in top_stocks:
print(f"{stock.stock}: Score {stock.combined_score}")
```
### Using MCP Tools
```bash
# List available tools
curl -X POST http://localhost:8003/mcp \
-H "Content-Type: application/json" \
-d '{"method": "tools/list"}'
# Get screening results
curl -X POST http://localhost:8003/mcp \
-H "Content-Type: application/json" \
-d '{
"method": "tools/call",
"params": {
"name": "get_maverick_stocks",
"arguments": {"limit": 10}
}
}'
```
## Next Steps
1. **Load More Data**: Expand beyond S&P 500 to Russell 3000
2. **Add More Indicators**: Implement additional technical indicators
3. **Custom Screening**: Create your own screening algorithms
4. **Web Dashboard**: Deploy the maverick-mcp-web frontend
5. **API Integration**: Build applications using the MCP protocol
## Support
- GitHub Issues: [Report bugs or request features](https://github.com/wshobson/maverick-mcp/issues)
- Discussions: [Join community discussions](https://github.com/wshobson/maverick-mcp/discussions)
- Documentation: [Read the full docs](https://github.com/wshobson/maverick-mcp)
---
*MaverickMCP is now completely self-contained and ready for deployment!*
```
--------------------------------------------------------------------------------
/maverick_mcp/backtesting/strategies/parser.py:
--------------------------------------------------------------------------------
```python
"""Natural language strategy parser for VectorBT."""
import re
from typing import Any
from langchain.prompts import PromptTemplate
from langchain_anthropic import ChatAnthropic
from .templates import STRATEGY_TEMPLATES
class StrategyParser:
"""Parser for converting natural language to VectorBT strategies."""
def __init__(self, llm: ChatAnthropic | None = None):
"""Initialize strategy parser.
Args:
llm: Language model for parsing (optional)
"""
self.llm = llm
self.templates = STRATEGY_TEMPLATES
def parse_simple(self, description: str) -> dict[str, Any]:
"""Parse simple strategy descriptions without LLM.
Args:
description: Natural language strategy description
Returns:
Strategy configuration
"""
description_lower = description.lower()
# Try to match known strategy patterns
if "sma" in description_lower or "moving average cross" in description_lower:
return self._parse_sma_strategy(description)
elif "rsi" in description_lower:
return self._parse_rsi_strategy(description)
elif "macd" in description_lower:
return self._parse_macd_strategy(description)
elif "bollinger" in description_lower or "band" in description_lower:
return self._parse_bollinger_strategy(description)
elif "momentum" in description_lower:
return self._parse_momentum_strategy(description)
elif "ema" in description_lower or "exponential" in description_lower:
return self._parse_ema_strategy(description)
elif "breakout" in description_lower or "channel" in description_lower:
return self._parse_breakout_strategy(description)
elif "mean reversion" in description_lower or "reversion" in description_lower:
return self._parse_mean_reversion_strategy(description)
else:
# Default to momentum if no clear match
return {
"strategy_type": "momentum",
"parameters": self.templates["momentum"]["parameters"],
}
def _parse_sma_strategy(self, description: str) -> dict[str, Any]:
"""Parse SMA crossover strategy from description."""
# Extract numbers from description
numbers = re.findall(r"\d+", description)
params = dict(self.templates["sma_cross"]["parameters"])
if len(numbers) >= 2:
params["fast_period"] = int(numbers[0])
params["slow_period"] = int(numbers[1])
elif len(numbers) == 1:
params["fast_period"] = int(numbers[0])
return {
"strategy_type": "sma_cross",
"parameters": params,
}
def _parse_rsi_strategy(self, description: str) -> dict[str, Any]:
"""Parse RSI strategy from description."""
numbers = re.findall(r"\d+", description)
params = dict(self.templates["rsi"]["parameters"])
# Look for period
for _i, num in enumerate(numbers):
num_val = int(num)
# Period is typically 7-21
if 5 <= num_val <= 30 and "period" not in params:
params["period"] = num_val
# Oversold is typically 20-35
elif 15 <= num_val <= 35:
params["oversold"] = num_val
# Overbought is typically 65-85
elif 65 <= num_val <= 85:
params["overbought"] = num_val
return {
"strategy_type": "rsi",
"parameters": params,
}
def _parse_macd_strategy(self, description: str) -> dict[str, Any]:
"""Parse MACD strategy from description."""
numbers = re.findall(r"\d+", description)
params = dict(self.templates["macd"]["parameters"])
if len(numbers) >= 3:
params["fast_period"] = int(numbers[0])
params["slow_period"] = int(numbers[1])
params["signal_period"] = int(numbers[2])
return {
"strategy_type": "macd",
"parameters": params,
}
def _parse_bollinger_strategy(self, description: str) -> dict[str, Any]:
"""Parse Bollinger Bands strategy from description."""
numbers = re.findall(r"\d+\.?\d*", description)
params = dict(self.templates["bollinger"]["parameters"])
for num in numbers:
num_val = float(num)
# Period is typically 10-30
if num_val == int(num_val) and 5 <= num_val <= 50:
params["period"] = int(num_val)
# Std dev is typically 1.5-3.0
elif 1.0 <= num_val <= 4.0:
params["std_dev"] = num_val
return {
"strategy_type": "bollinger",
"parameters": params,
}
def _parse_momentum_strategy(self, description: str) -> dict[str, Any]:
"""Parse momentum strategy from description."""
numbers = re.findall(r"\d+\.?\d*", description)
params = dict(self.templates["momentum"]["parameters"])
for num in numbers:
num_val = float(num)
# Lookback is typically 10-50
if num_val == int(num_val) and 5 <= num_val <= 100:
params["lookback"] = int(num_val)
# Threshold is typically 0.01-0.20
elif 0.001 <= num_val <= 0.5:
params["threshold"] = num_val
# Handle percentage notation (e.g., "5%" -> 0.05)
elif description[description.find(str(num)) + len(str(num))] == "%":
params["threshold"] = num_val / 100
return {
"strategy_type": "momentum",
"parameters": params,
}
def _parse_ema_strategy(self, description: str) -> dict[str, Any]:
"""Parse EMA crossover strategy from description."""
numbers = re.findall(r"\d+", description)
params = dict(self.templates["ema_cross"]["parameters"])
if len(numbers) >= 2:
params["fast_period"] = int(numbers[0])
params["slow_period"] = int(numbers[1])
elif len(numbers) == 1:
params["fast_period"] = int(numbers[0])
return {
"strategy_type": "ema_cross",
"parameters": params,
}
def _parse_breakout_strategy(self, description: str) -> dict[str, Any]:
"""Parse breakout strategy from description."""
numbers = re.findall(r"\d+", description)
params = dict(self.templates["breakout"]["parameters"])
if len(numbers) >= 2:
params["lookback"] = int(numbers[0])
params["exit_lookback"] = int(numbers[1])
elif len(numbers) == 1:
params["lookback"] = int(numbers[0])
return {
"strategy_type": "breakout",
"parameters": params,
}
def _parse_mean_reversion_strategy(self, description: str) -> dict[str, Any]:
"""Parse mean reversion strategy from description."""
numbers = re.findall(r"\d+\.?\d*", description)
params = dict(self.templates["mean_reversion"]["parameters"])
for num in numbers:
num_val = float(num)
if num_val == int(num_val) and 5 <= num_val <= 100:
params["ma_period"] = int(num_val)
elif 0.001 <= num_val <= 0.2:
if "entry" in description.lower():
params["entry_threshold"] = num_val
elif "exit" in description.lower():
params["exit_threshold"] = num_val
return {
"strategy_type": "mean_reversion",
"parameters": params,
}
async def parse_with_llm(self, description: str) -> dict[str, Any]:
"""Parse complex strategy descriptions using LLM.
Args:
description: Natural language strategy description
Returns:
Strategy configuration
"""
if not self.llm:
# Fall back to simple parsing
return self.parse_simple(description)
prompt = PromptTemplate(
input_variables=["description", "available_strategies"],
template="""
Convert this trading strategy description into a structured format.
Description: {description}
Available strategy types:
{available_strategies}
Return a JSON object with:
- strategy_type: one of the available types
- parameters: dictionary of parameters for that strategy
- entry_logic: description of entry conditions
- exit_logic: description of exit conditions
Example response:
{{
"strategy_type": "sma_cross",
"parameters": {{
"fast_period": 10,
"slow_period": 20
}},
"entry_logic": "Buy when fast SMA crosses above slow SMA",
"exit_logic": "Sell when fast SMA crosses below slow SMA"
}}
""",
)
available = "\n".join(
[f"- {k}: {v['description']}" for k, v in self.templates.items()]
)
response = await self.llm.ainvoke(
prompt.format(description=description, available_strategies=available)
)
# Parse JSON response
import json
try:
result = json.loads(response.content)
return result
except json.JSONDecodeError:
# Fall back to simple parsing
return self.parse_simple(description)
def validate_strategy(self, config: dict[str, Any]) -> bool:
"""Validate strategy configuration.
Args:
config: Strategy configuration
Returns:
True if valid
"""
strategy_type = config.get("strategy_type")
if strategy_type not in self.templates:
return False
template = self.templates[strategy_type]
required_params = set(template["parameters"].keys())
provided_params = set(config.get("parameters", {}).keys())
# Check if all required parameters are present
return required_params.issubset(provided_params)
```
--------------------------------------------------------------------------------
/docs/PORTFOLIO.md:
--------------------------------------------------------------------------------
```markdown
# Portfolio Management Guide
Complete guide to using MaverickMCP's portfolio personalization features for intelligent, context-aware stock analysis.
## Table of Contents
- [Overview](#overview)
- [Quick Start](#quick-start)
- [Portfolio Management](#portfolio-management)
- [Intelligent Analysis](#intelligent-analysis)
- [MCP Resource](#mcp-resource)
- [Best Practices](#best-practices)
- [Troubleshooting](#troubleshooting)
- [Technical Details](#technical-details)
## Overview
MaverickMCP's portfolio features transform your AI financial assistant from stateless analysis to personalized, context-aware recommendations. The system:
- **Tracks your holdings** with automatic cost basis averaging
- **Calculates live P&L** using real-time market data
- **Enhances analysis tools** to auto-detect your positions
- **Provides AI context** through MCP resources
**DISCLAIMER**: All portfolio features are for educational purposes only. This is not investment advice. Always consult qualified financial professionals before making investment decisions.
## Quick Start
### 1. Add Your First Position
```
Add 10 shares of Apple stock I bought at $150.50 on January 15, 2024
```
Behind the scenes, this uses:
```python
portfolio_add_position(
ticker="AAPL",
shares=10,
purchase_price=150.50,
purchase_date="2024-01-15"
)
```
### 2. View Your Portfolio
```
Show me my portfolio
```
Response includes:
- All positions with current prices
- Unrealized P&L for each position
- Total portfolio value and performance
- Diversification metrics
### 3. Smart Analysis
```
Compare my portfolio holdings
```
Automatically compares all your positions without manual ticker entry!
## Portfolio Management
### Adding Positions
**Add a new position:**
```
Add 50 shares of Microsoft at $380.25
```
**Add to existing position (automatic cost averaging):**
```
Add 25 more shares of Apple at $165.00
```
The system automatically:
- Averages your cost basis
- Updates total investment
- Preserves earliest purchase date
**Example:**
- Initial: 10 shares @ $150 = $1,500 total cost, $150 avg cost
- Add: 10 shares @ $170 = $1,700 total cost
- Result: 20 shares, $160 avg cost, $3,200 total invested
### Viewing Positions
**Get complete portfolio:**
```
Show my portfolio with current prices
```
Returns:
```json
{
"portfolio_name": "My Portfolio",
"positions": [
{
"ticker": "AAPL",
"shares": 20.0,
"average_cost_basis": 160.00,
"current_price": 175.50,
"unrealized_pnl": 310.00,
"unrealized_pnl_pct": 9.69
}
],
"total_value": 3510.00,
"total_invested": 3200.00,
"total_pnl": 310.00,
"total_pnl_pct": 9.69
}
```
### Removing Positions
**Partial sale:**
```
Sell 10 shares of Apple
```
Maintains average cost basis on remaining shares.
**Full position exit:**
```
Remove all my Tesla shares
```
or simply:
```
Remove TSLA
```
### Clearing Portfolio
**Remove all positions:**
```
Clear my entire portfolio
```
Requires confirmation for safety.
## Intelligent Analysis
### Auto-Compare Holdings
Instead of:
```
Compare AAPL, MSFT, GOOGL, TSLA
```
Simply use:
```
Compare my holdings
```
The tool automatically:
- Pulls all tickers from your portfolio
- Analyzes relative performance
- Ranks by metrics
- Shows best/worst performers
### Auto-Correlation Analysis
```
Analyze correlation in my portfolio
```
Automatically:
- Calculates correlation matrix for all holdings
- Identifies highly correlated pairs (diversification issues)
- Finds negative correlations (natural hedges)
- Provides diversification score
Example output:
```json
{
"average_portfolio_correlation": 0.612,
"diversification_score": 38.8,
"high_correlation_pairs": [
{
"pair": ["AAPL", "MSFT"],
"correlation": 0.823,
"interpretation": "High positive correlation"
}
],
"recommendation": "Consider adding uncorrelated assets"
}
```
### Position-Aware Risk Analysis
```
Analyze AAPL with risk analysis
```
If you own AAPL, automatically shows:
- Your current position (shares, cost basis)
- Unrealized P&L
- Position sizing recommendations
- Averaging down/up suggestions
Example with existing position:
```json
{
"ticker": "AAPL",
"current_price": 175.50,
"existing_position": {
"shares_owned": 20.0,
"average_cost_basis": 160.00,
"total_invested": 3200.00,
"current_value": 3510.00,
"unrealized_pnl": 310.00,
"unrealized_pnl_pct": 9.69,
"position_recommendation": "Hold current position"
}
}
```
## MCP Resource
### portfolio://my-holdings Resource
The portfolio resource provides automatic context to AI agents during conversations.
**Accessed via:**
```
What's in my portfolio?
```
The AI automatically sees:
- All current positions
- Live prices and P&L
- Portfolio composition
- Diversification status
This enables natural conversations:
```
Should I add more tech exposure?
```
The AI knows you already own AAPL, MSFT, GOOGL and can provide personalized advice.
## Best Practices
### Cost Basis Tracking
- **Always specify purchase date** for accurate records
- **Add notes** for important context (e.g., "RSU vest", "DCA purchase #3")
- **Review regularly** to ensure accuracy
### Diversification
- Use `portfolio_correlation_analysis()` monthly
- Watch for correlations above 0.7 (concentration risk)
- Consider uncorrelated assets when diversification score < 50
### Position Sizing
- Use `risk_adjusted_analysis()` before adding to positions
- Follow position sizing recommendations
- Respect stop-loss suggestions
### Maintenance
- **Weekly**: Review portfolio performance
- **Monthly**: Analyze correlations
- **Quarterly**: Rebalance based on analysis tools
## Troubleshooting
### "No portfolio found"
**Problem**: Trying to use auto-detection features without any positions.
**Solution**: Add at least one position:
```
Add 1 share of SPY at current price
```
### "Insufficient positions for comparison"
**Problem**: Need minimum 2 positions for comparison/correlation.
**Solution**: Add another position or specify tickers manually:
```
Compare AAPL, MSFT
```
### "Invalid ticker symbol"
**Problem**: Ticker doesn't exist or is incorrectly formatted.
**Solution**:
- Check ticker spelling
- Verify symbol on financial websites
- Use standard format (e.g., "BRK.B" not "BRKB")
### Stale Price Data
**Problem**: Portfolio shows old prices.
**Solution**: Refresh by calling `get_my_portfolio(include_current_prices=True)`
### Position Not Found
**Problem**: Trying to remove shares from position you don't own.
**Solution**: Check your portfolio first:
```
Show my portfolio
```
## Technical Details
### Cost Basis Method
**Average Cost Method**: Simplest and most appropriate for educational use.
Formula:
```
New Avg Cost = (Existing Total Cost + New Purchase Cost) / Total Shares
```
Example:
- Buy 10 @ $100 = $1,000 total, $100 avg
- Buy 10 @ $120 = $1,200 additional
- Result: 20 shares, $110 avg cost ($2,200 / 20)
### Database Schema
**Tables:**
- `mcp_portfolios`: User portfolio metadata
- `mcp_portfolio_positions`: Individual positions
**Precision:**
- Shares: Numeric(20,8) - supports fractional shares
- Prices: Numeric(12,4) - 4 decimal precision
- Total Cost: Numeric(20,4) - high precision for large positions
### Supported Features
✅ Fractional shares (0.001 minimum)
✅ Multiple portfolios per user
✅ Automatic cost averaging
✅ Live P&L calculations
✅ Position notes/annotations
✅ Timezone-aware timestamps
✅ Cascade deletion (portfolio → positions)
### Limitations
- Single currency (USD)
- Stock equities only (no options, futures, crypto)
- Average cost method only (no FIFO/LIFO)
- No tax lot tracking
- No dividend tracking (planned for future)
- No transaction history (planned for future)
### Data Sources
- **Historical Prices**: Tiingo API (free tier: 500 req/day)
- **Live Prices**: Same as historical (delayed 15 minutes on free tier)
- **Company Info**: Pre-seeded S&P 500 database
### Performance
- **Database**: SQLite default (PostgreSQL optional for better performance)
- **Caching**: In-memory by default (Redis optional)
- **Price Fetching**: Sequential (batch optimization in Phase 3)
- **Query Optimization**: selectin loading for relationships
### Privacy & Security
- **Local-first**: All data stored locally in your database
- **No cloud sync**: Portfolio data never leaves your machine
- **No authentication**: Personal use only (no multi-user)
- **No external sharing**: Data accessible only to you
## Migration Guide
### Upgrading from No Portfolio
1. Start MaverickMCP server
2. Migration runs automatically on first startup
3. Add your first position
4. Verify with `get_my_portfolio()`
### Downgrading (Rollback)
```bash
# Backup first
cp maverick_mcp.db maverick_mcp.db.backup
# Rollback migration
alembic downgrade -1
# Verify
alembic current
```
### Exporting Portfolio
Currently manual:
```bash
sqlite3 maverick_mcp.db "SELECT * FROM mcp_portfolio_positions;" > portfolio_export.csv
```
Future: Built-in export tool planned.
## FAQs
**Q: Can I track multiple portfolios?**
A: Yes! Use the `portfolio_name` parameter:
```python
add_portfolio_position("AAPL", 10, 150, portfolio_name="IRA")
add_portfolio_position("VOO", 5, 400, portfolio_name="401k")
```
**Q: What happens if I add wrong data?**
A: Simply remove the position and re-add:
```
Remove AAPL
Add 10 shares of AAPL at $150.50
```
**Q: Can I track realized gains?**
A: Not yet. Currently tracks unrealized P&L only. Transaction history is planned for future release.
**Q: Is my data backed up?**
A: No automatic backups. Manually copy `maverick_mcp.db` regularly:
```bash
cp maverick_mcp.db ~/backups/maverick_mcp_$(date +%Y%m%d).db
```
**Q: Can I use this for tax purposes?**
A: **NO**. This is educational software only. Use professional tax software for tax reporting.
---
## Getting Help
- **Issues**: https://github.com/wshobson/maverick-mcp/issues
- **Discussions**: https://github.com/wshobson/maverick-mcp/discussions
- **Documentation**: https://github.com/wshobson/maverick-mcp/tree/main/docs
---
**Remember**: This software is for educational purposes only. Always consult qualified financial professionals before making investment decisions.
```
--------------------------------------------------------------------------------
/docs/BACKTESTING.md:
--------------------------------------------------------------------------------
```markdown
# MaverickMCP Backtesting Documentation
## Overview
MaverickMCP provides a comprehensive backtesting system powered by VectorBT with advanced parallel processing capabilities. The system supports 35+ pre-built strategies ranging from simple moving averages to advanced ML ensembles, with optimization, validation, and analysis tools.
## Quick Start
### Basic Backtesting
```python
# Simple SMA crossover backtest
run_backtest("AAPL", "sma_cross", fast_period=10, slow_period=20)
# RSI mean reversion strategy
run_backtest("TSLA", "rsi", period=14, oversold=30, overbought=70)
# MACD strategy
run_backtest("MSFT", "macd", fast_period=12, slow_period=26, signal_period=9)
```
### Parallel Execution (6-8x Performance Boost)
```python
from maverick_mcp.backtesting.strategy_executor import ExecutionContext, get_strategy_executor
# Create execution contexts for multiple strategies
contexts = [
ExecutionContext(
strategy_id="sma_AAPL",
symbol="AAPL",
strategy_type="sma_cross",
parameters={"fast_period": 10, "slow_period": 20},
start_date="2023-01-01",
end_date="2024-01-01"
)
]
# Execute in parallel
async with get_strategy_executor(max_concurrent_strategies=6) as executor:
results = await executor.execute_strategies_parallel(contexts)
```
## Available Strategies
### Technical Analysis Strategies
- **sma_cross**: Simple Moving Average Crossover
- **ema_cross**: Exponential Moving Average Crossover
- **rsi**: Relative Strength Index Mean Reversion
- **macd**: MACD Crossover Strategy
- **bollinger**: Bollinger Bands Mean Reversion
- **momentum**: Price Momentum Strategy
- **breakout**: Price Channel Breakout
- **mean_reversion**: Statistical Mean Reversion
- **volume_weighted**: Volume-Weighted Moving Average
- **stochastic**: Stochastic Oscillator
### Advanced Strategies
- **adaptive_momentum**: ML-Enhanced Adaptive Momentum
- **ensemble**: Multi-Strategy Ensemble Approach
- **regime_aware**: Market Regime Detection & Switching
- **ml_enhanced**: Machine Learning Enhanced Trading
- **pairs_trading**: Statistical Arbitrage Pairs Trading
## Core API Functions
### run_backtest
Execute a comprehensive backtest with specified strategy and parameters.
```python
run_backtest(
symbol="AAPL",
strategy="sma_cross",
start_date="2023-01-01", # Optional, defaults to 1 year ago
end_date="2024-01-01", # Optional, defaults to today
initial_capital=10000.0,
fast_period=10,
slow_period=20
)
```
**Returns:**
```json
{
"symbol": "AAPL",
"strategy": "sma_cross",
"metrics": {
"total_return": 0.15,
"sharpe_ratio": 1.2,
"max_drawdown": -0.08,
"total_trades": 24,
"win_rate": 0.58,
"profit_factor": 1.45,
"calmar_ratio": 1.85
},
"trades": [...],
"equity_curve": [...],
"analysis": {...}
}
```
### optimize_strategy
Find optimal parameters using grid search optimization.
```python
optimize_strategy(
symbol="AAPL",
strategy="sma_cross",
optimization_params={
"fast_period": [5, 10, 15, 20],
"slow_period": [20, 30, 40, 50]
},
granularity="medium" # "coarse", "medium", or "fine"
)
```
### validate_strategy
Validate strategy robustness using walk-forward analysis.
```python
validate_strategy(
symbol="AAPL",
strategy="sma_cross",
parameters={"fast_period": 10, "slow_period": 20},
n_splits=5, # Number of walk-forward periods
test_size=0.2, # Out-of-sample test size
validation_type="walk_forward"
)
```
### analyze_portfolio
Run portfolio-level backtesting across multiple symbols.
```python
analyze_portfolio(
symbols=["AAPL", "MSFT", "GOOGL"],
strategy="momentum",
weights=[0.33, 0.33, 0.34], # Optional, equal weight if not specified
rebalance_frequency="monthly"
)
```
## Parallel Processing Configuration
### Performance Tuning
```python
# Development/Testing (conservative)
executor = StrategyExecutor(
max_concurrent_strategies=4,
max_concurrent_api_requests=8,
connection_pool_size=50
)
# Production (aggressive)
executor = StrategyExecutor(
max_concurrent_strategies=8,
max_concurrent_api_requests=15,
connection_pool_size=100
)
# High-volume backtesting
executor = StrategyExecutor(
max_concurrent_strategies=12,
max_concurrent_api_requests=20,
connection_pool_size=200
)
```
### Environment Variables
```bash
# Database optimization
DB_POOL_SIZE=20
DB_MAX_OVERFLOW=40
DB_POOL_TIMEOUT=30
# Parallel execution limits
MAX_CONCURRENT_STRATEGIES=6
MAX_CONCURRENT_API_REQUESTS=10
CONNECTION_POOL_SIZE=100
```
## Database Optimization
### Indexes for Performance
The system automatically creates optimized indexes for fast data retrieval:
- **Composite index** for date range queries with symbol lookup
- **Covering index** for OHLCV queries (includes all price data)
- **Partial index** for recent data (PostgreSQL only)
### Batch Data Fetching
```python
from maverick_mcp.backtesting.strategy_executor import batch_fetch_stock_data
# Fetch data for multiple symbols efficiently
symbols = ["AAPL", "MSFT", "GOOGL", "TSLA", "NVDA"]
data_dict = await batch_fetch_stock_data(
symbols=symbols,
start_date="2023-01-01",
end_date="2024-01-01",
max_concurrent=10
)
```
## Best Practices
### 1. Strategy Development
- Start with simple strategies before complex ones
- Always validate with out-of-sample data
- Use walk-forward analysis for robustness testing
- Consider transaction costs and slippage
### 2. Parameter Optimization
- Avoid overfitting with too many parameters
- Use coarse optimization first, then refine
- Validate optimal parameters on different time periods
- Consider parameter stability over time
### 3. Risk Management
- Always set appropriate position sizing
- Use stop-loss and risk limits
- Monitor maximum drawdown
- Diversify across strategies and assets
### 4. Performance Optimization
- Use parallel execution for multiple backtests
- Enable database caching for frequently accessed data
- Batch fetch data for multiple symbols
- Monitor memory usage with large datasets
## Troubleshooting
### Common Issues
**High memory usage**
- Reduce `max_concurrent_strategies`
- Use smaller date ranges for initial testing
- Enable database caching
**Slow performance**
- Ensure database indexes are created
- Increase connection pool size
- Use parallel execution
- Check API rate limits
**API rate limiting**
- Lower `max_concurrent_api_requests`
- Implement exponential backoff
- Use cached data when possible
**Data quality issues**
- Verify data source reliability
- Check for missing data periods
- Validate against multiple sources
- Handle corporate actions properly
### Debug Mode
Enable detailed logging for troubleshooting:
```python
import logging
logging.getLogger("maverick_mcp.backtesting").setLevel(logging.DEBUG)
```
## Performance Metrics
### Key Metrics Explained
- **Total Return**: Overall strategy performance
- **Sharpe Ratio**: Risk-adjusted returns (>1.0 is good, >2.0 is excellent)
- **Max Drawdown**: Maximum peak-to-trough decline
- **Win Rate**: Percentage of profitable trades
- **Profit Factor**: Gross profit / Gross loss (>1.5 is good)
- **Calmar Ratio**: Annual return / Max drawdown (>1.0 is good)
### Benchmark Comparison
Compare strategy performance against buy-and-hold:
```python
results = run_backtest(...)
benchmark = results.get("benchmark_comparison")
print(f"Strategy vs Buy-Hold: {benchmark['excess_return']:.2%}")
```
## Advanced Features
### Monte Carlo Simulation
Assess strategy robustness with randomized scenarios:
```python
monte_carlo_results = run_monte_carlo_simulation(
strategy_results=results,
n_simulations=1000,
confidence_level=0.95
)
```
### Market Regime Detection
Automatically adjust strategy based on market conditions:
```python
regime_results = analyze_market_regime(
symbol="SPY",
lookback_period=252,
regime_indicators=["volatility", "trend", "momentum"]
)
```
### Multi-Strategy Ensemble
Combine multiple strategies for better risk-adjusted returns:
```python
ensemble_results = run_ensemble_backtest(
symbol="AAPL",
strategies=["sma_cross", "rsi", "momentum"],
weights="equal", # or "optimize" for dynamic weighting
correlation_threshold=0.7
)
```
## Integration Examples
### With Claude Desktop
```python
# Use MCP tools for comprehensive analysis
"Run a backtest for AAPL using SMA crossover strategy with
optimization for the best parameters over the last 2 years"
# The system will:
# 1. Fetch historical data
# 2. Run parameter optimization
# 3. Execute backtest with optimal parameters
# 4. Provide detailed performance metrics
```
### Programmatic Usage
```python
from maverick_mcp.backtesting import BacktestingEngine
async def run_comprehensive_analysis():
engine = BacktestingEngine()
# Run backtest
results = await engine.run_backtest(
symbol="AAPL",
strategy="momentum"
)
# Optimize parameters
optimal = await engine.optimize_strategy(
symbol="AAPL",
strategy="momentum",
granularity="fine"
)
# Validate robustness
validation = await engine.validate_strategy(
symbol="AAPL",
strategy="momentum",
parameters=optimal["best_params"]
)
return {
"backtest": results,
"optimization": optimal,
"validation": validation
}
```
## Testing
Run the test suite to verify functionality:
```bash
# Unit tests
pytest tests/test_backtesting.py
# Integration tests
pytest tests/test_strategy_executor.py
# Performance benchmarks
python scripts/benchmark_parallel_backtesting.py
# Comprehensive validation
python scripts/test_all_strategies.py
```
## Summary
MaverickMCP's backtesting system provides:
- **35+ pre-built strategies** with extensive customization
- **6-8x performance improvement** with parallel processing
- **Comprehensive optimization** and validation tools
- **Professional-grade metrics** and risk analysis
- **Production-ready architecture** with error handling and monitoring
The system is designed for both simple strategy testing and complex portfolio analysis, with a focus on performance, reliability, and ease of use.
```
--------------------------------------------------------------------------------
/maverick_mcp/domain/screening/value_objects.py:
--------------------------------------------------------------------------------
```python
"""
Screening domain value objects.
This module contains immutable value objects that represent
core concepts in the screening domain.
"""
from dataclasses import dataclass
from decimal import Decimal
from enum import Enum
class ScreeningStrategy(Enum):
"""
Enumeration of available screening strategies.
Each strategy represents a different approach to identifying
investment opportunities in the stock market.
"""
MAVERICK_BULLISH = "maverick_bullish"
MAVERICK_BEARISH = "maverick_bearish"
TRENDING_STAGE2 = "trending_stage2"
def get_description(self) -> str:
"""Get human-readable description of the strategy."""
descriptions = {
self.MAVERICK_BULLISH: "High momentum stocks with bullish technical setups",
self.MAVERICK_BEARISH: "Weak stocks with bearish technical setups",
self.TRENDING_STAGE2: "Uptrend stocks meeting trending criteria",
}
return descriptions[self]
def get_primary_sort_field(self) -> str:
"""Get the primary field used for sorting results."""
sort_fields = {
self.MAVERICK_BULLISH: "combined_score",
self.MAVERICK_BEARISH: "bear_score",
self.TRENDING_STAGE2: "momentum_score",
}
return sort_fields[self]
def get_minimum_score_threshold(self) -> int:
"""Get the minimum score threshold for meaningful results."""
thresholds = {
self.MAVERICK_BULLISH: 50,
self.MAVERICK_BEARISH: 30,
self.TRENDING_STAGE2: 70,
}
return thresholds[self]
@dataclass(frozen=True)
class ScreeningCriteria:
"""
Immutable value object representing screening filter criteria.
This encapsulates all the parameters that can be used to filter
and refine screening results.
"""
# Basic filters
min_momentum_score: Decimal | None = None
max_momentum_score: Decimal | None = None
min_volume: int | None = None
max_volume: int | None = None
min_price: Decimal | None = None
max_price: Decimal | None = None
# Technical filters
min_combined_score: int | None = None
min_bear_score: int | None = None
min_adr_percentage: Decimal | None = None
max_adr_percentage: Decimal | None = None
# Pattern filters
require_pattern_detected: bool = False
require_squeeze: bool = False
require_consolidation: bool = False
require_entry_signal: bool = False
# Moving average filters
require_above_sma50: bool = False
require_above_sma150: bool = False
require_above_sma200: bool = False
require_ma_alignment: bool = False # 50 > 150 > 200
# Sector/Industry filters
allowed_sectors: list[str] | None = None
excluded_sectors: list[str] | None = None
def __post_init__(self):
"""Validate criteria constraints."""
self._validate_rating_ranges()
self._validate_volume_ranges()
self._validate_price_ranges()
self._validate_score_ranges()
def _validate_rating_ranges(self) -> None:
"""Validate momentum score range constraints."""
if self.min_momentum_score is not None:
if not (0 <= self.min_momentum_score <= 100):
raise ValueError("Minimum momentum score must be between 0 and 100")
if self.max_momentum_score is not None:
if not (0 <= self.max_momentum_score <= 100):
raise ValueError("Maximum momentum score must be between 0 and 100")
if (
self.min_momentum_score is not None
and self.max_momentum_score is not None
and self.min_momentum_score > self.max_momentum_score
):
raise ValueError(
"Minimum momentum score cannot exceed maximum momentum score"
)
def _validate_volume_ranges(self) -> None:
"""Validate volume range constraints."""
if self.min_volume is not None and self.min_volume < 0:
raise ValueError("Minimum volume cannot be negative")
if self.max_volume is not None and self.max_volume < 0:
raise ValueError("Maximum volume cannot be negative")
if (
self.min_volume is not None
and self.max_volume is not None
and self.min_volume > self.max_volume
):
raise ValueError("Minimum volume cannot exceed maximum volume")
def _validate_price_ranges(self) -> None:
"""Validate price range constraints."""
if self.min_price is not None and self.min_price <= 0:
raise ValueError("Minimum price must be positive")
if self.max_price is not None and self.max_price <= 0:
raise ValueError("Maximum price must be positive")
if (
self.min_price is not None
and self.max_price is not None
and self.min_price > self.max_price
):
raise ValueError("Minimum price cannot exceed maximum price")
def _validate_score_ranges(self) -> None:
"""Validate score range constraints."""
if self.min_combined_score is not None and self.min_combined_score < 0:
raise ValueError("Minimum combined score cannot be negative")
if self.min_bear_score is not None and self.min_bear_score < 0:
raise ValueError("Minimum bear score cannot be negative")
def has_any_filters(self) -> bool:
"""Check if any filters are applied."""
return any(
[
self.min_momentum_score is not None,
self.max_momentum_score is not None,
self.min_volume is not None,
self.max_volume is not None,
self.min_price is not None,
self.max_price is not None,
self.min_combined_score is not None,
self.min_bear_score is not None,
self.min_adr_percentage is not None,
self.max_adr_percentage is not None,
self.require_pattern_detected,
self.require_squeeze,
self.require_consolidation,
self.require_entry_signal,
self.require_above_sma50,
self.require_above_sma150,
self.require_above_sma200,
self.require_ma_alignment,
self.allowed_sectors is not None,
self.excluded_sectors is not None,
]
)
def get_filter_description(self) -> str:
"""Get human-readable description of active filters."""
filters = []
if self.min_momentum_score is not None:
filters.append(f"Momentum Score >= {self.min_momentum_score}")
if self.max_momentum_score is not None:
filters.append(f"Momentum Score <= {self.max_momentum_score}")
if self.min_volume is not None:
filters.append(f"Volume >= {self.min_volume:,}")
if self.min_price is not None:
filters.append(f"Price >= ${self.min_price}")
if self.max_price is not None:
filters.append(f"Price <= ${self.max_price}")
if self.require_above_sma50:
filters.append("Above SMA 50")
if self.require_pattern_detected:
filters.append("Pattern Detected")
if not filters:
return "No filters applied"
return "; ".join(filters)
@dataclass(frozen=True)
class ScreeningLimits:
"""
Value object representing limits and constraints for screening operations.
This encapsulates business rules around result limits, timeouts,
and resource constraints.
"""
max_results: int = 100
default_results: int = 20
min_results: int = 1
max_timeout_seconds: int = 30
def __post_init__(self):
"""Validate limit constraints."""
if self.min_results <= 0:
raise ValueError("Minimum results must be positive")
if self.default_results < self.min_results:
raise ValueError("Default results cannot be less than minimum")
if self.max_results < self.default_results:
raise ValueError("Maximum results cannot be less than default")
if self.max_timeout_seconds <= 0:
raise ValueError("Maximum timeout must be positive")
def validate_limit(self, requested_limit: int) -> int:
"""
Validate and adjust requested result limit.
Returns the adjusted limit within valid bounds.
"""
if requested_limit < self.min_results:
return self.min_results
if requested_limit > self.max_results:
return self.max_results
return requested_limit
@dataclass(frozen=True)
class SortingOptions:
"""
Value object representing sorting options for screening results.
This encapsulates the various ways results can be ordered.
"""
field: str
descending: bool = True
secondary_field: str | None = None
secondary_descending: bool = True
# Valid sortable fields
VALID_FIELDS = {
"combined_score",
"bear_score",
"momentum_score",
"close_price",
"volume",
"avg_volume_30d",
"adr_percentage",
"quality_score",
}
def __post_init__(self):
"""Validate sorting configuration."""
if self.field not in self.VALID_FIELDS:
raise ValueError(
f"Invalid sort field: {self.field}. Must be one of {self.VALID_FIELDS}"
)
if (
self.secondary_field is not None
and self.secondary_field not in self.VALID_FIELDS
):
raise ValueError(f"Invalid secondary sort field: {self.secondary_field}")
@classmethod
def for_strategy(cls, strategy: ScreeningStrategy) -> "SortingOptions":
"""Create default sorting options for a screening strategy."""
primary_field = strategy.get_primary_sort_field()
# Add appropriate secondary sort field
secondary_field = (
"momentum_score" if primary_field != "momentum_score" else "close_price"
)
return cls(
field=primary_field,
descending=True,
secondary_field=secondary_field,
secondary_descending=True,
)
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/implementations/persistence_adapter.py:
--------------------------------------------------------------------------------
```python
"""
Data persistence adapter.
This module provides adapters that make the existing database models
compatible with the new IDataPersistence interface.
"""
import asyncio
import logging
from typing import Any
import pandas as pd
from sqlalchemy.orm import Session
from maverick_mcp.data.models import (
MaverickBearStocks,
MaverickStocks,
PriceCache,
SessionLocal,
Stock,
SupplyDemandBreakoutStocks,
bulk_insert_price_data,
get_latest_maverick_screening,
)
from maverick_mcp.providers.interfaces.persistence import (
DatabaseConfig,
IDataPersistence,
)
logger = logging.getLogger(__name__)
class SQLAlchemyPersistenceAdapter(IDataPersistence):
"""
Adapter that makes the existing SQLAlchemy models compatible with IDataPersistence interface.
This adapter wraps the existing database operations and exposes them through the new
interface contracts, enabling gradual migration to the new architecture.
"""
def __init__(self, config: DatabaseConfig | None = None):
"""
Initialize the persistence adapter.
Args:
config: Database configuration (optional)
"""
self._config = config
logger.debug("SQLAlchemyPersistenceAdapter initialized")
async def get_session(self) -> Session:
"""
Get a database session (async wrapper).
Returns:
Database session for operations
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, SessionLocal)
async def get_read_only_session(self) -> Session:
"""
Get a read-only database session.
Returns:
Read-only database session for queries
"""
# Use the existing read-only session manager
# Since get_db_session_read_only returns a context manager, we need to handle it differently
# For now, return a regular session - this could be enhanced later
return await self.get_session()
async def save_price_data(
self, session: Session, symbol: str, data: pd.DataFrame
) -> int:
"""
Save stock price data to persistence layer (async wrapper).
Args:
session: Database session
symbol: Stock ticker symbol
data: DataFrame with OHLCV data
Returns:
Number of records saved
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, bulk_insert_price_data, session, symbol, data
)
async def get_price_data(
self,
session: Session,
symbol: str,
start_date: str,
end_date: str,
) -> pd.DataFrame:
"""
Retrieve stock price data from persistence layer (async wrapper).
Args:
session: Database session
symbol: Stock ticker symbol
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
Returns:
DataFrame with historical price data
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, PriceCache.get_price_data, session, symbol, start_date, end_date
)
async def get_or_create_stock(self, session: Session, symbol: str) -> Any:
"""
Get or create a stock record (async wrapper).
Args:
session: Database session
symbol: Stock ticker symbol
Returns:
Stock entity/record
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, Stock.get_or_create, session, symbol)
async def save_screening_results(
self,
session: Session,
screening_type: str,
results: list[dict[str, Any]],
) -> int:
"""
Save stock screening results.
Args:
session: Database session
screening_type: Type of screening (e.g., 'maverick', 'bearish', 'trending')
results: List of screening results
Returns:
Number of records saved
"""
# This would need to be implemented based on the specific screening models
# For now, return the count of results as a placeholder
logger.info(f"Saving {len(results)} {screening_type} screening results")
return len(results)
async def get_screening_results(
self,
session: Session,
screening_type: str,
limit: int | None = None,
min_score: float | None = None,
) -> list[dict[str, Any]]:
"""
Retrieve stock screening results (async wrapper).
Args:
session: Database session
screening_type: Type of screening
limit: Maximum number of results
min_score: Minimum score filter
Returns:
List of screening results
"""
loop = asyncio.get_event_loop()
if screening_type == "maverick":
# Use the existing MaverickStocks query logic
def get_maverick_results():
query = session.query(MaverickStocks)
if min_score:
query = query.filter(MaverickStocks.combined_score >= min_score)
if limit:
query = query.limit(limit)
stocks = query.order_by(MaverickStocks.combined_score.desc()).all()
return [stock.to_dict() for stock in stocks]
return await loop.run_in_executor(None, get_maverick_results)
elif screening_type == "bearish":
# Use the existing MaverickBearStocks query logic
def get_bear_results():
query = session.query(MaverickBearStocks)
if min_score:
query = query.filter(MaverickBearStocks.score >= min_score)
if limit:
query = query.limit(limit)
stocks = query.order_by(MaverickBearStocks.score.desc()).all()
return [stock.to_dict() for stock in stocks]
return await loop.run_in_executor(None, get_bear_results)
elif screening_type == "trending":
# Use the existing SupplyDemandBreakoutStocks query logic
def get_trending_results():
query = session.query(SupplyDemandBreakoutStocks).filter(
SupplyDemandBreakoutStocks.close_price
> SupplyDemandBreakoutStocks.sma_50,
SupplyDemandBreakoutStocks.close_price
> SupplyDemandBreakoutStocks.sma_150,
SupplyDemandBreakoutStocks.close_price
> SupplyDemandBreakoutStocks.sma_200,
SupplyDemandBreakoutStocks.sma_50
> SupplyDemandBreakoutStocks.sma_150,
SupplyDemandBreakoutStocks.sma_150
> SupplyDemandBreakoutStocks.sma_200,
)
if min_score:
query = query.filter(
SupplyDemandBreakoutStocks.momentum_score >= min_score
)
if limit:
query = query.limit(limit)
stocks = query.order_by(
SupplyDemandBreakoutStocks.momentum_score.desc()
).all()
return [stock.to_dict() for stock in stocks]
return await loop.run_in_executor(None, get_trending_results)
else:
logger.warning(f"Unknown screening type: {screening_type}")
return []
async def get_latest_screening_data(self) -> dict[str, list[dict[str, Any]]]:
"""
Get the latest screening data for all types (async wrapper).
Returns:
Dictionary with all screening types and their latest results
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, get_latest_maverick_screening)
async def check_data_freshness(self, symbol: str, max_age_hours: int = 24) -> bool:
"""
Check if cached data for a symbol is fresh enough.
Args:
symbol: Stock ticker symbol
max_age_hours: Maximum age in hours before data is considered stale
Returns:
True if data is fresh, False if stale or missing
"""
# This would need to be implemented based on timestamp fields in the models
# For now, return True as a placeholder
logger.debug(
f"Checking data freshness for {symbol} (max age: {max_age_hours}h)"
)
return True
async def bulk_save_price_data(
self, session: Session, symbol: str, data: pd.DataFrame
) -> int:
"""
Bulk save price data for better performance (async wrapper).
Args:
session: Database session
symbol: Stock ticker symbol
data: DataFrame with OHLCV data
Returns:
Number of records saved
"""
# Use the same implementation as save_price_data since bulk_insert_price_data is already optimized
return await self.save_price_data(session, symbol, data)
async def get_symbols_with_data(
self, session: Session, limit: int | None = None
) -> list[str]:
"""
Get list of symbols that have price data (async wrapper).
Args:
session: Database session
limit: Maximum number of symbols to return
Returns:
List of stock symbols
"""
loop = asyncio.get_event_loop()
def get_symbols():
query = session.query(Stock.symbol).distinct()
if limit:
query = query.limit(limit)
return [row[0] for row in query.all()]
return await loop.run_in_executor(None, get_symbols)
async def cleanup_old_data(self, session: Session, days_to_keep: int = 365) -> int:
"""
Clean up old data beyond retention period.
Args:
session: Database session
days_to_keep: Number of days of data to retain
Returns:
Number of records deleted
"""
# This would need to be implemented based on specific cleanup requirements
# For now, return 0 as a placeholder
logger.info(f"Cleanup old data beyond {days_to_keep} days")
return 0
```
--------------------------------------------------------------------------------
/alembic/versions/008_performance_optimization_indexes.py:
--------------------------------------------------------------------------------
```python
"""Add comprehensive performance optimization indexes
Revision ID: 008_performance_optimization_indexes
Revises: 007_enhance_audit_logging
Create Date: 2025-06-25 12:00:00
This migration adds comprehensive performance indexes for:
- Stock data queries with date ranges
- Screening table optimizations
- Rate limiting and authentication tables
- Cache key lookup optimizations
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "008_performance_optimization_indexes"
down_revision = "007_enhance_audit_logging"
branch_labels = None
depends_on = None
def upgrade():
"""Add comprehensive performance optimization indexes."""
# Stock data performance indexes
print("Creating stock data performance indexes...")
# Composite index for price cache queries (stock_id, date)
# This is the most common query pattern for historical data
op.create_index(
"idx_stocks_pricecache_stock_date_range",
"stocks_pricecache",
["stock_id", "date"],
postgresql_using="btree",
)
# Index for volume-based queries (high volume screening)
op.create_index(
"idx_stocks_pricecache_volume_desc",
"stocks_pricecache",
[sa.text("volume DESC")],
postgresql_using="btree",
)
# Index for price-based queries (close price for technical analysis)
op.create_index(
"idx_stocks_pricecache_close_price",
"stocks_pricecache",
["close_price"],
postgresql_using="btree",
)
# Stock lookup optimizations
print("Creating stock lookup optimization indexes...")
# Case-insensitive ticker lookup (for user input handling)
op.execute(
"CREATE INDEX IF NOT EXISTS idx_stocks_stock_ticker_lower "
"ON stocks_stock (LOWER(ticker_symbol))"
)
# Sector and industry filtering
op.create_index(
"idx_stocks_stock_sector",
"stocks_stock",
["sector"],
postgresql_using="btree",
)
op.create_index(
"idx_stocks_stock_industry",
"stocks_stock",
["industry"],
postgresql_using="btree",
)
# Exchange filtering for market-specific queries
op.create_index(
"idx_stocks_stock_exchange",
"stocks_stock",
["exchange"],
postgresql_using="btree",
)
# Screening table optimizations
print("Creating screening performance indexes...")
# Maverick bullish screening indexes
op.create_index(
"idx_stocks_maverickstocks_score_desc",
"stocks_maverickstocks",
[sa.text("score DESC")],
postgresql_using="btree",
)
op.create_index(
"idx_stocks_maverickstocks_rank_asc",
"stocks_maverickstocks",
["rank"],
postgresql_using="btree",
)
op.create_index(
"idx_stocks_maverickstocks_date_analyzed",
"stocks_maverickstocks",
[sa.text("date_analyzed DESC")],
postgresql_using="btree",
)
# Composite index for score and date filtering
op.create_index(
"idx_stocks_maverickstocks_score_date",
"stocks_maverickstocks",
[sa.text("score DESC"), sa.text("date_analyzed DESC")],
postgresql_using="btree",
)
# Maverick bearish screening indexes
op.create_index(
"idx_stocks_maverickbearstocks_score_desc",
"stocks_maverickbearstocks",
[sa.text("score DESC")],
postgresql_using="btree",
)
op.create_index(
"idx_stocks_maverickbearstocks_date_analyzed",
"stocks_maverickbearstocks",
[sa.text("date_analyzed DESC")],
postgresql_using="btree",
)
# Supply/Demand (Trending) screening indexes
op.create_index(
"idx_stocks_supply_demand_breakouts_momentum_score_desc",
"stocks_supply_demand_breakouts",
[sa.text("momentum_score DESC")],
postgresql_using="btree",
)
op.create_index(
"idx_stocks_supply_demand_breakouts_date_analyzed",
"stocks_supply_demand_breakouts",
[sa.text("date_analyzed DESC")],
postgresql_using="btree",
)
# Composite index for momentum score and date
op.create_index(
"idx_stocks_supply_demand_breakouts_momentum_date",
"stocks_supply_demand_breakouts",
[sa.text("momentum_score DESC"), sa.text("date_analyzed DESC")],
postgresql_using="btree",
)
# Authentication and rate limiting optimizations
print("Creating authentication performance indexes...")
# API key lookups (most frequent auth operation)
op.create_index(
"idx_mcp_api_keys_key_hash",
"mcp_api_keys",
["key_hash"],
postgresql_using="hash", # Hash index for exact equality
)
# Active API keys filter
op.create_index(
"idx_mcp_api_keys_active_expires",
"mcp_api_keys",
["is_active", "expires_at"],
postgresql_using="btree",
)
# User API keys lookup
op.create_index(
"idx_mcp_api_keys_user_id_active",
"mcp_api_keys",
["user_id", "is_active"],
postgresql_using="btree",
)
# Refresh token lookups
op.create_index(
"idx_mcp_refresh_tokens_token_hash",
"mcp_refresh_tokens",
["token_hash"],
postgresql_using="hash",
)
op.create_index(
"idx_mcp_refresh_tokens_user_active",
"mcp_refresh_tokens",
["user_id", "is_active"],
postgresql_using="btree",
)
# Request tracking for analytics
op.create_index(
"idx_mcp_requests_user_timestamp",
"mcp_requests",
["user_id", sa.text("timestamp DESC")],
postgresql_using="btree",
)
op.create_index(
"idx_mcp_requests_tool_name",
"mcp_requests",
["tool_name"],
postgresql_using="btree",
)
# Request success rate analysis
op.create_index(
"idx_mcp_requests_success_timestamp",
"mcp_requests",
["success", sa.text("timestamp DESC")],
postgresql_using="btree",
)
# Audit logging optimizations
print("Creating audit logging performance indexes...")
# User activity tracking
op.create_index(
"idx_mcp_audit_logs_user_timestamp",
"mcp_audit_logs",
["user_id", sa.text("timestamp DESC")],
postgresql_using="btree",
)
# Action type filtering
op.create_index(
"idx_mcp_audit_logs_action",
"mcp_audit_logs",
["action"],
postgresql_using="btree",
)
# IP address tracking for security
op.create_index(
"idx_mcp_audit_logs_ip_timestamp",
"mcp_audit_logs",
["ip_address", sa.text("timestamp DESC")],
postgresql_using="btree",
)
# Partial indexes for common queries
print("Creating partial indexes for optimal performance...")
# Active users only (most queries filter for active users)
op.execute(
"CREATE INDEX IF NOT EXISTS idx_mcp_users_active_email "
"ON mcp_users (email) WHERE is_active = true"
)
# Recent price data (last 30 days) - most common query pattern
op.execute(
"CREATE INDEX IF NOT EXISTS idx_stocks_pricecache_recent "
"ON stocks_pricecache (stock_id, date DESC) "
"WHERE date >= CURRENT_DATE - INTERVAL '30 days'"
)
# High-volume stocks (for active trading analysis)
op.execute(
"CREATE INDEX IF NOT EXISTS idx_stocks_pricecache_high_volume "
"ON stocks_pricecache (stock_id, date DESC, volume DESC) "
"WHERE volume > 1000000"
)
print("Performance optimization indexes created successfully!")
def downgrade():
"""Remove performance optimization indexes."""
print("Removing performance optimization indexes...")
# Stock data indexes
op.drop_index("idx_stocks_pricecache_stock_date_range", "stocks_pricecache")
op.drop_index("idx_stocks_pricecache_volume_desc", "stocks_pricecache")
op.drop_index("idx_stocks_pricecache_close_price", "stocks_pricecache")
# Stock lookup indexes
op.execute("DROP INDEX IF EXISTS idx_stocks_stock_ticker_lower")
op.drop_index("idx_stocks_stock_sector", "stocks_stock")
op.drop_index("idx_stocks_stock_industry", "stocks_stock")
op.drop_index("idx_stocks_stock_exchange", "stocks_stock")
# Screening indexes
op.drop_index("idx_stocks_maverickstocks_score_desc", "stocks_maverickstocks")
op.drop_index("idx_stocks_maverickstocks_rank_asc", "stocks_maverickstocks")
op.drop_index("idx_stocks_maverickstocks_date_analyzed", "stocks_maverickstocks")
op.drop_index("idx_stocks_maverickstocks_score_date", "stocks_maverickstocks")
op.drop_index(
"idx_stocks_maverickbearstocks_score_desc", "stocks_maverickbearstocks"
)
op.drop_index(
"idx_stocks_maverickbearstocks_date_analyzed", "stocks_maverickbearstocks"
)
op.drop_index(
"idx_stocks_supply_demand_breakouts_momentum_score_desc",
"stocks_supply_demand_breakouts",
)
op.drop_index(
"idx_stocks_supply_demand_breakouts_date_analyzed",
"stocks_supply_demand_breakouts",
)
op.drop_index(
"idx_stocks_supply_demand_breakouts_momentum_date",
"stocks_supply_demand_breakouts",
)
# Authentication indexes
op.drop_index("idx_mcp_api_keys_key_hash", "mcp_api_keys")
op.drop_index("idx_mcp_api_keys_active_expires", "mcp_api_keys")
op.drop_index("idx_mcp_api_keys_user_id_active", "mcp_api_keys")
op.drop_index("idx_mcp_refresh_tokens_token_hash", "mcp_refresh_tokens")
op.drop_index("idx_mcp_refresh_tokens_user_active", "mcp_refresh_tokens")
op.drop_index("idx_mcp_requests_user_timestamp", "mcp_requests")
op.drop_index("idx_mcp_requests_tool_name", "mcp_requests")
op.drop_index("idx_mcp_requests_success_timestamp", "mcp_requests")
# Audit logging indexes
op.drop_index("idx_mcp_audit_logs_user_timestamp", "mcp_audit_logs")
op.drop_index("idx_mcp_audit_logs_action", "mcp_audit_logs")
op.drop_index("idx_mcp_audit_logs_ip_timestamp", "mcp_audit_logs")
# Partial indexes
op.execute("DROP INDEX IF EXISTS idx_mcp_users_active_email")
op.execute("DROP INDEX IF EXISTS idx_stocks_pricecache_recent")
op.execute("DROP INDEX IF EXISTS idx_stocks_pricecache_high_volume")
print("Performance optimization indexes removed.")
```
--------------------------------------------------------------------------------
/scripts/setup_self_contained.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Self-contained setup script for Maverick-MCP.
This script sets up a completely self-contained Maverick-MCP installation
with its own database schema, sample data, and validation.
Usage:
python scripts/setup_self_contained.py --full-setup
python scripts/setup_self_contained.py --quick-setup
python scripts/setup_self_contained.py --migrate-only
"""
import argparse
import asyncio
import logging
import os
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.append(str(Path(__file__).parent.parent))
from maverick_mcp.config.database_self_contained import (
get_self_contained_db_config,
init_self_contained_database,
run_self_contained_migrations,
)
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("self_contained_setup")
def check_requirements() -> bool:
"""Check if all requirements are met for setup."""
logger.info("🔍 Checking requirements...")
# Check environment variables
required_env = []
optional_env = {
"TIINGO_API_TOKEN": "Required for loading market data from Tiingo API",
"MCP_DATABASE_URL": "Custom database URL (defaults to maverick_mcp database)",
"POSTGRES_URL": "Alternative database URL",
"DATABASE_URL": "Fallback database URL",
}
missing_required = []
for env_var in required_env:
if not os.getenv(env_var):
missing_required.append(env_var)
if missing_required:
logger.error(f"❌ Missing required environment variables: {missing_required}")
return False
# Check optional environment variables
missing_optional = []
for env_var, description in optional_env.items():
if not os.getenv(env_var):
missing_optional.append(f"{env_var}: {description}")
if missing_optional:
logger.info("ℹ️ Optional environment variables not set:")
for var in missing_optional:
logger.info(f" - {var}")
logger.info("✅ Requirements check passed")
return True
def run_migrations() -> bool:
"""Run database migrations."""
logger.info("🔄 Running database migrations...")
try:
run_self_contained_migrations()
logger.info("✅ Database migrations completed successfully")
return True
except Exception as e:
logger.error(f"❌ Migration failed: {e}")
return False
def validate_schema() -> bool:
"""Validate the database schema."""
logger.info("🔍 Validating database schema...")
try:
db_config = get_self_contained_db_config()
if db_config.validate_schema():
logger.info("✅ Schema validation passed")
return True
else:
logger.error("❌ Schema validation failed")
return False
except Exception as e:
logger.error(f"❌ Schema validation error: {e}")
return False
def load_sample_data(quick: bool = False) -> bool:
"""Load sample market data."""
logger.info("📊 Loading sample market data...")
try:
# Import here to avoid circular imports
from load_market_data import TiingoDataLoader
# Check if Tiingo API token is available
if not os.getenv("TIINGO_API_TOKEN"):
logger.warning("⚠️ TIINGO_API_TOKEN not set, skipping market data loading")
logger.info(
" You can load market data later using: python scripts/load_market_data.py"
)
return True
# Determine symbols to load
if quick:
symbols = [
"AAPL",
"MSFT",
"GOOGL",
"AMZN",
"TSLA",
"META",
"NVDA",
"JPM",
"V",
"PG",
]
else:
# Load more comprehensive set
from load_market_data import get_sp500_symbols
symbols = get_sp500_symbols()
async def load_data():
async with TiingoDataLoader() as loader:
loaded_count = await loader.load_stock_data(symbols)
return loaded_count
loaded_count = asyncio.run(load_data())
logger.info(f"✅ Loaded market data for {loaded_count} stocks")
return True
except ImportError as e:
logger.error(f"❌ Cannot import market data loader: {e}")
return False
except Exception as e:
logger.error(f"❌ Market data loading failed: {e}")
return False
def run_sample_screening(quick: bool = False) -> bool:
"""Run sample stock screening."""
logger.info("🎯 Running sample stock screening...")
try:
# Import here to avoid circular imports
from datetime import datetime
from run_stock_screening import StockScreener
from maverick_mcp.config.database_self_contained import (
SelfContainedDatabaseSession,
)
from maverick_mcp.data.models import MaverickStocks, bulk_insert_screening_data
async def run_screening():
screener = StockScreener()
today = datetime.now().date()
with SelfContainedDatabaseSession() as session:
if quick:
# Just run Maverick screening
results = await screener.run_maverick_screening(session)
if results:
count = bulk_insert_screening_data(
session, MaverickStocks, results, today
)
return count
else:
# Run all screenings
total_count = 0
# Maverick screening
maverick_results = await screener.run_maverick_screening(session)
if maverick_results:
count = bulk_insert_screening_data(
session, MaverickStocks, maverick_results, today
)
total_count += count
return total_count
return 0
count = asyncio.run(run_screening())
logger.info(f"✅ Completed screening, found {count} candidates")
return True
except ImportError as e:
logger.error(f"❌ Cannot import screening modules: {e}")
return False
except Exception as e:
logger.error(f"❌ Sample screening failed: {e}")
return False
def display_setup_summary() -> None:
"""Display setup summary and next steps."""
logger.info("📋 Setup Summary:")
try:
db_config = get_self_contained_db_config()
stats = db_config.get_database_stats()
print("\n📊 Database Statistics:")
print(f" Database URL: {stats.get('database_url', 'Unknown')}")
print(f" Total Records: {stats.get('total_records', 0)}")
for table, count in stats.get("tables", {}).items():
print(f" {table}: {count}")
except Exception as e:
logger.error(f"❌ Could not get database stats: {e}")
print("\n🎉 Self-contained Maverick-MCP setup completed!")
print("\n📚 Next Steps:")
print(" 1. Start the MCP server: python start_mcp_server.py")
print(" 2. Load more market data: python scripts/load_market_data.py --sp500")
print(" 3. Run screening: python scripts/run_stock_screening.py --all")
print(" 4. Access the web dashboard: http://localhost:3001")
print("\n💡 Available Scripts:")
print(" - scripts/load_market_data.py: Load stock and price data")
print(" - scripts/run_stock_screening.py: Run screening algorithms")
print(" - scripts/setup_self_contained.py: This setup script")
print("\n🔧 Environment Variables:")
print(" - TIINGO_API_TOKEN: Set to load market data")
print(" - MCP_DATABASE_URL: Override database URL")
print(" - DB_POOL_SIZE: Database connection pool size (default: 20)")
async def main():
"""Main setup function."""
parser = argparse.ArgumentParser(description="Setup self-contained Maverick-MCP")
parser.add_argument(
"--full-setup",
action="store_true",
help="Run complete setup with comprehensive data loading",
)
parser.add_argument(
"--quick-setup",
action="store_true",
help="Run quick setup with minimal sample data",
)
parser.add_argument(
"--migrate-only", action="store_true", help="Only run database migrations"
)
parser.add_argument("--database-url", type=str, help="Override database URL")
parser.add_argument(
"--skip-data",
action="store_true",
help="Skip loading market data and screening",
)
args = parser.parse_args()
if not any([args.full_setup, args.quick_setup, args.migrate_only]):
parser.print_help()
sys.exit(1)
print("🚀 Starting Maverick-MCP Self-Contained Setup...")
print("=" * 60)
# Step 1: Check requirements
if not check_requirements():
sys.exit(1)
# Step 2: Initialize database
try:
logger.info("🗄️ Initializing self-contained database...")
init_self_contained_database(
database_url=args.database_url, create_tables=True, validate_schema=True
)
logger.info("✅ Database initialization completed")
except Exception as e:
logger.error(f"❌ Database initialization failed: {e}")
sys.exit(1)
# Step 3: Run migrations
if not run_migrations():
sys.exit(1)
# Step 4: Validate schema
if not validate_schema():
sys.exit(1)
# Stop here if migrate-only
if args.migrate_only:
logger.info("✅ Migration-only setup completed successfully")
return
# Step 5: Load sample data (unless skipped)
if not args.skip_data:
quick = args.quick_setup
if not load_sample_data(quick=quick):
logger.warning("⚠️ Market data loading failed, but continuing setup")
# Step 6: Run sample screening
if not run_sample_screening(quick=quick):
logger.warning("⚠️ Sample screening failed, but continuing setup")
# Step 7: Display summary
display_setup_summary()
print("\n" + "=" * 60)
print("🎉 Self-contained Maverick-MCP setup completed successfully!")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/maverick_mcp/utils/agent_errors.py:
--------------------------------------------------------------------------------
```python
"""
Agent-friendly error handler with helpful fix suggestions.
This module provides decorators and utilities to catch common errors
and provide actionable solutions for agents.
"""
import asyncio
import functools
import traceback
from collections.abc import Callable
from typing import Any, TypeVar
from maverick_mcp.utils.logging import get_logger
logger = get_logger(__name__)
T = TypeVar("T")
# Common error patterns and their fixes
ERROR_FIXES = {
# DataFrame column errors
"KeyError.*close": {
"error": "DataFrame column 'close' not found",
"fix": "Use 'Close' with capital C - DataFrame columns are case-sensitive",
"example": "df['Close'] not df['close']",
},
"KeyError.*open": {
"error": "DataFrame column 'open' not found",
"fix": "Use 'Open' with capital O",
"example": "df['Open'] not df['open']",
},
"KeyError.*high": {
"error": "DataFrame column 'high' not found",
"fix": "Use 'High' with capital H",
"example": "df['High'] not df['high']",
},
"KeyError.*low": {
"error": "DataFrame column 'low' not found",
"fix": "Use 'Low' with capital L",
"example": "df['Low'] not df['low']",
},
"KeyError.*volume": {
"error": "DataFrame column 'volume' not found",
"fix": "Use 'Volume' with capital V",
"example": "df['Volume'] not df['volume']",
},
# Authentication errors
"401.*Unauthorized": {
"error": "Authentication required",
"fix": "Set AUTH_ENABLED=false for development or use generate_dev_token tool",
"example": "AUTH_ENABLED=false python -m maverick_mcp.api.server",
},
# Connection errors
"Redis.*Connection.*refused": {
"error": "Redis connection failed",
"fix": "Start Redis: brew services start redis",
"example": "Or set REDIS_HOST=none to skip caching",
},
"psycopg2.*could not connect": {
"error": "PostgreSQL connection failed",
"fix": "Use SQLite for development: DATABASE_URL=sqlite:///dev.db",
"example": "Or start PostgreSQL: brew services start postgresql",
},
# Import errors
"ModuleNotFoundError.*maverick": {
"error": "Maverick MCP modules not found",
"fix": "Install dependencies: uv sync",
"example": "Make sure you're in the project root directory",
},
"ImportError.*ta_lib": {
"error": "TA-Lib not installed",
"fix": "Install TA-Lib: brew install ta-lib && uv pip install ta-lib",
"example": "TA-Lib requires system libraries",
},
# Type errors
"TypeError.*NoneType.*has no attribute": {
"error": "Trying to access attribute on None",
"fix": "Check if the object exists before accessing attributes",
"example": "if obj is not None: obj.attribute",
},
# Value errors
"ValueError.*not enough values to unpack": {
"error": "Unpacking mismatch",
"fix": "Check the return value - it might be None or have fewer values",
"example": "result = func(); if result: a, b = result",
},
# Async errors
"RuntimeError.*no running event loop": {
"error": "Async function called without event loop",
"fix": "Use asyncio.run() or await in async context",
"example": "asyncio.run(async_function())",
},
# File errors
"FileNotFoundError": {
"error": "File not found",
"fix": "Check the file path - use absolute paths for reliability",
"example": "Path(__file__).parent / 'data.csv'",
},
# Port errors
"Address already in use.*8000": {
"error": "Port 8000 already in use",
"fix": "Stop the existing server: make stop",
"example": "Or use a different port: --port 8001",
},
}
def find_error_fix(error_str: str) -> dict[str, str] | None:
"""Find a fix suggestion for the given error string."""
import re
error_str_lower = str(error_str).lower()
for pattern, fix_info in ERROR_FIXES.items():
if re.search(pattern.lower(), error_str_lower):
return fix_info
return None
def agent_friendly_errors[T](
func: Callable[..., T] | None = None,
*,
provide_fix: bool = True,
log_errors: bool = True,
reraise: bool = True,
) -> Callable[..., T] | Callable[[Callable[..., T]], Callable[..., T]]:
"""
Decorator that catches errors and provides helpful fix suggestions.
Args:
provide_fix: Whether to include fix suggestions
log_errors: Whether to log errors
reraise: Whether to re-raise the error after logging
Usage:
@agent_friendly_errors
def my_function():
...
@agent_friendly_errors(reraise=False)
def my_function():
...
"""
def decorator(f: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(f)
def wrapper(*args: Any, **kwargs: Any) -> T:
try:
return f(*args, **kwargs)
except Exception as e:
error_msg = str(e)
error_type = type(e).__name__
# Build error info
error_info = {
"function": f.__name__,
"error_type": error_type,
"error_message": error_msg,
}
# Find fix suggestion
if provide_fix:
fix_info = find_error_fix(error_msg)
if fix_info:
error_info["fix_suggestion"] = fix_info
# Log the error
if log_errors:
logger.error(
f"Error in {f.__name__}: {error_type}: {error_msg}",
extra=error_info,
exc_info=True,
)
if fix_info:
logger.info(
f"💡 Fix suggestion: {fix_info['fix']}",
extra={"example": fix_info.get("example", "")},
)
# Create enhanced error message
if fix_info and provide_fix:
enhanced_msg = (
f"{error_msg}\n\n"
f"💡 Fix: {fix_info['fix']}\n"
f"Example: {fix_info.get('example', '')}"
)
# Replace the error message
e.args = (enhanced_msg,) + e.args[1:]
if reraise:
raise
# Return error info if not re-raising
return error_info # type: ignore[return-value]
# Add async support
if asyncio.iscoroutinefunction(f):
@functools.wraps(f)
async def async_wrapper(*args: Any, **kwargs: Any) -> T:
try:
return await f(*args, **kwargs)
except Exception as e:
# Same error handling logic
error_msg = str(e)
error_type = type(e).__name__
error_info = {
"function": f.__name__,
"error_type": error_type,
"error_message": error_msg,
}
if provide_fix:
fix_info = find_error_fix(error_msg)
if fix_info:
error_info["fix_suggestion"] = fix_info
if log_errors:
logger.error(
f"Error in {f.__name__}: {error_type}: {error_msg}",
extra=error_info,
exc_info=True,
)
if fix_info:
logger.info(
f"💡 Fix suggestion: {fix_info['fix']}",
extra={"example": fix_info.get("example", "")},
)
if fix_info and provide_fix:
enhanced_msg = (
f"{error_msg}\n\n"
f"💡 Fix: {fix_info['fix']}\n"
f"Example: {fix_info.get('example', '')}"
)
e.args = (enhanced_msg,) + e.args[1:]
if reraise:
raise
return error_info # type: ignore[return-value]
return async_wrapper # type: ignore[return-value]
return wrapper
# Handle being called with or without parentheses
if func is None:
return decorator
else:
return decorator(func)
# Context manager for agent-friendly error handling
class AgentErrorContext:
"""Context manager that provides helpful error messages."""
def __init__(self, operation: str = "operation"):
self.operation = operation
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
error_msg = str(exc_val)
fix_info = find_error_fix(error_msg)
if fix_info:
logger.error(
f"Error during {self.operation}: {exc_type.__name__}: {error_msg}"
)
logger.info(
f"💡 Fix: {fix_info['fix']}",
extra={"example": fix_info.get("example", "")},
)
# Don't suppress the exception
return False
return False
# Utility function to get common error context
def get_error_context(error: Exception) -> dict[str, Any]:
"""Extract useful context from an error."""
context = {
"error_type": type(error).__name__,
"error_message": str(error),
"traceback": traceback.format_exc().split("\n"),
}
# Add specific context based on error type
if isinstance(error, KeyError):
context["key"] = error.args[0] if error.args else "unknown"
elif isinstance(error, ValueError):
context["value_error_details"] = error.args
elif isinstance(error, ConnectionError):
context["connection_type"] = "network"
elif hasattr(error, "response"): # HTTP errors
context["status_code"] = getattr(error.response, "status_code", None)
context["response_text"] = getattr(error.response, "text", None)
return context
```
--------------------------------------------------------------------------------
/tests/integration/test_redis_cache.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for Redis caching functionality.
"""
import asyncio
import json
from datetime import datetime
import pytest
from tests.integration.base import RedisIntegrationTest
@pytest.mark.integration
@pytest.mark.redis
class TestRedisCache(RedisIntegrationTest):
"""Test Redis caching with real Redis instance."""
async def test_basic_cache_operations(self):
"""Test basic cache set/get/delete operations."""
# Set value
key = "test:basic:key"
value = {"data": "test value", "timestamp": datetime.now().isoformat()}
await self.redis_client.setex(
key,
300, # 5 minutes TTL
json.dumps(value),
)
# Get value
cached = await self.redis_client.get(key)
assert cached is not None
cached_data = json.loads(cached)
assert cached_data["data"] == value["data"]
assert cached_data["timestamp"] == value["timestamp"]
# Delete value
deleted = await self.redis_client.delete(key)
assert deleted == 1
# Verify deleted
await self.assert_cache_not_exists(key)
async def test_cache_expiration(self):
"""Test cache TTL and expiration."""
key = "test:expiry:key"
value = "expires soon"
# Set with 1 second TTL
await self.redis_client.setex(key, 1, value)
# Should exist immediately
await self.assert_cache_exists(key)
# Wait for expiration
await asyncio.sleep(1.5)
# Should be expired
await self.assert_cache_not_exists(key)
async def test_stock_data_caching(self):
"""Test caching of stock data."""
from maverick_mcp.data.cache import CacheManager
cache_manager = CacheManager()
# Create sample stock data
stock_data = {
"symbol": "AAPL",
"data": {"sample": "data"}, # Simplified for test
"timestamp": datetime.now().isoformat(),
}
# Cache stock data
cache_key = "stock:AAPL:1d"
await cache_manager.set(
cache_key,
stock_data,
ttl=3600, # 1 hour
)
# Retrieve from cache
cached = await cache_manager.get(cache_key)
assert cached is not None
assert cached["symbol"] == "AAPL"
assert "data" in cached
# Test cache invalidation
await cache_manager.delete(cache_key)
# Should be removed
cached = await cache_manager.get(cache_key)
assert cached is None
# Test commented out - rate_limiter module not available
# async def test_rate_limiting_cache(self):
# """Test rate limiting using Redis."""
# from maverick_mcp.auth.rate_limiter import RateLimiter
#
# rate_limiter = RateLimiter(self.redis_client)
#
# # Configure rate limit: 5 requests per minute
# user_id = "test_user_123"
# limit = 5
# window = 60 # seconds
#
# # Make requests up to limit
# for _ in range(limit):
# allowed = await rate_limiter.check_rate_limit(user_id, limit, window)
# assert allowed is True
#
# # Next request should be blocked
# allowed = await rate_limiter.check_rate_limit(user_id, limit, window)
# assert allowed is False
#
# # Check remaining
# remaining = await rate_limiter.get_remaining_requests(user_id, limit, window)
# assert remaining == 0
async def test_distributed_locking(self):
"""Test distributed locking with Redis."""
import uuid
lock_key = "test:lock:resource"
lock_value = str(uuid.uuid4())
lock_ttl = 5 # seconds
# Acquire lock
acquired = await self.redis_client.set(
lock_key,
lock_value,
nx=True, # Only set if not exists
ex=lock_ttl,
)
assert acquired is not None # Redis returns 'OK' string on success
# Try to acquire again (should fail)
acquired2 = await self.redis_client.set(
lock_key, "different_value", nx=True, ex=lock_ttl
)
assert acquired2 is None # Redis returns None when nx fails
# Release lock (only if we own it)
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
released = await self.redis_client.eval(
lua_script, keys=[lock_key], args=[lock_value]
)
assert released == 1
# Lock should be available now
acquired3 = await self.redis_client.set(
lock_key, "new_value", nx=True, ex=lock_ttl
)
assert acquired3 is not None # Redis returns 'OK' string on success
async def test_cache_patterns(self):
"""Test various cache key patterns and operations."""
# Set multiple keys with pattern
base_pattern = "test:pattern"
for i in range(10):
key = f"{base_pattern}:{i}"
await self.redis_client.set(key, f"value_{i}")
# Scan for keys matching pattern
keys = []
cursor = 0
while True:
cursor, batch = await self.redis_client.scan(
cursor, match=f"{base_pattern}:*", count=100
)
keys.extend(batch)
if cursor == 0:
break
assert len(keys) == 10
# Bulk get
values = await self.redis_client.mget(keys)
assert len(values) == 10
assert all(v is not None for v in values)
# Bulk delete
deleted = await self.redis_client.delete(*keys)
assert deleted == 10
async def test_cache_statistics(self):
"""Test cache hit/miss statistics."""
stats_key = "cache:stats"
# Initialize stats
await self.redis_client.hset(
stats_key,
mapping={
"hits": 0,
"misses": 0,
"total": 0,
},
)
# Simulate cache operations
async def record_hit():
await self.redis_client.hincrby(stats_key, "hits", 1)
await self.redis_client.hincrby(stats_key, "total", 1)
async def record_miss():
await self.redis_client.hincrby(stats_key, "misses", 1)
await self.redis_client.hincrby(stats_key, "total", 1)
# Simulate 70% hit rate
for i in range(100):
if i % 10 < 7:
await record_hit()
else:
await record_miss()
# Get stats
stats = await self.redis_client.hgetall(stats_key)
hits = int(stats[b"hits"])
misses = int(stats[b"misses"])
total = int(stats[b"total"])
assert total == 100
assert hits == 70
assert misses == 30
hit_rate = hits / total
assert hit_rate == 0.7
async def test_pub_sub_messaging(self):
"""Test Redis pub/sub for real-time updates."""
channel = "test:updates"
message = {"type": "price_update", "symbol": "AAPL", "price": 150.50}
# Create pubsub
pubsub = self.redis_client.pubsub()
await pubsub.subscribe(channel)
# Publish message
await self.redis_client.publish(channel, json.dumps(message))
# Receive message
received = None
async for msg in pubsub.listen():
if msg["type"] == "message":
received = json.loads(msg["data"])
break
assert received is not None
assert received["type"] == "price_update"
assert received["symbol"] == "AAPL"
assert received["price"] == 150.50
# Cleanup
await pubsub.unsubscribe(channel)
await pubsub.close()
async def test_cache_warming(self):
"""Test cache warming strategies."""
# Simulate warming cache with frequently accessed data
frequent_symbols = ["AAPL", "GOOGL", "MSFT", "AMZN", "TSLA"]
# Warm cache
for symbol in frequent_symbols:
cache_key = f"stock:quote:{symbol}"
quote_data = {
"symbol": symbol,
"price": 100.0 + hash(symbol) % 100,
"volume": 1000000,
"timestamp": datetime.now().isoformat(),
}
await self.redis_client.setex(
cache_key,
3600, # 1 hour
json.dumps(quote_data),
)
# Verify all cached
for symbol in frequent_symbols:
cache_key = f"stock:quote:{symbol}"
await self.assert_cache_exists(cache_key)
# Test batch retrieval
keys = [f"stock:quote:{symbol}" for symbol in frequent_symbols]
values = await self.redis_client.mget(keys)
assert len(values) == len(frequent_symbols)
assert all(v is not None for v in values)
# Parse and verify
for value, symbol in zip(values, frequent_symbols, strict=False):
data = json.loads(value)
assert data["symbol"] == symbol
async def test_cache_memory_optimization(self):
"""Test memory optimization strategies."""
# Test different serialization formats
import pickle
import zlib
large_data = {
"symbol": "TEST",
"historical_data": [
{"date": f"2024-01-{i:02d}", "price": 100 + i} for i in range(1, 32)
]
* 10, # Replicate to make it larger
}
# JSON serialization
json_data = json.dumps(large_data)
json_size = len(json_data.encode())
# Pickle serialization
pickle_data = pickle.dumps(large_data)
pickle_size = len(pickle_data)
# Compressed JSON
compressed_data = zlib.compress(json_data.encode())
compressed_size = len(compressed_data)
# Store all versions
await self.redis_client.set("test:json", json_data)
await self.redis_client.set("test:pickle", pickle_data)
await self.redis_client.set("test:compressed", compressed_data)
# Compare sizes
assert compressed_size < json_size
assert compressed_size < pickle_size
# Verify decompression works
retrieved = await self.redis_client.get("test:compressed")
decompressed = zlib.decompress(retrieved)
restored_data = json.loads(decompressed)
assert restored_data["symbol"] == "TEST"
assert len(restored_data["historical_data"]) == 310
```
--------------------------------------------------------------------------------
/tests/test_cache_management_service.py:
--------------------------------------------------------------------------------
```python
"""
Tests for CacheManagementService.
"""
from unittest.mock import Mock, patch
import pandas as pd
from maverick_mcp.infrastructure.caching import CacheManagementService
class TestCacheManagementService:
"""Test cases for CacheManagementService."""
def setup_method(self):
"""Set up test fixtures."""
self.mock_session = Mock()
self.service = CacheManagementService(
db_session=self.mock_session, cache_days=1
)
def test_init_with_session(self):
"""Test service initialization with provided session."""
assert self.service.cache_days == 1
assert self.service._db_session == self.mock_session
def test_init_without_session(self):
"""Test service initialization without session."""
service = CacheManagementService(cache_days=7)
assert service.cache_days == 7
assert service._db_session is None
@patch("maverick_mcp.infrastructure.caching.cache_management_service.PriceCache")
def test_get_cached_data_success(self, mock_price_cache):
"""Test successful cache data retrieval."""
# Mock data from cache
mock_data = pd.DataFrame(
{
"open": [150.0, 151.0],
"high": [152.0, 153.0],
"low": [149.0, 150.0],
"close": [151.0, 152.0],
"volume": [1000000, 1100000],
},
index=pd.date_range("2024-01-01", periods=2),
)
mock_price_cache.get_price_data.return_value = mock_data
# Test
result = self.service.get_cached_data("AAPL", "2024-01-01", "2024-01-02")
# Assertions
assert result is not None
assert not result.empty
assert len(result) == 2
# Check column normalization
assert "Open" in result.columns
assert "Close" in result.columns
assert "Dividends" in result.columns
assert "Stock Splits" in result.columns
mock_price_cache.get_price_data.assert_called_once_with(
self.mock_session, "AAPL", "2024-01-01", "2024-01-02"
)
@patch("maverick_mcp.infrastructure.caching.cache_management_service.PriceCache")
def test_get_cached_data_empty(self, mock_price_cache):
"""Test cache data retrieval with empty result."""
mock_price_cache.get_price_data.return_value = pd.DataFrame()
# Test
result = self.service.get_cached_data("INVALID", "2024-01-01", "2024-01-02")
# Assertions
assert result is None
@patch("maverick_mcp.infrastructure.caching.cache_management_service.PriceCache")
def test_get_cached_data_error(self, mock_price_cache):
"""Test cache data retrieval with database error."""
mock_price_cache.get_price_data.side_effect = Exception("Database error")
# Test
result = self.service.get_cached_data("AAPL", "2024-01-01", "2024-01-02")
# Assertions
assert result is None
@patch(
"maverick_mcp.infrastructure.caching.cache_management_service.bulk_insert_price_data"
)
@patch("maverick_mcp.infrastructure.caching.cache_management_service.Stock")
def test_cache_data_success(self, mock_stock, mock_bulk_insert):
"""Test successful data caching."""
# Mock data to cache
data = pd.DataFrame(
{
"Open": [150.0, 151.0],
"High": [152.0, 153.0],
"Low": [149.0, 150.0],
"Close": [151.0, 152.0],
"Volume": [1000000, 1100000],
},
index=pd.date_range("2024-01-01", periods=2),
)
mock_stock.get_or_create.return_value = Mock()
mock_bulk_insert.return_value = 2 # 2 records inserted
# Test
result = self.service.cache_data("AAPL", data)
# Assertions
assert result is True
mock_stock.get_or_create.assert_called_once_with(self.mock_session, "AAPL")
mock_bulk_insert.assert_called_once()
def test_cache_data_empty_dataframe(self):
"""Test caching with empty DataFrame."""
empty_df = pd.DataFrame()
# Test
result = self.service.cache_data("AAPL", empty_df)
# Assertions
assert result is True # Should succeed but do nothing
@patch(
"maverick_mcp.infrastructure.caching.cache_management_service.bulk_insert_price_data"
)
@patch("maverick_mcp.infrastructure.caching.cache_management_service.Stock")
def test_cache_data_error(self, mock_stock, mock_bulk_insert):
"""Test data caching with database error."""
data = pd.DataFrame(
{
"Open": [150.0],
"Close": [151.0],
},
index=pd.date_range("2024-01-01", periods=1),
)
mock_stock.get_or_create.side_effect = Exception("Database error")
# Test
result = self.service.cache_data("AAPL", data)
# Assertions
assert result is False
self.mock_session.rollback.assert_called_once()
@patch("maverick_mcp.infrastructure.caching.cache_management_service.PriceCache")
def test_invalidate_cache_success(self, mock_price_cache):
"""Test successful cache invalidation."""
mock_price_cache.delete_price_data.return_value = 5 # 5 records deleted
# Test
result = self.service.invalidate_cache("AAPL", "2024-01-01", "2024-01-02")
# Assertions
assert result is True
mock_price_cache.delete_price_data.assert_called_once_with(
self.mock_session, "AAPL", "2024-01-01", "2024-01-02"
)
@patch("maverick_mcp.infrastructure.caching.cache_management_service.PriceCache")
def test_invalidate_cache_error(self, mock_price_cache):
"""Test cache invalidation with database error."""
mock_price_cache.delete_price_data.side_effect = Exception("Database error")
# Test
result = self.service.invalidate_cache("AAPL", "2024-01-01", "2024-01-02")
# Assertions
assert result is False
@patch("maverick_mcp.infrastructure.caching.cache_management_service.PriceCache")
def test_get_cache_stats_success(self, mock_price_cache):
"""Test successful cache statistics retrieval."""
mock_stats = {
"total_records": 100,
"date_range": {"start": "2024-01-01", "end": "2024-01-31"},
"last_updated": "2024-01-31",
}
mock_price_cache.get_cache_stats.return_value = mock_stats
# Test
result = self.service.get_cache_stats("AAPL")
# Assertions
assert result["symbol"] == "AAPL"
assert result["total_records"] == 100
assert result["date_range"] == {"start": "2024-01-01", "end": "2024-01-31"}
@patch("maverick_mcp.infrastructure.caching.cache_management_service.PriceCache")
def test_get_cache_stats_error(self, mock_price_cache):
"""Test cache statistics retrieval with database error."""
mock_price_cache.get_cache_stats.side_effect = Exception("Database error")
# Test
result = self.service.get_cache_stats("AAPL")
# Assertions
assert result["symbol"] == "AAPL"
assert result["total_records"] == 0
assert result["last_updated"] is None
def test_normalize_cached_data(self):
"""Test data normalization from cache format."""
# Mock data in database format
data = pd.DataFrame(
{
"open": [150.0, 151.0],
"high": [152.0, 153.0],
"low": [149.0, 150.0],
"close": [151.0, 152.0],
"volume": ["1000000", "1100000"], # String volume to test conversion
},
index=pd.date_range("2024-01-01", periods=2),
)
# Test
result = self.service._normalize_cached_data(data)
# Assertions
assert "Open" in result.columns
assert "High" in result.columns
assert "Close" in result.columns
assert "Volume" in result.columns
assert "Dividends" in result.columns
assert "Stock Splits" in result.columns
# Check data types
assert result["Volume"].dtype == "int64"
assert result["Open"].dtype == "float64"
def test_prepare_data_for_cache(self):
"""Test data preparation for caching."""
# Mock data in yfinance format
data = pd.DataFrame(
{
"Open": [150.0, 151.0],
"High": [152.0, 153.0],
"Low": [149.0, 150.0],
"Close": [151.0, 152.0],
"Volume": [1000000, 1100000],
},
index=pd.date_range("2024-01-01", periods=2),
)
# Test
result = self.service._prepare_data_for_cache(data)
# Assertions
assert "open" in result.columns
assert "high" in result.columns
assert "close" in result.columns
assert "volume" in result.columns
@patch("maverick_mcp.infrastructure.caching.cache_management_service.SessionLocal")
def test_get_db_session_without_injected_session(self, mock_session_local):
"""Test database session creation when no session is injected."""
service = CacheManagementService() # No injected session
mock_session = Mock()
mock_session_local.return_value = mock_session
# Test
session, should_close = service._get_db_session()
# Assertions
assert session == mock_session
assert should_close is True
def test_get_db_session_with_injected_session(self):
"""Test database session retrieval with injected session."""
# Test
session, should_close = self.service._get_db_session()
# Assertions
assert session == self.mock_session
assert should_close is False
def test_check_cache_health_success(self):
"""Test successful cache health check."""
# Mock successful query
mock_result = Mock()
self.mock_session.execute.return_value = mock_result
mock_result.fetchone.return_value = (1,)
self.mock_session.query.return_value.count.return_value = 1000
# Test
result = self.service.check_cache_health()
# Assertions
assert result["status"] == "healthy"
assert result["database_connection"] is True
assert result["total_cached_records"] == 1000
def test_check_cache_health_failure(self):
"""Test cache health check with database error."""
self.mock_session.execute.side_effect = Exception("Connection failed")
# Test
result = self.service.check_cache_health()
# Assertions
assert result["status"] == "unhealthy"
assert result["database_connection"] is False
assert "error" in result
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/factories/provider_factory.py:
--------------------------------------------------------------------------------
```python
"""
Provider factory for dependency injection and lifecycle management.
This module provides a centralized factory for creating and managing
provider instances with proper dependency injection and configuration.
"""
import logging
from maverick_mcp.providers.implementations.cache_adapter import RedisCacheAdapter
from maverick_mcp.providers.implementations.macro_data_adapter import MacroDataAdapter
from maverick_mcp.providers.implementations.market_data_adapter import MarketDataAdapter
from maverick_mcp.providers.implementations.persistence_adapter import (
SQLAlchemyPersistenceAdapter,
)
from maverick_mcp.providers.implementations.stock_data_adapter import StockDataAdapter
from maverick_mcp.providers.interfaces.cache import CacheConfig, ICacheManager
from maverick_mcp.providers.interfaces.config import IConfigurationProvider
from maverick_mcp.providers.interfaces.macro_data import (
IMacroDataProvider,
MacroDataConfig,
)
from maverick_mcp.providers.interfaces.market_data import (
IMarketDataProvider,
MarketDataConfig,
)
from maverick_mcp.providers.interfaces.persistence import (
DatabaseConfig,
IDataPersistence,
)
from maverick_mcp.providers.interfaces.stock_data import (
IStockDataFetcher,
IStockScreener,
)
logger = logging.getLogger(__name__)
class ProviderFactory:
"""
Factory class for creating and managing provider instances.
This factory handles dependency injection, configuration, and lifecycle
management for all providers in the system. It ensures that providers
are properly configured and that dependencies are satisfied.
"""
def __init__(self, config: IConfigurationProvider):
"""
Initialize the provider factory.
Args:
config: Configuration provider for accessing settings
"""
self._config = config
self._cache_manager: ICacheManager | None = None
self._persistence: IDataPersistence | None = None
self._stock_data_fetcher: IStockDataFetcher | None = None
self._stock_screener: IStockScreener | None = None
self._market_data_provider: IMarketDataProvider | None = None
self._macro_data_provider: IMacroDataProvider | None = None
logger.debug("ProviderFactory initialized")
def get_cache_manager(self) -> ICacheManager:
"""
Get or create a cache manager instance.
Returns:
ICacheManager implementation
"""
if self._cache_manager is None:
cache_config = CacheConfig(
enabled=self._config.is_cache_enabled(),
default_ttl=self._config.get_cache_ttl(),
redis_host=self._config.get_redis_host(),
redis_port=self._config.get_redis_port(),
redis_db=self._config.get_redis_db(),
redis_password=self._config.get_redis_password(),
redis_ssl=self._config.get_redis_ssl(),
)
self._cache_manager = RedisCacheAdapter(config=cache_config)
logger.debug("Cache manager created")
return self._cache_manager
def get_persistence(self) -> IDataPersistence:
"""
Get or create a persistence instance.
Returns:
IDataPersistence implementation
"""
if self._persistence is None:
db_config = DatabaseConfig(
database_url=self._config.get_database_url(),
pool_size=self._config.get_pool_size(),
max_overflow=self._config.get_max_overflow(),
)
self._persistence = SQLAlchemyPersistenceAdapter(config=db_config)
logger.debug("Persistence adapter created")
return self._persistence
def get_stock_data_fetcher(self) -> IStockDataFetcher:
"""
Get or create a stock data fetcher instance.
Returns:
IStockDataFetcher implementation
"""
if self._stock_data_fetcher is None:
self._stock_data_fetcher = StockDataAdapter(
cache_manager=self.get_cache_manager(),
persistence=self.get_persistence(),
config=self._config,
)
logger.debug("Stock data fetcher created")
return self._stock_data_fetcher
def get_stock_screener(self) -> IStockScreener:
"""
Get or create a stock screener instance.
Returns:
IStockScreener implementation
"""
if self._stock_screener is None:
# The StockDataAdapter implements both interfaces
adapter = self.get_stock_data_fetcher()
if isinstance(adapter, IStockScreener):
self._stock_screener = adapter
else:
# This shouldn't happen with our current implementation
raise RuntimeError(
"Stock data fetcher does not implement IStockScreener"
)
logger.debug("Stock screener created")
return self._stock_screener
def get_market_data_provider(self) -> IMarketDataProvider:
"""
Get or create a market data provider instance.
Returns:
IMarketDataProvider implementation
"""
if self._market_data_provider is None:
market_config = MarketDataConfig(
external_api_key=self._config.get_external_api_key(),
tiingo_api_key=self._config.get_tiingo_api_key(),
request_timeout=self._config.get_request_timeout(),
max_retries=self._config.get_max_retries(),
)
self._market_data_provider = MarketDataAdapter(config=market_config)
logger.debug("Market data provider created")
return self._market_data_provider
def get_macro_data_provider(self) -> IMacroDataProvider:
"""
Get or create a macro data provider instance.
Returns:
IMacroDataProvider implementation
"""
if self._macro_data_provider is None:
macro_config = MacroDataConfig(
fred_api_key=self._config.get_fred_api_key(),
request_timeout=self._config.get_request_timeout(),
max_retries=self._config.get_max_retries(),
cache_ttl=self._config.get_cache_ttl(),
)
self._macro_data_provider = MacroDataAdapter(config=macro_config)
logger.debug("Macro data provider created")
return self._macro_data_provider
def create_stock_data_fetcher(
self,
cache_manager: ICacheManager | None = None,
persistence: IDataPersistence | None = None,
) -> IStockDataFetcher:
"""
Create a new stock data fetcher instance with optional dependencies.
Args:
cache_manager: Optional cache manager override
persistence: Optional persistence override
Returns:
New IStockDataFetcher instance
"""
return StockDataAdapter(
cache_manager=cache_manager or self.get_cache_manager(),
persistence=persistence or self.get_persistence(),
config=self._config,
)
def create_market_data_provider(
self, config_override: MarketDataConfig | None = None
) -> IMarketDataProvider:
"""
Create a new market data provider instance with optional config override.
Args:
config_override: Optional market data configuration override
Returns:
New IMarketDataProvider instance
"""
if config_override:
return MarketDataAdapter(config=config_override)
else:
return MarketDataAdapter(
config=MarketDataConfig(
external_api_key=self._config.get_external_api_key(),
tiingo_api_key=self._config.get_tiingo_api_key(),
request_timeout=self._config.get_request_timeout(),
max_retries=self._config.get_max_retries(),
)
)
def create_macro_data_provider(
self, config_override: MacroDataConfig | None = None
) -> IMacroDataProvider:
"""
Create a new macro data provider instance with optional config override.
Args:
config_override: Optional macro data configuration override
Returns:
New IMacroDataProvider instance
"""
if config_override:
return MacroDataAdapter(config=config_override)
else:
return MacroDataAdapter(
config=MacroDataConfig(
fred_api_key=self._config.get_fred_api_key(),
request_timeout=self._config.get_request_timeout(),
max_retries=self._config.get_max_retries(),
cache_ttl=self._config.get_cache_ttl(),
)
)
def reset_cache(self) -> None:
"""
Reset all cached provider instances.
This forces the factory to create new instances on the next request,
which can be useful for testing or configuration changes.
"""
self._cache_manager = None
self._persistence = None
self._stock_data_fetcher = None
self._stock_screener = None
self._market_data_provider = None
self._macro_data_provider = None
logger.debug("Provider factory cache reset")
def get_all_providers(self) -> dict[str, object]:
"""
Get all provider instances for introspection or testing.
Returns:
Dictionary mapping provider names to instances
"""
return {
"cache_manager": self.get_cache_manager(),
"persistence": self.get_persistence(),
"stock_data_fetcher": self.get_stock_data_fetcher(),
"stock_screener": self.get_stock_screener(),
"market_data_provider": self.get_market_data_provider(),
"macro_data_provider": self.get_macro_data_provider(),
}
def validate_configuration(self) -> list[str]:
"""
Validate that all required configuration is present.
Returns:
List of validation errors (empty if valid)
"""
errors = []
# Check database configuration
if not self._config.get_database_url():
errors.append("Database URL is not configured")
# Check cache configuration if enabled
if self._config.is_cache_enabled():
if not self._config.get_redis_host():
errors.append("Redis host is not configured but caching is enabled")
# Check API keys (warn but don't fail)
if not self._config.get_fred_api_key():
logger.warning(
"FRED API key is not configured - macro data will be limited"
)
if not self._config.get_external_api_key():
logger.warning(
"External API key is not configured - market data will use fallbacks"
)
return errors
```
--------------------------------------------------------------------------------
/maverick_mcp/infrastructure/data_fetching/stock_data_service.py:
--------------------------------------------------------------------------------
```python
"""
Stock Data Fetching Service - Responsible only for fetching data from external sources.
"""
import logging
from datetime import UTC, datetime, timedelta
from typing import Any
import pandas as pd
import yfinance as yf
from maverick_mcp.utils.circuit_breaker_decorators import (
with_stock_data_circuit_breaker,
)
logger = logging.getLogger("maverick_mcp.stock_data_fetching")
class StockDataFetchingService:
"""
Service responsible ONLY for fetching stock data from external sources.
This service:
- Handles data fetching from yfinance, Alpha Vantage, etc.
- Manages fallback logic between data sources
- Contains no caching logic
- Contains no business logic beyond data retrieval
"""
def __init__(self, timeout: int = 30, max_retries: int = 3):
"""
Initialize the stock data fetching service.
Args:
timeout: Request timeout in seconds
max_retries: Maximum number of retries for failed requests
"""
self.timeout = timeout
self.max_retries = max_retries
@with_stock_data_circuit_breaker(use_fallback=False)
def fetch_stock_data(
self,
symbol: str,
start_date: str | None = None,
end_date: str | None = None,
period: str | None = None,
interval: str = "1d",
) -> pd.DataFrame:
"""
Fetch stock data from yfinance with circuit breaker protection.
Args:
symbol: Stock ticker symbol
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
period: Alternative to start/end dates (e.g., '1d', '5d', '1mo', etc.)
interval: Data interval ('1d', '1wk', '1mo', '1m', '5m', etc.)
Returns:
DataFrame with stock data
Raises:
Exception: If data fetching fails after retries
"""
logger.info(
f"Fetching data from yfinance for {symbol} - "
f"Start: {start_date}, End: {end_date}, Period: {period}, Interval: {interval}"
)
ticker = yf.Ticker(symbol)
if period:
df = ticker.history(period=period, interval=interval)
else:
if start_date is None:
start_date = (datetime.now(UTC) - timedelta(days=365)).strftime(
"%Y-%m-%d"
)
if end_date is None:
end_date = datetime.now(UTC).strftime("%Y-%m-%d")
df = ticker.history(start=start_date, end=end_date, interval=interval)
# Validate and clean the data
df = self._validate_and_clean_data(df, symbol)
return df
def _validate_and_clean_data(self, df: pd.DataFrame, symbol: str) -> pd.DataFrame:
"""
Validate and clean the fetched data.
Args:
df: Raw DataFrame from data source
symbol: Stock symbol for logging
Returns:
Cleaned DataFrame
"""
# Check if dataframe is empty
if df.empty:
logger.warning(f"Empty dataframe returned for {symbol}")
return pd.DataFrame(columns=["Open", "High", "Low", "Close", "Volume"])
# Ensure all expected columns exist
required_columns = ["Open", "High", "Low", "Close", "Volume"]
for col in required_columns:
if col not in df.columns:
logger.warning(
f"Column {col} missing from data for {symbol}, adding default value"
)
if col == "Volume":
df[col] = 0
else:
df[col] = 0.0
# Set index name
df.index.name = "Date"
# Ensure data types
df["Volume"] = df["Volume"].astype(int)
for col in ["Open", "High", "Low", "Close"]:
df[col] = df[col].astype(float)
return df
@with_stock_data_circuit_breaker(use_fallback=False)
def fetch_stock_info(self, symbol: str) -> dict[str, Any]:
"""
Fetch detailed stock information.
Args:
symbol: Stock ticker symbol
Returns:
Dictionary with stock information
"""
logger.info(f"Fetching stock info for {symbol}")
ticker = yf.Ticker(symbol)
return ticker.info
def fetch_realtime_data(self, symbol: str) -> dict[str, Any] | None:
"""
Fetch real-time data for a single symbol.
Args:
symbol: Stock ticker symbol
Returns:
Dictionary with real-time data or None if failed
"""
try:
logger.info(f"Fetching real-time data for {symbol}")
ticker = yf.Ticker(symbol)
data = ticker.history(period="1d")
if data.empty:
logger.warning(f"No real-time data available for {symbol}")
return None
latest = data.iloc[-1]
# Get previous close for change calculation
prev_close = ticker.info.get("previousClose", None)
if prev_close is None:
# Try to get from 2-day history
data_2d = ticker.history(period="2d")
if len(data_2d) > 1:
prev_close = data_2d.iloc[0]["Close"]
else:
prev_close = latest["Close"]
# Calculate change
price = latest["Close"]
change = price - prev_close
change_percent = (change / prev_close) * 100 if prev_close != 0 else 0
return {
"symbol": symbol,
"price": round(price, 2),
"change": round(change, 2),
"change_percent": round(change_percent, 2),
"volume": int(latest["Volume"]),
"timestamp": data.index[-1],
"timestamp_display": data.index[-1].strftime("%Y-%m-%d %H:%M:%S"),
"is_real_time": False, # yfinance data has some delay
}
except Exception as e:
logger.error(f"Error fetching realtime data for {symbol}: {str(e)}")
return None
def fetch_multiple_realtime_data(self, symbols: list[str]) -> dict[str, Any]:
"""
Fetch real-time data for multiple symbols.
Args:
symbols: List of stock ticker symbols
Returns:
Dictionary mapping symbols to their real-time data
"""
logger.info(f"Fetching real-time data for {len(symbols)} symbols")
results = {}
for symbol in symbols:
data = self.fetch_realtime_data(symbol)
if data:
results[symbol] = data
return results
def fetch_news(self, symbol: str, limit: int = 10) -> pd.DataFrame:
"""
Fetch news for a stock.
Args:
symbol: Stock ticker symbol
limit: Maximum number of news items
Returns:
DataFrame with news data
"""
try:
logger.info(f"Fetching news for {symbol}")
ticker = yf.Ticker(symbol)
news = ticker.news
if not news:
return pd.DataFrame(
columns=[
"title",
"publisher",
"link",
"providerPublishTime",
"type",
]
)
df = pd.DataFrame(news[:limit])
# Convert timestamp to datetime
if "providerPublishTime" in df.columns:
df["providerPublishTime"] = pd.to_datetime(
df["providerPublishTime"], unit="s"
)
return df
except Exception as e:
logger.error(f"Error fetching news for {symbol}: {str(e)}")
return pd.DataFrame(
columns=["title", "publisher", "link", "providerPublishTime", "type"]
)
def fetch_earnings(self, symbol: str) -> dict[str, Any]:
"""
Fetch earnings information for a stock.
Args:
symbol: Stock ticker symbol
Returns:
Dictionary with earnings data
"""
try:
logger.info(f"Fetching earnings for {symbol}")
ticker = yf.Ticker(symbol)
return {
"earnings": ticker.earnings.to_dict()
if hasattr(ticker, "earnings") and not ticker.earnings.empty
else {},
"earnings_dates": ticker.earnings_dates.to_dict()
if hasattr(ticker, "earnings_dates") and not ticker.earnings_dates.empty
else {},
"earnings_trend": ticker.earnings_trend
if hasattr(ticker, "earnings_trend")
else {},
}
except Exception as e:
logger.error(f"Error fetching earnings for {symbol}: {str(e)}")
return {"earnings": {}, "earnings_dates": {}, "earnings_trend": {}}
def fetch_recommendations(self, symbol: str) -> pd.DataFrame:
"""
Fetch analyst recommendations for a stock.
Args:
symbol: Stock ticker symbol
Returns:
DataFrame with recommendations
"""
try:
logger.info(f"Fetching recommendations for {symbol}")
ticker = yf.Ticker(symbol)
recommendations = ticker.recommendations
if recommendations is None or recommendations.empty:
return pd.DataFrame(columns=["firm", "toGrade", "fromGrade", "action"])
return recommendations
except Exception as e:
logger.error(f"Error fetching recommendations for {symbol}: {str(e)}")
return pd.DataFrame(columns=["firm", "toGrade", "fromGrade", "action"])
def check_if_etf(self, symbol: str) -> bool:
"""
Check if a given symbol is an ETF.
Args:
symbol: Stock ticker symbol
Returns:
True if symbol is an ETF
"""
try:
logger.debug(f"Checking if {symbol} is an ETF")
stock = yf.Ticker(symbol)
# Check if quoteType exists and is ETF
if "quoteType" in stock.info:
return stock.info["quoteType"].upper() == "ETF"
# Fallback check for common ETF identifiers
common_etfs = [
"SPY",
"QQQ",
"IWM",
"DIA",
"XLB",
"XLE",
"XLF",
"XLI",
"XLK",
"XLP",
"XLU",
"XLV",
"XLY",
"XLC",
"XLRE",
"XME",
]
return any(
[
symbol.endswith(("ETF", "FUND")),
symbol in common_etfs,
"ETF" in stock.info.get("longName", "").upper(),
]
)
except Exception as e:
logger.error(f"Error checking if {symbol} is ETF: {e}")
return False
```
--------------------------------------------------------------------------------
/maverick_mcp/domain/screening/entities.py:
--------------------------------------------------------------------------------
```python
"""
Screening domain entities.
This module contains the core business entities for stock screening,
with embedded business rules and validation logic.
"""
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from typing import Any
@dataclass
class ScreeningResult:
"""
Domain entity representing a stock screening result.
This entity encapsulates all business rules related to screening results,
including validation, scoring, and ranking logic.
"""
# Core identification
stock_symbol: str
screening_date: datetime
# Price data
open_price: Decimal
high_price: Decimal
low_price: Decimal
close_price: Decimal
volume: int
# Technical indicators
ema_21: Decimal
sma_50: Decimal
sma_150: Decimal
sma_200: Decimal
momentum_score: Decimal
avg_volume_30d: Decimal
adr_percentage: Decimal
atr: Decimal
# Pattern analysis
pattern: str | None = None
squeeze: str | None = None
consolidation: str | None = None
entry_signal: str | None = None
# Screening-specific scores
combined_score: int = 0
bear_score: int = 0
compression_score: int = 0
pattern_detected: int = 0
# Additional bearish indicators
rsi_14: Decimal | None = None
macd: Decimal | None = None
macd_signal: Decimal | None = None
macd_histogram: Decimal | None = None
distribution_days_20: int | None = None
atr_contraction: bool | None = None
big_down_volume: bool | None = None
def __post_init__(self):
"""Validate business rules after initialization."""
self._validate_stock_symbol()
self._validate_price_data()
self._validate_technical_indicators()
def _validate_stock_symbol(self) -> None:
"""Validate stock symbol format."""
if not self.stock_symbol or not isinstance(self.stock_symbol, str):
raise ValueError("Stock symbol must be a non-empty string")
if len(self.stock_symbol) > 10:
raise ValueError("Stock symbol cannot exceed 10 characters")
def _validate_price_data(self) -> None:
"""Validate price data consistency."""
if self.close_price <= 0:
raise ValueError("Close price must be positive")
if self.volume < 0:
raise ValueError("Volume cannot be negative")
if self.high_price < self.low_price:
raise ValueError("High price cannot be less than low price")
if not (self.low_price <= self.close_price <= self.high_price):
raise ValueError("Close price must be between low and high prices")
def _validate_technical_indicators(self) -> None:
"""Validate technical indicator ranges."""
if not (0 <= self.momentum_score <= 100):
raise ValueError("Momentum score must be between 0 and 100")
if self.adr_percentage < 0:
raise ValueError("ADR percentage cannot be negative")
if self.avg_volume_30d < 0:
raise ValueError("Average volume cannot be negative")
def is_bullish_setup(self) -> bool:
"""
Determine if this is a bullish screening setup.
Business rule: A stock is considered bullish if it meets
momentum and trend criteria.
"""
return (
self.close_price > self.sma_50
and self.close_price > self.sma_150
and self.momentum_score >= 70
and self.combined_score >= 50
)
def is_bearish_setup(self) -> bool:
"""
Determine if this is a bearish screening setup.
Business rule: A stock is considered bearish if it shows
weakness and distribution characteristics.
"""
return (
self.close_price < self.sma_50
and self.momentum_score <= 30
and self.bear_score >= 50
)
def is_trending_stage2(self) -> bool:
"""
Determine if this meets trending criteria.
Business rule: Trending requires proper moving average alignment
and strong relative strength.
"""
return (
self.close_price > self.sma_50
and self.close_price > self.sma_150
and self.close_price > self.sma_200
and self.sma_50 > self.sma_150
and self.sma_150 > self.sma_200
and self.momentum_score >= 80
)
def meets_volume_criteria(self, min_volume: int) -> bool:
"""Check if stock meets minimum volume requirements."""
return self.avg_volume_30d >= min_volume
def meets_price_criteria(self, min_price: Decimal, max_price: Decimal) -> bool:
"""Check if stock meets price range criteria."""
return min_price <= self.close_price <= max_price
def calculate_risk_reward_ratio(
self, stop_loss_percentage: Decimal = Decimal("0.08")
) -> Decimal:
"""
Calculate risk/reward ratio based on current price and stop loss.
Business rule: Risk is calculated as the distance to stop loss,
reward is calculated as the potential upside to resistance levels.
"""
stop_loss_price = self.close_price * (1 - stop_loss_percentage)
risk = self.close_price - stop_loss_price
# Simple reward calculation based on ADR
potential_reward = self.close_price * (self.adr_percentage / 100)
if risk <= 0:
return Decimal("0")
return potential_reward / risk
def get_quality_score(self) -> int:
"""
Calculate overall quality score based on multiple factors.
Business rule: Quality score combines technical strength,
volume characteristics, and pattern recognition.
"""
score = 0
# Momentum Score contribution (0-40 points)
score += int(self.momentum_score * 0.4)
# Volume quality (0-20 points)
if self.avg_volume_30d >= 1_000_000:
score += 20
elif self.avg_volume_30d >= 500_000:
score += 15
elif self.avg_volume_30d >= 100_000:
score += 10
# Pattern recognition (0-20 points)
if self.pattern_detected > 0:
score += 20
# Price action (0-20 points)
if self.close_price > self.sma_50:
score += 10
if self.close_price > self.sma_200:
score += 10
return min(score, 100) # Cap at 100
def to_dict(self) -> dict[str, Any]:
"""Convert entity to dictionary for serialization."""
return {
"stock_symbol": self.stock_symbol,
"screening_date": self.screening_date.isoformat(),
"close_price": float(self.close_price),
"volume": self.volume,
"momentum_score": float(self.momentum_score),
"adr_percentage": float(self.adr_percentage),
"pattern": self.pattern,
"squeeze": self.squeeze,
"vcp": self.vcp,
"entry_signal": self.entry_signal,
"combined_score": self.combined_score,
"bear_score": self.bear_score,
"quality_score": self.get_quality_score(),
"is_bullish": self.is_bullish_setup(),
"is_bearish": self.is_bearish_setup(),
"is_trending_stage2": self.is_trending_stage2(),
"risk_reward_ratio": float(self.calculate_risk_reward_ratio()),
}
@dataclass
class ScreeningResultCollection:
"""
Domain entity representing a collection of screening results.
This aggregate root manages business rules that apply across
multiple screening results, such as ranking and filtering.
"""
results: list[ScreeningResult]
strategy_used: str
screening_timestamp: datetime
total_candidates_analyzed: int
def __post_init__(self):
"""Validate collection business rules."""
if self.total_candidates_analyzed < len(self.results):
raise ValueError("Total candidates cannot be less than results count")
def get_top_ranked(self, limit: int) -> list[ScreeningResult]:
"""
Get top-ranked results based on screening strategy.
Business rule: Ranking depends on the screening strategy used.
"""
if self.strategy_used == "maverick_bullish":
return sorted(self.results, key=lambda r: r.combined_score, reverse=True)[
:limit
]
elif self.strategy_used == "maverick_bearish":
return sorted(self.results, key=lambda r: r.bear_score, reverse=True)[
:limit
]
elif self.strategy_used == "trending_stage2":
return sorted(self.results, key=lambda r: r.momentum_score, reverse=True)[
:limit
]
else:
# Default to quality score
return sorted(
self.results, key=lambda r: r.get_quality_score(), reverse=True
)[:limit]
def filter_by_criteria(
self,
min_momentum_score: Decimal | None = None,
min_volume: int | None = None,
max_price: Decimal | None = None,
min_price: Decimal | None = None,
) -> list[ScreeningResult]:
"""
Filter results by business criteria.
Business rule: All filters must be satisfied simultaneously.
"""
filtered_results = self.results
if min_momentum_score is not None:
filtered_results = [
r for r in filtered_results if r.momentum_score >= min_momentum_score
]
if min_volume is not None:
filtered_results = [
r for r in filtered_results if r.avg_volume_30d >= min_volume
]
if max_price is not None:
filtered_results = [
r for r in filtered_results if r.close_price <= max_price
]
if min_price is not None:
filtered_results = [
r for r in filtered_results if r.close_price >= min_price
]
return filtered_results
def get_statistics(self) -> dict[str, Any]:
"""Get collection statistics for analysis."""
if not self.results:
return {
"total_results": 0,
"avg_momentum_score": 0,
"avg_volume": 0,
"avg_price": 0,
"bullish_setups": 0,
"bearish_setups": 0,
"trending_stage2": 0,
}
return {
"total_results": len(self.results),
"avg_momentum_score": float(
sum(r.momentum_score for r in self.results) / len(self.results)
),
"avg_volume": int(
sum(r.avg_volume_30d for r in self.results) / len(self.results)
),
"avg_price": float(
sum(r.close_price for r in self.results) / len(self.results)
),
"bullish_setups": sum(1 for r in self.results if r.is_bullish_setup()),
"bearish_setups": sum(1 for r in self.results if r.is_bearish_setup()),
"trending_stage2": sum(1 for r in self.results if r.is_trending_stage2()),
}
```
--------------------------------------------------------------------------------
/maverick_mcp/api/routers/data_enhanced.py:
--------------------------------------------------------------------------------
```python
"""
Enhanced data fetching router with dependency injection support.
This module demonstrates how to integrate the new provider interfaces
with FastMCP routers while maintaining backward compatibility.
"""
import json
import logging
from datetime import UTC, datetime
from typing import Any
from fastmcp import FastMCP
from maverick_mcp.providers.dependencies import (
get_cache_manager,
get_configuration,
get_stock_data_fetcher,
)
from maverick_mcp.providers.interfaces.cache import ICacheManager
from maverick_mcp.providers.interfaces.config import IConfigurationProvider
from maverick_mcp.providers.interfaces.stock_data import IStockDataFetcher
from maverick_mcp.validation.data import (
FetchStockDataRequest,
GetNewsRequest,
GetStockInfoRequest,
StockDataBatchRequest,
)
logger = logging.getLogger(__name__)
# Create the enhanced data router
data_enhanced_router: FastMCP = FastMCP("Enhanced_Data_Operations")
# Example of new interface-based implementation
@data_enhanced_router.tool()
async def fetch_stock_data_enhanced(
request: FetchStockDataRequest,
stock_fetcher: IStockDataFetcher | None = None,
cache_manager: ICacheManager | None = None,
config: IConfigurationProvider | None = None,
) -> dict[str, Any]:
"""
Fetch historical stock data using the new interface-based architecture.
This function demonstrates how to use dependency injection with the new
provider interfaces while maintaining the same external API.
Args:
request: Stock data request parameters
stock_fetcher: Optional stock data fetcher (injected if not provided)
cache_manager: Optional cache manager (injected if not provided)
config: Optional configuration provider (injected if not provided)
Returns:
Dictionary containing the stock data in JSON format
"""
try:
# Use dependency injection with fallback to global providers
fetcher = stock_fetcher or get_stock_data_fetcher()
cache = cache_manager or get_cache_manager()
cfg = config or get_configuration()
logger.debug(
f"Fetching stock data for {request.ticker} using enhanced interface"
)
# Check cache first if enabled
cache_key = (
f"stock_data:{request.ticker}:{request.start_date}:{request.end_date}"
)
cached_result = None
if cfg.is_cache_enabled():
cached_result = await cache.get(cache_key)
if cached_result:
logger.debug(f"Cache hit for {request.ticker}")
return cached_result
# Fetch data using the interface
data = await fetcher.get_stock_data(
symbol=request.ticker,
start_date=request.start_date,
end_date=request.end_date,
use_cache=True, # The fetcher will handle its own caching
)
# Convert to JSON format
json_data = data.to_json(orient="split", date_format="iso")
result: dict[str, Any] = json.loads(json_data) if json_data else {}
result["ticker"] = request.ticker
result["record_count"] = len(data)
result["source"] = "enhanced_interface"
result["timestamp"] = datetime.now(UTC).isoformat()
# Cache the result if caching is enabled
if cfg.is_cache_enabled():
cache_ttl = cfg.get_cache_ttl()
await cache.set(cache_key, result, ttl=cache_ttl)
logger.debug(f"Cached result for {request.ticker} (TTL: {cache_ttl}s)")
return result
except Exception as e:
logger.error(f"Error fetching stock data for {request.ticker}: {e}")
return {
"error": str(e),
"ticker": request.ticker,
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
@data_enhanced_router.tool()
async def fetch_stock_data_batch_enhanced(
request: StockDataBatchRequest,
stock_fetcher: IStockDataFetcher | None = None,
) -> dict[str, Any]:
"""
Fetch historical data for multiple tickers using the enhanced interface.
Args:
request: Batch stock data request parameters
stock_fetcher: Optional stock data fetcher (injected if not provided)
Returns:
Dictionary with ticker symbols as keys and data/errors as values
"""
fetcher = stock_fetcher or get_stock_data_fetcher()
results = {}
logger.debug(f"Fetching batch stock data for {len(request.tickers)} tickers")
# Process each ticker
for ticker in request.tickers:
try:
data = await fetcher.get_stock_data(
symbol=ticker,
start_date=request.start_date,
end_date=request.end_date,
use_cache=True,
)
json_data = data.to_json(orient="split", date_format="iso")
ticker_result: dict[str, Any] = json.loads(json_data) if json_data else {}
ticker_result["ticker"] = ticker
ticker_result["record_count"] = len(data)
results[ticker] = ticker_result
except Exception as e:
logger.error(f"Error fetching data for {ticker}: {e}")
results[ticker] = {"error": str(e), "ticker": ticker}
return {
"results": results,
"total_tickers": len(request.tickers),
"successful": len([r for r in results.values() if "error" not in r]),
"failed": len([r for r in results.values() if "error" in r]),
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
@data_enhanced_router.tool()
async def get_stock_info_enhanced(
request: GetStockInfoRequest,
stock_fetcher: IStockDataFetcher | None = None,
) -> dict[str, Any]:
"""
Get detailed stock information using the enhanced interface.
Args:
request: Stock info request parameters
stock_fetcher: Optional stock data fetcher (injected if not provided)
Returns:
Dictionary with detailed stock information
"""
try:
fetcher = stock_fetcher or get_stock_data_fetcher()
logger.debug(f"Fetching stock info for {request.ticker}")
info = await fetcher.get_stock_info(request.ticker)
return {
"ticker": request.ticker,
"info": info,
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
except Exception as e:
logger.error(f"Error fetching stock info for {request.ticker}: {e}")
return {
"error": str(e),
"ticker": request.ticker,
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
@data_enhanced_router.tool()
async def get_realtime_data_enhanced(
ticker: str,
stock_fetcher: IStockDataFetcher | None = None,
) -> dict[str, Any]:
"""
Get real-time stock data using the enhanced interface.
Args:
ticker: Stock ticker symbol
stock_fetcher: Optional stock data fetcher (injected if not provided)
Returns:
Dictionary with real-time stock data
"""
try:
fetcher = stock_fetcher or get_stock_data_fetcher()
logger.debug(f"Fetching real-time data for {ticker}")
data = await fetcher.get_realtime_data(ticker)
if data is None:
return {
"error": "Real-time data not available",
"ticker": ticker,
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
return {
"ticker": ticker,
"data": data,
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
except Exception as e:
logger.error(f"Error fetching real-time data for {ticker}: {e}")
return {
"error": str(e),
"ticker": ticker,
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
@data_enhanced_router.tool()
async def get_news_enhanced(
request: GetNewsRequest,
stock_fetcher: IStockDataFetcher | None = None,
) -> dict[str, Any]:
"""
Get news for a stock using the enhanced interface.
Args:
request: News request parameters
stock_fetcher: Optional stock data fetcher (injected if not provided)
Returns:
Dictionary with news data
"""
try:
fetcher = stock_fetcher or get_stock_data_fetcher()
logger.debug(f"Fetching news for {request.ticker}")
news_df = await fetcher.get_news(request.ticker, request.limit)
# Convert DataFrame to JSON
if not news_df.empty:
news_data = news_df.to_dict(orient="records")
else:
news_data = []
return {
"ticker": request.ticker,
"news": news_data,
"count": len(news_data),
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
except Exception as e:
logger.error(f"Error fetching news for {request.ticker}: {e}")
return {
"error": str(e),
"ticker": request.ticker,
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
@data_enhanced_router.tool()
async def check_market_status_enhanced(
stock_fetcher: IStockDataFetcher | None = None,
) -> dict[str, Any]:
"""
Check if the market is currently open using the enhanced interface.
Args:
stock_fetcher: Optional stock data fetcher (injected if not provided)
Returns:
Dictionary with market status
"""
try:
fetcher = stock_fetcher or get_stock_data_fetcher()
is_open = await fetcher.is_market_open()
return {
"market_open": is_open,
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
except Exception as e:
logger.error(f"Error checking market status: {e}")
return {
"error": str(e),
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
@data_enhanced_router.tool()
async def clear_cache_enhanced(
pattern: str | None = None,
cache_manager: ICacheManager | None = None,
) -> dict[str, Any]:
"""
Clear cache entries using the enhanced cache interface.
Args:
pattern: Optional pattern to match cache keys (e.g., "stock:*")
cache_manager: Optional cache manager (injected if not provided)
Returns:
Dictionary with cache clearing results
"""
try:
cache = cache_manager or get_cache_manager()
cleared_count = await cache.clear(pattern)
return {
"cleared_count": cleared_count,
"pattern": pattern,
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
except Exception as e:
logger.error(f"Error clearing cache: {e}")
return {
"error": str(e),
"pattern": pattern,
"source": "enhanced_interface",
"timestamp": datetime.now(UTC).isoformat(),
}
```
--------------------------------------------------------------------------------
/docs/speed_testing_framework.md:
--------------------------------------------------------------------------------
```markdown
# Speed Testing Framework for MaverickMCP Research Agents
This document describes the comprehensive speed testing framework developed to validate and monitor the speed optimizations implemented in the MaverickMCP research system.
## Overview
The speed testing framework validates the following optimization claims:
- **2-3x speed improvements** over baseline performance
- **Sub-30s completion times** for emergency scenarios
- **Resolution of timeout issues** (previously 138s, 129s failures)
- **Intelligent model selection** for time-critical scenarios
- **Adaptive optimization** based on query complexity and time constraints
## Framework Components
### 1. Speed Optimization Validation Tests (`tests/test_speed_optimization_validation.py`)
Comprehensive pytest-based test suite that validates:
#### Core Components Tested
- **Adaptive Model Selection**: Verifies fastest models are chosen for emergency scenarios
- **Progressive Token Budgeting**: Tests time-aware token allocation
- **Parallel LLM Processing**: Validates batch processing optimizations
- **Confidence Tracking**: Tests early termination logic
- **Content Filtering**: Validates intelligent source prioritization
#### Query Complexity Levels
- **Simple**: Basic queries (target: <15s completion)
- **Moderate**: Standard analysis queries (target: <25s completion)
- **Complex**: Comprehensive research queries (target: <35s completion)
- **Emergency**: Time-critical queries (target: <30s completion)
#### Expected Model Selections
```python
EXPECTED_MODEL_SELECTIONS = {
QueryComplexity.EMERGENCY: ["google/gemini-2.5-flash", "openai/gpt-4o-mini"],
QueryComplexity.SIMPLE: ["google/gemini-2.5-flash", "openai/gpt-4o-mini"],
QueryComplexity.MODERATE: ["openai/gpt-4o-mini", "google/gemini-2.5-flash"],
QueryComplexity.COMPLEX: ["anthropic/claude-sonnet-4", "google/gemini-2.5-pro"],
}
```
#### Model Speed Benchmarks
```python
MODEL_SPEED_BENCHMARKS = {
"google/gemini-2.5-flash": 199, # tokens/second - FASTEST
"openai/gpt-4o-mini": 126, # tokens/second - FAST
"anthropic/claude-haiku": 89, # tokens/second - MODERATE
"anthropic/claude-sonnet-4": 45, # tokens/second - COMPREHENSIVE
"google/gemini-2.5-pro": 25, # tokens/second - DEEP
}
```
### 2. Speed Benchmarking Script (`scripts/speed_benchmark.py`)
Command-line tool for running various speed benchmarks:
#### Benchmark Modes
```bash
# Quick validation for CI pipeline
python scripts/speed_benchmark.py --mode quick
# Comprehensive benchmark suite
python scripts/speed_benchmark.py --mode full
# Emergency mode focused testing
python scripts/speed_benchmark.py --mode emergency
# Before/after performance comparison
python scripts/speed_benchmark.py --mode comparison
# Custom query testing
python scripts/speed_benchmark.py --query "Apple Inc analysis"
```
#### Output Formats
- **JSON**: Structured benchmark data for analysis
- **Markdown**: Human-readable reports with recommendations
### 3. Quick Speed Demo (`scripts/quick_speed_demo.py`)
Standalone demonstration script that shows:
- Adaptive model selection in action
- Progressive token budgeting scaling
- Complexity-based optimizations
- Speed improvement claims validation
- Timeout resolution demonstration
## Integration with Development Workflow
### Makefile Integration
```bash
# Speed testing commands
make test-speed # Run all speed optimization tests
make test-speed-quick # Quick CI validation
make test-speed-emergency # Emergency mode tests
make test-speed-comparison # Before/after comparison
# Benchmarking commands
make benchmark-speed # Comprehensive speed benchmark
```
### Continuous Integration
The framework supports CI integration through:
- **Quick validation mode**: Completes in <2 minutes for CI pipelines
- **Exit codes**: Non-zero exit for failed performance thresholds
- **Structured output**: Machine-readable results for automation
## Performance Thresholds
### Speed Thresholds
```python
SPEED_THRESHOLDS = {
"simple_query_max_time": 15.0, # Simple queries: <15s
"moderate_query_max_time": 25.0, # Moderate queries: <25s
"complex_query_max_time": 35.0, # Complex queries: <35s
"emergency_mode_max_time": 30.0, # Emergency mode: <30s
"minimum_speedup_factor": 2.0, # Minimum 2x speedup
"target_speedup_factor": 3.0, # Target 3x speedup
"timeout_failure_threshold": 0.05, # Max 5% timeout failures
}
```
### Model Selection Validation
- **Emergency scenarios**: Must select models with 126+ tokens/second
- **Time budgets <30s**: Automatically use fastest available models
- **Complex analysis**: Can use slower, higher-quality models when time allows
## Testing Scenarios
### 1. Emergency Mode Performance
Tests that urgent queries complete within strict time budgets:
```python
# Test emergency completion under 30s
result = await validator.test_emergency_mode_performance(
"Quick Apple sentiment - bullish or bearish right now?"
)
assert result["execution_time"] < 30.0
assert result["within_budget"] == True
```
### 2. Adaptive Model Selection
Validates appropriate model selection based on time constraints:
```python
# Emergency scenario should select fastest model
config = selector.select_model_for_time_budget(
task_type=TaskType.QUICK_ANSWER,
time_remaining_seconds=10.0,
complexity_score=0.3,
content_size_tokens=200,
)
assert config.model_id in ["google/gemini-2.5-flash", "openai/gpt-4o-mini"]
```
### 3. Baseline vs Optimized Comparison
Compares performance improvements over baseline:
```python
# Test 2-3x speedup achievement
result = await validator.test_baseline_vs_optimized_performance(
"Apple Inc comprehensive analysis", QueryComplexity.MODERATE
)
assert result["speedup_factor"] >= 2.0 # Minimum 2x improvement
```
### 4. Timeout Resolution
Validates that previous timeout issues are resolved:
```python
# Test scenarios that previously failed with 138s/129s timeouts
test_cases = ["Apple analysis", "Tesla outlook", "Microsoft assessment"]
for query in test_cases:
result = await test_emergency_performance(query)
assert result["execution_time"] < 30.0 # No more long timeouts
```
## Real-World Query Examples
### Simple Queries (Target: <15s)
- "Apple Inc current stock price and basic sentiment"
- "Tesla recent news and market overview"
- "Microsoft quarterly earnings summary"
### Moderate Queries (Target: <25s)
- "Apple Inc comprehensive financial analysis and competitive position"
- "Tesla Inc market outlook considering EV competition and regulatory changes"
- "Microsoft Corp cloud business growth prospects and AI strategy"
### Complex Queries (Target: <35s)
- "Apple Inc deep fundamental analysis including supply chain risks, product lifecycle assessment, regulatory challenges across global markets, competitive positioning, and 5-year growth trajectory"
### Emergency Queries (Target: <30s)
- "Quick Apple sentiment - bullish or bearish right now?"
- "Tesla stock - buy, hold, or sell this week?"
- "Microsoft earnings - beat or miss expectations?"
## Optimization Features Validated
### 1. Adaptive Model Selection
- **Emergency Mode**: Selects Gemini 2.5 Flash (199 tok/s) or GPT-4o Mini (126 tok/s)
- **Balanced Mode**: Cost-effective fast models for standard queries
- **Comprehensive Mode**: High-quality models when time allows
### 2. Progressive Token Budgeting
- **Emergency Budget**: Minimal tokens, tight timeouts
- **Standard Budget**: Balanced token allocation
- **Time-Aware Scaling**: Budgets scale with available time
### 3. Intelligent Content Filtering
- **Relevance Scoring**: Prioritizes high-quality, relevant sources
- **Preprocessing**: Reduces content size for faster processing
- **Domain Credibility**: Weights sources by reliability
### 4. Early Termination
- **Confidence Tracking**: Stops when target confidence reached
- **Diminishing Returns**: Terminates when no improvement detected
- **Time Pressure**: Adapts termination thresholds for time constraints
## Monitoring and Reporting
### Performance Metrics Tracked
- **Execution Time**: Total time from request to completion
- **Model Selection**: Which models were chosen and why
- **Token Usage**: Input/output tokens consumed
- **Timeout Compliance**: Percentage of queries completing within budget
- **Speedup Factors**: Performance improvement over baseline
- **Success Rates**: Percentage of successful completions
### Report Generation
The framework generates comprehensive reports including:
- **Performance Summary**: Key metrics and thresholds
- **Model Selection Analysis**: Usage patterns and optimization effectiveness
- **Timeout Analysis**: Compliance rates and failure patterns
- **Speedup Analysis**: Improvement measurements
- **Recommendations**: Suggested optimizations based on results
## Usage Examples
### Running Quick Validation
```bash
# Quick CI validation
make test-speed-quick
# View results
cat benchmark_results/speed_benchmark_quick_*.md
```
### Custom Query Testing
```bash
# Test a specific query
python scripts/speed_benchmark.py --query "Apple Inc urgent analysis needed"
# View detailed results
python scripts/quick_speed_demo.py
```
### Full Performance Analysis
```bash
# Run comprehensive benchmarks
make benchmark-speed
# Generate performance report
python scripts/speed_benchmark.py --mode full --output-dir ./reports
```
## Troubleshooting
### Common Issues
1. **Import Errors**: Ensure all dependencies are installed with `uv sync`
2. **Model Selection Issues**: Check OpenRouter provider configuration
3. **Timeout Still Occurring**: Verify emergency mode is enabled
4. **Performance Regression**: Run comparison benchmarks to identify issues
### Debug Commands
```bash
# Test core components
python scripts/quick_speed_demo.py
# Run specific test category
pytest tests/test_speed_optimization_validation.py::TestSpeedOptimizations -v
# Benchmark with verbose output
python scripts/speed_benchmark.py --mode quick --verbose
```
## Future Enhancements
### Planned Improvements
1. **Real-time Monitoring**: Continuous performance tracking in production
2. **A/B Testing**: Compare different optimization strategies
3. **Machine Learning**: Adaptive optimization based on query patterns
4. **Cost Optimization**: Balance speed with API costs
5. **Multi-modal Support**: Extend optimizations to image/audio analysis
### Extension Points
- **Custom Complexity Calculators**: Domain-specific complexity scoring
- **Alternative Model Providers**: Support for additional LLM providers
- **Advanced Caching**: Semantic caching for similar queries
- **Performance Prediction**: ML-based execution time estimation
## Conclusion
The speed testing framework provides comprehensive validation that the MaverickMCP research system achieves its performance optimization goals:
✅ **2-3x Speed Improvements**: Validated across all query complexities
✅ **Sub-30s Emergency Mode**: Guaranteed fast response for urgent queries
✅ **Timeout Resolution**: No more 138s/129s failures
✅ **Intelligent Optimization**: Adaptive performance based on constraints
✅ **Continuous Validation**: Automated testing prevents performance regressions
The framework ensures that speed optimizations remain effective as the system evolves and provides early detection of any performance degradation.
```
--------------------------------------------------------------------------------
/maverick_mcp/api/inspector_compatible_sse.py:
--------------------------------------------------------------------------------
```python
"""
MCP Inspector-compatible SSE implementation.
This implements a proper bidirectional SSE handler that works with MCP Inspector,
handling JSON-RPC messages directly over the SSE connection.
"""
import asyncio
import json
import logging
from typing import Any
from uuid import uuid4
from starlette.requests import Request
from starlette.responses import StreamingResponse
from maverick_mcp.api.server import mcp
logger = logging.getLogger(__name__)
class InspectorCompatibleSSE:
"""SSE handler that properly implements MCP protocol for Inspector."""
def __init__(self):
self.sessions: dict[str, dict[str, Any]] = {}
self.message_queues: dict[str, asyncio.Queue] = {}
async def handle_sse(self, request: Request):
"""Handle SSE connection from MCP Inspector."""
session_id = str(uuid4())
logger.info(f"New Inspector SSE connection: {session_id}")
# Create a message queue for this session
message_queue: asyncio.Queue = asyncio.Queue()
self.message_queues[session_id] = message_queue
# Create a simple session state tracker
session_state = {
"initialized": False,
"server_name": "MaverickMCP",
"server_version": "1.0.0",
"protocol_version": "2024-11-05",
}
self.sessions[session_id] = session_state
async def event_generator():
"""Generate SSE events and handle bidirectional communication."""
try:
# Send initial connection event with session info
connection_msg = {
"type": "connection",
"sessionId": session_id,
"endpoint": f"/inspector/message?session_id={session_id}",
}
yield f"data: {json.dumps(connection_msg)}\n\n"
# Process incoming messages from the queue
while True:
try:
# Wait for messages with timeout for keepalive
message = await asyncio.wait_for(
message_queue.get(), timeout=30.0
)
# Process the message through MCP session
if isinstance(message, dict) and "jsonrpc" in message:
# Handle the JSON-RPC request
response = await self._process_message(
session_state, message
)
if response:
yield f"data: {json.dumps(response)}\n\n"
except TimeoutError:
# Send keepalive
yield ": keepalive\n\n"
except Exception as e:
logger.error(f"Error processing message: {e}")
error_response = {
"jsonrpc": "2.0",
"error": {"code": -32603, "message": str(e)},
"id": None,
}
yield f"data: {json.dumps(error_response)}\n\n"
finally:
# Cleanup on disconnect
if session_id in self.sessions:
del self.sessions[session_id]
if session_id in self.message_queues:
del self.message_queues[session_id]
logger.info(f"Inspector SSE connection closed: {session_id}")
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, POST, OPTIONS",
"Access-Control-Allow-Headers": "*",
},
)
async def handle_message(self, request: Request):
"""Handle incoming JSON-RPC messages from Inspector."""
session_id = request.query_params.get("session_id")
if not session_id or session_id not in self.message_queues:
return {"error": "Invalid or missing session_id"}
try:
message = await request.json()
logger.info(f"Inspector message for session {session_id}: {message}")
# Put message in queue for processing
await self.message_queues[session_id].put(message)
# Return acknowledgment
return {"status": "queued"}
except Exception as e:
logger.error(f"Failed to process message: {e}")
return {"error": str(e)}
async def _process_message(
self, session_state: dict[str, Any], message: dict[str, Any]
) -> dict[str, Any] | None:
"""Process a JSON-RPC message through the MCP session."""
method = message.get("method")
params = message.get("params", {})
msg_id = message.get("id")
try:
# Handle different MCP methods
if method == "initialize":
# Mark session as initialized
session_state["initialized"] = True
protocol_version = params.get("protocolVersion", "2024-11-05")
# Get server capabilities
return {
"jsonrpc": "2.0",
"id": msg_id,
"result": {
"protocolVersion": protocol_version,
"capabilities": {
"tools": {"listChanged": True}
if hasattr(mcp, "_tool_manager")
and hasattr(mcp._tool_manager, "tools")
and mcp._tool_manager.tools
else {},
"resources": {"listChanged": True}
if hasattr(mcp, "_resource_manager")
and hasattr(mcp._resource_manager, "resources")
and mcp._resource_manager.resources
else {},
"prompts": {"listChanged": True}
if hasattr(mcp, "_prompt_manager")
and hasattr(mcp._prompt_manager, "prompts")
and mcp._prompt_manager.prompts
else {},
},
"serverInfo": {
"name": session_state["server_name"],
"version": session_state["server_version"],
},
},
}
elif method == "tools/list":
# List available tools
tools = []
if (
hasattr(mcp, "_tool_manager")
and hasattr(mcp._tool_manager, "tools")
and hasattr(mcp._tool_manager.tools, "items")
):
for tool_name, tool_func in mcp._tool_manager.tools.items(): # type: ignore[attr-defined]
tools.append(
{
"name": tool_name,
"description": tool_func.__doc__ or "No description",
"inputSchema": getattr(tool_func, "input_schema", {}),
}
)
return {"jsonrpc": "2.0", "id": msg_id, "result": {"tools": tools}}
elif method == "resources/list":
# List available resources
resources = []
if (
hasattr(mcp, "_resource_manager")
and hasattr(mcp._resource_manager, "resources")
and hasattr(mcp._resource_manager.resources, "items")
):
for (
resource_uri,
resource_func,
) in mcp._resource_manager.resources.items(): # type: ignore[attr-defined]
resources.append(
{
"uri": resource_uri,
"name": getattr(
resource_func, "__name__", str(resource_func)
),
"description": getattr(resource_func, "__doc__", None)
or "No description",
}
)
return {
"jsonrpc": "2.0",
"id": msg_id,
"result": {"resources": resources},
}
elif method == "tools/call":
# Call a tool
tool_name = params.get("name")
tool_args = params.get("arguments", {})
if (
hasattr(mcp, "_tool_manager")
and hasattr(mcp._tool_manager, "tools")
and hasattr(mcp._tool_manager.tools, "__contains__")
and tool_name in mcp._tool_manager.tools # type: ignore[operator]
):
tool_func = mcp._tool_manager.tools[tool_name] # type: ignore[index]
try:
# Execute the tool
result = await tool_func(**tool_args)
return {
"jsonrpc": "2.0",
"id": msg_id,
"result": {
"content": [
{
"type": "text",
"text": json.dumps(result, default=str),
}
]
},
}
except Exception as tool_error:
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": {
"code": -32603,
"message": f"Tool execution error: {str(tool_error)}",
},
}
else:
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": {
"code": -32601,
"message": f"Tool not found: {tool_name}",
},
}
else:
# Method not found
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": {"code": -32601, "message": f"Method not found: {method}"},
}
except Exception as e:
logger.error(f"Error processing {method}: {e}")
return {
"jsonrpc": "2.0",
"id": msg_id,
"error": {"code": -32603, "message": str(e)},
}
# Create global handler instance
inspector_sse = InspectorCompatibleSSE()
```