This is page 15 of 29. Use http://codebase.md/wshobson/maverick-mcp?page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/maverick_mcp/utils/monitoring.py:
--------------------------------------------------------------------------------
```python
"""
Monitoring and observability integration for MaverickMCP.
This module provides Sentry error tracking and Prometheus metrics integration
for production monitoring and alerting.
"""
import os
import time
from contextlib import contextmanager
from typing import Any
from maverick_mcp.config.settings import settings
from maverick_mcp.utils.logging import get_logger
# Optional prometheus integration
try:
from prometheus_client import Counter, Gauge, Histogram, generate_latest
PROMETHEUS_AVAILABLE = True
except ImportError:
logger = get_logger(__name__)
logger.warning("Prometheus client not available. Metrics will be disabled.")
PROMETHEUS_AVAILABLE = False
# Create stub classes for when prometheus is not available
class _MetricStub:
def __init__(self, *args, **kwargs):
pass
def inc(self, *args, **kwargs):
pass
def observe(self, *args, **kwargs):
pass
def set(self, *args, **kwargs):
pass
def dec(self, *args, **kwargs):
pass
def labels(self, *args, **kwargs):
return self
Counter = Gauge = Histogram = _MetricStub
def generate_latest():
return b"# Prometheus not available"
logger = get_logger(__name__)
# HTTP Request metrics
request_counter = Counter(
"maverick_requests_total",
"Total number of API requests",
["method", "endpoint", "status", "user_type"],
)
request_duration = Histogram(
"maverick_request_duration_seconds",
"Request duration in seconds",
["method", "endpoint", "user_type"],
buckets=(
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1.0,
2.5,
5.0,
10.0,
30.0,
60.0,
float("inf"),
),
)
request_size_bytes = Histogram(
"maverick_request_size_bytes",
"HTTP request size in bytes",
["method", "endpoint"],
buckets=(1024, 4096, 16384, 65536, 262144, 1048576, 4194304, float("inf")),
)
response_size_bytes = Histogram(
"maverick_response_size_bytes",
"HTTP response size in bytes",
["method", "endpoint", "status"],
buckets=(1024, 4096, 16384, 65536, 262144, 1048576, 4194304, float("inf")),
)
# Connection metrics
active_connections = Gauge(
"maverick_active_connections", "Number of active connections"
)
concurrent_requests = Gauge(
"maverick_concurrent_requests", "Number of concurrent requests being processed"
)
# Tool execution metrics
tool_usage_counter = Counter(
"maverick_tool_usage_total",
"Total tool usage count",
["tool_name", "user_id", "status"],
)
tool_duration = Histogram(
"maverick_tool_duration_seconds",
"Tool execution duration in seconds",
["tool_name", "complexity"],
buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, float("inf")),
)
tool_errors = Counter(
"maverick_tool_errors_total",
"Total tool execution errors",
["tool_name", "error_type", "complexity"],
)
# Error metrics
error_counter = Counter(
"maverick_errors_total",
"Total number of errors",
["error_type", "endpoint", "severity"],
)
rate_limit_hits = Counter(
"maverick_rate_limit_hits_total",
"Rate limit violations",
["user_id", "endpoint", "limit_type"],
)
# Cache metrics
cache_hits = Counter(
"maverick_cache_hits_total", "Total cache hits", ["cache_type", "key_prefix"]
)
cache_misses = Counter(
"maverick_cache_misses_total", "Total cache misses", ["cache_type", "key_prefix"]
)
cache_evictions = Counter(
"maverick_cache_evictions_total", "Total cache evictions", ["cache_type", "reason"]
)
cache_size_bytes = Gauge(
"maverick_cache_size_bytes", "Cache size in bytes", ["cache_type"]
)
cache_keys_total = Gauge(
"maverick_cache_keys_total", "Total number of keys in cache", ["cache_type"]
)
# Database metrics
db_connection_pool_size = Gauge(
"maverick_db_connection_pool_size", "Database connection pool size"
)
db_active_connections = Gauge(
"maverick_db_active_connections", "Active database connections"
)
db_idle_connections = Gauge("maverick_db_idle_connections", "Idle database connections")
db_query_duration = Histogram(
"maverick_db_query_duration_seconds",
"Database query duration in seconds",
["query_type", "table"],
buckets=(
0.001,
0.005,
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1.0,
2.5,
5.0,
float("inf"),
),
)
db_queries_total = Counter(
"maverick_db_queries_total",
"Total database queries",
["query_type", "table", "status"],
)
db_connections_created = Counter(
"maverick_db_connections_created_total", "Total database connections created"
)
db_connections_closed = Counter(
"maverick_db_connections_closed_total",
"Total database connections closed",
["reason"],
)
# Redis metrics
redis_connections = Gauge("maverick_redis_connections", "Number of Redis connections")
redis_operations = Counter(
"maverick_redis_operations_total", "Total Redis operations", ["operation", "status"]
)
redis_operation_duration = Histogram(
"maverick_redis_operation_duration_seconds",
"Redis operation duration in seconds",
["operation"],
buckets=(0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, float("inf")),
)
redis_memory_usage = Gauge(
"maverick_redis_memory_usage_bytes", "Redis memory usage in bytes"
)
redis_keyspace_hits = Counter(
"maverick_redis_keyspace_hits_total", "Redis keyspace hits"
)
redis_keyspace_misses = Counter(
"maverick_redis_keyspace_misses_total", "Redis keyspace misses"
)
# External API metrics
external_api_calls = Counter(
"maverick_external_api_calls_total",
"External API calls",
["service", "endpoint", "method", "status"],
)
external_api_duration = Histogram(
"maverick_external_api_duration_seconds",
"External API call duration in seconds",
["service", "endpoint"],
buckets=(0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, float("inf")),
)
external_api_errors = Counter(
"maverick_external_api_errors_total",
"External API errors",
["service", "endpoint", "error_type"],
)
# Business metrics
daily_active_users = Gauge("maverick_daily_active_users", "Daily active users count")
monthly_active_users = Gauge(
"maverick_monthly_active_users", "Monthly active users count"
)
user_sessions = Counter(
"maverick_user_sessions_total", "Total user sessions", ["user_type", "auth_method"]
)
user_session_duration = Histogram(
"maverick_user_session_duration_seconds",
"User session duration in seconds",
["user_type"],
buckets=(60, 300, 900, 1800, 3600, 7200, 14400, 28800, 86400, float("inf")),
)
# Performance metrics
memory_usage_bytes = Gauge(
"maverick_memory_usage_bytes", "Process memory usage in bytes"
)
cpu_usage_percent = Gauge("maverick_cpu_usage_percent", "Process CPU usage percentage")
open_file_descriptors = Gauge(
"maverick_open_file_descriptors", "Number of open file descriptors"
)
garbage_collections = Counter(
"maverick_garbage_collections_total", "Garbage collection events", ["generation"]
)
# Security metrics
authentication_attempts = Counter(
"maverick_authentication_attempts_total",
"Authentication attempts",
["method", "status", "user_agent"],
)
authorization_checks = Counter(
"maverick_authorization_checks_total",
"Authorization checks",
["resource", "action", "status"],
)
security_violations = Counter(
"maverick_security_violations_total",
"Security violations detected",
["violation_type", "severity"],
)
class MonitoringService:
"""Service for monitoring and observability."""
def __init__(self):
self.sentry_enabled = False
self._initialize_sentry()
def _initialize_sentry(self):
"""Initialize Sentry error tracking."""
sentry_dsn = os.getenv("SENTRY_DSN")
if not sentry_dsn:
if settings.environment == "production":
logger.warning("Sentry DSN not configured in production")
return
try:
import sentry_sdk
from sentry_sdk.integrations.asyncio import AsyncioIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
# Configure Sentry
sentry_sdk.init(
dsn=sentry_dsn,
environment=settings.environment,
traces_sample_rate=0.1 if settings.environment == "production" else 1.0,
profiles_sample_rate=0.1
if settings.environment == "production"
else 1.0,
integrations=[
AsyncioIntegration(),
LoggingIntegration(
level=None, # Capture all levels
event_level=None, # Don't create events from logs
),
SqlalchemyIntegration(),
],
before_send=self._before_send_sentry,
attach_stacktrace=True,
send_default_pii=False, # Don't send PII
release=os.getenv("RELEASE_VERSION", "unknown"),
)
# Set user context if available
sentry_sdk.set_context(
"app",
{
"name": settings.app_name,
"environment": settings.environment,
"auth_enabled": settings.auth.enabled,
},
)
self.sentry_enabled = True
logger.info("Sentry error tracking initialized")
except ImportError:
logger.warning("Sentry SDK not installed. Run: pip install sentry-sdk")
except Exception as e:
logger.error(f"Failed to initialize Sentry: {e}")
def _before_send_sentry(
self, event: dict[str, Any], hint: dict[str, Any]
) -> dict[str, Any] | None:
"""Filter events before sending to Sentry."""
# Don't send certain errors
if "exc_info" in hint:
_, exc_value, _ = hint["exc_info"]
# Skip client errors
error_message = str(exc_value).lower()
if any(
skip in error_message
for skip in [
"client disconnected",
"connection reset",
"broken pipe",
]
):
return None
# Remove sensitive data
if "request" in event:
request = event["request"]
# Remove auth headers
if "headers" in request:
request["headers"] = {
k: v
for k, v in request["headers"].items()
if k.lower() not in ["authorization", "cookie", "x-api-key"]
}
# Remove sensitive query params
if "query_string" in request:
# Parse and filter query string
pass
return event
def capture_exception(self, error: Exception, **context):
"""Capture exception with Sentry."""
if not self.sentry_enabled:
return
try:
import sentry_sdk
# Add context
for key, value in context.items():
sentry_sdk.set_context(key, value)
# Capture the exception
sentry_sdk.capture_exception(error)
except Exception as e:
logger.error(f"Failed to capture exception with Sentry: {e}")
def capture_message(self, message: str, level: str = "info", **context):
"""Capture message with Sentry."""
if not self.sentry_enabled:
return
try:
import sentry_sdk
# Add context
for key, value in context.items():
sentry_sdk.set_context(key, value)
# Capture the message
sentry_sdk.capture_message(message, level=level)
except Exception as e:
logger.error(f"Failed to capture message with Sentry: {e}")
def set_user_context(self, user_id: str | None, email: str | None = None):
"""Set user context for Sentry."""
if not self.sentry_enabled:
return
try:
import sentry_sdk
if user_id:
sentry_sdk.set_user(
{
"id": user_id,
"email": email,
}
)
else:
sentry_sdk.set_user(None)
except Exception as e:
logger.error(f"Failed to set user context: {e}")
@contextmanager
def transaction(self, name: str, op: str = "task"):
"""Create a Sentry transaction."""
if not self.sentry_enabled:
yield
return
try:
import sentry_sdk
with sentry_sdk.start_transaction(name=name, op=op) as transaction:
yield transaction
except Exception as e:
logger.error(f"Failed to create transaction: {e}")
yield
def add_breadcrumb(
self, message: str, category: str = "app", level: str = "info", **data
):
"""Add breadcrumb for Sentry."""
if not self.sentry_enabled:
return
try:
import sentry_sdk
sentry_sdk.add_breadcrumb(
message=message,
category=category,
level=level,
data=data,
)
except Exception as e:
logger.error(f"Failed to add breadcrumb: {e}")
# Global monitoring instance
_monitoring_service: MonitoringService | None = None
def get_monitoring_service() -> MonitoringService:
"""Get or create the global monitoring service."""
global _monitoring_service
if _monitoring_service is None:
_monitoring_service = MonitoringService()
return _monitoring_service
@contextmanager
def track_request(method: str, endpoint: str):
"""Track request metrics."""
start_time = time.time()
active_connections.inc()
status = "unknown"
try:
yield
status = "success"
except Exception as e:
status = "error"
error_type = type(e).__name__
error_counter.labels(error_type=error_type, endpoint=endpoint).inc()
# Capture with Sentry
monitoring = get_monitoring_service()
monitoring.capture_exception(
e,
request={
"method": method,
"endpoint": endpoint,
},
)
raise
finally:
# Record metrics
duration = time.time() - start_time
request_counter.labels(method=method, endpoint=endpoint, status=status).inc()
request_duration.labels(method=method, endpoint=endpoint).observe(duration)
active_connections.dec()
def track_tool_usage(
tool_name: str,
user_id: str,
duration: float,
status: str = "success",
complexity: str = "standard",
):
"""Track comprehensive tool usage metrics."""
tool_usage_counter.labels(
tool_name=tool_name, user_id=str(user_id), status=status
).inc()
tool_duration.labels(tool_name=tool_name, complexity=complexity).observe(duration)
def track_tool_error(tool_name: str, error_type: str, complexity: str = "standard"):
"""Track tool execution errors."""
tool_errors.labels(
tool_name=tool_name, error_type=error_type, complexity=complexity
).inc()
def track_cache_operation(
cache_type: str = "default",
operation: str = "get",
hit: bool = False,
key_prefix: str = "unknown",
):
"""Track cache operations with detailed metrics."""
if hit:
cache_hits.labels(cache_type=cache_type, key_prefix=key_prefix).inc()
else:
cache_misses.labels(cache_type=cache_type, key_prefix=key_prefix).inc()
def track_cache_eviction(cache_type: str, reason: str):
"""Track cache evictions."""
cache_evictions.labels(cache_type=cache_type, reason=reason).inc()
def update_cache_metrics(cache_type: str, size_bytes: int, key_count: int):
"""Update cache size and key count metrics."""
cache_size_bytes.labels(cache_type=cache_type).set(size_bytes)
cache_keys_total.labels(cache_type=cache_type).set(key_count)
def track_database_query(
query_type: str, table: str, duration: float, status: str = "success"
):
"""Track database query metrics."""
db_query_duration.labels(query_type=query_type, table=table).observe(duration)
db_queries_total.labels(query_type=query_type, table=table, status=status).inc()
def update_database_metrics(
pool_size: int, active_connections: int, idle_connections: int
):
"""Update database connection metrics."""
db_connection_pool_size.set(pool_size)
db_active_connections.set(active_connections)
db_idle_connections.set(idle_connections)
def track_database_connection_event(event_type: str, reason: str = "normal"):
"""Track database connection lifecycle events."""
if event_type == "created":
db_connections_created.inc()
elif event_type == "closed":
db_connections_closed.labels(reason=reason).inc()
def track_redis_operation(operation: str, duration: float, status: str = "success"):
"""Track Redis operation metrics."""
redis_operations.labels(operation=operation, status=status).inc()
redis_operation_duration.labels(operation=operation).observe(duration)
def update_redis_metrics(connections: int, memory_bytes: int, hits: int, misses: int):
"""Update Redis metrics."""
redis_connections.set(connections)
redis_memory_usage.set(memory_bytes)
if hits > 0:
redis_keyspace_hits.inc(hits)
if misses > 0:
redis_keyspace_misses.inc(misses)
def track_external_api_call(
service: str,
endpoint: str,
method: str,
status_code: int,
duration: float,
error_type: str | None = None,
):
"""Track external API call metrics."""
status = "success" if 200 <= status_code < 300 else "error"
external_api_calls.labels(
service=service, endpoint=endpoint, method=method, status=status
).inc()
external_api_duration.labels(service=service, endpoint=endpoint).observe(duration)
if error_type:
external_api_errors.labels(
service=service, endpoint=endpoint, error_type=error_type
).inc()
def track_user_session(user_type: str, auth_method: str, duration: float | None = None):
"""Track user session metrics."""
user_sessions.labels(user_type=user_type, auth_method=auth_method).inc()
if duration:
user_session_duration.labels(user_type=user_type).observe(duration)
def update_active_users(daily_count: int, monthly_count: int):
"""Update active user counts."""
daily_active_users.set(daily_count)
monthly_active_users.set(monthly_count)
def track_authentication(method: str, status: str, user_agent: str = "unknown"):
"""Track authentication attempts."""
authentication_attempts.labels(
method=method,
status=status,
user_agent=user_agent[:50], # Truncate user agent
).inc()
def track_authorization(resource: str, action: str, status: str):
"""Track authorization checks."""
authorization_checks.labels(resource=resource, action=action, status=status).inc()
def track_security_violation(violation_type: str, severity: str = "medium"):
"""Track security violations."""
security_violations.labels(violation_type=violation_type, severity=severity).inc()
def track_rate_limit_hit(user_id: str, endpoint: str, limit_type: str):
"""Track rate limit violations."""
rate_limit_hits.labels(
user_id=str(user_id), endpoint=endpoint, limit_type=limit_type
).inc()
def update_performance_metrics():
"""Update system performance metrics."""
import gc
import psutil
process = psutil.Process()
# Memory usage
memory_info = process.memory_info()
memory_usage_bytes.set(memory_info.rss)
# CPU usage
cpu_usage_percent.set(process.cpu_percent())
# File descriptors
try:
open_file_descriptors.set(process.num_fds())
except AttributeError:
# Windows doesn't support num_fds
pass
# Garbage collection stats
gc_stats = gc.get_stats()
for i, stat in enumerate(gc_stats):
if "collections" in stat:
garbage_collections.labels(generation=str(i)).inc(stat["collections"])
def get_metrics() -> str:
"""Get Prometheus metrics in text format."""
if PROMETHEUS_AVAILABLE:
return generate_latest().decode("utf-8")
return "# Prometheus not available"
def initialize_monitoring():
"""Initialize monitoring systems."""
logger.info("Initializing monitoring systems...")
# Initialize global monitoring service
monitoring = get_monitoring_service()
if monitoring.sentry_enabled:
logger.info("Sentry error tracking initialized")
else:
logger.info("Sentry error tracking disabled (no DSN configured)")
if PROMETHEUS_AVAILABLE:
logger.info("Prometheus metrics initialized")
else:
logger.info("Prometheus metrics disabled (client not available)")
logger.info("Monitoring systems initialization complete")
```
--------------------------------------------------------------------------------
/maverick_mcp/backtesting/batch_processing.py:
--------------------------------------------------------------------------------
```python
"""
Batch Processing Extensions for VectorBTEngine.
This module adds batch processing capabilities to the VectorBT engine,
allowing for parallel execution of multiple backtest strategies,
parameter optimization, and strategy validation.
"""
import asyncio
import gc
import time
from typing import Any
import numpy as np
from maverick_mcp.utils.memory_profiler import (
cleanup_dataframes,
get_memory_stats,
profile_memory,
)
from maverick_mcp.utils.structured_logger import (
get_structured_logger,
with_structured_logging,
)
logger = get_structured_logger(__name__)
class BatchProcessingMixin:
"""Mixin class to add batch processing methods to VectorBTEngine."""
@with_structured_logging(
"run_batch_backtest", include_performance=True, log_params=True
)
@profile_memory(log_results=True, threshold_mb=100.0)
async def run_batch_backtest(
self,
batch_configs: list[dict[str, Any]],
max_workers: int = 6,
chunk_size: int = 10,
validate_data: bool = True,
fail_fast: bool = False,
) -> dict[str, Any]:
"""
Run multiple backtest strategies in parallel with optimized batch processing.
Args:
batch_configs: List of backtest configurations, each containing:
- symbol: Stock symbol
- strategy_type: Strategy type name
- parameters: Strategy parameters dict
- start_date: Start date string
- end_date: End date string
- initial_capital: Starting capital (optional, default 10000)
- fees: Trading fees (optional, default 0.001)
- slippage: Slippage factor (optional, default 0.001)
max_workers: Maximum concurrent workers
chunk_size: Number of configs to process per batch
validate_data: Whether to validate input data
fail_fast: Whether to stop on first failure
Returns:
Dictionary containing batch results and summary statistics
"""
from maverick_mcp.backtesting.strategy_executor import (
ExecutionContext,
ExecutionResult,
StrategyExecutor,
)
start_time = time.time()
batch_id = f"batch_{int(start_time)}"
logger.info(
f"Starting batch backtest {batch_id} with {len(batch_configs)} configurations"
)
# Validate input data if requested
if validate_data:
validation_errors = []
for i, config in enumerate(batch_configs):
try:
self._validate_batch_config(config, f"config_{i}")
except Exception as e:
validation_errors.append(f"Config {i}: {str(e)}")
if validation_errors:
if fail_fast:
raise ValueError(
f"Batch validation failed: {'; '.join(validation_errors)}"
)
else:
logger.warning(
f"Validation warnings for batch {batch_id}: {validation_errors}"
)
# Initialize executor
executor = StrategyExecutor(
max_concurrent_strategies=max_workers,
cache_manager=getattr(self, "cache", None),
)
# Convert configs to execution contexts
contexts = []
for i, config in enumerate(batch_configs):
context = ExecutionContext(
strategy_id=f"{batch_id}_strategy_{i}",
symbol=config["symbol"],
strategy_type=config["strategy_type"],
parameters=config["parameters"],
start_date=config["start_date"],
end_date=config["end_date"],
initial_capital=config.get("initial_capital", 10000.0),
fees=config.get("fees", 0.001),
slippage=config.get("slippage", 0.001),
)
contexts.append(context)
# Process in chunks to manage memory
all_results = []
successful_results = []
failed_results = []
for chunk_start in range(0, len(contexts), chunk_size):
chunk_end = min(chunk_start + chunk_size, len(contexts))
chunk_contexts = contexts[chunk_start:chunk_end]
logger.info(
f"Processing chunk {chunk_start // chunk_size + 1} ({len(chunk_contexts)} items)"
)
try:
# Execute chunk in parallel
chunk_results = await executor.execute_strategies(chunk_contexts)
# Process results
for result in chunk_results:
all_results.append(result)
if result.success:
successful_results.append(result)
else:
failed_results.append(result)
if fail_fast:
logger.error(f"Batch failed fast on: {result.error}")
break
# Memory cleanup between chunks
if getattr(self, "enable_memory_profiling", False):
cleanup_dataframes()
gc.collect()
except Exception as e:
logger.error(f"Chunk processing failed: {e}")
if fail_fast:
raise
# Add failed result for chunk
for context in chunk_contexts:
failed_results.append(
ExecutionResult(
context=context,
success=False,
error=f"Chunk processing error: {e}",
)
)
# Cleanup executor
await executor.cleanup()
# Calculate summary statistics
total_execution_time = time.time() - start_time
success_rate = (
len(successful_results) / len(all_results) if all_results else 0.0
)
summary = {
"batch_id": batch_id,
"total_configs": len(batch_configs),
"successful": len(successful_results),
"failed": len(failed_results),
"success_rate": success_rate,
"total_execution_time": total_execution_time,
"avg_execution_time": total_execution_time / len(all_results)
if all_results
else 0.0,
"memory_stats": get_memory_stats()
if getattr(self, "enable_memory_profiling", False)
else None,
}
logger.info(f"Batch backtest {batch_id} completed: {summary}")
return {
"batch_id": batch_id,
"summary": summary,
"successful_results": [r.result for r in successful_results if r.result],
"failed_results": [
{
"strategy_id": r.context.strategy_id,
"symbol": r.context.symbol,
"strategy_type": r.context.strategy_type,
"error": r.error,
}
for r in failed_results
],
"all_results": all_results,
}
@with_structured_logging(
"batch_optimize_parameters", include_performance=True, log_params=True
)
async def batch_optimize_parameters(
self,
optimization_configs: list[dict[str, Any]],
max_workers: int = 4,
optimization_method: str = "grid_search",
max_iterations: int = 100,
) -> dict[str, Any]:
"""
Optimize strategy parameters for multiple symbols/strategies in parallel.
Args:
optimization_configs: List of optimization configurations, each containing:
- symbol: Stock symbol
- strategy_type: Strategy type name
- parameter_ranges: Dictionary of parameter ranges to optimize
- start_date: Start date string
- end_date: End date string
- optimization_metric: Metric to optimize (default: sharpe_ratio)
- initial_capital: Starting capital
max_workers: Maximum concurrent workers
optimization_method: Optimization method ('grid_search', 'random_search')
max_iterations: Maximum optimization iterations per config
Returns:
Dictionary containing optimization results for all configurations
"""
start_time = time.time()
batch_id = f"optimize_batch_{int(start_time)}"
logger.info(
f"Starting batch optimization {batch_id} with {len(optimization_configs)} configurations"
)
# Process optimizations in parallel
optimization_tasks = []
for i, config in enumerate(optimization_configs):
task = self._run_single_optimization(
config, f"{batch_id}_opt_{i}", optimization_method, max_iterations
)
optimization_tasks.append(task)
# Execute with concurrency limit
semaphore = asyncio.BoundedSemaphore(max_workers)
async def limited_optimization(task):
async with semaphore:
return await task
# Run all optimizations
optimization_results = await asyncio.gather(
*[limited_optimization(task) for task in optimization_tasks],
return_exceptions=True,
)
# Process results
successful_optimizations = []
failed_optimizations = []
for i, result in enumerate(optimization_results):
if isinstance(result, Exception):
failed_optimizations.append(
{
"config_index": i,
"config": optimization_configs[i],
"error": str(result),
}
)
else:
successful_optimizations.append(result)
# Calculate summary
total_execution_time = time.time() - start_time
success_rate = (
len(successful_optimizations) / len(optimization_configs)
if optimization_configs
else 0.0
)
summary = {
"batch_id": batch_id,
"total_optimizations": len(optimization_configs),
"successful": len(successful_optimizations),
"failed": len(failed_optimizations),
"success_rate": success_rate,
"total_execution_time": total_execution_time,
"optimization_method": optimization_method,
"max_iterations": max_iterations,
}
logger.info(f"Batch optimization {batch_id} completed: {summary}")
return {
"batch_id": batch_id,
"summary": summary,
"successful_optimizations": successful_optimizations,
"failed_optimizations": failed_optimizations,
}
async def batch_validate_strategies(
self,
validation_configs: list[dict[str, Any]],
validation_start_date: str,
validation_end_date: str,
max_workers: int = 6,
) -> dict[str, Any]:
"""
Validate multiple strategies against out-of-sample data in parallel.
Args:
validation_configs: List of validation configurations with optimized parameters
validation_start_date: Start date for validation period
validation_end_date: End date for validation period
max_workers: Maximum concurrent workers
Returns:
Dictionary containing validation results and performance comparison
"""
start_time = time.time()
batch_id = f"validate_batch_{int(start_time)}"
logger.info(
f"Starting batch validation {batch_id} with {len(validation_configs)} strategies"
)
# Create validation backtest configs
validation_batch_configs = []
for config in validation_configs:
validation_config = {
"symbol": config["symbol"],
"strategy_type": config["strategy_type"],
"parameters": config.get(
"optimized_parameters", config.get("parameters", {})
),
"start_date": validation_start_date,
"end_date": validation_end_date,
"initial_capital": config.get("initial_capital", 10000.0),
"fees": config.get("fees", 0.001),
"slippage": config.get("slippage", 0.001),
}
validation_batch_configs.append(validation_config)
# Run validation backtests
validation_results = await self.run_batch_backtest(
validation_batch_configs,
max_workers=max_workers,
validate_data=True,
fail_fast=False,
)
# Calculate validation metrics
validation_metrics = self._calculate_validation_metrics(
validation_configs, validation_results["successful_results"]
)
total_execution_time = time.time() - start_time
return {
"batch_id": batch_id,
"validation_period": {
"start_date": validation_start_date,
"end_date": validation_end_date,
},
"summary": {
"total_strategies": len(validation_configs),
"validated_strategies": len(validation_results["successful_results"]),
"validation_success_rate": len(validation_results["successful_results"])
/ len(validation_configs)
if validation_configs
else 0.0,
"total_execution_time": total_execution_time,
},
"validation_results": validation_results["successful_results"],
"validation_metrics": validation_metrics,
"failed_validations": validation_results["failed_results"],
}
async def get_batch_results(
self, batch_id: str, include_detailed_results: bool = False
) -> dict[str, Any] | None:
"""
Retrieve results for a completed batch operation.
Args:
batch_id: Batch ID to retrieve results for
include_detailed_results: Whether to include full result details
Returns:
Dictionary containing batch results or None if not found
"""
# This would typically retrieve from a persistence layer
# For now, return None as results are returned directly
logger.warning(f"Batch result retrieval not implemented for {batch_id}")
logger.info(
"Batch results are currently returned directly from batch operations"
)
return None
# Alias method for backward compatibility
async def batch_optimize(self, *args, **kwargs):
"""Alias for batch_optimize_parameters for backward compatibility."""
return await self.batch_optimize_parameters(*args, **kwargs)
# =============================================================================
# BATCH PROCESSING HELPER METHODS
# =============================================================================
def _validate_batch_config(self, config: dict[str, Any], config_name: str) -> None:
"""Validate a single batch configuration."""
required_fields = [
"symbol",
"strategy_type",
"parameters",
"start_date",
"end_date",
]
for field in required_fields:
if field not in config:
raise ValueError(f"Missing required field '{field}' in {config_name}")
# Validate dates
try:
from maverick_mcp.data.validation import DataValidator
DataValidator.validate_date_range(config["start_date"], config["end_date"])
except Exception as e:
raise ValueError(f"Invalid date range in {config_name}: {e}") from e
# Validate symbol
if not isinstance(config["symbol"], str) or len(config["symbol"]) == 0:
raise ValueError(f"Invalid symbol in {config_name}")
# Validate strategy type
if not isinstance(config["strategy_type"], str):
raise ValueError(f"Invalid strategy_type in {config_name}")
# Validate parameters
if not isinstance(config["parameters"], dict):
raise ValueError(f"Parameters must be a dictionary in {config_name}")
async def _run_single_optimization(
self,
config: dict[str, Any],
optimization_id: str,
method: str,
max_iterations: int,
) -> dict[str, Any]:
"""Run optimization for a single configuration."""
try:
# Extract configuration
symbol = config["symbol"]
strategy_type = config["strategy_type"]
parameter_ranges = config["parameter_ranges"]
start_date = config["start_date"]
end_date = config["end_date"]
optimization_metric = config.get("optimization_metric", "sharpe_ratio")
initial_capital = config.get("initial_capital", 10000.0)
# Simple parameter optimization (placeholder - would use actual optimizer)
# For now, return basic result structure
best_params = {}
for param, ranges in parameter_ranges.items():
if isinstance(ranges, list) and len(ranges) >= 2:
# Use middle value as "optimized"
best_params[param] = ranges[len(ranges) // 2]
elif isinstance(ranges, dict):
if "min" in ranges and "max" in ranges:
best_params[param] = (ranges["min"] + ranges["max"]) / 2
# Run a basic backtest with these parameters
backtest_result = await self.run_backtest(
symbol=symbol,
strategy_type=strategy_type,
parameters=best_params,
start_date=start_date,
end_date=end_date,
initial_capital=initial_capital,
)
best_score = backtest_result.get("metrics", {}).get(
optimization_metric, 0.0
)
return {
"optimization_id": optimization_id,
"symbol": symbol,
"strategy_type": strategy_type,
"optimized_parameters": best_params,
"best_score": best_score,
"optimization_history": [
{"parameters": best_params, "score": best_score}
],
"execution_time": 0.0,
}
except Exception as e:
logger.error(f"Optimization failed for {optimization_id}: {e}")
raise
def _calculate_validation_metrics(
self,
original_configs: list[dict[str, Any]],
validation_results: list[dict[str, Any]],
) -> dict[str, Any]:
"""Calculate validation metrics comparing in-sample vs out-of-sample performance."""
metrics = {
"strategy_comparisons": [],
"aggregate_metrics": {
"avg_in_sample_sharpe": 0.0,
"avg_out_of_sample_sharpe": 0.0,
"sharpe_degradation": 0.0,
"strategies_with_positive_validation": 0,
},
}
if not original_configs or not validation_results:
return metrics
sharpe_ratios_in_sample = []
sharpe_ratios_out_of_sample = []
for i, (original, validation) in enumerate(
zip(original_configs, validation_results, strict=False)
):
# Get in-sample performance (from original optimization)
in_sample_sharpe = original.get("best_score", 0.0)
# Get out-of-sample performance
out_of_sample_sharpe = validation.get("metrics", {}).get(
"sharpe_ratio", 0.0
)
strategy_comparison = {
"strategy_index": i,
"symbol": original["symbol"],
"strategy_type": original["strategy_type"],
"in_sample_sharpe": in_sample_sharpe,
"out_of_sample_sharpe": out_of_sample_sharpe,
"sharpe_degradation": in_sample_sharpe - out_of_sample_sharpe,
"validation_success": out_of_sample_sharpe > 0,
}
metrics["strategy_comparisons"].append(strategy_comparison)
sharpe_ratios_in_sample.append(in_sample_sharpe)
sharpe_ratios_out_of_sample.append(out_of_sample_sharpe)
# Calculate aggregate metrics
if sharpe_ratios_in_sample and sharpe_ratios_out_of_sample:
metrics["aggregate_metrics"]["avg_in_sample_sharpe"] = np.mean(
sharpe_ratios_in_sample
)
metrics["aggregate_metrics"]["avg_out_of_sample_sharpe"] = np.mean(
sharpe_ratios_out_of_sample
)
metrics["aggregate_metrics"]["sharpe_degradation"] = (
metrics["aggregate_metrics"]["avg_in_sample_sharpe"]
- metrics["aggregate_metrics"]["avg_out_of_sample_sharpe"]
)
metrics["aggregate_metrics"]["strategies_with_positive_validation"] = sum(
1
for comp in metrics["strategy_comparisons"]
if comp["validation_success"]
)
return metrics
```
--------------------------------------------------------------------------------
/maverick_mcp/workflows/agents/optimizer_agent.py:
--------------------------------------------------------------------------------
```python
"""
Optimizer Agent for intelligent parameter optimization.
This agent performs regime-aware parameter optimization for selected strategies,
using adaptive grid sizes and optimization metrics based on market conditions.
"""
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Any
from maverick_mcp.backtesting import StrategyOptimizer, VectorBTEngine
from maverick_mcp.backtesting.strategies.templates import get_strategy_info
from maverick_mcp.workflows.state import BacktestingWorkflowState
logger = logging.getLogger(__name__)
class OptimizerAgent:
"""Intelligent parameter optimizer with regime-aware optimization."""
def __init__(
self,
vectorbt_engine: VectorBTEngine | None = None,
strategy_optimizer: StrategyOptimizer | None = None,
):
"""Initialize optimizer agent.
Args:
vectorbt_engine: VectorBT backtesting engine
strategy_optimizer: Strategy optimization engine
"""
self.engine = vectorbt_engine or VectorBTEngine()
self.optimizer = strategy_optimizer or StrategyOptimizer(self.engine)
# Optimization configurations for different regimes
self.REGIME_OPTIMIZATION_CONFIG = {
"trending": {
"optimization_metric": "total_return", # Focus on capturing trends
"grid_size": "medium",
"min_trades": 10,
"max_drawdown_limit": 0.25,
},
"ranging": {
"optimization_metric": "sharpe_ratio", # Focus on risk-adjusted returns
"grid_size": "fine", # More precision needed for ranging markets
"min_trades": 15,
"max_drawdown_limit": 0.15,
},
"volatile": {
"optimization_metric": "calmar_ratio", # Risk-adjusted for volatility
"grid_size": "coarse", # Avoid overfitting in volatile conditions
"min_trades": 8,
"max_drawdown_limit": 0.35,
},
"volatile_trending": {
"optimization_metric": "sortino_ratio", # Focus on downside risk
"grid_size": "medium",
"min_trades": 10,
"max_drawdown_limit": 0.30,
},
"low_volume": {
"optimization_metric": "win_rate", # Consistency important in low volume
"grid_size": "medium",
"min_trades": 12,
"max_drawdown_limit": 0.20,
},
"unknown": {
"optimization_metric": "sharpe_ratio", # Balanced approach
"grid_size": "medium",
"min_trades": 10,
"max_drawdown_limit": 0.20,
},
}
# Strategy-specific optimization parameters
self.STRATEGY_PARAM_RANGES = {
"sma_cross": {
"fast_period": {
"coarse": [5, 10, 15],
"medium": [5, 8, 10, 12, 15, 20],
"fine": list(range(5, 21)),
},
"slow_period": {
"coarse": [20, 30, 50],
"medium": [20, 25, 30, 40, 50],
"fine": list(range(20, 51, 5)),
},
},
"rsi": {
"period": {
"coarse": [10, 14, 21],
"medium": [10, 12, 14, 16, 21],
"fine": list(range(8, 25)),
},
"oversold": {
"coarse": [25, 30, 35],
"medium": [20, 25, 30, 35],
"fine": list(range(15, 36, 5)),
},
"overbought": {
"coarse": [65, 70, 75],
"medium": [65, 70, 75, 80],
"fine": list(range(65, 86, 5)),
},
},
"macd": {
"fast_period": {
"coarse": [8, 12, 16],
"medium": [8, 10, 12, 14, 16],
"fine": list(range(8, 17)),
},
"slow_period": {
"coarse": [21, 26, 32],
"medium": [21, 24, 26, 28, 32],
"fine": list(range(21, 35)),
},
"signal_period": {
"coarse": [6, 9, 12],
"medium": [6, 8, 9, 10, 12],
"fine": list(range(6, 15)),
},
},
"bollinger": {
"period": {
"coarse": [15, 20, 25],
"medium": [15, 18, 20, 22, 25],
"fine": list(range(12, 28)),
},
"std_dev": {
"coarse": [1.5, 2.0, 2.5],
"medium": [1.5, 1.8, 2.0, 2.2, 2.5],
"fine": [1.0, 1.5, 1.8, 2.0, 2.2, 2.5, 3.0],
},
},
"momentum": {
"lookback": {
"coarse": [10, 20, 30],
"medium": [10, 15, 20, 25, 30],
"fine": list(range(5, 31, 5)),
},
"threshold": {
"coarse": [0.03, 0.05, 0.08],
"medium": [0.02, 0.03, 0.05, 0.07, 0.10],
"fine": [0.01, 0.02, 0.03, 0.04, 0.05, 0.07, 0.10, 0.15],
},
},
}
logger.info("OptimizerAgent initialized")
async def optimize_parameters(
self, state: BacktestingWorkflowState
) -> BacktestingWorkflowState:
"""Optimize parameters for selected strategies.
Args:
state: Current workflow state with selected strategies
Returns:
Updated state with optimization results
"""
start_time = datetime.now()
try:
logger.info(
f"Optimizing parameters for {len(state.selected_strategies)} strategies on {state.symbol}"
)
# Get optimization configuration based on regime
optimization_config = self._get_optimization_config(
state.market_regime, state.regime_confidence
)
# Generate parameter grids for each strategy
parameter_grids = self._generate_parameter_grids(
state.selected_strategies, optimization_config["grid_size"]
)
# Optimize each strategy
optimization_results = {}
best_parameters = {}
total_iterations = 0
# Use shorter timeframe for optimization to avoid overfitting
opt_start_date = self._calculate_optimization_window(
state.start_date, state.end_date
)
for strategy in state.selected_strategies:
try:
logger.info(f"Optimizing {strategy} strategy...")
param_grid = parameter_grids.get(strategy, {})
if not param_grid:
logger.warning(
f"No parameter grid for {strategy}, using defaults"
)
continue
# Run optimization
result = await self.engine.optimize_parameters(
symbol=state.symbol,
strategy_type=strategy,
param_grid=param_grid,
start_date=opt_start_date,
end_date=state.end_date,
optimization_metric=optimization_config["optimization_metric"],
initial_capital=state.initial_capital,
top_n=min(
10, len(state.selected_strategies) * 2
), # Adaptive top_n
)
# Filter results by quality metrics
filtered_result = self._filter_optimization_results(
result, optimization_config
)
optimization_results[strategy] = filtered_result
best_parameters[strategy] = filtered_result.get(
"best_parameters", {}
)
total_iterations += filtered_result.get("valid_combinations", 0)
logger.info(
f"Optimized {strategy}: {filtered_result.get('best_metric_value', 0):.3f} {optimization_config['optimization_metric']}"
)
except Exception as e:
logger.error(f"Failed to optimize {strategy}: {e}")
# Use default parameters as fallback
strategy_info = get_strategy_info(strategy)
best_parameters[strategy] = strategy_info.get("parameters", {})
state.fallback_strategies_used.append(
f"{strategy}_optimization_fallback"
)
# Update state
state.optimization_config = optimization_config
state.parameter_grids = parameter_grids
state.optimization_results = optimization_results
state.best_parameters = best_parameters
state.optimization_iterations = total_iterations
# Record execution time
execution_time = (datetime.now() - start_time).total_seconds() * 1000
state.optimization_time_ms = execution_time
# Update workflow status
state.workflow_status = "validating"
state.current_step = "optimization_completed"
state.steps_completed.append("parameter_optimization")
logger.info(
f"Parameter optimization completed for {state.symbol}: "
f"{total_iterations} combinations tested in {execution_time:.0f}ms"
)
return state
except Exception as e:
error_info = {
"step": "parameter_optimization",
"error": str(e),
"timestamp": datetime.now().isoformat(),
"symbol": state.symbol,
}
state.errors_encountered.append(error_info)
# Fallback to default parameters
default_params = {}
for strategy in state.selected_strategies:
strategy_info = get_strategy_info(strategy)
default_params[strategy] = strategy_info.get("parameters", {})
state.best_parameters = default_params
state.fallback_strategies_used.append("optimization_fallback")
logger.error(f"Parameter optimization failed for {state.symbol}: {e}")
return state
def _get_optimization_config(
self, regime: str, regime_confidence: float
) -> dict[str, Any]:
"""Get optimization configuration based on market regime."""
base_config = self.REGIME_OPTIMIZATION_CONFIG.get(
regime, self.REGIME_OPTIMIZATION_CONFIG["unknown"]
).copy()
# Adjust grid size based on confidence
if regime_confidence < 0.5:
# Low confidence -> use coarser grid to avoid overfitting
if base_config["grid_size"] == "fine":
base_config["grid_size"] = "medium"
elif base_config["grid_size"] == "medium":
base_config["grid_size"] = "coarse"
return base_config
def _generate_parameter_grids(
self, strategies: list[str], grid_size: str
) -> dict[str, dict[str, list]]:
"""Generate parameter grids for optimization."""
parameter_grids = {}
for strategy in strategies:
if strategy in self.STRATEGY_PARAM_RANGES:
param_ranges = self.STRATEGY_PARAM_RANGES[strategy]
grid = {}
for param_name, size_ranges in param_ranges.items():
if grid_size in size_ranges:
grid[param_name] = size_ranges[grid_size]
else:
# Fallback to medium if requested size not available
grid[param_name] = size_ranges.get(
"medium", size_ranges["coarse"]
)
parameter_grids[strategy] = grid
else:
# For strategies not in our predefined ranges, use default minimal grid
parameter_grids[strategy] = self._generate_default_grid(
strategy, grid_size
)
return parameter_grids
def _generate_default_grid(self, strategy: str, grid_size: str) -> dict[str, list]:
"""Generate default parameter grid for unknown strategies."""
# Get strategy info to understand default parameters
strategy_info = get_strategy_info(strategy)
default_params = strategy_info.get("parameters", {})
grid = {}
# Generate basic variations around default values
for param_name, default_value in default_params.items():
if isinstance(default_value, int | float):
if grid_size == "coarse":
variations = [
default_value * 0.8,
default_value,
default_value * 1.2,
]
elif grid_size == "fine":
variations = [
default_value * 0.7,
default_value * 0.8,
default_value * 0.9,
default_value,
default_value * 1.1,
default_value * 1.2,
default_value * 1.3,
]
else: # medium
variations = [
default_value * 0.8,
default_value * 0.9,
default_value,
default_value * 1.1,
default_value * 1.2,
]
# Convert back to appropriate type and filter valid values
if isinstance(default_value, int):
grid[param_name] = [max(1, int(v)) for v in variations]
else:
grid[param_name] = [max(0.001, v) for v in variations]
else:
# For non-numeric parameters, just use the default
grid[param_name] = [default_value]
return grid
def _calculate_optimization_window(self, start_date: str, end_date: str) -> str:
"""Calculate optimization window to prevent overfitting."""
start_dt = datetime.strptime(start_date, "%Y-%m-%d")
end_dt = datetime.strptime(end_date, "%Y-%m-%d")
total_days = (end_dt - start_dt).days
# Use 70% of data for optimization, leaving 30% for validation
opt_days = int(total_days * 0.7)
opt_start = end_dt - timedelta(days=opt_days)
return opt_start.strftime("%Y-%m-%d")
def _filter_optimization_results(
self, result: dict[str, Any], optimization_config: dict[str, Any]
) -> dict[str, Any]:
"""Filter optimization results based on quality criteria."""
if "top_results" not in result or not result["top_results"]:
return result
# Quality filters
min_trades = optimization_config.get("min_trades", 10)
max_drawdown_limit = optimization_config.get("max_drawdown_limit", 0.25)
# Filter results by quality criteria
filtered_results = []
for res in result["top_results"]:
# Check minimum trades
if res.get("total_trades", 0) < min_trades:
continue
# Check maximum drawdown
if abs(res.get("max_drawdown", 0)) > max_drawdown_limit:
continue
filtered_results.append(res)
# If no results pass filters, relax criteria
if not filtered_results and result["top_results"]:
logger.warning("No results passed quality filters, relaxing criteria")
# Take top results but with warning
filtered_results = result["top_results"][:3] # Top 3 regardless of quality
# Update result with filtered data
filtered_result = result.copy()
filtered_result["top_results"] = filtered_results
if filtered_results:
filtered_result["best_parameters"] = filtered_results[0]["parameters"]
filtered_result["best_metric_value"] = filtered_results[0][
optimization_config["optimization_metric"]
]
else:
# Complete fallback
filtered_result["best_parameters"] = {}
filtered_result["best_metric_value"] = 0.0
return filtered_result
def get_optimization_summary(
self, state: BacktestingWorkflowState
) -> dict[str, Any]:
"""Get summary of optimization results."""
if not state.optimization_results:
return {"summary": "No optimization results available"}
summary = {
"total_strategies": len(state.selected_strategies),
"optimized_strategies": len(state.optimization_results),
"total_iterations": state.optimization_iterations,
"execution_time_ms": state.optimization_time_ms,
"optimization_config": state.optimization_config,
"strategy_results": {},
}
for strategy, results in state.optimization_results.items():
if results:
summary["strategy_results"][strategy] = {
"best_metric": results.get("best_metric_value", 0),
"metric_type": state.optimization_config.get(
"optimization_metric", "unknown"
),
"valid_combinations": results.get("valid_combinations", 0),
"best_parameters": state.best_parameters.get(strategy, {}),
}
return summary
async def parallel_optimization(
self, state: BacktestingWorkflowState, max_concurrent: int = 3
) -> BacktestingWorkflowState:
"""Run optimization for multiple strategies in parallel."""
if len(state.selected_strategies) <= 1:
return await self.optimize_parameters(state)
start_time = datetime.now()
logger.info(
f"Running parallel optimization for {len(state.selected_strategies)} strategies"
)
# Create semaphore to limit concurrent optimizations
semaphore = asyncio.Semaphore(max_concurrent)
async def optimize_single_strategy(strategy: str) -> tuple[str, dict[str, Any]]:
async with semaphore:
try:
optimization_config = self._get_optimization_config(
state.market_regime, state.regime_confidence
)
parameter_grids = self._generate_parameter_grids(
[strategy], optimization_config["grid_size"]
)
opt_start_date = self._calculate_optimization_window(
state.start_date, state.end_date
)
result = await self.engine.optimize_parameters(
symbol=state.symbol,
strategy_type=strategy,
param_grid=parameter_grids.get(strategy, {}),
start_date=opt_start_date,
end_date=state.end_date,
optimization_metric=optimization_config["optimization_metric"],
initial_capital=state.initial_capital,
top_n=10,
)
filtered_result = self._filter_optimization_results(
result, optimization_config
)
return strategy, filtered_result
except Exception as e:
logger.error(f"Failed to optimize {strategy}: {e}")
return strategy, {"error": str(e)}
# Run optimizations in parallel
tasks = [
optimize_single_strategy(strategy) for strategy in state.selected_strategies
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
optimization_results = {}
best_parameters = {}
total_iterations = 0
for result in results:
if isinstance(result, Exception):
logger.error(f"Parallel optimization failed: {result}")
continue
strategy, opt_result = result
if "error" not in opt_result:
optimization_results[strategy] = opt_result
best_parameters[strategy] = opt_result.get("best_parameters", {})
total_iterations += opt_result.get("valid_combinations", 0)
# Update state
optimization_config = self._get_optimization_config(
state.market_regime, state.regime_confidence
)
state.optimization_config = optimization_config
state.optimization_results = optimization_results
state.best_parameters = best_parameters
state.optimization_iterations = total_iterations
# Record execution time
execution_time = (datetime.now() - start_time).total_seconds() * 1000
state.optimization_time_ms = execution_time
logger.info(f"Parallel optimization completed in {execution_time:.0f}ms")
return state
```
--------------------------------------------------------------------------------
/scripts/run_stock_screening.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Stock screening script for self-contained Maverick-MCP database.
This script runs various stock screening algorithms and populates the
screening tables with results, making the system completely self-contained.
Usage:
python scripts/run_stock_screening.py --all
python scripts/run_stock_screening.py --maverick
python scripts/run_stock_screening.py --bear
python scripts/run_stock_screening.py --supply-demand
"""
import argparse
import asyncio
import logging
import sys
from datetime import datetime, timedelta
from pathlib import Path
import numpy as np
import pandas as pd
import talib
# Add parent directory to path for imports
sys.path.append(str(Path(__file__).parent.parent))
from maverick_mcp.config.database_self_contained import (
SelfContainedDatabaseSession,
init_self_contained_database,
)
from maverick_mcp.data.models import (
MaverickBearStocks,
MaverickStocks,
PriceCache,
Stock,
SupplyDemandBreakoutStocks,
bulk_insert_screening_data,
)
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("stock_screener")
class TechnicalAnalyzer:
"""Calculates technical indicators for stock screening."""
@staticmethod
def calculate_moving_averages(df: pd.DataFrame) -> pd.DataFrame:
"""Calculate various moving averages."""
df["SMA_20"] = talib.SMA(df["close"].values, timeperiod=20)
df["SMA_50"] = talib.SMA(df["close"].values, timeperiod=50)
df["SMA_150"] = talib.SMA(df["close"].values, timeperiod=150)
df["SMA_200"] = talib.SMA(df["close"].values, timeperiod=200)
df["EMA_21"] = talib.EMA(df["close"].values, timeperiod=21)
return df
@staticmethod
def calculate_rsi(df: pd.DataFrame, period: int = 14) -> pd.DataFrame:
"""Calculate RSI indicator."""
df[f"RSI_{period}"] = talib.RSI(df["close"].values, timeperiod=period)
return df
@staticmethod
def calculate_macd(df: pd.DataFrame) -> pd.DataFrame:
"""Calculate MACD indicator."""
macd, macd_signal, macd_hist = talib.MACD(df["close"].values)
df["MACD"] = macd
df["MACD_Signal"] = macd_signal
df["MACD_Histogram"] = macd_hist
return df
@staticmethod
def calculate_atr(df: pd.DataFrame, period: int = 14) -> pd.DataFrame:
"""Calculate Average True Range."""
df[f"ATR_{period}"] = talib.ATR(
df["high"].values, df["low"].values, df["close"].values, timeperiod=period
)
return df
@staticmethod
def calculate_relative_strength(
df: pd.DataFrame, benchmark_df: pd.DataFrame
) -> pd.DataFrame:
"""Calculate relative strength vs benchmark (simplified)."""
# Simplified RS calculation - in production would use proper technical analysis methodology
stock_returns = df["close"].pct_change(periods=252).fillna(0) # 1 year
benchmark_returns = benchmark_df["close"].pct_change(periods=252).fillna(0)
# Momentum Score approximation (0-100 scale)
relative_performance = stock_returns - benchmark_returns
df["Momentum_Score"] = np.clip((relative_performance + 1) * 50, 0, 100)
return df
@staticmethod
def detect_patterns(df: pd.DataFrame) -> pd.DataFrame:
"""Detect chart patterns (simplified)."""
df["Pattern"] = "None"
df["Squeeze"] = "None"
df["Consolidation"] = "None"
df["Entry"] = "None"
# Simplified pattern detection
latest = df.iloc[-1]
# Basic trend detection
if (
latest["close"] > latest["SMA_20"]
and latest["SMA_20"] > latest["SMA_50"]
and latest["SMA_50"] > latest["SMA_150"]
):
df.loc[df.index[-1], "Pattern"] = "Uptrend"
# Basic squeeze detection (Bollinger Band width vs ATR)
if latest["ATR_14"] < df["ATR_14"].rolling(20).mean().iloc[-1]:
df.loc[df.index[-1], "Squeeze"] = "Yes"
return df
class StockScreener:
"""Runs various stock screening algorithms."""
def __init__(self):
self.analyzer = TechnicalAnalyzer()
async def get_stock_data(
self, session, symbol: str, days: int = 365
) -> pd.DataFrame | None:
"""
Get stock price data from database.
Args:
session: Database session
symbol: Stock ticker symbol
days: Number of days of historical data
Returns:
DataFrame with price data or None
"""
cutoff_date = datetime.now().date() - timedelta(days=days)
query = (
session.query(PriceCache)
.join(Stock)
.filter(Stock.ticker_symbol == symbol, PriceCache.date >= cutoff_date)
.order_by(PriceCache.date)
)
records = query.all()
if not records:
return None
data = []
for record in records:
data.append(
{
"date": record.date,
"open": float(record.open_price) if record.open_price else 0,
"high": float(record.high_price) if record.high_price else 0,
"low": float(record.low_price) if record.low_price else 0,
"close": float(record.close_price) if record.close_price else 0,
"volume": record.volume or 0,
}
)
if not data:
return None
df = pd.DataFrame(data)
df.set_index("date", inplace=True)
return df
async def run_maverick_screening(self, session) -> list[dict]:
"""
Run Maverick momentum screening algorithm.
Returns:
List of screening results
"""
logger.info("Running Maverick momentum screening...")
# Get all active stocks
stocks = session.query(Stock).filter(Stock.is_active).all()
results = []
for stock in stocks:
try:
df = await self.get_stock_data(session, stock.ticker_symbol, days=365)
if df is None or len(df) < 200:
continue
# Calculate technical indicators
df = self.analyzer.calculate_moving_averages(df)
df = self.analyzer.calculate_rsi(df)
df = self.analyzer.calculate_atr(df)
df = self.analyzer.detect_patterns(df)
latest = df.iloc[-1]
# Maverick screening criteria (simplified)
score = 0
# Price above moving averages
if latest["close"] > latest["SMA_50"]:
score += 25
if latest["close"] > latest["SMA_150"]:
score += 25
if latest["close"] > latest["SMA_200"]:
score += 25
# Moving average alignment
if (
latest["SMA_50"] > latest["SMA_150"]
and latest["SMA_150"] > latest["SMA_200"]
):
score += 25
# Volume above average
avg_volume = df["volume"].rolling(30).mean().iloc[-1]
if latest["volume"] > avg_volume * 1.5:
score += 10
# RSI not overbought
if latest["RSI_14"] < 80:
score += 10
# Pattern detection bonus
if latest["Pattern"] == "Uptrend":
score += 15
if score >= 50: # Minimum threshold
result = {
"ticker": stock.ticker_symbol,
"open_price": latest["open"],
"high_price": latest["high"],
"low_price": latest["low"],
"close_price": latest["close"],
"volume": int(latest["volume"]),
"ema_21": latest["EMA_21"],
"sma_50": latest["SMA_50"],
"sma_150": latest["SMA_150"],
"sma_200": latest["SMA_200"],
"momentum_score": latest.get("Momentum_Score", 50),
"avg_vol_30d": avg_volume,
"adr_pct": (
(latest["high"] - latest["low"]) / latest["close"] * 100
),
"atr": latest["ATR_14"],
"pattern_type": latest["Pattern"],
"squeeze_status": latest["Squeeze"],
"consolidation_status": latest["Consolidation"],
"entry_signal": latest["Entry"],
"compression_score": min(score // 10, 10),
"pattern_detected": 1 if latest["Pattern"] != "None" else 0,
"combined_score": score,
}
results.append(result)
except Exception as e:
logger.warning(f"Error screening {stock.ticker_symbol}: {e}")
continue
logger.info(f"Maverick screening found {len(results)} candidates")
return results
async def run_bear_screening(self, session) -> list[dict]:
"""
Run bear market screening algorithm.
Returns:
List of screening results
"""
logger.info("Running bear market screening...")
stocks = session.query(Stock).filter(Stock.is_active).all()
results = []
for stock in stocks:
try:
df = await self.get_stock_data(session, stock.ticker_symbol, days=365)
if df is None or len(df) < 200:
continue
# Calculate technical indicators
df = self.analyzer.calculate_moving_averages(df)
df = self.analyzer.calculate_rsi(df)
df = self.analyzer.calculate_macd(df)
df = self.analyzer.calculate_atr(df)
latest = df.iloc[-1]
# Bear screening criteria
score = 0
# Price below moving averages (bearish)
if latest["close"] < latest["SMA_50"]:
score += 20
if latest["close"] < latest["SMA_200"]:
score += 20
# RSI oversold
if latest["RSI_14"] < 30:
score += 15
elif latest["RSI_14"] < 40:
score += 10
# MACD bearish
if latest["MACD"] < latest["MACD_Signal"]:
score += 15
# High volume decline
avg_volume = df["volume"].rolling(30).mean().iloc[-1]
if (
latest["volume"] > avg_volume * 1.2
and latest["close"] < df["close"].iloc[-2]
):
score += 20
# ATR contraction (consolidation)
atr_avg = df["ATR_14"].rolling(20).mean().iloc[-1]
atr_contraction = latest["ATR_14"] < atr_avg * 0.8
if atr_contraction:
score += 10
if score >= 40: # Minimum threshold for bear candidates
# Calculate distance from 20-day SMA
sma_20 = df["close"].rolling(20).mean().iloc[-1]
dist_from_sma20 = (latest["close"] - sma_20) / sma_20 * 100
result = {
"ticker": stock.ticker_symbol,
"open_price": latest["open"],
"high_price": latest["high"],
"low_price": latest["low"],
"close_price": latest["close"],
"volume": int(latest["volume"]),
"momentum_score": latest.get("Momentum_Score", 50),
"ema_21": latest["EMA_21"],
"sma_50": latest["SMA_50"],
"sma_200": latest["SMA_200"],
"rsi_14": latest["RSI_14"],
"macd": latest["MACD"],
"macd_signal": latest["MACD_Signal"],
"macd_histogram": latest["MACD_Histogram"],
"dist_days_20": int(abs(dist_from_sma20)),
"adr_pct": (
(latest["high"] - latest["low"]) / latest["close"] * 100
),
"atr_contraction": atr_contraction,
"atr": latest["ATR_14"],
"avg_vol_30d": avg_volume,
"big_down_vol": (
latest["volume"] > avg_volume * 1.5
and latest["close"] < df["close"].iloc[-2]
),
"squeeze_status": "Contraction" if atr_contraction else "None",
"consolidation_status": "None",
"score": score,
}
results.append(result)
except Exception as e:
logger.warning(f"Error in bear screening {stock.ticker_symbol}: {e}")
continue
logger.info(f"Bear screening found {len(results)} candidates")
return results
async def run_supply_demand_screening(self, session) -> list[dict]:
"""
Run supply/demand breakout screening algorithm.
Returns:
List of screening results
"""
logger.info("Running supply/demand breakout screening...")
stocks = session.query(Stock).filter(Stock.is_active).all()
results = []
for stock in stocks:
try:
df = await self.get_stock_data(session, stock.ticker_symbol, days=365)
if df is None or len(df) < 200:
continue
# Calculate technical indicators
df = self.analyzer.calculate_moving_averages(df)
df = self.analyzer.calculate_atr(df)
df = self.analyzer.detect_patterns(df)
latest = df.iloc[-1]
# Supply/Demand criteria (Technical Breakout Analysis)
meets_criteria = True
# Criteria 1: Current stock price > 150 and 200-day SMA
if not (
latest["close"] > latest["SMA_150"]
and latest["close"] > latest["SMA_200"]
):
meets_criteria = False
# Criteria 2: 150-day SMA > 200-day SMA
if not (latest["SMA_150"] > latest["SMA_200"]):
meets_criteria = False
# Criteria 3: 200-day SMA trending up for at least 1 month
sma_200_1m_ago = (
df["SMA_200"].iloc[-22] if len(df) > 22 else df["SMA_200"].iloc[0]
)
if not (latest["SMA_200"] > sma_200_1m_ago):
meets_criteria = False
# Criteria 4: 50-day SMA > 150 and 200-day SMA
if not (
latest["SMA_50"] > latest["SMA_150"]
and latest["SMA_50"] > latest["SMA_200"]
):
meets_criteria = False
# Criteria 5: Current stock price > 50-day SMA
if not (latest["close"] > latest["SMA_50"]):
meets_criteria = False
# Additional scoring for quality
accumulation_rating = 0
distribution_rating = 0
breakout_strength = 0
# Price above all MAs = accumulation
if (
latest["close"]
> latest["SMA_50"]
> latest["SMA_150"]
> latest["SMA_200"]
):
accumulation_rating = 85
# Volume above average = institutional interest
avg_volume = df["volume"].rolling(30).mean().iloc[-1]
if latest["volume"] > avg_volume * 1.2:
breakout_strength += 25
# Price near 52-week high
high_52w = df["high"].rolling(252).max().iloc[-1]
if latest["close"] > high_52w * 0.75: # Within 25% of 52-week high
breakout_strength += 25
if meets_criteria:
result = {
"ticker": stock.ticker_symbol,
"open_price": latest["open"],
"high_price": latest["high"],
"low_price": latest["low"],
"close_price": latest["close"],
"volume": int(latest["volume"]),
"ema_21": latest["EMA_21"],
"sma_50": latest["SMA_50"],
"sma_150": latest["SMA_150"],
"sma_200": latest["SMA_200"],
"momentum_score": latest.get(
"Momentum_Score", 75
), # Higher default for qualified stocks
"avg_volume_30d": avg_volume,
"adr_pct": (
(latest["high"] - latest["low"]) / latest["close"] * 100
),
"atr": latest["ATR_14"],
"pattern_type": latest["Pattern"],
"squeeze_status": latest["Squeeze"],
"consolidation_status": latest["Consolidation"],
"entry_signal": latest["Entry"],
"accumulation_rating": accumulation_rating,
"distribution_rating": distribution_rating,
"breakout_strength": breakout_strength,
}
results.append(result)
except Exception as e:
logger.warning(
f"Error in supply/demand screening {stock.ticker_symbol}: {e}"
)
continue
logger.info(f"Supply/demand screening found {len(results)} candidates")
return results
async def main():
"""Main function to run stock screening."""
parser = argparse.ArgumentParser(description="Run stock screening algorithms")
parser.add_argument(
"--all", action="store_true", help="Run all screening algorithms"
)
parser.add_argument(
"--maverick", action="store_true", help="Run Maverick momentum screening"
)
parser.add_argument("--bear", action="store_true", help="Run bear market screening")
parser.add_argument(
"--supply-demand", action="store_true", help="Run supply/demand screening"
)
parser.add_argument("--database-url", type=str, help="Override database URL")
args = parser.parse_args()
if not any([args.all, args.maverick, args.bear, args.supply_demand]):
parser.print_help()
sys.exit(1)
# Initialize database
try:
init_self_contained_database(database_url=args.database_url)
logger.info("Self-contained database initialized")
except Exception as e:
logger.error(f"Database initialization failed: {e}")
sys.exit(1)
# Initialize screener
screener = StockScreener()
today = datetime.now().date()
with SelfContainedDatabaseSession() as session:
# Run Maverick screening
if args.all or args.maverick:
try:
maverick_results = await screener.run_maverick_screening(session)
if maverick_results:
count = bulk_insert_screening_data(
session, MaverickStocks, maverick_results, today
)
logger.info(f"Inserted {count} Maverick screening results")
except Exception as e:
logger.error(f"Maverick screening failed: {e}")
# Run Bear screening
if args.all or args.bear:
try:
bear_results = await screener.run_bear_screening(session)
if bear_results:
count = bulk_insert_screening_data(
session, MaverickBearStocks, bear_results, today
)
logger.info(f"Inserted {count} Bear screening results")
except Exception as e:
logger.error(f"Bear screening failed: {e}")
# Run Supply/Demand screening
if args.all or args.supply_demand:
try:
sd_results = await screener.run_supply_demand_screening(session)
if sd_results:
count = bulk_insert_screening_data(
session, SupplyDemandBreakoutStocks, sd_results, today
)
logger.info(f"Inserted {count} Supply/Demand screening results")
except Exception as e:
logger.error(f"Supply/Demand screening failed: {e}")
# Display final stats
from maverick_mcp.config.database_self_contained import get_self_contained_db_config
db_config = get_self_contained_db_config()
stats = db_config.get_database_stats()
print("\n📊 Final Database Statistics:")
print(f" Total Records: {stats['total_records']}")
for table, count in stats["tables"].items():
if "screening" in table or "maverick" in table or "supply_demand" in table:
print(f" {table}: {count}")
print("\n✅ Stock screening completed successfully!")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/maverick_mcp/utils/debug_utils.py:
--------------------------------------------------------------------------------
```python
"""
Debug utilities for backtesting system troubleshooting.
This module provides comprehensive debugging tools including:
- Request/response logging
- Performance profiling
- Memory analysis
- Error tracking
- Debug mode utilities
"""
import inspect
import json
import time
import traceback
from collections.abc import Callable, Generator
from contextlib import contextmanager
from datetime import UTC, datetime
from functools import wraps
from typing import Any
import psutil
from maverick_mcp.utils.structured_logger import (
CorrelationIDGenerator,
get_logger_manager,
get_performance_logger,
get_structured_logger,
)
class DebugProfiler:
"""Comprehensive debug profiler for performance analysis."""
def __init__(self):
self.logger = get_structured_logger("maverick_mcp.debug")
self.performance_logger = get_performance_logger("debug_profiler")
self._profiles: dict[str, dict[str, Any]] = {}
def start_profile(self, profile_name: str, **context):
"""Start a debug profiling session."""
profile_id = f"{profile_name}_{int(time.time() * 1000)}"
profile_data = {
"profile_name": profile_name,
"profile_id": profile_id,
"start_time": time.time(),
"start_memory": self._get_memory_usage(),
"start_cpu": self._get_cpu_usage(),
"context": context,
"checkpoints": [],
}
self._profiles[profile_id] = profile_data
self.logger.debug(
f"Started debug profile: {profile_name}",
extra={
"profile_id": profile_id,
"start_memory_mb": profile_data["start_memory"],
"start_cpu_percent": profile_data["start_cpu"],
**context,
},
)
return profile_id
def checkpoint(self, profile_id: str, checkpoint_name: str, **data):
"""Add a checkpoint to an active profile."""
if profile_id not in self._profiles:
self.logger.warning(f"Profile {profile_id} not found for checkpoint")
return
profile = self._profiles[profile_id]
current_time = time.time()
elapsed_ms = (current_time - profile["start_time"]) * 1000
checkpoint_data = {
"name": checkpoint_name,
"timestamp": current_time,
"elapsed_ms": elapsed_ms,
"memory_mb": self._get_memory_usage(),
"cpu_percent": self._get_cpu_usage(),
"data": data,
}
profile["checkpoints"].append(checkpoint_data)
self.logger.debug(
f"Profile checkpoint: {checkpoint_name} at {elapsed_ms:.2f}ms",
extra={
"profile_id": profile_id,
"checkpoint": checkpoint_name,
"elapsed_ms": elapsed_ms,
"memory_mb": checkpoint_data["memory_mb"],
**data,
},
)
def end_profile(
self, profile_id: str, success: bool = True, **final_data
) -> dict[str, Any]:
"""End a debug profiling session and return comprehensive results."""
if profile_id not in self._profiles:
self.logger.warning(f"Profile {profile_id} not found for ending")
return {}
profile = self._profiles.pop(profile_id)
end_time = time.time()
total_duration_ms = (end_time - profile["start_time"]) * 1000
# Calculate memory and CPU deltas
end_memory = self._get_memory_usage()
end_cpu = self._get_cpu_usage()
memory_delta = end_memory - profile["start_memory"]
results = {
"profile_name": profile["profile_name"],
"profile_id": profile_id,
"success": success,
"total_duration_ms": total_duration_ms,
"start_time": profile["start_time"],
"end_time": end_time,
"memory_stats": {
"start_mb": profile["start_memory"],
"end_mb": end_memory,
"delta_mb": memory_delta,
"peak_usage": max(cp["memory_mb"] for cp in profile["checkpoints"])
if profile["checkpoints"]
else end_memory,
},
"cpu_stats": {
"start_percent": profile["start_cpu"],
"end_percent": end_cpu,
"avg_percent": sum(cp["cpu_percent"] for cp in profile["checkpoints"])
/ len(profile["checkpoints"])
if profile["checkpoints"]
else end_cpu,
},
"checkpoints": profile["checkpoints"],
"checkpoint_count": len(profile["checkpoints"]),
"context": profile["context"],
"final_data": final_data,
}
# Log profile completion
log_level = "info" if success else "error"
getattr(self.logger, log_level)(
f"Completed debug profile: {profile['profile_name']} in {total_duration_ms:.2f}ms",
extra={
"profile_results": results,
"performance_summary": {
"duration_ms": total_duration_ms,
"memory_delta_mb": memory_delta,
"checkpoint_count": len(profile["checkpoints"]),
"success": success,
},
},
)
return results
@staticmethod
def _get_memory_usage() -> float:
"""Get current memory usage in MB."""
try:
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024
except (psutil.NoSuchProcess, psutil.AccessDenied):
return 0.0
@staticmethod
def _get_cpu_usage() -> float:
"""Get current CPU usage percentage."""
try:
process = psutil.Process()
return process.cpu_percent(interval=None)
except (psutil.NoSuchProcess, psutil.AccessDenied):
return 0.0
class RequestResponseLogger:
"""Detailed request/response logging for debugging."""
def __init__(self, max_payload_size: int = 5000):
self.logger = get_structured_logger("maverick_mcp.requests")
self.max_payload_size = max_payload_size
def log_request(self, operation: str, **request_data):
"""Log detailed request information."""
correlation_id = CorrelationIDGenerator.get_correlation_id()
# Sanitize and truncate request data
sanitized_data = self._sanitize_data(request_data)
truncated_data = self._truncate_data(sanitized_data)
self.logger.info(
f"Request: {operation}",
extra={
"operation": operation,
"correlation_id": correlation_id,
"request_data": truncated_data,
"request_size": len(json.dumps(request_data, default=str)),
"timestamp": datetime.now(UTC).isoformat(),
},
)
def log_response(
self, operation: str, success: bool, duration_ms: float, **response_data
):
"""Log detailed response information."""
correlation_id = CorrelationIDGenerator.get_correlation_id()
# Sanitize and truncate response data
sanitized_data = self._sanitize_data(response_data)
truncated_data = self._truncate_data(sanitized_data)
log_method = self.logger.info if success else self.logger.error
log_method(
f"Response: {operation} ({'success' if success else 'failure'}) in {duration_ms:.2f}ms",
extra={
"operation": operation,
"correlation_id": correlation_id,
"success": success,
"duration_ms": duration_ms,
"response_data": truncated_data,
"response_size": len(json.dumps(response_data, default=str)),
"timestamp": datetime.now(UTC).isoformat(),
},
)
def _sanitize_data(self, data: Any) -> Any:
"""Remove sensitive information from data."""
if isinstance(data, dict):
sanitized = {}
for key, value in data.items():
if any(
sensitive in key.lower()
for sensitive in ["password", "token", "key", "secret"]
):
sanitized[key] = "***REDACTED***"
else:
sanitized[key] = self._sanitize_data(value)
return sanitized
elif isinstance(data, list | tuple):
return [self._sanitize_data(item) for item in data]
else:
return data
def _truncate_data(self, data: Any) -> Any:
"""Truncate data to prevent log overflow."""
data_str = json.dumps(data, default=str)
if len(data_str) > self.max_payload_size:
truncated = data_str[: self.max_payload_size]
return f"{truncated}... (truncated, original size: {len(data_str)})"
return data
class ErrorTracker:
"""Comprehensive error tracking and analysis."""
def __init__(self):
self.logger = get_structured_logger("maverick_mcp.errors")
self._error_stats: dict[str, dict[str, Any]] = {}
def track_error(
self,
error: Exception,
operation: str,
context: dict[str, Any],
severity: str = "error",
):
"""Track error with detailed context and statistics."""
error_type = type(error).__name__
error_key = f"{operation}_{error_type}"
# Update error statistics
if error_key not in self._error_stats:
self._error_stats[error_key] = {
"first_seen": datetime.now(UTC),
"last_seen": datetime.now(UTC),
"count": 0,
"operation": operation,
"error_type": error_type,
"contexts": [],
}
stats = self._error_stats[error_key]
stats["last_seen"] = datetime.now(UTC)
stats["count"] += 1
# Keep only recent contexts (last 10)
stats["contexts"].append(
{
"timestamp": datetime.now(UTC).isoformat(),
"context": context,
"error_message": str(error),
}
)
stats["contexts"] = stats["contexts"][-10:] # Keep only last 10
# Get stack trace
stack_trace = traceback.format_exception(
type(error), error, error.__traceback__
)
# Log the error
correlation_id = CorrelationIDGenerator.get_correlation_id()
log_data = {
"operation": operation,
"correlation_id": correlation_id,
"error_type": error_type,
"error_message": str(error),
"error_count": stats["count"],
"first_seen": stats["first_seen"].isoformat(),
"last_seen": stats["last_seen"].isoformat(),
"context": context,
"stack_trace": stack_trace,
"severity": severity,
}
if severity == "critical":
self.logger.critical(
f"Critical error in {operation}: {error}", extra=log_data
)
elif severity == "error":
self.logger.error(f"Error in {operation}: {error}", extra=log_data)
elif severity == "warning":
self.logger.warning(f"Warning in {operation}: {error}", extra=log_data)
def get_error_summary(self) -> dict[str, Any]:
"""Get comprehensive error statistics summary."""
if not self._error_stats:
return {"message": "No errors tracked"}
summary = {
"total_error_types": len(self._error_stats),
"total_errors": sum(stats["count"] for stats in self._error_stats.values()),
"error_breakdown": {},
"most_common_errors": [],
"recent_errors": [],
}
# Error breakdown by type
for _error_key, stats in self._error_stats.items():
summary["error_breakdown"][stats["error_type"]] = (
summary["error_breakdown"].get(stats["error_type"], 0) + stats["count"]
)
# Most common errors
sorted_errors = sorted(
self._error_stats.items(), key=lambda x: x[1]["count"], reverse=True
)
summary["most_common_errors"] = [
{
"operation": stats["operation"],
"error_type": stats["error_type"],
"count": stats["count"],
"first_seen": stats["first_seen"].isoformat(),
"last_seen": stats["last_seen"].isoformat(),
}
for _, stats in sorted_errors[:10]
]
# Recent errors
all_contexts = []
for stats in self._error_stats.values():
for context in stats["contexts"]:
all_contexts.append(
{
"operation": stats["operation"],
"error_type": stats["error_type"],
**context,
}
)
summary["recent_errors"] = sorted(
all_contexts, key=lambda x: x["timestamp"], reverse=True
)[:20]
return summary
class DebugContextManager:
"""Context manager for debug sessions with automatic cleanup."""
def __init__(
self,
operation_name: str,
enable_profiling: bool = True,
enable_request_logging: bool = True,
enable_error_tracking: bool = True,
**context,
):
self.operation_name = operation_name
self.enable_profiling = enable_profiling
self.enable_request_logging = enable_request_logging
self.enable_error_tracking = enable_error_tracking
self.context = context
# Initialize components
self.profiler = DebugProfiler() if enable_profiling else None
self.request_logger = (
RequestResponseLogger() if enable_request_logging else None
)
self.error_tracker = ErrorTracker() if enable_error_tracking else None
self.profile_id = None
self.start_time = None
def __enter__(self):
"""Enter debug context."""
self.start_time = time.time()
# Set correlation ID if not present
if not CorrelationIDGenerator.get_correlation_id():
CorrelationIDGenerator.set_correlation_id()
# Start profiling
if self.profiler:
self.profile_id = self.profiler.start_profile(
self.operation_name, **self.context
)
# Log request
if self.request_logger:
self.request_logger.log_request(self.operation_name, **self.context)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit debug context with cleanup."""
duration_ms = (time.time() - self.start_time) * 1000 if self.start_time else 0
success = exc_type is None
# Track error if occurred
if not success and self.error_tracker and exc_val:
self.error_tracker.track_error(
exc_val, self.operation_name, self.context, severity="error"
)
# End profiling
if self.profiler and self.profile_id:
self.profiler.end_profile(
self.profile_id,
success=success,
exception_type=exc_type.__name__ if exc_type else None,
)
# Log response
if self.request_logger:
response_data = {"exception": str(exc_val)} if exc_val else {}
self.request_logger.log_response(
self.operation_name, success, duration_ms, **response_data
)
def checkpoint(self, name: str, **data):
"""Add a checkpoint during debug session."""
if self.profiler and self.profile_id:
self.profiler.checkpoint(self.profile_id, name, **data)
# Decorator for automatic debug wrapping
def debug_operation(
operation_name: str | None = None,
enable_profiling: bool = True,
enable_request_logging: bool = True,
enable_error_tracking: bool = True,
**default_context,
):
"""Decorator to automatically wrap operations with debug context."""
def decorator(func: Callable) -> Callable:
actual_operation_name = operation_name or func.__name__
@wraps(func)
async def async_wrapper(*args, **kwargs):
# Extract additional context from function signature
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
context = {**default_context}
# Add non-sensitive parameters to context
for param_name, param_value in bound_args.arguments.items():
if not any(
sensitive in param_name.lower()
for sensitive in ["password", "token", "key", "secret"]
):
if (
isinstance(param_value, str | int | float | bool)
or param_value is None
):
context[param_name] = param_value
with DebugContextManager(
actual_operation_name,
enable_profiling,
enable_request_logging,
enable_error_tracking,
**context,
) as debug_ctx:
result = await func(*args, **kwargs)
debug_ctx.checkpoint(
"function_completed", result_type=type(result).__name__
)
return result
@wraps(func)
def sync_wrapper(*args, **kwargs):
# Similar logic for sync functions
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
context = {**default_context}
for param_name, param_value in bound_args.arguments.items():
if not any(
sensitive in param_name.lower()
for sensitive in ["password", "token", "key", "secret"]
):
if (
isinstance(param_value, str | int | float | bool)
or param_value is None
):
context[param_name] = param_value
with DebugContextManager(
actual_operation_name,
enable_profiling,
enable_request_logging,
enable_error_tracking,
**context,
) as debug_ctx:
result = func(*args, **kwargs)
debug_ctx.checkpoint(
"function_completed", result_type=type(result).__name__
)
return result
return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper
return decorator
@contextmanager
def debug_session(
session_name: str, **context
) -> Generator[DebugContextManager, None, None]:
"""Context manager for manual debug sessions."""
with DebugContextManager(session_name, **context) as debug_ctx:
yield debug_ctx
# Global debug utilities
_debug_profiler = DebugProfiler()
_error_tracker = ErrorTracker()
def get_debug_profiler() -> DebugProfiler:
"""Get global debug profiler instance."""
return _debug_profiler
def get_error_tracker() -> ErrorTracker:
"""Get global error tracker instance."""
return _error_tracker
def print_debug_summary():
"""Print comprehensive debug summary to console."""
print("\n" + "=" * 80)
print("MAVERICK MCP DEBUG SUMMARY")
print("=" * 80)
# Performance metrics
print("\n📊 PERFORMANCE METRICS")
print("-" * 40)
try:
manager = get_logger_manager()
dashboard_data = manager.create_dashboard_metrics()
print(
f"Log Level Counts: {dashboard_data.get('system_metrics', {}).get('log_level_counts', {})}"
)
print(
f"Active Correlation IDs: {dashboard_data.get('system_metrics', {}).get('active_correlation_ids', 0)}"
)
if "memory_stats" in dashboard_data:
memory_stats = dashboard_data["memory_stats"]
print(
f"Memory Usage: {memory_stats.get('rss_mb', 0):.1f}MB RSS, {memory_stats.get('cpu_percent', 0):.1f}% CPU"
)
except Exception as e:
print(f"Error getting performance metrics: {e}")
# Error summary
print("\n🚨 ERROR SUMMARY")
print("-" * 40)
try:
error_summary = _error_tracker.get_error_summary()
if "message" in error_summary:
print(error_summary["message"])
else:
print(f"Total Error Types: {error_summary['total_error_types']}")
print(f"Total Errors: {error_summary['total_errors']}")
if error_summary["most_common_errors"]:
print("\nMost Common Errors:")
for error in error_summary["most_common_errors"][:5]:
print(
f" {error['error_type']} in {error['operation']}: {error['count']} times"
)
except Exception as e:
print(f"Error getting error summary: {e}")
print("\n" + "=" * 80)
def enable_debug_mode():
"""Enable comprehensive debug mode."""
import os
os.environ["MAVERICK_DEBUG"] = "true"
print("🐛 Debug mode enabled")
print(" - Verbose logging activated")
print(" - Request/response logging enabled")
print(" - Performance profiling enabled")
print(" - Error tracking enhanced")
def disable_debug_mode():
"""Disable debug mode."""
import os
if "MAVERICK_DEBUG" in os.environ:
del os.environ["MAVERICK_DEBUG"]
print("🐛 Debug mode disabled")
```
--------------------------------------------------------------------------------
/maverick_mcp/agents/market_analysis.py:
--------------------------------------------------------------------------------
```python
"""
Market Analysis Agent using LangGraph best practices with professional features.
"""
import hashlib
import logging
from datetime import datetime
from typing import Any
from langchain_core.messages import HumanMessage
from langchain_core.tools import BaseTool
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraph
from maverick_mcp.agents.circuit_breaker import circuit_manager
from maverick_mcp.config.settings import get_settings
from maverick_mcp.exceptions import (
AgentInitializationError,
PersonaConfigurationError,
ToolRegistrationError,
)
from maverick_mcp.langchain_tools import get_tool_registry
from maverick_mcp.memory import ConversationStore
from maverick_mcp.tools.risk_management import (
PositionSizeTool,
RiskMetricsTool,
TechnicalStopsTool,
)
from maverick_mcp.tools.sentiment_analysis import (
MarketBreadthTool,
NewsSentimentTool,
SectorSentimentTool,
)
from maverick_mcp.workflows.state import MarketAnalysisState
from .base import PersonaAwareAgent
logger = logging.getLogger(__name__)
settings = get_settings()
class MarketAnalysisAgent(PersonaAwareAgent):
"""
Professional market analysis agent with advanced screening and risk assessment.
Features:
- Multi-strategy screening (momentum, mean reversion, breakout)
- Sector rotation analysis
- Market regime detection
- Risk-adjusted recommendations
- Real-time sentiment integration
- Circuit breaker protection for API calls
"""
VALID_PERSONAS = ["conservative", "moderate", "aggressive", "day_trader"]
def __init__(
self,
llm,
persona: str = "moderate",
ttl_hours: int | None = None,
):
"""
Initialize market analysis agent.
Args:
llm: Language model
persona: Investor persona
ttl_hours: Cache TTL in hours (uses config default if None)
postgres_url: Optional PostgreSQL URL for checkpointing
Raises:
PersonaConfigurationError: If persona is invalid
AgentInitializationError: If initialization fails
"""
try:
# Validate persona
if persona.lower() not in self.VALID_PERSONAS:
raise PersonaConfigurationError(
persona=persona, valid_personas=self.VALID_PERSONAS
)
# Store persona temporarily for tool configuration
self._temp_persona = persona.lower()
# Get comprehensive tool set
tools = self._get_comprehensive_tools()
if not tools:
raise AgentInitializationError(
agent_type="MarketAnalysisAgent",
reason="No tools available for initialization",
)
# Use default TTL from config if not provided
if ttl_hours is None:
ttl_hours = settings.agent.conversation_cache_ttl_hours
# Initialize with MemorySaver
super().__init__(
llm=llm,
tools=tools,
persona=persona,
checkpointer=MemorySaver(),
ttl_hours=ttl_hours,
)
except (PersonaConfigurationError, AgentInitializationError):
raise
except Exception as e:
logger.error(f"Failed to initialize MarketAnalysisAgent: {str(e)}")
error = AgentInitializationError(
agent_type="MarketAnalysisAgent",
reason=str(e),
)
error.context["original_error"] = type(e).__name__
raise error
# Initialize conversation store
self.conversation_store = ConversationStore(ttl_hours=ttl_hours)
# Circuit breakers for external APIs
self.circuit_breakers = {
"screening": None,
"sentiment": None,
"market_data": None,
}
def _get_comprehensive_tools(self) -> list[BaseTool]:
"""Get comprehensive set of market analysis tools.
Returns:
List of configured tools
Raises:
ToolRegistrationError: If critical tools cannot be loaded
"""
try:
registry = get_tool_registry()
except Exception as e:
logger.error(f"Failed to get tool registry: {str(e)}")
raise ToolRegistrationError(tool_name="registry", reason=str(e))
# Core screening tools
screening_tools = [
registry.get_tool("get_maverick_stocks"),
registry.get_tool("get_maverick_bear_stocks"),
registry.get_tool("get_supply_demand_breakouts"),
registry.get_tool("get_all_screening_recommendations"),
]
# Technical analysis tools
technical_tools = [
registry.get_tool("get_technical_indicators"),
registry.get_tool("calculate_support_resistance"),
registry.get_tool("detect_chart_patterns"),
]
# Market data tools
market_tools = [
registry.get_tool("get_market_movers"),
registry.get_tool("get_sector_performance"),
registry.get_tool("get_market_indices"),
]
# Risk management tools (persona-aware)
risk_tools = [
PositionSizeTool(),
TechnicalStopsTool(),
RiskMetricsTool(),
]
# Sentiment analysis tools
sentiment_tools = [
NewsSentimentTool(),
MarketBreadthTool(),
SectorSentimentTool(),
]
# Combine all tools and filter None
all_tools = (
screening_tools
+ technical_tools
+ market_tools
+ risk_tools
+ sentiment_tools
)
tools = [t for t in all_tools if t is not None]
# Configure persona for PersonaAwareTools
for tool in tools:
try:
if hasattr(tool, "set_persona"):
tool.set_persona(self._temp_persona)
except Exception as e:
logger.warning(
f"Failed to set persona for tool {tool.__class__.__name__}: {str(e)}"
)
# Continue with other tools
if not tools:
logger.warning("No tools available for market analysis")
return []
logger.info(f"Loaded {len(tools)} tools for {self._temp_persona} persona")
return tools
def get_state_schema(self) -> type:
"""Return enhanced state schema for market analysis."""
return MarketAnalysisState
def _build_system_prompt(self) -> str:
"""Build comprehensive system prompt for professional market analysis."""
base_prompt = super()._build_system_prompt()
market_prompt = f"""
You are a professional market analyst specializing in systematic screening and analysis.
Current market date: {datetime.now().strftime("%Y-%m-%d")}
## Core Responsibilities:
1. **Multi-Strategy Screening**:
- Momentum: High RS stocks breaking out on volume
- Mean Reversion: Oversold quality stocks at support
- Breakout: Stocks clearing resistance with volume surge
- Trend Following: Stocks in established uptrends
2. **Market Regime Analysis**:
- Identify current market regime (bull/bear/sideways)
- Analyze sector rotation patterns
- Monitor breadth indicators and sentiment
- Detect risk-on vs risk-off environments
3. **Risk-Adjusted Selection**:
- Filter stocks by persona risk tolerance
- Calculate position sizes using Kelly Criterion
- Set appropriate stop losses using ATR
- Consider correlation and portfolio heat
4. **Professional Reporting**:
- Provide actionable entry/exit levels
- Include risk/reward ratios
- Highlight key catalysts and risks
- Suggest portfolio allocation
## Screening Criteria by Persona:
**Conservative ({self.persona.name if self.persona.name == "Conservative" else "N/A"})**:
- Large-cap stocks (>$10B market cap)
- Dividend yield > 2%
- Low debt/equity < 1.5
- Beta < 1.2
- Established uptrends only
**Moderate ({self.persona.name if self.persona.name == "Moderate" else "N/A"})**:
- Mid to large-cap stocks (>$2B)
- Balanced growth/value metrics
- Moderate volatility accepted
- Mix of dividend and growth stocks
**Aggressive ({self.persona.name if self.persona.name == "Aggressive" else "N/A"})**:
- All market caps considered
- High growth rates prioritized
- Momentum and relative strength focus
- Higher volatility tolerated
**Day Trader ({self.persona.name if self.persona.name == "Day Trader" else "N/A"})**:
- High liquidity (>1M avg volume)
- Tight spreads (<0.1%)
- High ATR for movement
- Technical patterns emphasized
## Analysis Framework:
1. Start with market regime assessment
2. Identify leading/lagging sectors
3. Screen for stocks matching criteria
4. Apply technical analysis filters
5. Calculate risk metrics
6. Generate recommendations with specific levels
Remember to:
- Cite specific data points
- Explain your reasoning
- Highlight risks clearly
- Provide actionable insights
- Consider time horizon
"""
return base_prompt + market_prompt
def _build_graph(self):
"""Build enhanced graph with multiple analysis nodes."""
workflow = StateGraph(MarketAnalysisState)
# Add specialized nodes with unique names
workflow.add_node("analyze_market_regime", self._analyze_market_regime)
workflow.add_node("analyze_sectors", self._analyze_sectors)
workflow.add_node("run_screening", self._run_screening)
workflow.add_node("assess_risks", self._assess_risks)
workflow.add_node("agent", self._agent_node)
# Create tool node if tools available
if self.tools:
from langgraph.prebuilt import ToolNode
tool_node = ToolNode(self.tools)
workflow.add_node("tools", tool_node)
# Define flow
workflow.add_edge(START, "analyze_market_regime")
workflow.add_edge("analyze_market_regime", "analyze_sectors")
workflow.add_edge("analyze_sectors", "run_screening")
workflow.add_edge("run_screening", "assess_risks")
workflow.add_edge("assess_risks", "agent")
if self.tools:
workflow.add_conditional_edges(
"agent",
self._should_continue,
{
"continue": "tools",
"end": END,
},
)
workflow.add_edge("tools", "agent")
else:
workflow.add_edge("agent", END)
return workflow.compile(checkpointer=self.checkpointer)
async def _analyze_market_regime(
self, state: MarketAnalysisState
) -> dict[str, Any]:
"""Analyze current market regime."""
try:
# Use market breadth tool
breadth_tool = next(
(t for t in self.tools if t.name == "analyze_market_breadth"), None
)
if breadth_tool:
circuit_breaker = await circuit_manager.get_or_create("market_data")
async def get_breadth():
return await breadth_tool.ainvoke({"index": "SPY"})
breadth_data = await circuit_breaker.call(get_breadth)
# Parse results to determine regime
# Handle both string and dict responses
if isinstance(breadth_data, str):
# Try to extract sentiment from string response
if "Bullish" in breadth_data:
state["market_regime"] = "bullish"
elif "Bearish" in breadth_data:
state["market_regime"] = "bearish"
else:
state["market_regime"] = "neutral"
elif (
isinstance(breadth_data, dict) and "market_breadth" in breadth_data
):
sentiment = breadth_data["market_breadth"].get(
"sentiment", "Neutral"
)
state["market_regime"] = sentiment.lower()
else:
state["market_regime"] = "neutral"
else:
state["market_regime"] = "neutral"
except Exception as e:
logger.error(f"Error analyzing market regime: {e}")
state["market_regime"] = "unknown"
state["api_calls_made"] = state.get("api_calls_made", 0) + 1
return {"market_regime": state.get("market_regime", "neutral")}
async def _analyze_sectors(self, state: MarketAnalysisState) -> dict[str, Any]:
"""Analyze sector rotation patterns."""
try:
# Use sector sentiment tool
sector_tool = next(
(t for t in self.tools if t.name == "analyze_sector_sentiment"), None
)
if sector_tool:
circuit_breaker = await circuit_manager.get_or_create("market_data")
async def get_sectors():
return await sector_tool.ainvoke({})
sector_data = await circuit_breaker.call(get_sectors)
if "sector_rotation" in sector_data:
state["sector_rotation"] = sector_data["sector_rotation"]
# Extract leading sectors
leading = sector_data["sector_rotation"].get("leading_sectors", [])
if leading and state.get("sector_filter"):
# Prioritize screening in leading sectors
state["sector_filter"] = leading[0].get("name", "")
except Exception as e:
logger.error(f"Error analyzing sectors: {e}")
state["api_calls_made"] = state.get("api_calls_made", 0) + 1
return {"sector_rotation": state.get("sector_rotation", {})}
async def _run_screening(self, state: MarketAnalysisState) -> dict[str, Any]:
"""Run multi-strategy screening."""
try:
# Determine which screening strategy based on market regime
strategy = state.get("screening_strategy", "momentum")
# Adjust strategy based on regime
if state.get("market_regime") == "bearish" and strategy == "momentum":
strategy = "mean_reversion"
# Get appropriate screening tool
tool_map = {
"momentum": "get_maverick_stocks",
"supply_demand_breakout": "get_supply_demand_breakouts",
"bearish": "get_maverick_bear_stocks",
}
tool_name = tool_map.get(strategy, "get_maverick_stocks")
screening_tool = next((t for t in self.tools if t.name == tool_name), None)
if screening_tool:
circuit_breaker = await circuit_manager.get_or_create("screening")
async def run_screen():
return await screening_tool.ainvoke(
{"limit": state.get("max_results", 20)}
)
results = await circuit_breaker.call(run_screen)
if "stocks" in results:
symbols = [s.get("symbol") for s in results["stocks"]]
scores = {
s.get("symbol"): s.get("combined_score", 0)
for s in results["stocks"]
}
state["screened_symbols"] = symbols
state["screening_scores"] = scores
state["cache_hits"] += 1
except Exception as e:
logger.error(f"Error running screening: {e}")
state["cache_misses"] += 1
state["api_calls_made"] = state.get("api_calls_made", 0) + 1
return {
"screened_symbols": state.get("screened_symbols", []),
"screening_scores": state.get("screening_scores", {}),
}
async def _assess_risks(self, state: MarketAnalysisState) -> dict[str, Any]:
"""Assess risks for screened symbols."""
symbols = state.get("screened_symbols", [])[:5] # Top 5 only
if not symbols:
return {}
try:
# Get risk metrics tool
risk_tool = next(
(t for t in self.tools if isinstance(t, RiskMetricsTool)), None
)
if risk_tool and len(symbols) > 1:
# Calculate portfolio risk metrics
risk_data = await risk_tool.ainvoke(
{"symbols": symbols, "lookback_days": 252}
)
# Store risk assessment
state["conversation_context"]["risk_metrics"] = risk_data
except Exception as e:
logger.error(f"Error assessing risks: {e}")
return {}
async def analyze_market(
self,
query: str,
session_id: str,
screening_strategy: str = "momentum",
max_results: int = 20,
**kwargs,
) -> dict[str, Any]:
"""
Analyze market with specific screening parameters.
Enhanced with caching, circuit breakers, and comprehensive analysis.
"""
start_time = datetime.now()
# Check cache first
cached = self._check_enhanced_cache(query, session_id, screening_strategy)
if cached:
return cached
# Prepare initial state
initial_state = {
"messages": [HumanMessage(content=query)],
"persona": self.persona.name,
"session_id": session_id,
"screening_strategy": screening_strategy,
"max_results": max_results,
"timestamp": datetime.now(),
"token_count": 0,
"error": None,
"analyzed_stocks": {},
"key_price_levels": {},
"last_analysis_time": {},
"conversation_context": {},
"execution_time_ms": None,
"api_calls_made": 0,
"cache_hits": 0,
"cache_misses": 0,
}
# Update with any additional parameters
initial_state.update(kwargs)
# Run the analysis
result = await self.ainvoke(query, session_id, initial_state=initial_state)
# Calculate execution time
execution_time = (datetime.now() - start_time).total_seconds() * 1000
result["execution_time_ms"] = execution_time
# Extract and cache results
analysis_results = self._extract_enhanced_results(result)
# Create same cache key as used in _check_enhanced_cache
query_hash = hashlib.md5(query.lower().encode()).hexdigest()[:8]
cache_key = f"{screening_strategy}_{query_hash}"
self.conversation_store.save_analysis(
session_id=session_id,
symbol=cache_key,
analysis_type=f"{screening_strategy}_analysis",
data=analysis_results,
)
return analysis_results
def _check_enhanced_cache(
self, query: str, session_id: str, strategy: str
) -> dict[str, Any] | None:
"""Check for cached analysis with strategy awareness."""
# Create a hash of the query to use as cache key
query_hash = hashlib.md5(query.lower().encode()).hexdigest()[:8]
cache_key = f"{strategy}_{query_hash}"
cached = self.conversation_store.get_analysis(
session_id=session_id,
symbol=cache_key,
analysis_type=f"{strategy}_analysis",
)
if cached and cached.get("data"):
# Check cache age based on strategy
timestamp = datetime.fromisoformat(cached["timestamp"])
age_minutes = (datetime.now() - timestamp).total_seconds() / 60
# Different cache durations for different strategies
cache_durations = {
"momentum": 15, # 15 minutes for fast-moving
"trending": 60, # 1 hour for trend following
"mean_reversion": 30, # 30 minutes
}
max_age = cache_durations.get(strategy, 30)
if age_minutes < max_age:
logger.info(f"Using cached {strategy} analysis")
return cached["data"] # type: ignore
return None
def _extract_enhanced_results(self, result: dict[str, Any]) -> dict[str, Any]:
"""Extract comprehensive results from agent output."""
state = result.get("state", {})
# Get final message content
messages = result.get("messages", [])
content = messages[-1].content if messages else ""
return {
"status": "success",
"timestamp": datetime.now().isoformat(),
"query_type": "professional_market_analysis",
"execution_metrics": {
"execution_time_ms": result.get("execution_time_ms", 0),
"api_calls": state.get("api_calls_made", 0),
"cache_hits": state.get("cache_hits", 0),
"cache_misses": state.get("cache_misses", 0),
},
"market_analysis": {
"regime": state.get("market_regime", "unknown"),
"sector_rotation": state.get("sector_rotation", {}),
"breadth": state.get("market_breadth", {}),
"sentiment": state.get("sentiment_indicators", {}),
},
"screening_results": {
"strategy": state.get("screening_strategy", "momentum"),
"symbols": state.get("screened_symbols", [])[:20],
"scores": state.get("screening_scores", {}),
"count": len(state.get("screened_symbols", [])),
"metadata": state.get("symbol_metadata", {}),
},
"risk_assessment": state.get("conversation_context", {}).get(
"risk_metrics", {}
),
"recommendations": {
"summary": content,
"persona_adjusted": True,
"risk_level": self.persona.name,
"position_sizing": f"Max {self.persona.position_size_max * 100:.1f}% per position",
},
}
```
--------------------------------------------------------------------------------
/maverick_mcp/tests/test_stock_data_enhanced.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive tests for the enhanced stock data provider with SQLAlchemy integration.
"""
from datetime import datetime, timedelta
from decimal import Decimal
from unittest.mock import MagicMock, patch
import pandas as pd
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from maverick_mcp.data.models import (
Base,
MaverickBearStocks,
MaverickStocks,
PriceCache,
Stock,
SupplyDemandBreakoutStocks,
)
from maverick_mcp.providers.stock_data import EnhancedStockDataProvider
@pytest.fixture(scope="module")
def test_db():
"""Create a test database for the tests."""
# Use in-memory SQLite for tests
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
yield engine
engine.dispose()
@pytest.fixture
def db_session(test_db):
"""Create a database session for each test."""
# Clear all data before each test
Base.metadata.drop_all(bind=test_db)
Base.metadata.create_all(bind=test_db)
SessionLocal = sessionmaker(bind=test_db)
session = SessionLocal()
yield session
session.rollback()
session.close()
@pytest.fixture
def provider():
"""Create an instance of the enhanced provider."""
return EnhancedStockDataProvider()
@pytest.fixture
def sample_stock(db_session):
"""Create a sample stock in the database."""
stock = Stock(
ticker_symbol="AAPL",
company_name="Apple Inc.",
sector="Technology",
industry="Consumer Electronics",
exchange="NASDAQ",
currency="USD",
)
db_session.add(stock)
db_session.commit()
return stock
@pytest.fixture
def sample_price_data(db_session, sample_stock):
"""Create sample price data in the database."""
prices = []
base_date = datetime(2024, 1, 1).date()
for i in range(5):
price = PriceCache(
stock_id=sample_stock.stock_id,
date=base_date + timedelta(days=i),
open_price=Decimal(f"{150 + i}.00"),
high_price=Decimal(f"{155 + i}.00"),
low_price=Decimal(f"{149 + i}.00"),
close_price=Decimal(f"{152 + i}.00"),
volume=1000000 + i * 10000,
)
prices.append(price)
db_session.add_all(prices)
db_session.commit()
return prices
@pytest.fixture
def mock_yfinance():
"""Mock yfinance responses."""
with patch("maverick_mcp.providers.stock_data.yf") as mock_yf:
# Mock ticker
mock_ticker = MagicMock()
mock_yf.Ticker.return_value = mock_ticker
# Mock history data
dates = pd.date_range("2024-01-01", periods=5, freq="D")
mock_df = pd.DataFrame(
{
"Open": [150.0, 151.0, 152.0, 153.0, 154.0],
"High": [155.0, 156.0, 157.0, 158.0, 159.0],
"Low": [149.0, 150.0, 151.0, 152.0, 153.0],
"Close": [152.0, 153.0, 154.0, 155.0, 156.0],
"Volume": [1000000, 1010000, 1020000, 1030000, 1040000],
},
index=dates,
)
mock_ticker.history.return_value = mock_df
# Mock info
mock_ticker.info = {
"longName": "Apple Inc.",
"sector": "Technology",
"industry": "Consumer Electronics",
"exchange": "NASDAQ",
"currency": "USD",
"country": "United States",
"previousClose": 151.0,
"quoteType": "EQUITY",
}
# Mock other attributes
mock_ticker.news = []
mock_ticker.earnings = pd.DataFrame()
mock_ticker.earnings_dates = pd.DataFrame()
mock_ticker.earnings_trend = {}
mock_ticker.recommendations = pd.DataFrame()
yield mock_yf
class TestEnhancedStockDataProvider:
"""Test the enhanced stock data provider."""
def test_singleton_pattern(self):
"""Test that provider follows singleton pattern."""
provider1 = EnhancedStockDataProvider()
provider2 = EnhancedStockDataProvider()
assert provider1 is provider2
def test_get_db_session(self, provider, monkeypatch):
"""Test database session retrieval."""
mock_session = MagicMock()
mock_get_db = MagicMock(return_value=iter([mock_session]))
monkeypatch.setattr("maverick_mcp.providers.stock_data.get_db", mock_get_db)
session = provider._get_db_session()
assert session == mock_session
def test_get_or_create_stock_existing(self, provider, db_session, sample_stock):
"""Test getting an existing stock."""
stock = provider._get_or_create_stock(db_session, "AAPL")
assert stock.stock_id == sample_stock.stock_id
assert stock.ticker_symbol == "AAPL"
def test_get_or_create_stock_new(self, provider, db_session, mock_yfinance):
"""Test creating a new stock."""
stock = provider._get_or_create_stock(db_session, "GOOGL")
assert stock.ticker_symbol == "GOOGL"
assert stock.company_name == "Apple Inc." # From mock
assert stock.sector == "Technology"
# Verify it was saved
found = db_session.query(Stock).filter_by(ticker_symbol="GOOGL").first()
assert found is not None
def test_get_cached_price_data(
self, provider, db_session, sample_stock, sample_price_data, monkeypatch
):
"""Test retrieving cached price data."""
# Mock the get_db function to return our test session
def mock_get_db():
yield db_session
monkeypatch.setattr("maverick_mcp.providers.stock_data.get_db", mock_get_db)
df = provider._get_cached_price_data(
db_session, "AAPL", "2024-01-01", "2024-01-05"
)
assert not df.empty
assert len(df) == 5
assert df.index[0] == pd.Timestamp("2024-01-01")
assert df["Close"].iloc[0] == 152.0
def test_get_cached_price_data_partial_range(
self, provider, db_session, sample_stock, sample_price_data
):
"""Test retrieving cached data for partial range."""
df = provider._get_cached_price_data(
db_session, "AAPL", "2024-01-02", "2024-01-04"
)
assert not df.empty
assert len(df) == 3
assert df.index[0] == pd.Timestamp("2024-01-02")
assert df.index[-1] == pd.Timestamp("2024-01-04")
def test_get_cached_price_data_no_data(self, provider, db_session):
"""Test retrieving cached data when none exists."""
df = provider._get_cached_price_data(
db_session, "TSLA", "2024-01-01", "2024-01-05"
)
assert df is None
def test_cache_price_data(self, provider, db_session, sample_stock):
"""Test caching price data."""
# Create test DataFrame
dates = pd.date_range("2024-02-01", periods=3, freq="D")
df = pd.DataFrame(
{
"Open": [160.0, 161.0, 162.0],
"High": [165.0, 166.0, 167.0],
"Low": [159.0, 160.0, 161.0],
"Close": [162.0, 163.0, 164.0],
"Volume": [2000000, 2100000, 2200000],
},
index=dates,
)
provider._cache_price_data(db_session, "AAPL", df)
# Verify data was cached
prices = (
db_session.query(PriceCache)
.filter(
PriceCache.stock_id == sample_stock.stock_id,
PriceCache.date >= dates[0].date(),
)
.all()
)
assert len(prices) == 3
assert prices[0].close_price == Decimal("162.00")
def test_get_stock_data_with_cache(
self, provider, db_session, sample_stock, sample_price_data, monkeypatch
):
"""Test getting stock data with cache hit."""
# Mock the get_db function to return our test session
def mock_get_db():
yield db_session
monkeypatch.setattr("maverick_mcp.providers.stock_data.get_db", mock_get_db)
df = provider.get_stock_data("AAPL", "2024-01-01", "2024-01-05", use_cache=True)
assert not df.empty
assert len(df) == 5
assert df["Close"].iloc[0] == 152.0
def test_get_stock_data_without_cache(self, provider, mock_yfinance):
"""Test getting stock data without cache."""
df = provider.get_stock_data(
"AAPL", "2024-01-01", "2024-01-05", use_cache=False
)
assert not df.empty
assert len(df) == 5
assert df["Close"].iloc[0] == 152.0
def test_get_stock_data_cache_miss(
self, provider, db_session, mock_yfinance, monkeypatch
):
"""Test getting stock data with cache miss."""
# Mock the session getter
monkeypatch.setattr(provider, "_get_db_session", lambda: db_session)
df = provider.get_stock_data("TSLA", "2024-01-01", "2024-01-05", use_cache=True)
assert not df.empty
assert len(df) == 5
# Data should come from mock yfinance
assert df["Close"].iloc[0] == 152.0
def test_get_stock_data_non_daily_interval(self, provider, mock_yfinance):
"""Test that non-daily intervals bypass cache."""
df = provider.get_stock_data("AAPL", interval="1wk", period="1mo")
assert not df.empty
# Should call yfinance directly
mock_yfinance.Ticker.return_value.history.assert_called_with(
period="1mo", interval="1wk"
)
class TestMaverickRecommendations:
"""Test maverick screening recommendation methods."""
@pytest.fixture
def sample_maverick_stocks(self, db_session):
"""Create sample maverick stocks."""
stocks = []
for i in range(3):
stock = MaverickStocks(
id=i + 1, # Add explicit ID for SQLite
stock=f"STOCK{i}",
close=100.0 + i * 10,
volume=1000000,
momentum_score=95.0 - i * 5,
adr_pct=3.0 + i * 0.5,
pat="Cup&Handle" if i == 0 else "Base",
sqz="active" if i < 2 else "neutral",
consolidation="yes" if i == 0 else "no",
entry=f"{102.0 + i * 10}",
combined_score=95 - i * 5,
compression_score=90 - i * 3,
pattern_detected=1,
)
stocks.append(stock)
db_session.add_all(stocks)
db_session.commit()
return stocks
def test_get_maverick_recommendations(
self, provider, db_session, sample_maverick_stocks, monkeypatch
):
"""Test getting maverick recommendations."""
monkeypatch.setattr(provider, "_get_db_session", lambda: db_session)
recommendations = provider.get_maverick_recommendations(limit=2)
assert len(recommendations) == 2
assert recommendations[0]["stock"] == "STOCK0"
assert recommendations[0]["combined_score"] == 95
assert recommendations[0]["recommendation_type"] == "maverick_bullish"
assert "reason" in recommendations[0]
assert "Exceptional combined score" in recommendations[0]["reason"]
def test_get_maverick_recommendations_with_min_score(
self, provider, db_session, sample_maverick_stocks, monkeypatch
):
"""Test getting maverick recommendations with minimum score filter."""
monkeypatch.setattr(provider, "_get_db_session", lambda: db_session)
recommendations = provider.get_maverick_recommendations(limit=10, min_score=90)
assert len(recommendations) == 2 # Only STOCK0 and STOCK1 have score >= 90
assert all(rec["combined_score"] >= 90 for rec in recommendations)
@pytest.fixture
def sample_bear_stocks(self, db_session):
"""Create sample bear stocks."""
stocks = []
for i in range(3):
stock = MaverickBearStocks(
id=i + 1, # Add explicit ID for SQLite
stock=f"BEAR{i}",
close=50.0 - i * 5,
volume=500000,
momentum_score=30.0 - i * 5,
rsi_14=28.0 - i * 3,
macd=-0.5 - i * 0.1,
adr_pct=4.0 + i * 0.5,
atr_contraction=i < 2,
big_down_vol=i == 0,
score=90 - i * 5,
sqz="red" if i < 2 else "neutral",
)
stocks.append(stock)
db_session.add_all(stocks)
db_session.commit()
return stocks
def test_get_maverick_bear_recommendations(
self, provider, db_session, sample_bear_stocks, monkeypatch
):
"""Test getting bear recommendations."""
monkeypatch.setattr(provider, "_get_db_session", lambda: db_session)
recommendations = provider.get_maverick_bear_recommendations(limit=2)
assert len(recommendations) == 2
assert recommendations[0]["stock"] == "BEAR0"
assert recommendations[0]["score"] == 90
assert recommendations[0]["recommendation_type"] == "maverick_bearish"
assert "reason" in recommendations[0]
assert "Exceptional bear score" in recommendations[0]["reason"]
@pytest.fixture
def sample_trending_stocks(self, db_session):
"""Create sample trending stocks."""
stocks = []
for i in range(3):
stock = SupplyDemandBreakoutStocks(
id=i + 1, # Add explicit ID for SQLite
stock=f"MNRV{i}",
close=200.0 + i * 10,
volume=2000000,
ema_21=195.0 + i * 9,
sma_50=190.0 + i * 8,
sma_150=185.0 + i * 7,
sma_200=180.0 + i * 6,
momentum_score=92.0 - i * 2,
adr_pct=2.8 + i * 0.2,
pat="Base" if i == 0 else "Flag",
sqz="neutral",
consolidation="yes" if i < 2 else "no",
entry=f"{202.0 + i * 10}",
)
stocks.append(stock)
db_session.add_all(stocks)
db_session.commit()
return stocks
def test_get_trending_recommendations(
self, provider, db_session, sample_trending_stocks, monkeypatch
):
"""Test getting trending recommendations."""
monkeypatch.setattr(provider, "_get_db_session", lambda: db_session)
recommendations = provider.get_trending_recommendations(limit=2)
assert len(recommendations) == 2
assert recommendations[0]["stock"] == "MNRV0"
assert recommendations[0]["momentum_score"] == 92.0
assert recommendations[0]["recommendation_type"] == "trending_stage2"
assert "reason" in recommendations[0]
assert "Uptrend" in recommendations[0]["reason"]
def test_get_all_screening_recommendations(self, provider, monkeypatch):
"""Test getting all screening recommendations."""
mock_results = {
"maverick_stocks": [
{"stock": "AAPL", "combined_score": 95, "momentum_score": 90}
],
"maverick_bear_stocks": [
{"stock": "BEAR", "score": 88, "momentum_score": 25}
],
"trending_stocks": [{"stock": "MSFT", "momentum_score": 91}],
}
monkeypatch.setattr(
"maverick_mcp.providers.stock_data.get_latest_maverick_screening",
lambda: mock_results,
)
results = provider.get_all_screening_recommendations()
assert "maverick_stocks" in results
assert "maverick_bear_stocks" in results
assert "trending_stocks" in results
# Check that reasons were added
assert (
results["maverick_stocks"][0]["recommendation_type"] == "maverick_bullish"
)
assert "reason" in results["maverick_stocks"][0]
assert (
results["maverick_bear_stocks"][0]["recommendation_type"]
== "maverick_bearish"
)
assert "reason" in results["maverick_bear_stocks"][0]
assert results["trending_stocks"][0]["recommendation_type"] == "trending_stage2"
assert "reason" in results["trending_stocks"][0]
class TestBackwardCompatibility:
"""Test backward compatibility with original StockDataProvider."""
def test_get_stock_info(self, provider, mock_yfinance):
"""Test get_stock_info method."""
info = provider.get_stock_info("AAPL")
assert info["longName"] == "Apple Inc."
assert info["sector"] == "Technology"
def test_get_realtime_data(self, provider, mock_yfinance):
"""Test get_realtime_data method."""
data = provider.get_realtime_data("AAPL")
assert data is not None
assert data["symbol"] == "AAPL"
assert data["price"] == 156.0 # Last close from mock
assert data["change"] == 5.0 # 156 - 151 (previousClose)
assert data["change_percent"] == pytest.approx(3.31, rel=0.01)
def test_get_all_realtime_data(self, provider, mock_yfinance):
"""Test get_all_realtime_data method."""
results = provider.get_all_realtime_data(["AAPL", "GOOGL"])
assert len(results) == 2
assert "AAPL" in results
assert "GOOGL" in results
assert results["AAPL"]["price"] == 156.0
def test_is_market_open(self, provider, monkeypatch):
"""Test is_market_open method."""
import pytz
# Mock a weekday at 10 AM Eastern
mock_now = datetime(2024, 1, 2, 10, 0, 0) # Tuesday
mock_now = pytz.timezone("US/Eastern").localize(mock_now)
monkeypatch.setattr(
"maverick_mcp.providers.stock_data.datetime",
MagicMock(now=MagicMock(return_value=mock_now)),
)
assert provider.is_market_open() is True
# Mock a weekend
mock_now = datetime(2024, 1, 6, 10, 0, 0) # Saturday
mock_now = pytz.timezone("US/Eastern").localize(mock_now)
monkeypatch.setattr(
"maverick_mcp.providers.stock_data.datetime",
MagicMock(now=MagicMock(return_value=mock_now)),
)
assert provider.is_market_open() is False
def test_get_news(self, provider, mock_yfinance):
"""Test get_news method."""
mock_news = [
{
"title": "Apple News 1",
"publisher": "Reuters",
"link": "https://example.com/1",
"providerPublishTime": 1704134400, # 2024-01-01 timestamp
"type": "STORY",
}
]
mock_yfinance.Ticker.return_value.news = mock_news
df = provider.get_news("AAPL", limit=5)
assert not df.empty
assert len(df) == 1
assert df["title"].iloc[0] == "Apple News 1"
assert isinstance(df["providerPublishTime"].iloc[0], pd.Timestamp)
def test_get_earnings(self, provider, mock_yfinance):
"""Test get_earnings method."""
result = provider.get_earnings("AAPL")
assert "earnings" in result
assert "earnings_dates" in result
assert "earnings_trend" in result
def test_get_recommendations(self, provider, mock_yfinance):
"""Test get_recommendations method."""
df = provider.get_recommendations("AAPL")
assert isinstance(df, pd.DataFrame)
assert list(df.columns) == ["firm", "toGrade", "fromGrade", "action"]
def test_is_etf(self, provider, mock_yfinance):
"""Test is_etf method."""
# Test regular stock
assert provider.is_etf("AAPL") is False
# Test ETF
mock_yfinance.Ticker.return_value.info["quoteType"] = "ETF"
assert provider.is_etf("SPY") is True
# Test by symbol pattern
assert provider.is_etf("QQQ") is True
class TestErrorHandling:
"""Test error handling in the enhanced provider."""
def test_get_stock_data_error_handling(self, provider, mock_yfinance, monkeypatch):
"""Test error handling in get_stock_data."""
# Mock an exception for all yfinance calls
mock_yfinance.Ticker.return_value.history.side_effect = Exception("API Error")
# Also mock the database session to ensure no cache is used
def mock_get_db_session():
raise Exception("Database error")
monkeypatch.setattr(provider, "_get_db_session", mock_get_db_session)
# Now the provider should return empty DataFrame since both cache and yfinance fail
df = provider.get_stock_data(
"AAPL", "2024-01-01", "2024-01-05", use_cache=False
)
assert df.empty
assert list(df.columns) == ["Open", "High", "Low", "Close", "Volume"]
def test_get_cached_price_data_error_handling(
self, provider, db_session, monkeypatch
):
"""Test error handling in _get_cached_price_data."""
# Mock a database error
def mock_get_price_data(*args, **kwargs):
raise Exception("Database error")
monkeypatch.setattr(PriceCache, "get_price_data", mock_get_price_data)
result = provider._get_cached_price_data(
db_session, "AAPL", "2024-01-01", "2024-01-05"
)
assert result is None
def test_cache_price_data_error_handling(self, provider, db_session, monkeypatch):
"""Test error handling in _cache_price_data."""
# Mock a database error
def mock_bulk_insert(*args, **kwargs):
raise Exception("Insert error")
monkeypatch.setattr(
"maverick_mcp.providers.stock_data.bulk_insert_price_data", mock_bulk_insert
)
dates = pd.date_range("2024-01-01", periods=3, freq="D")
df = pd.DataFrame(
{
"Open": [150.0, 151.0, 152.0],
"High": [155.0, 156.0, 157.0],
"Low": [149.0, 150.0, 151.0],
"Close": [152.0, 153.0, 154.0],
"Volume": [1000000, 1010000, 1020000],
},
index=dates,
)
# Should not raise exception
provider._cache_price_data(db_session, "AAPL", df)
def test_get_maverick_recommendations_error_handling(self, provider, monkeypatch):
"""Test error handling in get_maverick_recommendations."""
# Mock a database session that throws when used
mock_session = MagicMock()
mock_session.query.side_effect = Exception("Database query error")
mock_session.close = MagicMock()
monkeypatch.setattr(provider, "_get_db_session", lambda: mock_session)
recommendations = provider.get_maverick_recommendations()
assert recommendations == []
```
--------------------------------------------------------------------------------
/tests/test_agents_router_mcp.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the new MCP tools in the agents router.
Tests the orchestrated_analysis, deep_research_financial, and
compare_multi_agent_analysis MCP tools for Claude Desktop integration.
"""
import uuid
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from maverick_mcp.api.routers.agents import (
compare_multi_agent_analysis,
deep_research_financial,
get_or_create_agent,
list_available_agents,
orchestrated_analysis,
)
@pytest.fixture
def mock_supervisor_agent():
"""Mock SupervisorAgent for testing."""
agent = MagicMock()
agent.orchestrate_analysis = AsyncMock(
return_value={
"status": "success",
"synthesis": "Comprehensive analysis completed",
"agents_used": ["market", "research"],
"execution_time_ms": 4500,
"synthesis_confidence": 0.87,
"agent_results": {
"market": {"summary": "Strong momentum", "confidence": 0.85},
"research": {"summary": "Solid fundamentals", "confidence": 0.88},
},
"key_recommendations": ["Focus on large-cap tech", "Monitor earnings"],
"confidence": 0.87,
}
)
return agent
@pytest.fixture
def mock_research_agent():
"""Mock DeepResearchAgent for testing."""
agent = MagicMock()
agent.conduct_research = AsyncMock(
return_value={
"status": "success",
"research_findings": [
{
"insight": "Strong revenue growth",
"confidence": 0.9,
"source": "earnings-report",
},
{
"insight": "Expanding market share",
"confidence": 0.85,
"source": "market-analysis",
},
],
"sources_analyzed": 42,
"research_confidence": 0.88,
"validation_checks_passed": 35,
"total_sources_processed": 50,
"content_summaries": [
"Financial performance strong",
"Market position improving",
],
"citations": [
{"url": "https://example.com/report1", "title": "Q3 Earnings Analysis"},
{"url": "https://example.com/report2", "title": "Market Share Report"},
],
"execution_time_ms": 6500,
}
)
return agent
@pytest.fixture
def mock_market_agent():
"""Mock MarketAnalysisAgent for testing."""
agent = MagicMock()
agent.analyze_market = AsyncMock(
return_value={
"status": "success",
"summary": "Top momentum stocks identified",
"screened_symbols": ["AAPL", "MSFT", "NVDA"],
"confidence": 0.82,
"results": {
"screening_scores": {"AAPL": 0.92, "MSFT": 0.88, "NVDA": 0.95},
"sector_performance": {"Technology": 0.15, "Healthcare": 0.08},
},
"execution_time_ms": 2100,
}
)
return agent
class TestOrchestratedAnalysis:
"""Test orchestrated_analysis MCP tool."""
@pytest.mark.asyncio
async def test_orchestrated_analysis_success(self, mock_supervisor_agent):
"""Test successful orchestrated analysis."""
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
return_value=mock_supervisor_agent,
):
result = await orchestrated_analysis(
query="Analyze tech sector opportunities",
persona="moderate",
routing_strategy="llm_powered",
max_agents=3,
parallel_execution=True,
)
assert result["status"] == "success"
assert result["agent_type"] == "supervisor_orchestrated"
assert result["persona"] == "moderate"
assert result["routing_strategy"] == "llm_powered"
assert "agents_used" in result
assert "synthesis_confidence" in result
assert "execution_time_ms" in result
mock_supervisor_agent.orchestrate_analysis.assert_called_once()
@pytest.mark.asyncio
async def test_orchestrated_analysis_with_session_id(self, mock_supervisor_agent):
"""Test orchestrated analysis with provided session ID."""
session_id = "test-session-123"
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
return_value=mock_supervisor_agent,
):
result = await orchestrated_analysis(
query="Market analysis", session_id=session_id
)
assert result["session_id"] == session_id
call_args = mock_supervisor_agent.orchestrate_analysis.call_args
assert call_args[1]["session_id"] == session_id
@pytest.mark.asyncio
async def test_orchestrated_analysis_generates_session_id(
self, mock_supervisor_agent
):
"""Test orchestrated analysis generates session ID when not provided."""
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
return_value=mock_supervisor_agent,
):
result = await orchestrated_analysis(query="Market analysis")
assert "session_id" in result
# Should be a valid UUID format
uuid.UUID(result["session_id"])
@pytest.mark.asyncio
async def test_orchestrated_analysis_error_handling(self, mock_supervisor_agent):
"""Test orchestrated analysis error handling."""
mock_supervisor_agent.orchestrate_analysis.side_effect = Exception(
"Orchestration failed"
)
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
return_value=mock_supervisor_agent,
):
result = await orchestrated_analysis(query="Test error handling")
assert result["status"] == "error"
assert result["agent_type"] == "supervisor_orchestrated"
assert "error" in result
assert "Orchestration failed" in result["error"]
@pytest.mark.asyncio
async def test_orchestrated_analysis_persona_variations(
self, mock_supervisor_agent
):
"""Test orchestrated analysis with different personas."""
personas = ["conservative", "moderate", "aggressive", "day_trader"]
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
return_value=mock_supervisor_agent,
):
for persona in personas:
result = await orchestrated_analysis(
query="Test persona", persona=persona
)
assert result["persona"] == persona
# Check agent was created with correct persona
call_args = mock_supervisor_agent.orchestrate_analysis.call_args
assert call_args is not None
class TestDeepResearchFinancial:
"""Test deep_research_financial MCP tool."""
@pytest.mark.asyncio
async def test_deep_research_success(self, mock_research_agent):
"""Test successful deep research."""
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
return_value=mock_research_agent,
):
result = await deep_research_financial(
research_topic="AAPL competitive analysis",
persona="moderate",
research_depth="comprehensive",
focus_areas=["fundamentals", "competitive_landscape"],
timeframe="90d",
)
assert result["status"] == "success"
assert result["agent_type"] == "deep_research"
assert result["research_topic"] == "AAPL competitive analysis"
assert result["research_depth"] == "comprehensive"
assert "fundamentals" in result["focus_areas"]
assert "competitive_landscape" in result["focus_areas"]
assert result["sources_analyzed"] == 42
assert result["research_confidence"] == 0.88
mock_research_agent.conduct_research.assert_called_once()
@pytest.mark.asyncio
async def test_deep_research_default_focus_areas(self, mock_research_agent):
"""Test deep research with default focus areas."""
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
return_value=mock_research_agent,
):
result = await deep_research_financial(
research_topic="Tesla analysis",
focus_areas=None, # Should use defaults
)
expected_defaults = [
"fundamentals",
"market_sentiment",
"competitive_landscape",
]
assert result["focus_areas"] == expected_defaults
call_args = mock_research_agent.conduct_research.call_args
assert call_args[1]["focus_areas"] == expected_defaults
@pytest.mark.asyncio
async def test_deep_research_depth_variations(self, mock_research_agent):
"""Test deep research with different depth levels."""
depth_levels = ["basic", "standard", "comprehensive", "exhaustive"]
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
return_value=mock_research_agent,
):
for depth in depth_levels:
result = await deep_research_financial(
research_topic="Test research", research_depth=depth
)
assert result["research_depth"] == depth
@pytest.mark.asyncio
async def test_deep_research_error_handling(self, mock_research_agent):
"""Test deep research error handling."""
mock_research_agent.conduct_research.side_effect = Exception(
"Research API failed"
)
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
return_value=mock_research_agent,
):
result = await deep_research_financial(research_topic="Error test")
assert result["status"] == "error"
assert result["agent_type"] == "deep_research"
assert "Research API failed" in result["error"]
@pytest.mark.asyncio
async def test_deep_research_timeframe_handling(self, mock_research_agent):
"""Test deep research with different timeframes."""
timeframes = ["7d", "30d", "90d", "1y"]
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
return_value=mock_research_agent,
):
for timeframe in timeframes:
await deep_research_financial(
research_topic="Timeframe test", timeframe=timeframe
)
call_args = mock_research_agent.conduct_research.call_args
assert call_args[1]["timeframe"] == timeframe
class TestCompareMultiAgentAnalysis:
"""Test compare_multi_agent_analysis MCP tool."""
@pytest.mark.asyncio
async def test_compare_agents_success(
self, mock_market_agent, mock_supervisor_agent
):
"""Test successful multi-agent comparison."""
def get_agent_mock(agent_type, persona):
if agent_type == "market":
return mock_market_agent
elif agent_type == "supervisor":
return mock_supervisor_agent
else:
raise ValueError(f"Unknown agent type: {agent_type}")
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
side_effect=get_agent_mock,
):
result = await compare_multi_agent_analysis(
query="Analyze NVDA stock potential",
agent_types=["market", "supervisor"],
persona="moderate",
)
assert result["status"] == "success"
assert result["persona"] == "moderate"
assert "comparison" in result
assert "market" in result["comparison"]
assert "supervisor" in result["comparison"]
assert "execution_times_ms" in result
# Both agents should have been called
mock_market_agent.analyze_market.assert_called_once()
mock_supervisor_agent.orchestrate_analysis.assert_called_once()
@pytest.mark.asyncio
async def test_compare_agents_default_types(
self, mock_market_agent, mock_supervisor_agent
):
"""Test comparison with default agent types."""
def get_agent_mock(agent_type, persona):
return (
mock_market_agent if agent_type == "market" else mock_supervisor_agent
)
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
side_effect=get_agent_mock,
):
result = await compare_multi_agent_analysis(
query="Default comparison test",
agent_types=None, # Should use defaults
)
# Should use default agent types ["market", "supervisor"]
assert "market" in result["agents_compared"]
assert "supervisor" in result["agents_compared"]
@pytest.mark.asyncio
async def test_compare_agents_with_failure(
self, mock_market_agent, mock_supervisor_agent
):
"""Test comparison with one agent failing."""
mock_market_agent.analyze_market.side_effect = Exception("Market agent failed")
def get_agent_mock(agent_type, persona):
return (
mock_market_agent if agent_type == "market" else mock_supervisor_agent
)
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
side_effect=get_agent_mock,
):
result = await compare_multi_agent_analysis(
query="Failure handling test", agent_types=["market", "supervisor"]
)
assert result["status"] == "success" # Overall success
assert "comparison" in result
assert "error" in result["comparison"]["market"]
assert result["comparison"]["market"]["status"] == "failed"
# Supervisor should still succeed
assert "summary" in result["comparison"]["supervisor"]
@pytest.mark.asyncio
async def test_compare_agents_session_id_handling(
self, mock_market_agent, mock_supervisor_agent
):
"""Test session ID handling in agent comparison."""
session_id = "compare-test-456"
def get_agent_mock(agent_type, persona):
return (
mock_market_agent if agent_type == "market" else mock_supervisor_agent
)
with patch(
"maverick_mcp.api.routers.agents.get_or_create_agent",
side_effect=get_agent_mock,
):
await compare_multi_agent_analysis(
query="Session ID test", session_id=session_id
)
# Check session IDs were properly formatted for each agent
market_call_args = mock_market_agent.analyze_market.call_args
assert market_call_args[1]["session_id"] == f"{session_id}_market"
supervisor_call_args = mock_supervisor_agent.orchestrate_analysis.call_args
assert supervisor_call_args[1]["session_id"] == f"{session_id}_supervisor"
class TestGetOrCreateAgent:
"""Test agent factory function."""
@patch.dict("os.environ", {"OPENAI_API_KEY": "test-key"})
def test_create_supervisor_agent(self):
"""Test creating supervisor agent."""
with patch("maverick_mcp.api.routers.agents.SupervisorAgent") as mock_class:
mock_instance = MagicMock()
mock_class.return_value = mock_instance
agent = get_or_create_agent("supervisor", "moderate")
assert agent == mock_instance
mock_class.assert_called_once()
@patch.dict(
"os.environ",
{
"OPENAI_API_KEY": "test-key",
"EXA_API_KEY": "exa-key",
"TAVILY_API_KEY": "tavily-key",
},
)
def test_create_deep_research_agent_with_api_keys(self):
"""Test creating deep research agent with API keys."""
with patch("maverick_mcp.api.routers.agents.DeepResearchAgent") as mock_class:
mock_instance = MagicMock()
mock_class.return_value = mock_instance
get_or_create_agent("deep_research", "moderate")
# Should pass API keys to constructor
call_args = mock_class.call_args
assert call_args[1]["exa_api_key"] == "exa-key"
assert call_args[1]["tavily_api_key"] == "tavily-key"
@patch.dict("os.environ", {"OPENAI_API_KEY": "test-key"})
def test_create_deep_research_agent_without_api_keys(self):
"""Test creating deep research agent without optional API keys."""
with patch("maverick_mcp.api.routers.agents.DeepResearchAgent") as mock_class:
mock_instance = MagicMock()
mock_class.return_value = mock_instance
get_or_create_agent("deep_research", "moderate")
# Should pass None for missing API keys
call_args = mock_class.call_args
assert call_args[1]["exa_api_key"] is None
assert call_args[1]["tavily_api_key"] is None
def test_agent_caching(self):
"""Test agent instance caching."""
with patch("maverick_mcp.api.routers.agents.MarketAnalysisAgent") as mock_class:
mock_instance = MagicMock()
mock_class.return_value = mock_instance
# First call should create agent
agent1 = get_or_create_agent("market", "moderate")
# Second call should return cached agent
agent2 = get_or_create_agent("market", "moderate")
assert agent1 == agent2 == mock_instance
# Constructor should only be called once due to caching
mock_class.assert_called_once()
def test_different_personas_create_different_agents(self):
"""Test different personas create separate cached agents."""
with patch("maverick_mcp.api.routers.agents.MarketAnalysisAgent") as mock_class:
mock_class.return_value = MagicMock()
agent_moderate = get_or_create_agent("market", "moderate")
agent_aggressive = get_or_create_agent("market", "aggressive")
# Should create separate instances for different personas
assert agent_moderate != agent_aggressive
assert mock_class.call_count == 2
def test_invalid_agent_type(self):
"""Test handling of invalid agent type."""
with pytest.raises(ValueError, match="Unknown agent type"):
get_or_create_agent("invalid_agent_type", "moderate")
class TestListAvailableAgents:
"""Test list_available_agents MCP tool."""
def test_list_available_agents_structure(self):
"""Test the structure of available agents list."""
result = list_available_agents()
assert result["status"] == "success"
assert "agents" in result
assert "orchestrated_tools" in result
assert "features" in result
def test_active_agents_listed(self):
"""Test that active agents are properly listed."""
result = list_available_agents()
agents = result["agents"]
# Check new orchestrated agents
assert "supervisor_orchestrated" in agents
assert agents["supervisor_orchestrated"]["status"] == "active"
assert (
"Multi-agent orchestration"
in agents["supervisor_orchestrated"]["description"]
)
assert "deep_research" in agents
assert agents["deep_research"]["status"] == "active"
assert (
"comprehensive financial research"
in agents["deep_research"]["description"].lower()
)
def test_orchestrated_tools_listed(self):
"""Test that orchestrated tools are listed."""
result = list_available_agents()
tools = result["orchestrated_tools"]
assert "orchestrated_analysis" in tools
assert "deep_research_financial" in tools
assert "compare_multi_agent_analysis" in tools
def test_personas_supported(self):
"""Test that all personas are supported."""
result = list_available_agents()
expected_personas = ["conservative", "moderate", "aggressive", "day_trader"]
# Check supervisor agent supports all personas
supervisor_personas = result["agents"]["supervisor_orchestrated"]["personas"]
assert all(persona in supervisor_personas for persona in expected_personas)
# Check research agent supports all personas
research_personas = result["agents"]["deep_research"]["personas"]
assert all(persona in research_personas for persona in expected_personas)
def test_capabilities_documented(self):
"""Test that agent capabilities are documented."""
result = list_available_agents()
agents = result["agents"]
# Supervisor capabilities
supervisor_caps = agents["supervisor_orchestrated"]["capabilities"]
assert "Intelligent query routing" in supervisor_caps
assert "Multi-agent coordination" in supervisor_caps
# Research capabilities
research_caps = agents["deep_research"]["capabilities"]
assert "Multi-provider web search" in research_caps
assert "AI-powered content analysis" in research_caps
def test_new_features_documented(self):
"""Test that new orchestration features are documented."""
result = list_available_agents()
features = result["features"]
assert "multi_agent_orchestration" in features
assert "web_search_research" in features
assert "intelligent_routing" in features
@pytest.mark.integration
class TestAgentRouterIntegration:
"""Integration tests for agent router MCP tools."""
@pytest.mark.asyncio
async def test_end_to_end_orchestrated_workflow(self):
"""Test complete orchestrated analysis workflow."""
# This would be a full integration test with real or more sophisticated mocks
# Testing the complete flow: query -> classification -> agent execution -> synthesis
pass
@pytest.mark.asyncio
async def test_research_agent_with_supervisor_integration(self):
"""Test research agent working with supervisor."""
# Test how research agent integrates with supervisor routing
pass
@pytest.mark.asyncio
async def test_error_propagation_across_agents(self):
"""Test how errors propagate through the orchestration system."""
pass
if __name__ == "__main__":
# Run tests
pytest.main([__file__, "-v", "--tb=short"])
```
--------------------------------------------------------------------------------
/examples/complete_speed_validation.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Complete Speed Optimization Validation for MaverickMCP
This comprehensive demonstration validates all speed optimization improvements
including LLM optimizations and simulated research workflows to prove
2-3x speed improvements over the previous 138s/129s timeout failures.
Validates:
- Adaptive model selection (Gemini Flash for speed)
- Progressive timeout management
- Token generation speed (100+ tok/s for emergency scenarios)
- Research workflow optimizations
- Early termination strategies
- Overall system performance under time pressure
"""
import asyncio
import os
import sys
import time
from datetime import datetime
from typing import Any
# Add the project root to Python path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from maverick_mcp.providers.openrouter_provider import OpenRouterProvider, TaskType
from maverick_mcp.utils.llm_optimization import AdaptiveModelSelector
class CompleteSpeedValidator:
"""Complete validation of all speed optimization features."""
def __init__(self):
"""Initialize the validation suite."""
api_key = os.getenv("OPENROUTER_API_KEY")
if not api_key:
raise ValueError(
"OPENROUTER_API_KEY environment variable is required. "
"Please set it with your OpenRouter API key."
)
self.openrouter_provider = OpenRouterProvider(api_key=api_key)
self.model_selector = AdaptiveModelSelector(self.openrouter_provider)
# Validation scenarios representing real-world usage
self.validation_scenarios = [
{
"name": "⚡ Emergency Market Alert",
"description": "Critical market alert requiring immediate analysis",
"time_budget": 20.0,
"target": "Sub-20s response with high-speed models",
"phases": [
{
"name": "Quick Analysis",
"prompt": "URGENT: NVIDIA down 8% after hours. Immediate impact assessment for AI sector in 2-3 bullet points.",
"task_type": TaskType.QUICK_ANSWER,
"weight": 1.0,
}
],
},
{
"name": "📊 Technical Analysis Request",
"description": "Standard technical analysis request",
"time_budget": 35.0,
"target": "Sub-35s with comprehensive analysis",
"phases": [
{
"name": "Technical Analysis",
"prompt": "Provide technical analysis for Tesla (TSLA): current RSI, MACD signal, support/resistance levels, and price target.",
"task_type": TaskType.TECHNICAL_ANALYSIS,
"weight": 1.0,
}
],
},
{
"name": "🔍 Multi-Phase Research Simulation",
"description": "Simulated research workflow with multiple phases",
"time_budget": 60.0,
"target": "Sub-60s with intelligent phase management",
"phases": [
{
"name": "Market Context",
"prompt": "Federal Reserve policy impact on tech stocks - key points only.",
"task_type": TaskType.MARKET_ANALYSIS,
"weight": 0.3,
},
{
"name": "Sentiment Analysis",
"prompt": "Current market sentiment for technology sector based on recent earnings.",
"task_type": TaskType.SENTIMENT_ANALYSIS,
"weight": 0.3,
},
{
"name": "Synthesis",
"prompt": "Synthesize: Tech sector outlook considering Fed policy and earnings sentiment.",
"task_type": TaskType.RESULT_SYNTHESIS,
"weight": 0.4,
},
],
},
{
"name": "🧠 Complex Research Challenge",
"description": "Complex multi-factor analysis under time pressure",
"time_budget": 90.0,
"target": "Sub-90s with intelligent optimization",
"phases": [
{
"name": "Sector Analysis",
"prompt": "Renewable energy investment landscape 2025: policy drivers, technology trends, key opportunities.",
"task_type": TaskType.MARKET_ANALYSIS,
"weight": 0.4,
},
{
"name": "Risk Assessment",
"prompt": "Risk factors for renewable energy investments: regulatory, technological, and market risks.",
"task_type": TaskType.RISK_ASSESSMENT,
"weight": 0.3,
},
{
"name": "Investment Synthesis",
"prompt": "Top 3 renewable energy investment themes for 2025 with risk-adjusted outlook.",
"task_type": TaskType.RESULT_SYNTHESIS,
"weight": 0.3,
},
],
},
]
def print_header(self, title: str):
"""Print formatted header."""
print("\n" + "=" * 80)
print(f" {title}")
print("=" * 80)
def print_phase_header(self, title: str):
"""Print phase header."""
print(f"\n--- {title} ---")
async def validate_system_readiness(self) -> bool:
"""Validate system is ready for speed testing."""
self.print_header("🔧 SYSTEM READINESS VALIDATION")
try:
# Test OpenRouter connection with fast model
test_llm = self.openrouter_provider.get_llm(TaskType.QUICK_ANSWER)
start_time = time.time()
from langchain_core.messages import HumanMessage
test_response = await asyncio.wait_for(
test_llm.ainvoke([HumanMessage(content="System ready?")]), timeout=10.0
)
response_time = time.time() - start_time
print("✅ OpenRouter API: Connected and responding")
print(f" Test Response Time: {response_time:.2f}s")
print(f" Response Length: {len(test_response.content)} chars")
print(
f" Estimated Speed: ~{len(test_response.content) // 4 / response_time:.0f} tok/s"
)
# Test model selector
print("\n🧠 Model Selection Intelligence: Active")
from maverick_mcp.providers.openrouter_provider import MODEL_PROFILES
print(f" Available models: {len(MODEL_PROFILES)} profiles")
print(" Speed optimization: Enabled")
return True
except Exception as e:
print(f"❌ System readiness check failed: {e}")
return False
async def run_validation_scenario(self, scenario: dict[str, Any]) -> dict[str, Any]:
"""Run a complete validation scenario."""
print(f"\n🚀 Scenario: {scenario['name']}")
print(f" Description: {scenario['description']}")
print(f" Time Budget: {scenario['time_budget']}s")
print(f" Target: {scenario['target']}")
scenario_start = time.time()
phase_results = []
total_tokens = 0
total_response_length = 0
# Calculate time budget per phase based on weights
remaining_budget = scenario["time_budget"]
for i, phase in enumerate(scenario["phases"]):
phase_budget = remaining_budget * phase["weight"]
print(f"\n Phase {i + 1}: {phase['name']} (Budget: {phase_budget:.1f}s)")
try:
# Get optimal model for this phase
complexity = self.model_selector.calculate_task_complexity(
content=phase["prompt"],
task_type=phase["task_type"],
)
model_config = self.model_selector.select_model_for_time_budget(
task_type=phase["task_type"],
time_remaining_seconds=phase_budget,
complexity_score=complexity,
content_size_tokens=len(phase["prompt"]) // 4,
)
print(f" Selected Model: {model_config.model_id}")
print(f" Max Timeout: {model_config.timeout_seconds}s")
# Execute phase
llm = self.openrouter_provider.get_llm(
model_override=model_config.model_id,
temperature=model_config.temperature,
max_tokens=model_config.max_tokens,
)
phase_start = time.time()
from langchain_core.messages import HumanMessage
response = await asyncio.wait_for(
llm.ainvoke([HumanMessage(content=phase["prompt"])]),
timeout=model_config.timeout_seconds,
)
phase_time = time.time() - phase_start
# Calculate metrics
response_length = len(response.content)
estimated_tokens = response_length // 4
tokens_per_second = (
estimated_tokens / phase_time if phase_time > 0 else 0
)
phase_result = {
"name": phase["name"],
"execution_time": phase_time,
"budget_used_pct": (phase_time / phase_budget) * 100,
"model_used": model_config.model_id,
"tokens_per_second": tokens_per_second,
"response_length": response_length,
"success": True,
"response_preview": response.content[:100] + "..."
if len(response.content) > 100
else response.content,
}
phase_results.append(phase_result)
total_tokens += estimated_tokens
total_response_length += response_length
print(
f" ✅ Completed: {phase_time:.2f}s ({phase_result['budget_used_pct']:.1f}% of budget)"
)
print(f" Speed: {tokens_per_second:.0f} tok/s")
# Update remaining budget
remaining_budget -= phase_time
# Early termination if running out of time
if remaining_budget < 5 and i < len(scenario["phases"]) - 1:
print(
f" ⚠️ Early termination triggered - {remaining_budget:.1f}s remaining"
)
break
except Exception as e:
print(f" ❌ Phase failed: {str(e)}")
phase_results.append(
{
"name": phase["name"],
"execution_time": 0,
"success": False,
"error": str(e),
}
)
# Calculate scenario metrics
total_execution_time = time.time() - scenario_start
successful_phases = [p for p in phase_results if p.get("success", False)]
scenario_result = {
"scenario_name": scenario["name"],
"total_execution_time": total_execution_time,
"time_budget": scenario["time_budget"],
"budget_utilization": (total_execution_time / scenario["time_budget"])
* 100,
"target_achieved": total_execution_time <= scenario["time_budget"],
"phases_completed": len(successful_phases),
"phases_total": len(scenario["phases"]),
"average_speed": sum(
p.get("tokens_per_second", 0) for p in successful_phases
)
/ len(successful_phases)
if successful_phases
else 0,
"total_response_length": total_response_length,
"phase_results": phase_results,
"early_termination": len(successful_phases) < len(scenario["phases"]),
}
# Print scenario summary
status_icon = "✅" if scenario_result["target_achieved"] else "⚠️"
early_icon = "🔄" if scenario_result["early_termination"] else ""
print(
f"\n {status_icon} {early_icon} Scenario Complete: {total_execution_time:.2f}s"
)
print(f" Budget Used: {scenario_result['budget_utilization']:.1f}%")
print(
f" Phases: {scenario_result['phases_completed']}/{scenario_result['phases_total']}"
)
print(f" Avg Speed: {scenario_result['average_speed']:.0f} tok/s")
return scenario_result
def analyze_validation_results(self, results: list[dict[str, Any]]):
"""Analyze complete validation results."""
self.print_header("📊 COMPLETE SPEED VALIDATION ANALYSIS")
successful_scenarios = [r for r in results if r["phases_completed"] > 0]
targets_achieved = [r for r in successful_scenarios if r["target_achieved"]]
print("📈 Overall Validation Results:")
print(f" Total Scenarios: {len(results)}")
print(f" Successful: {len(successful_scenarios)}")
print(f" Targets Achieved: {len(targets_achieved)}")
print(f" Success Rate: {(len(targets_achieved) / len(results) * 100):.1f}%")
if successful_scenarios:
# Speed improvement analysis
historical_baseline = 130.0 # Average of 138s and 129s timeout failures
max_execution_time = max(
r["total_execution_time"] for r in successful_scenarios
)
avg_execution_time = sum(
r["total_execution_time"] for r in successful_scenarios
) / len(successful_scenarios)
overall_improvement = (
historical_baseline / max_execution_time
if max_execution_time > 0
else 0
)
avg_improvement = (
historical_baseline / avg_execution_time
if avg_execution_time > 0
else 0
)
print("\n🎯 Speed Improvement Validation:")
print(f" Historical Baseline: {historical_baseline}s (timeout failures)")
print(f" Current Max Time: {max_execution_time:.2f}s")
print(f" Current Avg Time: {avg_execution_time:.2f}s")
print(f" Max Speed Improvement: {overall_improvement:.1f}x")
print(f" Avg Speed Improvement: {avg_improvement:.1f}x")
# Validation status
if overall_improvement >= 3.0:
print(
f" 🎉 OUTSTANDING: {overall_improvement:.1f}x speed improvement!"
)
elif overall_improvement >= 2.0:
print(
f" ✅ SUCCESS: {overall_improvement:.1f}x speed improvement achieved!"
)
elif overall_improvement >= 1.5:
print(f" 👍 GOOD: {overall_improvement:.1f}x improvement")
else:
print(f" ⚠️ MARGINAL: {overall_improvement:.1f}x improvement")
# Performance breakdown by scenario type
self.print_phase_header("⚡ PERFORMANCE BY SCENARIO TYPE")
for result in successful_scenarios:
print(f" {result['scenario_name']}")
print(f" Execution Time: {result['total_execution_time']:.2f}s")
print(f" Budget Used: {result['budget_utilization']:.1f}%")
print(f" Average Speed: {result['average_speed']:.0f} tok/s")
print(
f" Phases Completed: {result['phases_completed']}/{result['phases_total']}"
)
# Show fastest phase
successful_phases = [
p for p in result["phase_results"] if p.get("success", False)
]
if successful_phases:
fastest_phase = min(
successful_phases, key=lambda x: x["execution_time"]
)
print(
f" Fastest Phase: {fastest_phase['name']} ({fastest_phase['execution_time']:.2f}s, {fastest_phase['tokens_per_second']:.0f} tok/s)"
)
print("")
# Model performance analysis
self.print_phase_header("🧠 MODEL PERFORMANCE ANALYSIS")
model_stats = {}
for result in successful_scenarios:
for phase in result["phase_results"]:
if phase.get("success", False):
model = phase.get("model_used", "unknown")
if model not in model_stats:
model_stats[model] = {"times": [], "speeds": [], "count": 0}
model_stats[model]["times"].append(phase["execution_time"])
model_stats[model]["speeds"].append(phase["tokens_per_second"])
model_stats[model]["count"] += 1
for model, stats in model_stats.items():
avg_time = sum(stats["times"]) / len(stats["times"])
avg_speed = sum(stats["speeds"]) / len(stats["speeds"])
print(f" {model}:")
print(f" Uses: {stats['count']} phases")
print(f" Avg Time: {avg_time:.2f}s")
print(f" Avg Speed: {avg_speed:.0f} tok/s")
# Speed category
if avg_speed >= 100:
speed_category = "🚀 Ultra-fast"
elif avg_speed >= 60:
speed_category = "⚡ Fast"
elif avg_speed >= 30:
speed_category = "🔄 Moderate"
else:
speed_category = "🐌 Slow"
print(f" Category: {speed_category}")
print("")
async def run_complete_validation(self):
"""Run the complete speed validation suite."""
print("🚀 MaverickMCP Complete Speed Optimization Validation")
print(f"⏰ Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(
"🎯 Goal: Validate 2-3x speed improvements over 138s/129s timeout failures"
)
print("📋 Scope: LLM optimizations + research workflow simulations")
# Step 1: System readiness
if not await self.validate_system_readiness():
print("\n❌ System not ready for validation")
return False
# Step 2: Run validation scenarios
self.print_header("🔍 RUNNING VALIDATION SCENARIOS")
results = []
total_start_time = time.time()
for i, scenario in enumerate(self.validation_scenarios, 1):
print(f"\n{'=' * 60}")
print(f"SCENARIO {i}/{len(self.validation_scenarios)}")
print(f"{'=' * 60}")
try:
result = await self.run_validation_scenario(scenario)
results.append(result)
# Brief pause between scenarios
await asyncio.sleep(1)
except Exception as e:
print(f"💥 Scenario failed: {e}")
results.append(
{
"scenario_name": scenario["name"],
"total_execution_time": 0,
"phases_completed": 0,
"target_achieved": False,
"error": str(e),
}
)
total_validation_time = time.time() - total_start_time
# Step 3: Analyze results
self.analyze_validation_results(results)
# Final validation summary
self.print_header("🎉 VALIDATION COMPLETE")
successful_scenarios = [r for r in results if r["phases_completed"] > 0]
targets_achieved = [r for r in successful_scenarios if r["target_achieved"]]
print("✅ Complete Speed Validation Results:")
print(f" Scenarios Run: {len(results)}")
print(f" Successful: {len(successful_scenarios)}")
print(f" Targets Achieved: {len(targets_achieved)}")
print(f" Success Rate: {(len(targets_achieved) / len(results) * 100):.1f}%")
print(f" Total Validation Time: {total_validation_time:.2f}s")
if successful_scenarios:
max_time = max(r["total_execution_time"] for r in successful_scenarios)
speed_improvement = 130.0 / max_time if max_time > 0 else 0
print(f" Speed Improvement Achieved: {speed_improvement:.1f}x")
print("\n📊 Optimizations Validated:")
print(" ✅ Adaptive Model Selection (Gemini Flash for speed scenarios)")
print(" ✅ Progressive Time Budget Management")
print(" ✅ Early Termination Under Time Pressure")
print(" ✅ Multi-Phase Workflow Optimization")
print(" ✅ Token Generation Speed Optimization (100+ tok/s)")
print(" ✅ Intelligent Timeout Management")
# Success criteria: 75% success rate and 2x improvement
validation_passed = (
len(targets_achieved) >= len(results) * 0.75
and successful_scenarios
and 130.0 / max(r["total_execution_time"] for r in successful_scenarios)
>= 1.8
)
return validation_passed
async def main():
"""Main validation entry point."""
validator = CompleteSpeedValidator()
try:
validation_passed = await validator.run_complete_validation()
if validation_passed:
print(
"\n🎉 VALIDATION PASSED - Speed optimizations successfully validated!"
)
print(
" System demonstrates 2-3x speed improvements over historical timeouts"
)
return 0
else:
print(
"\n⚠️ VALIDATION MIXED RESULTS - Review analysis for improvement areas"
)
return 1
except KeyboardInterrupt:
print("\n\n⏹️ Validation interrupted by user")
return 130
except Exception as e:
print(f"\n💥 Validation failed with error: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
# Check required environment variables
if not os.getenv("OPENROUTER_API_KEY"):
print("❌ Missing OPENROUTER_API_KEY environment variable")
print("Please check your .env file")
sys.exit(1)
# Run the complete validation
exit_code = asyncio.run(main())
sys.exit(exit_code)
```