This is page 3 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/maverick_mcp/domain/entities/stock_analysis.py:
--------------------------------------------------------------------------------
```python
"""
Stock analysis entity.
This entity represents a complete technical analysis of a stock.
"""
from dataclasses import dataclass
from datetime import datetime
from maverick_mcp.domain.value_objects.technical_indicators import (
BollingerBands,
MACDIndicator,
PriceLevel,
RSIIndicator,
Signal,
StochasticOscillator,
TrendDirection,
VolumeProfile,
)
@dataclass
class StockAnalysis:
"""
Entity representing a comprehensive technical analysis of a stock.
This is a domain entity that aggregates various technical indicators
and analysis results for a specific stock at a point in time.
"""
# Basic information
symbol: str
analysis_date: datetime
current_price: float
# Trend analysis
trend_direction: TrendDirection
trend_strength: float # 0-100
# Technical indicators
rsi: RSIIndicator | None = None
macd: MACDIndicator | None = None
bollinger_bands: BollingerBands | None = None
stochastic: StochasticOscillator | None = None
# Price levels
support_levels: list[PriceLevel] | None = None
resistance_levels: list[PriceLevel] | None = None
# Volume analysis
volume_profile: VolumeProfile | None = None
# Composite analysis
composite_signal: Signal = Signal.NEUTRAL
confidence_score: float = 0.0 # 0-100
# Analysis metadata
analysis_period_days: int = 365
indicators_used: list[str] | None = None
def __post_init__(self):
"""Initialize default values."""
if self.support_levels is None:
self.support_levels = []
if self.resistance_levels is None:
self.resistance_levels = []
if self.indicators_used is None:
self.indicators_used = []
@property
def has_bullish_setup(self) -> bool:
"""Check if the analysis indicates a bullish setup."""
bullish_signals = [
Signal.BUY,
Signal.STRONG_BUY,
]
return self.composite_signal in bullish_signals
@property
def has_bearish_setup(self) -> bool:
"""Check if the analysis indicates a bearish setup."""
bearish_signals = [
Signal.SELL,
Signal.STRONG_SELL,
]
return self.composite_signal in bearish_signals
@property
def nearest_support(self) -> PriceLevel | None:
"""Get the nearest support level below current price."""
if not self.support_levels:
return None
below_price = [s for s in self.support_levels if s.price < self.current_price]
if below_price:
return max(below_price, key=lambda x: x.price)
return None
@property
def nearest_resistance(self) -> PriceLevel | None:
"""Get the nearest resistance level above current price."""
if not self.resistance_levels:
return None
above_price = [
r for r in self.resistance_levels if r.price > self.current_price
]
if above_price:
return min(above_price, key=lambda x: x.price)
return None
@property
def risk_reward_ratio(self) -> float | None:
"""Calculate risk/reward ratio based on nearest support/resistance."""
support = self.nearest_support
resistance = self.nearest_resistance
if not support or not resistance:
return None
risk = self.current_price - support.price
reward = resistance.price - self.current_price
if risk <= 0:
return None
return reward / risk
def get_indicator_summary(self) -> dict[str, str]:
"""Get a summary of all indicator signals."""
summary = {}
if self.rsi:
summary["RSI"] = f"{self.rsi.value:.1f} ({self.rsi.signal.value})"
if self.macd:
summary["MACD"] = self.macd.signal.value
if self.bollinger_bands:
summary["Bollinger"] = self.bollinger_bands.signal.value
if self.stochastic:
summary["Stochastic"] = (
f"{self.stochastic.k_value:.1f} ({self.stochastic.signal.value})"
)
if self.volume_profile:
summary["Volume"] = f"{self.volume_profile.relative_volume:.1f}x average"
return summary
def get_key_levels(self) -> dict[str, float]:
"""Get key price levels for trading decisions."""
levels = {
"current_price": self.current_price,
}
if self.nearest_support:
levels["nearest_support"] = self.nearest_support.price
if self.nearest_resistance:
levels["nearest_resistance"] = self.nearest_resistance.price
if self.bollinger_bands:
levels["bollinger_upper"] = self.bollinger_bands.upper_band
levels["bollinger_lower"] = self.bollinger_bands.lower_band
return levels
def to_dict(self) -> dict:
"""Convert the analysis to a dictionary for serialization."""
return {
"symbol": self.symbol,
"analysis_date": self.analysis_date.isoformat(),
"current_price": self.current_price,
"trend": {
"direction": self.trend_direction.value,
"strength": self.trend_strength,
},
"indicators": self.get_indicator_summary(),
"levels": self.get_key_levels(),
"signal": self.composite_signal.value,
"confidence": self.confidence_score,
"risk_reward_ratio": self.risk_reward_ratio,
}
```
--------------------------------------------------------------------------------
/server.json:
--------------------------------------------------------------------------------
```json
{
"$schema": "https://static.modelcontextprotocol.io/schemas/2025-07-09/server.schema.json",
"name": "io.github.wshobson/maverick-mcp",
"description": "Stock analysis MCP server with S&P 500 data, technical indicators, and AI research tools.",
"status": "active",
"repository": {
"url": "https://github.com/wshobson/maverick-mcp",
"source": "github"
},
"version": "0.1.0",
"remotes": [
{
"name": "sse",
"description": "SSE transport for web-based clients and remote connections",
"transport": {
"type": "sse",
"url": "http://localhost:8003/sse/"
},
"setup_instructions": [
"Clone repository: git clone https://github.com/wshobson/maverick-mcp.git",
"Install dependencies: uv sync (or pip install -e .)",
"Copy .env.example to .env and add your TIINGO_API_KEY",
"Start server: make dev (or uv run python -m maverick_mcp.api.server --transport sse --port 8003)"
],
"environment_variables": [
{
"name": "TIINGO_API_KEY",
"description": "Required API key for Tiingo stock data provider. Get free key at https://tiingo.com (500 requests/day free tier)",
"is_required": true,
"is_secret": true
},
{
"name": "OPENROUTER_API_KEY",
"description": "Optional API key for OpenRouter (400+ AI models with intelligent cost optimization). Get at https://openrouter.ai",
"is_required": false,
"is_secret": true
},
{
"name": "EXA_API_KEY",
"description": "Optional API key for Exa web search (advanced research features). Get at https://exa.ai",
"is_required": false,
"is_secret": true
},
{
"name": "TAVILY_API_KEY",
"description": "Optional API key for Tavily web search (research features). Get at https://tavily.com",
"is_required": false,
"is_secret": true
},
{
"name": "FRED_API_KEY",
"description": "Optional API key for Federal Reserve Economic Data (macroeconomic indicators). Get at https://fred.stlouisfed.org/docs/api/",
"is_required": false,
"is_secret": true
},
{
"name": "DATABASE_URL",
"description": "Optional database URL. Defaults to SQLite (sqlite:///maverick_mcp.db) if not provided. PostgreSQL supported for better performance.",
"is_required": false,
"is_secret": false
},
{
"name": "REDIS_HOST",
"description": "Optional Redis host for enhanced caching performance. Defaults to in-memory caching if not provided.",
"is_required": false,
"is_secret": false
},
{
"name": "REDIS_PORT",
"description": "Optional Redis port (default: 6379)",
"is_required": false,
"is_secret": false
}
]
},
{
"name": "streamable-http",
"description": "Streamable HTTP transport for remote access via mcp-remote bridge",
"transport": {
"type": "streamable-http",
"url": "http://localhost:8003/mcp/"
},
"setup_instructions": [
"Clone repository: git clone https://github.com/wshobson/maverick-mcp.git",
"Install dependencies: uv sync (or pip install -e .)",
"Copy .env.example to .env and add your TIINGO_API_KEY",
"Start server: make dev (or uv run python -m maverick_mcp.api.server --transport streamable-http --port 8003)",
"Connect via mcp-remote: npx mcp-remote http://localhost:8003/mcp/"
],
"environment_variables": [
{
"name": "TIINGO_API_KEY",
"description": "Required API key for Tiingo stock data provider. Get free key at https://tiingo.com (500 requests/day free tier)",
"is_required": true,
"is_secret": true
},
{
"name": "OPENROUTER_API_KEY",
"description": "Optional API key for OpenRouter (400+ AI models with intelligent cost optimization). Get at https://openrouter.ai",
"is_required": false,
"is_secret": true
},
{
"name": "EXA_API_KEY",
"description": "Optional API key for Exa web search (advanced research features). Get at https://exa.ai",
"is_required": false,
"is_secret": true
},
{
"name": "TAVILY_API_KEY",
"description": "Optional API key for Tavily web search (research features). Get at https://tavily.com",
"is_required": false,
"is_secret": true
},
{
"name": "FRED_API_KEY",
"description": "Optional API key for Federal Reserve Economic Data (macroeconomic indicators). Get at https://fred.stlouisfed.org/docs/api/",
"is_required": false,
"is_secret": true
},
{
"name": "DATABASE_URL",
"description": "Optional database URL. Defaults to SQLite (sqlite:///maverick_mcp.db) if not provided. PostgreSQL supported for better performance.",
"is_required": false,
"is_secret": false
},
{
"name": "REDIS_HOST",
"description": "Optional Redis host for enhanced caching performance. Defaults to in-memory caching if not provided.",
"is_required": false,
"is_secret": false
},
{
"name": "REDIS_PORT",
"description": "Optional Redis port (default: 6379)",
"is_required": false,
"is_secret": false
}
]
}
]
}
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/implementations/macro_data_adapter.py:
--------------------------------------------------------------------------------
```python
"""
Macro data provider adapter.
This module provides adapters that make the existing MacroDataProvider
compatible with the new IMacroDataProvider interface.
"""
import asyncio
import logging
from typing import Any
from maverick_mcp.providers.interfaces.macro_data import (
IMacroDataProvider,
MacroDataConfig,
)
from maverick_mcp.providers.macro_data import MacroDataProvider
logger = logging.getLogger(__name__)
class MacroDataAdapter(IMacroDataProvider):
"""
Adapter that makes the existing MacroDataProvider compatible with IMacroDataProvider interface.
This adapter wraps the existing provider and exposes it through the new
interface contracts, enabling gradual migration to the new architecture.
"""
def __init__(self, config: MacroDataConfig | None = None):
"""
Initialize the macro data adapter.
Args:
config: Macro data configuration (optional)
"""
self._config = config
# Initialize the existing provider with configuration
window_days = config.window_days if config else 365
self._provider = MacroDataProvider(window_days=window_days)
logger.debug("MacroDataAdapter initialized")
async def get_gdp_growth_rate(self) -> dict[str, Any]:
"""
Get GDP growth rate data (async wrapper).
Returns:
Dictionary with current and previous GDP growth rates
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_gdp_growth_rate)
async def get_unemployment_rate(self) -> dict[str, Any]:
"""
Get unemployment rate data (async wrapper).
Returns:
Dictionary with current and previous unemployment rates
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_unemployment_rate)
async def get_inflation_rate(self) -> dict[str, Any]:
"""
Get inflation rate data based on CPI (async wrapper).
Returns:
Dictionary with current and previous inflation rates and bounds
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_inflation_rate)
async def get_vix(self) -> float | None:
"""
Get VIX (volatility index) data (async wrapper).
Returns:
Current VIX value or None if unavailable
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_vix)
async def get_sp500_performance(self) -> float:
"""
Get S&P 500 performance over multiple timeframes (async wrapper).
Returns:
Weighted performance percentage
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_sp500_performance)
async def get_nasdaq_performance(self) -> float:
"""
Get NASDAQ performance over multiple timeframes (async wrapper).
Returns:
Weighted performance percentage
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_nasdaq_performance)
async def get_sp500_momentum(self) -> float:
"""
Get short-term S&P 500 momentum (async wrapper).
Returns:
Momentum percentage over short timeframes
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_sp500_momentum)
async def get_nasdaq_momentum(self) -> float:
"""
Get short-term NASDAQ momentum (async wrapper).
Returns:
Momentum percentage over short timeframes
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_nasdaq_momentum)
async def get_usd_momentum(self) -> float:
"""
Get USD momentum using broad dollar index (async wrapper).
Returns:
USD momentum percentage over short timeframes
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_usd_momentum)
async def get_macro_statistics(self) -> dict[str, Any]:
"""
Get comprehensive macroeconomic statistics (async wrapper).
Returns:
Dictionary with all macro indicators including:
- gdp_growth_rate: Current and previous GDP growth
- unemployment_rate: Current and previous unemployment
- inflation_rate: Current and previous inflation
- sp500_performance: S&P 500 performance
- nasdaq_performance: NASDAQ performance
- vix: Volatility index
- sentiment_score: Computed sentiment score
- historical_data: Time series data
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_macro_statistics)
async def get_historical_data(self) -> dict[str, Any]:
"""
Get historical data for all indicators (async wrapper).
Returns:
Dictionary with time series data for various indicators
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._provider.get_historical_data)
def get_sync_provider(self) -> MacroDataProvider:
"""
Get the underlying synchronous provider for backward compatibility.
Returns:
The wrapped MacroDataProvider instance
"""
return self._provider
```
--------------------------------------------------------------------------------
/maverick_mcp/tests/README_INMEMORY_TESTS.md:
--------------------------------------------------------------------------------
```markdown
# In-Memory Testing Guide for Maverick-MCP
This guide explains the in-memory testing patterns implemented for Maverick-MCP using FastMCP's testing capabilities.
## Overview
In-memory testing allows us to test the MCP server without:
- Starting external processes
- Making network calls
- Managing server lifecycle
- Dealing with port conflicts
This results in faster, more reliable tests that can run in any environment.
## Test Files
### 1. `test_in_memory_server.py`
Basic in-memory server tests covering:
- Health endpoint validation
- Stock data fetching
- Technical analysis tools
- Batch operations
- Input validation
- Error handling
- Resource management
### 2. `test_in_memory_routers.py`
Domain-specific router tests:
- Technical analysis router (RSI, MACD, support/resistance)
- Screening router (Maverick, Trending Breakout)
- Portfolio router (risk analysis, correlation)
- Data router (batch fetching, caching)
- Concurrent router operations
### 3. `test_advanced_patterns.py`
Advanced testing patterns:
- External dependency mocking (yfinance, Redis)
- Performance and load testing
- Error recovery patterns
- Integration scenarios
- Monitoring and metrics
## Running the Tests
### Run all in-memory tests:
```bash
pytest maverick_mcp/tests/test_in_memory*.py -v
```
### Run specific test file:
```bash
pytest maverick_mcp/tests/test_in_memory_server.py -v
```
### Run with coverage:
```bash
pytest maverick_mcp/tests/test_in_memory*.py --cov=maverick_mcp --cov-report=html
```
### Run specific test class:
```bash
pytest maverick_mcp/tests/test_in_memory_routers.py::TestTechnicalRouter -v
```
## Key Testing Patterns
### 1. In-Memory Database
```python
@pytest.fixture
def test_db():
"""Create an in-memory SQLite database for testing."""
engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)
# Add test data...
yield engine
```
### 2. Mock External Services
```python
@pytest.fixture
def mock_redis():
"""Mock Redis client for testing."""
with patch('maverick_mcp.data.cache.RedisCache') as mock:
cache_instance = Mock()
# Configure mock behavior...
yield cache_instance
```
### 3. FastMCP Client Testing
```python
async with Client(mcp) as client:
result = await client.call_tool("tool_name", {"param": "value"})
assert result.text is not None
```
### 4. Router Isolation
```python
test_mcp = FastMCP("TestServer")
test_mcp.mount("/technical", technical_router)
async with Client(test_mcp) as client:
# Test only technical router
```
## Benefits
### 1. **Speed**
- No process startup overhead
- No network latency
- Instant test execution
### 2. **Reliability**
- No port conflicts
- No external dependencies
- Deterministic results
### 3. **Isolation**
- Each test runs in isolation
- No shared state between tests
- Easy to debug failures
### 4. **Flexibility**
- Easy to mock dependencies
- Test specific scenarios
- Control external service behavior
## Best Practices
### 1. Use Fixtures
Create reusable fixtures for common test setup:
```python
@pytest.fixture
def populated_db(test_db):
"""Database with test data."""
# Add stocks, prices, etc.
return test_db
```
### 2. Mock External APIs
Always mock external services:
```python
with patch('yfinance.download') as mock_yf:
mock_yf.return_value = test_data
# Run tests
```
### 3. Test Error Scenarios
Include tests for failure cases:
```python
mock_yf.side_effect = Exception("API Error")
# Verify graceful handling
```
### 4. Measure Performance
Use timing to ensure performance:
```python
start_time = time.time()
await client.call_tool("tool_name", params)
duration = time.time() - start_time
assert duration < 1.0 # Should complete in under 1 second
```
## Debugging Tests
### Enable logging:
```python
import logging
logging.basicConfig(level=logging.DEBUG)
```
### Use pytest debugging:
```bash
pytest -vv --pdb # Drop into debugger on failure
```
### Capture output:
```bash
pytest -s # Don't capture stdout
```
## CI/CD Integration
These tests are perfect for CI/CD pipelines:
```yaml
# .github/workflows/test.yml
- name: Run in-memory tests
run: |
pytest maverick_mcp/tests/test_in_memory*.py \
--cov=maverick_mcp \
--cov-report=xml \
--junit-xml=test-results.xml
```
## Extending the Tests
To add new test cases:
1. Choose the appropriate test file based on what you're testing
2. Use existing fixtures or create new ones
3. Follow the async pattern with `Client(mcp)`
4. Mock external dependencies
5. Assert both success and failure cases
Example:
```python
@pytest.mark.asyncio
async def test_new_feature(test_db, mock_redis):
"""Test description."""
async with Client(mcp) as client:
result = await client.call_tool("new_tool", {
"param": "value"
})
assert result.text is not None
data = eval(result.text)
assert data["expected_key"] == "expected_value"
```
## Troubleshooting
### Common Issues:
1. **Import Errors**: Ensure maverick_mcp is installed: `pip install -e .`
2. **Async Warnings**: Use `pytest-asyncio` for async tests
3. **Mock Not Working**: Check patch path matches actual import
4. **Database Errors**: Ensure models are imported before `create_all()`
### Tips:
- Run tests in isolation first to identify issues
- Check fixture dependencies
- Verify mock configurations
- Use debugger to inspect test state
## Conclusion
These in-memory tests provide comprehensive coverage of Maverick-MCP functionality while maintaining fast execution and reliability. They demonstrate best practices for testing MCP servers and can be easily extended for new features.
```
--------------------------------------------------------------------------------
/maverick_mcp/application/dto/technical_analysis_dto.py:
--------------------------------------------------------------------------------
```python
"""
Data Transfer Objects for technical analysis.
These DTOs are used to transfer data between the application layer
and the API layer, providing a stable contract for API responses.
"""
from datetime import datetime
from pydantic import BaseModel, Field
class RSIAnalysisDTO(BaseModel):
"""RSI analysis response DTO."""
current_value: float = Field(..., description="Current RSI value")
period: int = Field(..., description="RSI calculation period")
signal: str = Field(..., description="Trading signal")
is_overbought: bool = Field(..., description="Whether RSI indicates overbought")
is_oversold: bool = Field(..., description="Whether RSI indicates oversold")
interpretation: str = Field(..., description="Human-readable interpretation")
class MACDAnalysisDTO(BaseModel):
"""MACD analysis response DTO."""
macd_line: float = Field(..., description="MACD line value")
signal_line: float = Field(..., description="Signal line value")
histogram: float = Field(..., description="MACD histogram value")
signal: str = Field(..., description="Trading signal")
is_bullish_crossover: bool = Field(..., description="Bullish crossover detected")
is_bearish_crossover: bool = Field(..., description="Bearish crossover detected")
interpretation: str = Field(..., description="Human-readable interpretation")
class BollingerBandsDTO(BaseModel):
"""Bollinger Bands analysis response DTO."""
upper_band: float = Field(..., description="Upper band value")
middle_band: float = Field(..., description="Middle band (SMA) value")
lower_band: float = Field(..., description="Lower band value")
current_price: float = Field(..., description="Current stock price")
bandwidth: float = Field(..., description="Band width (volatility indicator)")
percent_b: float = Field(..., description="Position within bands (0-1)")
signal: str = Field(..., description="Trading signal")
interpretation: str = Field(..., description="Human-readable interpretation")
class StochasticDTO(BaseModel):
"""Stochastic oscillator response DTO."""
k_value: float = Field(..., description="%K value")
d_value: float = Field(..., description="%D value")
signal: str = Field(..., description="Trading signal")
is_overbought: bool = Field(..., description="Whether indicating overbought")
is_oversold: bool = Field(..., description="Whether indicating oversold")
interpretation: str = Field(..., description="Human-readable interpretation")
class PriceLevelDTO(BaseModel):
"""Price level (support/resistance) DTO."""
price: float = Field(..., description="Price level")
strength: int = Field(..., ge=1, le=5, description="Level strength (1-5)")
touches: int = Field(..., description="Number of times tested")
distance_from_current: float = Field(
..., description="Distance from current price (%)"
)
class VolumeAnalysisDTO(BaseModel):
"""Volume analysis response DTO."""
current_volume: int = Field(..., description="Current trading volume")
average_volume: float = Field(..., description="Average volume")
relative_volume: float = Field(..., description="Volume relative to average")
volume_trend: str = Field(..., description="Volume trend direction")
unusual_activity: bool = Field(..., description="Unusual volume detected")
interpretation: str = Field(..., description="Human-readable interpretation")
class TrendAnalysisDTO(BaseModel):
"""Trend analysis response DTO."""
direction: str = Field(..., description="Trend direction")
strength: float = Field(..., ge=0, le=100, description="Trend strength (0-100)")
interpretation: str = Field(..., description="Human-readable interpretation")
class TechnicalAnalysisRequestDTO(BaseModel):
"""Request DTO for technical analysis."""
symbol: str = Field(..., description="Stock ticker symbol")
days: int = Field(
default=365, ge=30, le=1825, description="Days of historical data"
)
indicators: list[str] | None = Field(
default=None, description="Specific indicators to calculate (default: all)"
)
class CompleteTechnicalAnalysisDTO(BaseModel):
"""Complete technical analysis response DTO."""
symbol: str = Field(..., description="Stock ticker symbol")
analysis_date: datetime = Field(..., description="Analysis timestamp")
current_price: float = Field(..., description="Current stock price")
# Trend
trend: TrendAnalysisDTO = Field(..., description="Trend analysis")
# Indicators
rsi: RSIAnalysisDTO | None = Field(None, description="RSI analysis")
macd: MACDAnalysisDTO | None = Field(None, description="MACD analysis")
bollinger_bands: BollingerBandsDTO | None = Field(
None, description="Bollinger Bands"
)
stochastic: StochasticDTO | None = Field(None, description="Stochastic oscillator")
# Levels
support_levels: list[PriceLevelDTO] = Field(
default_factory=list, description="Support levels"
)
resistance_levels: list[PriceLevelDTO] = Field(
default_factory=list, description="Resistance levels"
)
# Volume
volume_analysis: VolumeAnalysisDTO | None = Field(
None, description="Volume analysis"
)
# Overall analysis
composite_signal: str = Field(..., description="Overall trading signal")
confidence_score: float = Field(
..., ge=0, le=100, description="Analysis confidence (0-100)"
)
risk_reward_ratio: float | None = Field(None, description="Risk/reward ratio")
# Summary
summary: str = Field(..., description="Executive summary of analysis")
key_levels: dict[str, float] = Field(
..., description="Key price levels for trading"
)
class Config:
json_encoders = {datetime: lambda v: v.isoformat()}
```
--------------------------------------------------------------------------------
/scripts/load_example.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Example usage of the Tiingo data loader.
This script demonstrates common usage patterns for loading market data
from Tiingo API into the Maverick-MCP database.
"""
import asyncio
import logging
import os
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from maverick_mcp.data.models import SessionLocal
from scripts.load_tiingo_data import ProgressTracker, TiingoDataLoader
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def load_sample_stocks():
"""Load a small sample of stocks for testing."""
symbols = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA"]
print(f"Loading sample stocks: {', '.join(symbols)}")
# Create progress tracker
progress = ProgressTracker("sample_load_progress.json")
progress.total_symbols = len(symbols)
async with TiingoDataLoader(
batch_size=10, max_concurrent=3, progress_tracker=progress
) as loader:
# Load 1 year of data with indicators
start_date = "2023-01-01"
successful, failed = await loader.load_batch_symbols(
symbols, start_date, calculate_indicators=True, store_indicators=True
)
print(f"\nCompleted: {successful} successful, {failed} failed")
# Run screening
if successful > 0:
print("Running screening algorithms...")
with SessionLocal() as session:
screening_results = loader.run_screening_algorithms(session)
print("Screening results:")
for screen_type, count in screening_results.items():
print(f" {screen_type}: {count} stocks")
async def load_sector_stocks():
"""Load stocks from a specific sector."""
from scripts.tiingo_config import MARKET_SECTORS
sector = "technology"
symbols = MARKET_SECTORS[sector][:10] # Just first 10 for demo
print(f"Loading {sector} sector stocks: {len(symbols)} symbols")
progress = ProgressTracker(f"{sector}_load_progress.json")
progress.total_symbols = len(symbols)
async with TiingoDataLoader(
batch_size=5, max_concurrent=2, progress_tracker=progress
) as loader:
# Load 2 years of data
start_date = "2022-01-01"
successful, failed = await loader.load_batch_symbols(
symbols, start_date, calculate_indicators=True, store_indicators=True
)
print(f"\nSector loading completed: {successful} successful, {failed} failed")
async def resume_interrupted_load():
"""Demonstrate resuming from a checkpoint."""
checkpoint_file = "sample_load_progress.json"
if not os.path.exists(checkpoint_file):
print(f"No checkpoint file found: {checkpoint_file}")
return
print("Resuming from checkpoint...")
# Load progress
progress = ProgressTracker(checkpoint_file)
progress.load_checkpoint()
# Get remaining symbols (this would normally come from your original symbol list)
all_symbols = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "NVDA", "META", "ADBE"]
remaining_symbols = [s for s in all_symbols if s not in progress.completed_symbols]
if not remaining_symbols:
print("All symbols already completed!")
return
print(f"Resuming with {len(remaining_symbols)} remaining symbols")
async with TiingoDataLoader(
batch_size=3, max_concurrent=2, progress_tracker=progress
) as loader:
successful, failed = await loader.load_batch_symbols(
remaining_symbols,
"2023-01-01",
calculate_indicators=True,
store_indicators=True,
)
print(f"Resume completed: {successful} successful, {failed} failed")
def print_database_stats():
"""Print current database statistics."""
from maverick_mcp.data.models import (
MaverickStocks,
PriceCache,
Stock,
TechnicalCache,
)
with SessionLocal() as session:
stats = {
"stocks": session.query(Stock).count(),
"price_records": session.query(PriceCache).count(),
"technical_indicators": session.query(TechnicalCache).count(),
"maverick_stocks": session.query(MaverickStocks).count(),
}
print("\n📊 Current Database Statistics:")
for key, value in stats.items():
print(f" {key}: {value:,}")
async def main():
"""Main demonstration function."""
print("Tiingo Data Loader Examples")
print("=" * 40)
# Check for API token
if not os.getenv("TIINGO_API_TOKEN"):
print("❌ TIINGO_API_TOKEN environment variable not set")
print("Please set your Tiingo API token:")
print("export TIINGO_API_TOKEN=your_token_here")
return
print("✅ Tiingo API token found")
# Show current database stats
print_database_stats()
# Menu of examples
print("\nSelect an example to run:")
print("1. Load sample stocks (5 symbols)")
print("2. Load technology sector stocks (10 symbols)")
print("3. Resume interrupted load")
print("4. Show database stats")
print("0. Exit")
try:
choice = input("\nEnter your choice (0-4): ").strip()
if choice == "1":
await load_sample_stocks()
elif choice == "2":
await load_sector_stocks()
elif choice == "3":
await resume_interrupted_load()
elif choice == "4":
print_database_stats()
elif choice == "0":
print("Goodbye!")
return
else:
print("Invalid choice")
return
# Show updated stats
print_database_stats()
except KeyboardInterrupt:
print("\nOperation cancelled")
except Exception as e:
logger.error(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/.github/pull_request_template.md:
--------------------------------------------------------------------------------
```markdown
## 📋 Pull Request Summary
**Brief description of changes:**
A concise description of what this PR accomplishes.
**Related issue(s):**
- Fixes #(issue)
- Closes #(issue)
- Addresses #(issue)
## 💰 Financial Disclaimer Acknowledgment
- [ ] I understand this is educational software and not financial advice
- [ ] Any financial analysis features include appropriate disclaimers
- [ ] This PR maintains the educational/personal-use focus of the project
## 🔄 Type of Change
- [ ] 🐛 Bug fix (non-breaking change that fixes an issue)
- [ ] ✨ New feature (non-breaking change that adds functionality)
- [ ] 💥 Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] 📚 Documentation update (improvements to documentation)
- [ ] 🔧 Refactor (code changes that neither fix bugs nor add features)
- [ ] ⚡ Performance improvement
- [ ] 🧹 Chore (dependencies, build tools, etc.)
## 🎯 Component Areas
**Primary areas affected:**
- [ ] Data fetching (Tiingo, Yahoo Finance, FRED)
- [ ] Technical analysis calculations
- [ ] Stock screening strategies
- [ ] Portfolio analysis and optimization
- [ ] MCP server/tools implementation
- [ ] Database operations and models
- [ ] Caching (Redis/in-memory)
- [ ] Claude Desktop integration
- [ ] Development tools and setup
- [ ] Documentation and examples
## 🔧 Implementation Details
**Technical approach:**
Describe the technical approach and any architectural decisions.
**Key changes:**
- Changed X to improve Y
- Added new function Z for feature A
- Refactored B to better handle C
**Dependencies:**
- [ ] No new dependencies added
- [ ] New dependencies added (list below)
- [ ] Dependencies removed (list below)
**New dependencies added:**
- package-name==version (reason for adding)
## 🧪 Testing
**Testing performed:**
- [ ] Unit tests added/updated
- [ ] Integration tests added/updated
- [ ] Manual testing completed
- [ ] Tested with Claude Desktop
- [ ] Tested with different data sources
- [ ] Performance testing completed
**Test scenarios covered:**
- [ ] Happy path functionality
- [ ] Error handling and edge cases
- [ ] Data validation and sanitization
- [ ] API rate limiting compliance
- [ ] Database operations
- [ ] Cache behavior
**Manual testing:**
```bash
# Commands used for testing
make test
make test-integration
# etc.
```
## 📊 Financial Analysis Impact
**Financial calculations:**
- [ ] No financial calculations affected
- [ ] New financial calculations added (validated for accuracy)
- [ ] Existing calculations modified (thoroughly tested)
- [ ] All calculations include appropriate disclaimers
**Data providers:**
- [ ] No data provider changes
- [ ] New data provider integration
- [ ] Existing provider modifications
- [ ] Rate limiting compliance verified
**Market data handling:**
- [ ] Historical data processing
- [ ] Real-time data integration
- [ ] Technical indicator calculations
- [ ] Screening algorithm changes
## 🔒 Security Considerations
**Security checklist:**
- [ ] No hardcoded secrets or credentials
- [ ] Input validation implemented
- [ ] Error handling doesn't leak sensitive information
- [ ] API keys handled securely via environment variables
- [ ] SQL injection prevention verified
- [ ] Rate limiting respected for external APIs
## 📚 Documentation
**Documentation updates:**
- [ ] Code comments added/updated
- [ ] README.md updated (if needed)
- [ ] API documentation updated
- [ ] Examples/tutorials added
- [ ] Financial disclaimers included where appropriate
**Breaking changes documentation:**
- [ ] No breaking changes
- [ ] Breaking changes documented in PR description
- [ ] Migration guide provided
- [ ] CHANGELOG.md updated
## ✅ Pre-submission Checklist
**Code quality:**
- [ ] Code follows the project style guide
- [ ] Self-review of code completed
- [ ] Tests added/updated and passing
- [ ] No linting errors (`make lint`)
- [ ] Type checking passes (`make typecheck`)
- [ ] All tests pass (`make test`)
**Financial software standards:**
- [ ] Financial disclaimers included where appropriate
- [ ] No investment advice or guarantees provided
- [ ] Educational purpose maintained
- [ ] Data accuracy considerations documented
- [ ] Risk warnings included for relevant features
**Community standards:**
- [ ] PR title is descriptive and follows convention
- [ ] Description clearly explains the changes
- [ ] Related issues are linked
- [ ] Screenshots/examples included (if applicable)
- [ ] Ready for review
## 📸 Screenshots/Examples
**Before and after (if applicable):**
<!-- Add screenshots, CLI output, or code examples -->
**New functionality examples:**
```python
# Example of new feature usage
result = new_function(symbol="AAPL", period=20)
print(result)
```
## 🤝 Review Guidance
**Areas needing special attention:**
- Focus on X because of Y
- Pay attention to Z implementation
- Verify A works correctly with B
**Questions for reviewers:**
- Does the implementation approach make sense?
- Are there any security concerns?
- Is the documentation clear and complete?
- Any suggestions for improvement?
## 🚀 Deployment Notes
**Environment considerations:**
- [ ] No environment changes required
- [ ] New environment variables needed (documented)
- [ ] Database migrations required
- [ ] Cache invalidation needed
**Rollback plan:**
- [ ] Changes are fully backward compatible
- [ ] Database migrations are reversible
- [ ] Rollback steps documented below
**Rollback steps (if needed):**
1. Step 1
2. Step 2
## 🎓 Educational Impact
**Learning value:**
- What financial concepts does this help teach?
- How does this improve the developer experience?
- What new capabilities does this enable for users?
**Community benefit:**
- Who will benefit from these changes?
- How does this advance the project's educational mission?
- Any potential for broader community impact?
---
**Additional Notes:**
Any other information that would be helpful for reviewers.
```
--------------------------------------------------------------------------------
/tools/templates/new_router_template.py:
--------------------------------------------------------------------------------
```python
"""
Template for creating new FastAPI routers.
Copy this file and modify it to create new routers quickly.
"""
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from maverick_mcp.data.models import get_db
from maverick_mcp.utils.logging import get_logger
logger = get_logger(__name__)
# Create router instance
router = APIRouter(
prefix="/your_domain", # Change this to your domain
tags=["your_domain"], # Change this to your domain
)
# Request/Response models
class YourRequest(BaseModel):
"""Request model for your endpoint."""
field1: str = Field(..., description="Description of field1")
field2: int = Field(10, ge=0, description="Description of field2")
field3: bool = Field(True, description="Description of field3")
class YourResponse(BaseModel):
"""Response model for your endpoint."""
status: str = Field(..., description="Operation status")
result: dict[str, Any] = Field(..., description="Operation result")
message: str | None = Field(None, description="Optional message")
# Endpoints
@router.get("/", response_model=list[YourResponse])
async def list_items(
limit: int = Query(10, ge=1, le=100, description="Number of items to return"),
offset: int = Query(0, ge=0, description="Number of items to skip"),
db: Session = Depends(get_db),
):
"""
List items with pagination.
Args:
limit: Maximum number of items to return
offset: Number of items to skip
db: Database session
Returns:
List of items
"""
try:
# Your business logic here
items = [] # Fetch from database
logger.info(
f"Listed {len(items)} items",
extra={"limit": limit, "offset": offset},
)
return items
except Exception as e:
logger.error(f"Error listing items: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/{item_id}", response_model=YourResponse)
async def get_item(
item_id: int,
db: Session = Depends(get_db),
):
"""
Get a specific item by ID.
Args:
item_id: The item ID
db: Database session
Returns:
The requested item
"""
try:
# Your business logic here
item = None # Fetch from database
if not item:
raise HTTPException(status_code=404, detail="Item not found")
return YourResponse(
status="success",
result={"id": item_id, "data": "example"},
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting item {item_id}: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/", response_model=YourResponse)
async def create_item(
request: YourRequest,
db: Session = Depends(get_db),
):
"""
Create a new item.
Args:
request: The item data
db: Database session
Returns:
The created item
"""
try:
logger.info(
"Creating new item",
extra={"request": request.model_dump()},
)
# Your business logic here
# Example: Create in database
# new_item = YourModel(**request.model_dump())
# db.add(new_item)
# db.commit()
return YourResponse(
status="success",
result={"id": 1, "created": True},
message="Item created successfully",
)
except Exception as e:
logger.error(f"Error creating item: {e}", exc_info=True)
db.rollback()
raise HTTPException(status_code=500, detail="Internal server error")
@router.put("/{item_id}", response_model=YourResponse)
async def update_item(
item_id: int,
request: YourRequest,
db: Session = Depends(get_db),
):
"""
Update an existing item.
Args:
item_id: The item ID
request: The updated data
db: Database session
Returns:
The updated item
"""
try:
# Your business logic here
# Example: Update in database
# item = db.query(YourModel).filter(YourModel.id == item_id).first()
# if not item:
# raise HTTPException(status_code=404, detail="Item not found")
# Update fields
# for key, value in request.model_dump().items():
# setattr(item, key, value)
# db.commit()
return YourResponse(
status="success",
result={"id": item_id, "updated": True},
message="Item updated successfully",
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating item {item_id}: {e}", exc_info=True)
db.rollback()
raise HTTPException(status_code=500, detail="Internal server error")
@router.delete("/{item_id}")
async def delete_item(
item_id: int,
db: Session = Depends(get_db),
):
"""
Delete an item.
Args:
item_id: The item ID
db: Database session
Returns:
Deletion confirmation
"""
try:
# Your business logic here
# Example: Delete from database
# item = db.query(YourModel).filter(YourModel.id == item_id).first()
# if not item:
# raise HTTPException(status_code=404, detail="Item not found")
# db.delete(item)
# db.commit()
return {"status": "success", "message": f"Item {item_id} deleted"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting item {item_id}: {e}", exc_info=True)
db.rollback()
raise HTTPException(status_code=500, detail="Internal server error")
# Health check endpoint
@router.get("/health")
async def health_check():
"""Check if the router is healthy."""
return {
"status": "healthy",
"router": "your_domain",
"timestamp": "2024-01-01T00:00:00Z",
}
```
--------------------------------------------------------------------------------
/tools/quick_test.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""
Quick test runner for Maverick-MCP.
This script allows rapid testing of individual functions or modules
without running the full test suite.
"""
import asyncio
import os
import sys
import time
from pathlib import Path
# Add project root to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# Set up minimal environment
os.environ.setdefault("AUTH_ENABLED", "false")
os.environ.setdefault("REDIS_HOST", "localhost")
os.environ.setdefault("DATABASE_URL", "sqlite:///:memory:")
def setup_test_environment():
"""Set up a minimal test environment."""
# Configure logging
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Disable noisy loggers
for logger_name in ["httpx", "httpcore", "urllib3", "asyncio"]:
logging.getLogger(logger_name).setLevel(logging.WARNING)
async def test_stock_data():
"""Quick test for stock data provider."""
from maverick_mcp.providers.stock_data import StockDataProvider
print("\n🧪 Testing StockDataProvider...")
provider = StockDataProvider(use_cache=False) # Skip cache for testing
# Test getting stock data
df = provider.get_stock_data("AAPL", "2024-01-01", "2024-01-10")
print(f"✅ Got {len(df)} days of data for AAPL")
print(f" Columns: {list(df.columns)}")
print(f" Latest close: ${df['Close'].iloc[-1]:.2f}")
async def test_technical_analysis():
"""Quick test for technical analysis."""
from maverick_mcp.core.technical_analysis import calculate_rsi, calculate_sma
print("\n🧪 Testing Technical Analysis...")
# Create sample data
import pandas as pd
prices = [100, 102, 101, 103, 105, 104, 106, 108, 107, 109] * 3
df = pd.DataFrame({"Close": prices})
# Test SMA
sma = calculate_sma(df, period=5)
print(f"✅ SMA calculated: {sma.iloc[-1]:.2f}")
# Test RSI
rsi = calculate_rsi(df, period=14)
print(f"✅ RSI calculated: {rsi.iloc[-1]:.2f}")
async def test_auth_token():
"""Quick test for authentication token generation (disabled for personal use)."""
print(
"\n⚠️ Auth Token Test - Skipped (Authentication system removed for personal use)"
)
async def run_custom_test():
"""
Custom test function - modify this to test specific functionality.
This is where you can quickly test any function or module.
"""
print("\n🧪 Running custom test...")
# Example: Test a specific function
# from maverick_mcp.some_module import some_function
# result = await some_function()
# print(f"Result: {result}")
print("✅ Custom test completed")
async def test_parallel_screening():
"""Test parallel screening performance improvement."""
print("\n🧪 Testing Parallel Screening Performance...")
# Test symbols
test_symbols = ["AAPL", "MSFT", "GOOGL", "AMZN", "TSLA", "META", "NVDA", "JPM"]
import time
from maverick_mcp.utils.parallel_screening import (
ParallelScreener,
example_momentum_screen,
)
# Sequential baseline
print("\n📊 Sequential screening (baseline):")
sequential_start = time.time()
sequential_results = []
for symbol in test_symbols:
result = example_momentum_screen(symbol)
sequential_results.append(result)
print(f" {symbol}: {'✅' if result.get('passed') else '❌'}")
sequential_time = time.time() - sequential_start
# Parallel screening
print("\n⚡ Parallel screening (4 workers):")
with ParallelScreener(max_workers=4) as screener:
parallel_start = time.time()
parallel_results = screener.screen_batch(
test_symbols, example_momentum_screen, batch_size=2
)
parallel_time = time.time() - parallel_start
# Results
speedup = sequential_time / parallel_time
print("\n📈 Performance Results:")
print(f" Sequential: {sequential_time:.2f}s")
print(f" Parallel: {parallel_time:.2f}s")
print(f" Speedup: {speedup:.1f}x")
print(
f" Passed: {len([r for r in parallel_results if r.get('passed')])} stocks"
)
if speedup > 2:
print(f"\n🎉 Excellent! Achieved {speedup:.1f}x speedup!")
else:
print(f"\n✅ Good! Achieved {speedup:.1f}x speedup")
async def main():
"""Main test runner."""
import argparse
parser = argparse.ArgumentParser(description="Quick test runner for Maverick-MCP")
parser.add_argument(
"--test",
choices=["stock", "technical", "auth", "custom", "parallel", "all"],
default="custom",
help="Which test to run",
)
parser.add_argument(
"--loop",
action="store_true",
help="Run test in a loop for performance testing",
)
parser.add_argument(
"--times",
type=int,
default=1,
help="Number of times to run the test",
)
args = parser.parse_args()
setup_test_environment()
print("🚀 Maverick-MCP Quick Test Runner")
print("=" * 50)
# Map test names to functions
tests = {
"stock": test_stock_data,
"technical": test_technical_analysis,
"auth": test_auth_token,
"custom": run_custom_test,
"parallel": test_parallel_screening,
}
# Run selected tests
start_time = time.time()
for i in range(args.times):
if args.times > 1:
print(f"\n🔄 Run {i + 1}/{args.times}")
if args.test == "all":
for test_name, test_func in tests.items():
if test_name != "custom": # Skip custom in "all" mode
await test_func()
else:
await tests[args.test]()
if args.loop and i < args.times - 1:
await asyncio.sleep(0.1) # Small delay between runs
elapsed = time.time() - start_time
print(f"\n⏱️ Total time: {elapsed:.2f} seconds")
if args.times > 1:
print(f"📊 Average time per run: {elapsed / args.times:.2f} seconds")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/tools/experiments/validation_examples.py:
--------------------------------------------------------------------------------
```python
"""
Validation examples for testing the agentic workflow improvements.
This file demonstrates the 4 validation tasks:
1. Add a new technical indicator tool
2. Debug authentication issues
3. Run tests for stock data provider
4. Create a new screening strategy
"""
import os
import subprocess
from typing import Any
import pandas as pd
from maverick_mcp.core.technical_analysis import calculate_sma
from maverick_mcp.providers.stock_data import StockDataProvider
from maverick_mcp.utils.agent_errors import agent_friendly_errors
from maverick_mcp.utils.quick_cache import get_cache_stats, quick_cache
print("🎯 Maverick-MCP Validation Examples")
print("=" * 60)
# Validation 1: Add a new technical indicator tool
print("\n📊 1. Adding a new technical indicator (Stochastic Oscillator)...")
def calculate_stochastic(
df: pd.DataFrame, k_period: int = 14, d_period: int = 3
) -> pd.DataFrame:
"""Calculate Stochastic Oscillator (%K and %D)."""
high_roll = df["High"].rolling(k_period)
low_roll = df["Low"].rolling(k_period)
# %K = (Current Close - Lowest Low) / (Highest High - Lowest Low) * 100
k_percent = 100 * (
(df["Close"] - low_roll.min()) / (high_roll.max() - low_roll.min())
)
# %D = 3-day SMA of %K
d_percent = k_percent.rolling(d_period).mean()
result = pd.DataFrame({"%K": k_percent, "%D": d_percent})
return result
# Mock tool registration (would normally use @mcp.tool())
@agent_friendly_errors
def get_stochastic_analysis(symbol: str, period: int = 14) -> dict[str, Any]:
"""
Get Stochastic Oscillator analysis for a stock.
This demonstrates adding a new technical indicator tool.
"""
# Simulate getting data
provider = StockDataProvider(use_cache=False)
data = provider.get_stock_data(symbol, "2023-10-01", "2024-01-01")
stoch = calculate_stochastic(data, k_period=period)
current_k = stoch["%K"].iloc[-1]
current_d = stoch["%D"].iloc[-1]
# Determine signal
signal = "neutral"
if current_k > 80:
signal = "overbought"
elif current_k < 20:
signal = "oversold"
elif current_k > current_d:
signal = "bullish_crossover"
elif current_k < current_d:
signal = "bearish_crossover"
result = {
"symbol": symbol,
"stochastic_k": round(current_k, 2),
"stochastic_d": round(current_d, 2),
"signal": signal,
"period": period,
}
print(
f"✅ Stochastic indicator added: {symbol} - %K={result['stochastic_k']}, Signal={signal}"
)
return result
# Test the new indicator
try:
stoch_result = get_stochastic_analysis("AAPL", period=14)
except Exception as e:
print(f"❌ Error testing stochastic: {e}")
# Validation 2: Debug authentication
print("\n🔐 2. Debugging authentication...")
os.environ["AUTH_ENABLED"] = "false" # Disable for testing
# Test with agent error handler
@agent_friendly_errors(reraise=False)
def test_auth_error():
"""Simulate an auth error to test error handling."""
# This would normally raise 401 Unauthorized
raise ValueError("401 Unauthorized")
auth_result = test_auth_error()
if isinstance(auth_result, dict) and "fix_suggestion" in auth_result:
print(f"✅ Auth error caught with fix: {auth_result['fix_suggestion']['fix']}")
else:
print("✅ Auth disabled for development")
# Validation 3: Run tests for stock data provider
print("\n🧪 3. Running stock data provider tests...")
# Quick test using our test runner
result = subprocess.run(
["python", "tools/quick_test.py", "--test", "stock"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
print("✅ Stock data tests passed")
# Show last few lines of output
lines = result.stdout.strip().split("\n")
for line in lines[-3:]:
print(f" {line}")
else:
print(f"❌ Stock data tests failed: {result.stderr}")
# Validation 4: Create a new screening strategy
print("\n🔍 4. Creating a new screening strategy (Golden Cross)...")
@quick_cache(ttl_seconds=300) # Cache for 5 minutes
def screen_golden_cross(symbol: str) -> dict[str, Any]:
"""
Screen for Golden Cross pattern (50 SMA crosses above 200 SMA).
"""
provider = StockDataProvider(use_cache=False)
data = provider.get_stock_data(symbol, "2023-01-01", "2024-01-01")
if len(data) < 200:
return {"symbol": symbol, "passed": False, "reason": "Insufficient data"}
# Calculate SMAs
sma_50 = calculate_sma(data, 50)
sma_200 = calculate_sma(data, 200)
# Check for golden cross in last 10 days
golden_cross = False
for i in range(-10, 0):
if (
sma_50.iloc[i - 1] <= sma_200.iloc[i - 1]
and sma_50.iloc[i] > sma_200.iloc[i]
):
golden_cross = True
break
return {
"symbol": symbol,
"passed": golden_cross,
"current_price": round(data["Close"].iloc[-1], 2),
"sma_50": round(sma_50.iloc[-1], 2),
"sma_200": round(sma_200.iloc[-1], 2),
"above_50": data["Close"].iloc[-1] > sma_50.iloc[-1],
"above_200": data["Close"].iloc[-1] > sma_200.iloc[-1],
}
# Test the new screening strategy
test_symbols = ["AAPL", "MSFT", "GOOGL"]
golden_cross_results = []
for symbol in test_symbols:
try:
result = screen_golden_cross(symbol)
golden_cross_results.append(result)
status = "✅ Golden Cross" if result["passed"] else "❌ No Golden Cross"
print(f" {symbol}: {status} (Price=${result['current_price']})")
except Exception as e:
print(f" {symbol}: ❌ Error - {e}")
# Summary
print("\n" + "=" * 60)
print("🎉 Validation Summary:")
print(" 1. New Indicator Tool: ✅ Stochastic Oscillator added")
print(" 2. Auth Debugging: ✅ Error handler provides helpful fixes")
print(" 3. Test Running: ✅ Stock data tests executed")
print(" 4. New Screening: ✅ Golden Cross strategy created")
print("\n✨ All validations completed successfully!")
# Cache stats
cache_stats = get_cache_stats()
if cache_stats["total"] > 0:
print(
f"\n📊 Cache Performance: {cache_stats['hit_rate']}% hit rate ({cache_stats['hits']} hits)"
)
```
--------------------------------------------------------------------------------
/maverick_mcp/langchain_tools/registry.py:
--------------------------------------------------------------------------------
```python
"""
Tool registry for managing MCP and LangChain tools.
"""
import logging
from collections.abc import Callable
from typing import Any
from langchain_core.tools import BaseTool
from .adapters import mcp_to_langchain_adapter
logger = logging.getLogger(__name__)
class ToolRegistry:
"""Registry for managing tools from different sources."""
def __init__(self):
self.mcp_tools: dict[str, Callable] = {}
self.langchain_tools: dict[str, BaseTool] = {}
self.tool_metadata: dict[str, dict[str, Any]] = {}
def register_mcp_tool(
self,
func: Callable,
name: str | None = None,
description: str | None = None,
persona_aware: bool = False,
categories: list[str] | None = None,
) -> None:
"""
Register an MCP tool function.
Args:
func: The MCP tool function
name: Optional custom name
description: Optional description
persona_aware: Whether tool should adapt to personas
categories: Tool categories for organization
"""
tool_name = name or func.__name__
self.mcp_tools[tool_name] = func
# Store metadata
self.tool_metadata[tool_name] = {
"source": "mcp",
"description": description or func.__doc__,
"persona_aware": persona_aware,
"categories": categories or [],
"original_func": func,
}
logger.info(f"Registered MCP tool: {tool_name}")
def register_langchain_tool(
self, tool: BaseTool, categories: list[str] | None = None
) -> None:
"""
Register a LangChain tool.
Args:
tool: The LangChain tool
categories: Tool categories for organization
"""
self.langchain_tools[tool.name] = tool
# Store metadata
self.tool_metadata[tool.name] = {
"source": "langchain",
"description": tool.description,
"persona_aware": tool.metadata.get("persona_aware", False)
if hasattr(tool, "metadata") and tool.metadata
else False,
"categories": categories or [],
"tool_instance": tool,
}
logger.info(f"Registered LangChain tool: {tool.name}")
def get_tool(self, name: str, as_langchain: bool = True) -> Any | None:
"""
Get a tool by name.
Args:
name: Tool name
as_langchain: Whether to return as LangChain tool
Returns:
Tool instance or function
"""
# Check if it's already a LangChain tool
if name in self.langchain_tools:
return self.langchain_tools[name]
# Check if it's an MCP tool
if name in self.mcp_tools:
if as_langchain:
# Convert to LangChain tool on demand
metadata = self.tool_metadata[name]
return mcp_to_langchain_adapter(
self.mcp_tools[name],
name=name,
description=metadata["description"],
persona_aware=metadata["persona_aware"],
)
else:
return self.mcp_tools[name]
return None
def get_tools_by_category(
self, category: str, as_langchain: bool = True
) -> list[Any]:
"""
Get all tools in a category.
Args:
category: Category name
as_langchain: Whether to return as LangChain tools
Returns:
List of tools
"""
tools = []
for name, metadata in self.tool_metadata.items():
if category in metadata.get("categories", []):
tool = self.get_tool(name, as_langchain=as_langchain)
if tool:
tools.append(tool)
return tools
def get_all_tools(self, as_langchain: bool = True) -> list[Any]:
"""
Get all registered tools.
Args:
as_langchain: Whether to return as LangChain tools
Returns:
List of all tools
"""
tools: list[Any] = []
# Add all LangChain tools
if as_langchain:
tools.extend(self.langchain_tools.values())
# Add all MCP tools
for name in self.mcp_tools:
if name not in self.langchain_tools: # Avoid duplicates
tool = self.get_tool(name, as_langchain=as_langchain)
if tool:
tools.append(tool)
return tools
def get_persona_aware_tools(self, as_langchain: bool = True) -> list[Any]:
"""Get all persona-aware tools."""
tools = []
for name, metadata in self.tool_metadata.items():
if metadata.get("persona_aware", False):
tool = self.get_tool(name, as_langchain=as_langchain)
if tool:
tools.append(tool)
return tools
def list_tools(self) -> dict[str, dict[str, Any]]:
"""List all tools with their metadata."""
return {
name: {
"description": meta["description"],
"source": meta["source"],
"persona_aware": meta["persona_aware"],
"categories": meta["categories"],
}
for name, meta in self.tool_metadata.items()
}
# Global registry instance
_tool_registry = None
def get_tool_registry() -> ToolRegistry:
"""Get the global tool registry instance."""
global _tool_registry
if _tool_registry is None:
_tool_registry = ToolRegistry()
_initialize_default_tools()
return _tool_registry
def _initialize_default_tools():
"""Initialize registry with default MCP tools."""
get_tool_registry()
try:
# TODO: Fix router tool registration
# The router tools are FastMCP FunctionTool instances, not plain Callables
# Need to extract the underlying function or adapt the registration approach
logger.info("Tool registry initialized (router tools registration pending)")
except ImportError as e:
logger.warning(f"Could not import default MCP tools: {e}")
```
--------------------------------------------------------------------------------
/maverick_mcp/utils/tool_monitoring.py:
--------------------------------------------------------------------------------
```python
"""
Tool execution monitoring utilities.
This module provides functions for monitoring tool execution,
including timing, error handling, and performance analysis.
"""
from __future__ import annotations
import time
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from typing import Any
from maverick_mcp.utils.logging import get_logger
logger = get_logger("maverick_mcp.utils.tool_monitoring")
@dataclass
class ExecutionResult:
"""Result of tool execution monitoring."""
result: Any
execution_time: float
success: bool
error: Exception | None = None
class ToolMonitor:
"""Monitors tool execution for performance and errors."""
def __init__(self, tool_name: str, user_id: int | None = None):
"""
Initialize tool monitor.
Args:
tool_name: Name of the tool being monitored
user_id: ID of the user executing the tool
"""
self.tool_name = tool_name
self.user_id = user_id
async def execute_with_monitoring(
self,
func: Callable[..., Awaitable[Any]],
args: tuple[Any, ...],
kwargs: dict[str, Any],
estimation: dict[str, Any] | None = None,
) -> ExecutionResult:
"""
Execute a tool function with comprehensive monitoring.
Args:
func: The async function to execute
args: Positional arguments for the function
kwargs: Keyword arguments for the function
estimation: Optional estimation data for comparison
Returns:
ExecutionResult: Contains result, timing, and error information
"""
start_time = time.time()
try:
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
# Log successful execution
self._log_successful_execution(execution_time, result, estimation)
# Check for potential underestimation
self._check_for_underestimation(execution_time, estimation)
return ExecutionResult(
result=result,
execution_time=execution_time,
success=True,
)
except Exception as e:
execution_time = time.time() - start_time
# Log failed execution
self._log_failed_execution(execution_time, e)
return ExecutionResult(
result=None,
execution_time=execution_time,
success=False,
error=e,
)
def _log_successful_execution(
self,
execution_time: float,
result: Any,
estimation: dict[str, Any] | None,
) -> None:
"""Log successful tool execution."""
log_data = {
"tool_name": self.tool_name,
"user_id": self.user_id,
"execution_time_seconds": round(execution_time, 3),
"has_result": result is not None,
}
if estimation:
log_data.update(
{
"estimated_llm_calls": estimation.get("llm_calls", 0),
"estimated_tokens": estimation.get("total_tokens", 0),
}
)
logger.info(f"Tool executed successfully: {self.tool_name}", extra=log_data)
def _log_failed_execution(self, execution_time: float, error: Exception) -> None:
"""Log failed tool execution."""
logger.error(
f"Tool execution failed: {self.tool_name}",
extra={
"tool_name": self.tool_name,
"user_id": self.user_id,
"execution_time_seconds": round(execution_time, 3),
"error": str(error),
"error_type": type(error).__name__,
},
)
def _check_for_underestimation(
self,
execution_time: float,
estimation: dict[str, Any] | None,
) -> None:
"""Check if execution time indicates potential underestimation."""
# Long execution time may indicate underestimation
if execution_time > 30:
log_data = {
"tool_name": self.tool_name,
"execution_time_seconds": round(execution_time, 3),
"note": "Consider reviewing estimate if this persists",
}
if estimation:
log_data.update(
{
"estimated_llm_calls": estimation.get("llm_calls", 0),
"estimated_tokens": estimation.get("total_tokens", 0),
"complexity": estimation.get("complexity", "unknown"),
"confidence": estimation.get("confidence", 0.5),
}
)
logger.warning(
f"Long execution time for tool: {self.tool_name}", extra=log_data
)
def add_usage_info_to_result(
self, result: Any, usage_info: dict[str, Any] | None
) -> Any:
"""
Add usage information to the tool result.
Args:
result: The tool execution result
usage_info: Usage information to add
Returns:
The result with usage info added (if applicable)
"""
if usage_info and isinstance(result, dict):
result["usage"] = usage_info
return result
def create_tool_monitor(tool_name: str, user_id: int | None = None) -> ToolMonitor:
"""
Create a tool monitor instance.
Args:
tool_name: Name of the tool being monitored
user_id: ID of the user executing the tool
Returns:
ToolMonitor: Configured tool monitor
"""
return ToolMonitor(tool_name, user_id)
def should_alert_for_performance(
execution_time: float, threshold: float = 30.0
) -> tuple[bool, str]:
"""
Check if an alert should be raised for performance issues.
Args:
execution_time: Execution time in seconds
threshold: Performance threshold in seconds
Returns:
tuple: (should_alert, alert_message)
"""
if execution_time > threshold:
return (
True,
f"Tool execution took {execution_time:.2f}s (threshold: {threshold}s)",
)
return False, ""
```
--------------------------------------------------------------------------------
/tools/check_orchestration_config.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Configuration checker for orchestrated agent setup.
Verifies that all required dependencies and configurations are available.
"""
import os
import sys
from typing import Any
def check_dependencies() -> dict[str, Any]:
"""Check if all required dependencies are available."""
results = {"dependencies": {}, "status": "success", "missing": []}
# Check core dependencies
deps_to_check = [
("langchain_core", "LangChain core"),
("langgraph", "LangGraph"),
("fastmcp", "FastMCP"),
("exa_py", "Exa AI search (optional)"),
("tavily", "Tavily search (optional)"),
]
for module, description in deps_to_check:
try:
__import__(module)
results["dependencies"][module] = {
"status": "available",
"description": description,
}
except ImportError as e:
results["dependencies"][module] = {
"status": "missing",
"description": description,
"error": str(e),
}
if module not in ["exa_py", "tavily"]: # Optional dependencies
results["missing"].append(module)
results["status"] = "error"
return results
def check_environment_variables() -> dict[str, Any]:
"""Check environment variables for API keys."""
results = {"environment": {}, "status": "success", "warnings": []}
# Required variables
required_vars = [
("TIINGO_API_KEY", "Stock data provider", True),
]
# Optional variables
optional_vars = [
("OPENAI_API_KEY", "OpenAI LLM provider", False),
("ANTHROPIC_API_KEY", "Anthropic LLM provider", False),
("EXA_API_KEY", "Exa search provider", False),
("TAVILY_API_KEY", "Tavily search provider", False),
("FRED_API_KEY", "Economic data provider", False),
]
all_vars = required_vars + optional_vars
for var_name, description, required in all_vars:
value = os.getenv(var_name)
if value:
results["environment"][var_name] = {
"status": "configured",
"description": description,
"has_value": bool(value and value.strip()),
}
else:
results["environment"][var_name] = {
"status": "not_configured",
"description": description,
"required": required,
}
if required:
results["status"] = "error"
else:
results["warnings"].append(
f"{var_name} not configured - {description} will not be available"
)
return results
def check_agent_imports() -> dict[str, Any]:
"""Check if agent classes can be imported successfully."""
results = {"agents": {}, "status": "success", "errors": []}
agents_to_check = [
("maverick_mcp.agents.market_analysis", "MarketAnalysisAgent"),
("maverick_mcp.agents.supervisor", "SupervisorAgent"),
("maverick_mcp.agents.deep_research", "DeepResearchAgent"),
]
for module_path, class_name in agents_to_check:
try:
module = __import__(module_path, fromlist=[class_name])
getattr(module, class_name)
results["agents"][class_name] = {
"status": "importable",
"module": module_path,
}
except Exception as e:
results["agents"][class_name] = {
"status": "error",
"module": module_path,
"error": str(e),
}
results["errors"].append(f"{class_name}: {str(e)}")
results["status"] = "error"
return results
def main():
"""Run configuration checks."""
print("🔍 Checking MaverickMCP Orchestration Configuration...")
print("=" * 60)
# Check dependencies
dep_results = check_dependencies()
print("\n📦 Dependencies:")
for dep, info in dep_results["dependencies"].items():
status_icon = "✅" if info["status"] == "available" else "❌"
print(f" {status_icon} {dep}: {info['description']}")
if info["status"] == "missing":
print(f" Error: {info['error']}")
# Check environment variables
env_results = check_environment_variables()
print("\n🔧 Environment Variables:")
for var, info in env_results["environment"].items():
if info["status"] == "configured":
print(f" ✅ {var}: {info['description']}")
else:
icon = "❌" if info.get("required") else "⚠️ "
print(f" {icon} {var}: {info['description']} (not configured)")
# Check agent imports
agent_results = check_agent_imports()
print("\n🤖 Agent Classes:")
for agent, info in agent_results["agents"].items():
status_icon = "✅" if info["status"] == "importable" else "❌"
print(f" {status_icon} {agent}: {info['module']}")
if info["status"] == "error":
print(f" Error: {info['error']}")
# Summary
print("\n" + "=" * 60)
all_status = [dep_results["status"], env_results["status"], agent_results["status"]]
overall_status = "error" if "error" in all_status else "success"
if overall_status == "success":
print("✅ Configuration check PASSED!")
print("\nOrchestrated agents are ready to use.")
if env_results["warnings"]:
print("\n⚠️ Warnings:")
for warning in env_results["warnings"]:
print(f" • {warning}")
else:
print("❌ Configuration check FAILED!")
print("\nPlease fix the errors above before using orchestrated agents.")
if dep_results["missing"]:
print(f"\nMissing dependencies: {', '.join(dep_results['missing'])}")
print("Run: uv sync")
if env_results["status"] == "error":
print("\nMissing required environment variables.")
print("Copy .env.example to .env and configure required API keys.")
if agent_results["errors"]:
print("\nAgent import errors:")
for error in agent_results["errors"]:
print(f" • {error}")
return 0 if overall_status == "success" else 1
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/interfaces/macro_data.py:
--------------------------------------------------------------------------------
```python
"""
Macroeconomic data provider interface.
This module defines the abstract interface for macroeconomic data operations,
including GDP, inflation, unemployment, and market sentiment indicators.
"""
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class IMacroDataProvider(Protocol):
"""
Interface for macroeconomic data operations.
This interface defines the contract for retrieving economic indicators,
market sentiment data, and related macroeconomic statistics.
"""
async def get_gdp_growth_rate(self) -> dict[str, Any]:
"""
Get GDP growth rate data.
Returns:
Dictionary with current and previous GDP growth rates
"""
...
async def get_unemployment_rate(self) -> dict[str, Any]:
"""
Get unemployment rate data.
Returns:
Dictionary with current and previous unemployment rates
"""
...
async def get_inflation_rate(self) -> dict[str, Any]:
"""
Get inflation rate data based on CPI.
Returns:
Dictionary with current and previous inflation rates and bounds
"""
...
async def get_vix(self) -> float | None:
"""
Get VIX (volatility index) data.
Returns:
Current VIX value or None if unavailable
"""
...
async def get_sp500_performance(self) -> float:
"""
Get S&P 500 performance over multiple timeframes.
Returns:
Weighted performance percentage
"""
...
async def get_nasdaq_performance(self) -> float:
"""
Get NASDAQ performance over multiple timeframes.
Returns:
Weighted performance percentage
"""
...
async def get_sp500_momentum(self) -> float:
"""
Get short-term S&P 500 momentum.
Returns:
Momentum percentage over short timeframes
"""
...
async def get_nasdaq_momentum(self) -> float:
"""
Get short-term NASDAQ momentum.
Returns:
Momentum percentage over short timeframes
"""
...
async def get_usd_momentum(self) -> float:
"""
Get USD momentum using broad dollar index.
Returns:
USD momentum percentage over short timeframes
"""
...
async def get_macro_statistics(self) -> dict[str, Any]:
"""
Get comprehensive macroeconomic statistics.
Returns:
Dictionary with all macro indicators including:
- gdp_growth_rate: Current and previous GDP growth
- unemployment_rate: Current and previous unemployment
- inflation_rate: Current and previous inflation
- sp500_performance: S&P 500 performance
- nasdaq_performance: NASDAQ performance
- vix: Volatility index
- sentiment_score: Computed sentiment score
- historical_data: Time series data
"""
...
async def get_historical_data(self) -> dict[str, Any]:
"""
Get historical data for all indicators.
Returns:
Dictionary with time series data for various indicators
"""
...
class MacroDataConfig:
"""
Configuration class for macroeconomic data providers.
This class encapsulates macro data-related configuration parameters
to reduce coupling between providers and configuration sources.
"""
def __init__(
self,
fred_api_key: str = "",
window_days: int = 365,
lookback_days: int = 30,
request_timeout: int = 30,
max_retries: int = 3,
cache_ttl: int = 3600,
sentiment_weights: dict[str, float] | None = None,
smoothing_factor: float = 0.8,
):
"""
Initialize macro data configuration.
Args:
fred_api_key: API key for FRED (Federal Reserve Economic Data)
window_days: Window for historical data bounds calculation
lookback_days: Lookback period for momentum calculations
request_timeout: Request timeout in seconds
max_retries: Maximum number of retry attempts
cache_ttl: Cache time-to-live in seconds
sentiment_weights: Weights for sentiment score calculation
smoothing_factor: Smoothing factor for sentiment score
"""
self.fred_api_key = fred_api_key
self.window_days = window_days
self.lookback_days = lookback_days
self.request_timeout = request_timeout
self.max_retries = max_retries
self.cache_ttl = cache_ttl
self.smoothing_factor = smoothing_factor
# Default sentiment weights
self.sentiment_weights = sentiment_weights or {
# Short-term signals (60% total)
"vix": 0.20,
"sp500_momentum": 0.20,
"nasdaq_momentum": 0.15,
"usd_momentum": 0.05,
# Macro signals (40% total)
"inflation_rate": 0.15,
"gdp_growth_rate": 0.15,
"unemployment_rate": 0.10,
}
@property
def has_fred_key(self) -> bool:
"""Check if FRED API key is configured."""
return bool(self.fred_api_key.strip())
def get_sentiment_weight(self, indicator: str) -> float:
"""Get sentiment weight for a specific indicator."""
return self.sentiment_weights.get(indicator, 0.0)
def get_total_sentiment_weight(self) -> float:
"""Get total sentiment weight (should sum to 1.0)."""
return sum(self.sentiment_weights.values())
# FRED series IDs for common economic indicators
FRED_SERIES_IDS = {
"gdp_growth_rate": "A191RL1Q225SBEA",
"unemployment_rate": "UNRATE",
"core_inflation": "CPILFESL",
"sp500": "SP500",
"nasdaq": "NASDAQ100",
"vix": "VIXCLS",
"usd_index": "DTWEXBGS",
}
# Default bounds for normalization
DEFAULT_INDICATOR_BOUNDS = {
"vix": {"min": 10.0, "max": 50.0},
"sp500_momentum": {"min": -15.0, "max": 15.0},
"nasdaq_momentum": {"min": -20.0, "max": 20.0},
"usd_momentum": {"min": -5.0, "max": 5.0},
"inflation_rate": {"min": 0.0, "max": 10.0},
"gdp_growth_rate": {"min": -2.0, "max": 6.0},
"unemployment_rate": {"min": 2.0, "max": 10.0},
}
```
--------------------------------------------------------------------------------
/maverick_mcp/infrastructure/health/health_checker.py:
--------------------------------------------------------------------------------
```python
"""
Health Checker Service
Extracts health checking logic from the main server to follow
Single Responsibility Principle.
"""
import logging
from datetime import datetime
from typing import Any
from maverick_mcp.config.settings import settings
from maverick_mcp.data.session_management import get_db_session
logger = logging.getLogger(__name__)
class HealthStatus:
"""Health status enumeration."""
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
DEGRADED = "degraded"
UNKNOWN = "unknown"
class HealthChecker:
"""Service for checking system health status."""
def __init__(self):
self.logger = logger
def check_all(self) -> dict[str, Any]:
"""
Perform comprehensive health check.
Returns:
Dict containing overall status and component health
"""
components = {
"database": self._check_database(),
"configuration": self._check_configuration(),
"redis": self._check_redis(),
}
overall_status = self._determine_overall_status(components)
return {
"status": overall_status,
"timestamp": datetime.utcnow().isoformat(),
"components": components,
"system": self._get_system_info(),
}
def _check_database(self) -> dict[str, Any]:
"""Check database connectivity and status."""
try:
with get_db_session() as session:
# Simple query to test connection
result = session.execute("SELECT 1 as test").fetchone()
if result and result[0] == 1:
return {
"status": HealthStatus.HEALTHY,
"message": "Database connection successful",
"response_time_ms": 0, # Could add timing if needed
}
else:
return {
"status": HealthStatus.UNHEALTHY,
"message": "Database query failed",
"error": "Unexpected query result",
}
except Exception as e:
self.logger.error(f"Database health check failed: {e}")
return {
"status": HealthStatus.UNHEALTHY,
"message": "Database connection failed",
"error": str(e),
}
def _check_redis(self) -> dict[str, Any]:
"""Check Redis connectivity if configured."""
if not hasattr(settings, "redis") or not settings.redis.host:
return {
"status": HealthStatus.HEALTHY,
"message": "Redis not configured (using in-memory cache)",
"note": "This is normal for personal use setup",
}
try:
import redis
redis_client = redis.Redis(
host=settings.redis.host,
port=settings.redis.port,
db=settings.redis.db,
decode_responses=True,
socket_timeout=5.0,
)
# Test Redis connection
redis_client.ping()
return {
"status": HealthStatus.HEALTHY,
"message": "Redis connection successful",
"host": settings.redis.host,
"port": settings.redis.port,
}
except ImportError:
return {
"status": HealthStatus.DEGRADED,
"message": "Redis library not available",
"note": "Falling back to in-memory cache",
}
except Exception as e:
self.logger.warning(f"Redis health check failed: {e}")
return {
"status": HealthStatus.DEGRADED,
"message": "Redis connection failed, using in-memory cache",
"error": str(e),
}
def _check_configuration(self) -> dict[str, Any]:
"""Check application configuration status."""
warnings = []
errors = []
# Check required API keys
if not getattr(settings, "tiingo_api_key", None):
warnings.append("TIINGO_API_KEY not configured")
# Check optional API keys
optional_keys = ["fred_api_key", "openai_api_key", "anthropic_api_key"]
missing_optional = []
for key in optional_keys:
if not getattr(settings, key, None):
missing_optional.append(key.upper())
if missing_optional:
warnings.append(
f"Optional API keys not configured: {', '.join(missing_optional)}"
)
# Check database configuration
if not settings.database_url:
errors.append("DATABASE_URL not configured")
# Determine status
if errors:
status = HealthStatus.UNHEALTHY
message = f"Configuration errors: {'; '.join(errors)}"
elif warnings:
status = HealthStatus.DEGRADED
message = f"Configuration warnings: {'; '.join(warnings)}"
else:
status = HealthStatus.HEALTHY
message = "Configuration is valid"
result = {
"status": status,
"message": message,
}
if warnings:
result["warnings"] = warnings
if errors:
result["errors"] = errors
return result
def _determine_overall_status(self, components: dict[str, dict[str, Any]]) -> str:
"""Determine overall system status from component statuses."""
statuses = [comp["status"] for comp in components.values()]
if HealthStatus.UNHEALTHY in statuses:
return HealthStatus.UNHEALTHY
elif HealthStatus.DEGRADED in statuses:
return HealthStatus.DEGRADED
elif all(status == HealthStatus.HEALTHY for status in statuses):
return HealthStatus.HEALTHY
else:
return HealthStatus.UNKNOWN
def _get_system_info(self) -> dict[str, Any]:
"""Get basic system information."""
return {
"app_name": settings.app_name,
"version": getattr(settings, "version", "0.1.0"),
"environment": getattr(settings, "environment", "development"),
"python_version": f"{__import__('sys').version_info.major}.{__import__('sys').version_info.minor}",
}
```
--------------------------------------------------------------------------------
/maverick_mcp/validation/responses.py:
--------------------------------------------------------------------------------
```python
"""
Base response models for API standardization.
This module provides standard response formats for all API endpoints
to ensure consistency across the application.
"""
from datetime import UTC, datetime
from typing import Any, TypeVar
from pydantic import BaseModel, Field
T = TypeVar("T")
class BaseResponse(BaseModel):
"""Base response model with standard fields."""
success: bool = Field(..., description="Whether the request was successful")
message: str | None = Field(None, description="Human-readable message")
timestamp: datetime = Field(
default_factory=lambda: datetime.now(UTC),
description="Response timestamp",
)
request_id: str | None = Field(None, description="Request tracking ID")
class DataResponse[T](BaseResponse):
"""Response model with data payload."""
data: T = Field(..., description="Response data")
class ListResponse[T](BaseResponse):
"""Response model for paginated lists."""
data: list[T] = Field(..., description="List of items")
total: int = Field(..., description="Total number of items")
limit: int = Field(..., description="Number of items per page")
offset: int = Field(..., description="Number of items skipped")
has_more: bool = Field(..., description="Whether more items are available")
class ErrorDetail(BaseModel):
"""Detailed error information."""
code: str = Field(..., description="Error code")
field: str | None = Field(None, description="Field that caused the error")
message: str = Field(..., description="Error message")
context: dict[str, Any] | None = Field(None, description="Additional context")
class ErrorResponse(BaseResponse):
"""Standard error response model."""
success: bool = Field(default=False, description="Always false for errors")
error: ErrorDetail = Field(..., description="Error details")
status_code: int = Field(..., description="HTTP status code")
trace_id: str | None = Field(None, description="Error trace ID for debugging")
class ValidationErrorResponse(ErrorResponse):
"""Response for validation errors."""
errors: list[ErrorDetail] = Field(..., description="List of validation errors")
class BatchOperationResult(BaseModel):
"""Result of a batch operation on a single item."""
id: str = Field(..., description="Item identifier")
success: bool = Field(..., description="Whether the operation succeeded")
error: ErrorDetail | None = Field(None, description="Error if operation failed")
data: dict[str, Any] | None = Field(None, description="Operation result data")
class BatchResponse(BaseResponse):
"""Response for batch operations."""
results: list[BatchOperationResult] = Field(
..., description="Results for each item"
)
successful: int = Field(..., description="Number of successful operations")
failed: int = Field(..., description="Number of failed operations")
partial: bool = Field(..., description="Whether some operations failed")
class HealthStatus(BaseModel):
"""Health check status for a component."""
name: str = Field(..., description="Component name")
status: str = Field(..., description="Status (healthy, unhealthy, degraded)")
latency_ms: float | None = Field(None, description="Response time in milliseconds")
details: dict[str, Any] | None = Field(None, description="Additional details")
class HealthResponse(BaseResponse):
"""Health check response."""
status: str = Field(..., description="Overall status")
components: list[HealthStatus] = Field(
..., description="Status of individual components"
)
version: str | None = Field(None, description="Application version")
uptime_seconds: int | None = Field(None, description="Uptime in seconds")
class RateLimitInfo(BaseModel):
"""Rate limit information."""
limit: int = Field(..., description="Request limit")
remaining: int = Field(..., description="Remaining requests")
reset: datetime = Field(..., description="When the limit resets")
retry_after: int | None = Field(None, description="Seconds to wait before retrying")
class RateLimitResponse(ErrorResponse):
"""Response when rate limit is exceeded."""
rate_limit: RateLimitInfo = Field(..., description="Rate limit details")
class WebhookEvent(BaseModel):
"""Webhook event payload."""
event_id: str = Field(..., description="Unique event identifier")
event_type: str = Field(..., description="Type of event")
timestamp: datetime = Field(..., description="When the event occurred")
data: dict[str, Any] = Field(..., description="Event data")
signature: str | None = Field(None, description="Event signature for verification")
class WebhookResponse(BaseResponse):
"""Response for webhook endpoints."""
event_id: str = Field(..., description="Processed event ID")
status: str = Field(..., description="Processing status")
# Helper functions for creating responses
def success_response(
data: Any = None,
message: str | None = None,
request_id: str | None = None,
) -> dict[str, Any]:
"""Create a successful response."""
response = {"success": True, "timestamp": datetime.now(UTC).isoformat()}
if message:
response["message"] = message
if request_id:
response["request_id"] = request_id
if data is not None:
response["data"] = data
return response
def error_response(
code: str,
message: str,
status_code: int,
field: str | None = None,
context: dict[str, Any] | None = None,
trace_id: str | None = None,
) -> dict[str, Any]:
"""Create an error response."""
return {
"success": False,
"timestamp": datetime.now(UTC).isoformat(),
"error": {
"code": code,
"message": message,
"field": field,
"context": context,
},
"status_code": status_code,
"trace_id": trace_id,
}
def validation_error_response(
errors: list[dict[str, Any]], trace_id: str | None = None
) -> dict[str, Any]:
"""Create a validation error response."""
return {
"success": False,
"timestamp": datetime.now(UTC).isoformat(),
"error": {
"code": "VALIDATION_ERROR",
"message": "Validation failed",
},
"errors": errors,
"status_code": 422,
"trace_id": trace_id,
}
```
--------------------------------------------------------------------------------
/maverick_mcp/domain/value_objects/technical_indicators.py:
--------------------------------------------------------------------------------
```python
"""
Value objects for technical indicators.
These are immutable objects representing technical analysis concepts
in the domain layer. They contain no infrastructure dependencies.
"""
from dataclasses import dataclass
from enum import Enum
class Signal(Enum):
"""Trading signal types."""
STRONG_BUY = "strong_buy"
BUY = "buy"
NEUTRAL = "neutral"
SELL = "sell"
STRONG_SELL = "strong_sell"
class TrendDirection(Enum):
"""Market trend directions."""
STRONG_UPTREND = "strong_uptrend"
UPTREND = "uptrend"
SIDEWAYS = "sideways"
DOWNTREND = "downtrend"
STRONG_DOWNTREND = "strong_downtrend"
@dataclass(frozen=True)
class RSIIndicator:
"""Relative Strength Index value object."""
value: float
period: int = 14
def __post_init__(self):
if not 0 <= self.value <= 100:
raise ValueError("RSI must be between 0 and 100")
if self.period <= 0:
raise ValueError("Period must be positive")
@property
def is_overbought(self) -> bool:
"""Check if RSI indicates overbought condition."""
return self.value >= 70
@property
def is_oversold(self) -> bool:
"""Check if RSI indicates oversold condition."""
return self.value <= 30
@property
def signal(self) -> Signal:
"""Get trading signal based on RSI value."""
if self.value >= 80:
return Signal.STRONG_SELL
elif self.value >= 70:
return Signal.SELL
elif self.value <= 20:
return Signal.STRONG_BUY
elif self.value <= 30:
return Signal.BUY
else:
return Signal.NEUTRAL
@dataclass(frozen=True)
class MACDIndicator:
"""MACD (Moving Average Convergence Divergence) value object."""
macd_line: float
signal_line: float
histogram: float
fast_period: int = 12
slow_period: int = 26
signal_period: int = 9
@property
def is_bullish_crossover(self) -> bool:
"""Check if MACD crossed above signal line."""
return self.macd_line > self.signal_line and self.histogram > 0
@property
def is_bearish_crossover(self) -> bool:
"""Check if MACD crossed below signal line."""
return self.macd_line < self.signal_line and self.histogram < 0
@property
def signal(self) -> Signal:
"""Get trading signal based on MACD."""
if self.is_bullish_crossover and self.histogram > 0.5:
return Signal.STRONG_BUY
elif self.is_bullish_crossover:
return Signal.BUY
elif self.is_bearish_crossover and self.histogram < -0.5:
return Signal.STRONG_SELL
elif self.is_bearish_crossover:
return Signal.SELL
else:
return Signal.NEUTRAL
@dataclass(frozen=True)
class BollingerBands:
"""Bollinger Bands value object."""
upper_band: float
middle_band: float
lower_band: float
current_price: float
period: int = 20
std_dev: int = 2
@property
def bandwidth(self) -> float:
"""Calculate bandwidth (volatility indicator)."""
return (self.upper_band - self.lower_band) / self.middle_band
@property
def percent_b(self) -> float:
"""Calculate %B (position within bands)."""
denominator = self.upper_band - self.lower_band
if denominator == 0:
return 0.5 # Return middle if bands are flat
return (self.current_price - self.lower_band) / denominator
@property
def is_squeeze(self) -> bool:
"""Check if bands are in a squeeze (low volatility)."""
return self.bandwidth < 0.1
@property
def signal(self) -> Signal:
"""Get trading signal based on Bollinger Bands."""
if self.current_price > self.upper_band:
return Signal.SELL
elif self.current_price < self.lower_band:
return Signal.BUY
elif self.percent_b > 0.8:
return Signal.SELL
elif self.percent_b < 0.2:
return Signal.BUY
else:
return Signal.NEUTRAL
@dataclass(frozen=True)
class StochasticOscillator:
"""Stochastic Oscillator value object."""
k_value: float
d_value: float
period: int = 14
def __post_init__(self):
if not 0 <= self.k_value <= 100:
raise ValueError("%K must be between 0 and 100")
if not 0 <= self.d_value <= 100:
raise ValueError("%D must be between 0 and 100")
@property
def is_overbought(self) -> bool:
"""Check if stochastic indicates overbought."""
return self.k_value >= 80
@property
def is_oversold(self) -> bool:
"""Check if stochastic indicates oversold."""
return self.k_value <= 20
@property
def signal(self) -> Signal:
"""Get trading signal based on stochastic."""
if self.k_value > self.d_value and self.k_value < 20:
return Signal.BUY
elif self.k_value < self.d_value and self.k_value > 80:
return Signal.SELL
elif self.is_oversold:
return Signal.BUY
elif self.is_overbought:
return Signal.SELL
else:
return Signal.NEUTRAL
@dataclass(frozen=True)
class PriceLevel:
"""Support or resistance price level."""
price: float
strength: int # 1-5, with 5 being strongest
touches: int # Number of times price touched this level
def __post_init__(self):
if self.price <= 0:
raise ValueError("Price must be positive")
if not 1 <= self.strength <= 5:
raise ValueError("Strength must be between 1 and 5")
if self.touches < 0:
raise ValueError("Touches must be non-negative")
@dataclass(frozen=True)
class VolumeProfile:
"""Volume analysis value object."""
current_volume: int
average_volume: float
volume_trend: TrendDirection
unusual_activity: bool
@property
def relative_volume(self) -> float:
"""Calculate volume relative to average."""
return (
self.current_volume / self.average_volume if self.average_volume > 0 else 0
)
@property
def is_high_volume(self) -> bool:
"""Check if volume is significantly above average."""
return self.relative_volume > 1.5
@property
def is_low_volume(self) -> bool:
"""Check if volume is significantly below average."""
return self.relative_volume < 0.5
```
--------------------------------------------------------------------------------
/maverick_mcp/utils/logging_example.py:
--------------------------------------------------------------------------------
```python
"""
Example usage of structured logging in Maverick-MCP.
This file demonstrates how to use the structured logging system
in different parts of the application.
"""
import asyncio
from maverick_mcp.utils.logging import (
PerformanceMonitor,
get_logger,
log_cache_operation,
log_database_query,
log_external_api_call,
setup_structured_logging,
)
from maverick_mcp.utils.mcp_logging import LoggingStockDataProvider, with_logging
async def example_basic_logging():
"""Example of basic structured logging."""
# Get a logger for your module
logger = get_logger("maverick_mcp.example")
# Log with structured data
logger.info(
"Processing stock request",
extra={"ticker": "AAPL", "action": "fetch_data", "user_id": "user123"},
)
# Log warnings with context
logger.warning(
"Rate limit approaching",
extra={
"current_requests": 95,
"limit": 100,
"reset_time": "2024-01-15T10:00:00Z",
},
)
# Log errors with full context
try:
# Some operation that might fail
raise ValueError("Invalid ticker symbol")
except Exception:
logger.error(
"Failed to process request",
exc_info=True, # Includes full traceback
extra={"ticker": "INVALID", "error_code": "INVALID_TICKER"},
)
async def example_performance_monitoring():
"""Example of performance monitoring."""
# Monitor a code block
with PerformanceMonitor("data_processing"):
# Simulate some work
await asyncio.sleep(0.1)
_ = [i**2 for i in range(10000)] # Creating data for performance test
# Monitor nested operations
with PerformanceMonitor("full_analysis"):
with PerformanceMonitor("fetch_data"):
await asyncio.sleep(0.05)
with PerformanceMonitor("calculate_indicators"):
await asyncio.sleep(0.03)
with PerformanceMonitor("generate_report"):
await asyncio.sleep(0.02)
async def example_specialized_logging():
"""Example of specialized logging functions."""
# Log cache operations
cache_key = "stock:AAPL:2024-01-01:2024-01-31"
# Cache miss
log_cache_operation("get", cache_key, hit=False, duration_ms=5)
# Cache hit
log_cache_operation("get", cache_key, hit=True, duration_ms=2)
# Cache set
log_cache_operation("set", cache_key, duration_ms=10)
# Log database queries
query = "SELECT * FROM stocks WHERE ticker = :ticker"
params = {"ticker": "AAPL"}
log_database_query(query, params, duration_ms=45)
# Log external API calls
log_external_api_call(
service="yfinance",
endpoint="/quote/AAPL",
method="GET",
status_code=200,
duration_ms=150,
)
# Log API error
log_external_api_call(
service="alphavantage",
endpoint="/time_series",
method="GET",
error="Rate limit exceeded",
)
# Example FastMCP tool with logging
@with_logging("example_tool")
async def example_mcp_tool(context, ticker: str, period: int = 20):
"""
Example MCP tool with automatic logging.
The @with_logging decorator automatically logs:
- Tool invocation with parameters
- Execution time
- Success/failure status
- Context information
"""
logger = get_logger("maverick_mcp.tools.example")
# Tool-specific logging
logger.info(
"Processing advanced analysis",
extra={"ticker": ticker, "period": period, "analysis_type": "comprehensive"},
)
# Simulate work with progress reporting
if hasattr(context, "report_progress"):
await context.report_progress(50, 100, "Analyzing data...")
# Return results
return {"ticker": ticker, "period": period, "result": "analysis_complete"}
# Example of wrapping existing providers with logging
async def example_provider_logging():
"""Example of adding logging to data providers."""
from maverick_mcp.providers.stock_data import StockDataProvider
# Wrap provider with logging
base_provider = StockDataProvider()
logging_provider = LoggingStockDataProvider(base_provider)
# All calls are now automatically logged
_ = await logging_provider.get_stock_data(
ticker="AAPL", start_date="2024-01-01", end_date="2024-01-31"
)
# Example configuration for different environments
def setup_logging_for_environment(environment: str):
"""Configure logging based on environment."""
if environment == "development":
setup_structured_logging(
log_level="DEBUG",
log_format="text", # Human-readable
log_file="dev.log",
)
elif environment == "production":
setup_structured_logging(
log_level="INFO",
log_format="json", # Machine-readable for log aggregation
log_file="/var/log/maverick_mcp/app.log",
)
elif environment == "testing":
setup_structured_logging(
log_level="WARNING",
log_format="json",
log_file=None, # Console only
)
# Example of custom log analysis
def analyze_logs_example():
"""Example of analyzing structured logs."""
import json
# Parse JSON logs
with open("app.log") as f:
for line in f:
try:
log_entry = json.loads(line)
# Analyze slow queries
if log_entry.get("duration_ms", 0) > 1000:
print(
f"Slow operation: {log_entry['operation']} - {log_entry['duration_ms']}ms"
)
# Find errors
if log_entry.get("level") == "ERROR":
print(f"Error: {log_entry['message']} at {log_entry['timestamp']}")
# Track API usage
if log_entry.get("tool_name"):
print(
f"Tool used: {log_entry['tool_name']} by {log_entry.get('user_id', 'unknown')}"
)
except json.JSONDecodeError:
continue
if __name__ == "__main__":
# Set up logging
setup_structured_logging(log_level="DEBUG", log_format="json")
# Run examples
asyncio.run(example_basic_logging())
asyncio.run(example_performance_monitoring())
asyncio.run(example_specialized_logging())
print("\nLogging examples completed. Check the console output for structured logs.")
```
--------------------------------------------------------------------------------
/tests/test_exception_hierarchy.py:
--------------------------------------------------------------------------------
```python
"""
Test the consolidated exception hierarchy.
"""
from maverick_mcp.exceptions import (
AuthenticationError,
AuthorizationError,
CacheConnectionError,
CircuitBreakerError,
ConfigurationError,
ConflictError,
DataIntegrityError,
DataNotFoundError,
DataProviderError,
ExternalServiceError,
MaverickException,
MaverickMCPError,
NotFoundError,
RateLimitError,
ValidationError,
WebhookError,
get_error_message,
)
class TestExceptionHierarchy:
"""Test the consolidated exception hierarchy."""
def test_base_exception(self):
"""Test base MaverickException."""
exc = MaverickException("Test error")
assert exc.message == "Test error"
assert exc.error_code == "INTERNAL_ERROR"
assert exc.status_code == 500
assert exc.field is None
assert exc.context == {}
assert exc.recoverable is True
def test_base_exception_with_params(self):
"""Test base exception with custom parameters."""
exc = MaverickException(
"Custom error",
error_code="CUSTOM_ERROR",
status_code=400,
field="test_field",
context={"key": "value"},
recoverable=False,
)
assert exc.message == "Custom error"
assert exc.error_code == "CUSTOM_ERROR"
assert exc.status_code == 400
assert exc.field == "test_field"
assert exc.context == {"key": "value"}
assert exc.recoverable is False
def test_validation_error(self):
"""Test ValidationError."""
exc = ValidationError("Invalid email format", field="email")
assert exc.message == "Invalid email format"
assert exc.error_code == "VALIDATION_ERROR"
assert exc.status_code == 422
assert exc.field == "email"
def test_authentication_error(self):
"""Test AuthenticationError."""
exc = AuthenticationError()
assert exc.message == "Authentication failed"
assert exc.error_code == "AUTHENTICATION_ERROR"
assert exc.status_code == 401
def test_authorization_error(self):
"""Test AuthorizationError."""
exc = AuthorizationError(resource="portfolio", action="rebalance")
assert "Unauthorized access to portfolio for action 'rebalance'" in exc.message
assert exc.error_code == "AUTHORIZATION_ERROR"
assert exc.status_code == 403
assert exc.context["resource"] == "portfolio"
assert exc.context["action"] == "rebalance"
def test_not_found_error(self):
"""Test NotFoundError."""
exc = NotFoundError("Stock", identifier="AAPL")
assert exc.message == "Stock not found: AAPL"
assert exc.error_code == "NOT_FOUND"
assert exc.status_code == 404
assert exc.context["resource"] == "Stock"
assert exc.context["identifier"] == "AAPL"
def test_rate_limit_error(self):
"""Test RateLimitError."""
exc = RateLimitError(retry_after=60)
assert exc.message == "Rate limit exceeded"
assert exc.error_code == "RATE_LIMIT_EXCEEDED"
assert exc.status_code == 429
assert exc.context["retry_after"] == 60
def test_external_service_error(self):
"""Test ExternalServiceError."""
exc = ExternalServiceError(
"MarketDataAPI",
"Service request failed",
original_error="Connection timeout",
)
assert exc.message == "Service request failed"
assert exc.error_code == "EXTERNAL_SERVICE_ERROR"
assert exc.status_code == 503
assert exc.context["service"] == "MarketDataAPI"
assert exc.context["original_error"] == "Connection timeout"
def test_data_provider_error(self):
"""Test DataProviderError."""
exc = DataProviderError("yfinance", "API request failed")
assert exc.message == "API request failed"
assert exc.error_code == "DATA_PROVIDER_ERROR"
assert exc.status_code == 503
assert exc.context["provider"] == "yfinance"
def test_data_not_found_error(self):
"""Test DataNotFoundError."""
exc = DataNotFoundError("AAPL", date_range=("2024-01-01", "2024-01-31"))
assert "Data not found for symbol 'AAPL'" in exc.message
assert "in range 2024-01-01 to 2024-01-31" in exc.message
assert exc.error_code == "DATA_NOT_FOUND"
assert exc.status_code == 404
assert exc.context["symbol"] == "AAPL"
assert exc.context["date_range"] == ("2024-01-01", "2024-01-31")
def test_exception_to_dict(self):
"""Test exception to_dict method."""
exc = ValidationError("Invalid value", field="age")
exc.context["min_value"] = 0
exc.context["max_value"] = 120
result = exc.to_dict()
assert result == {
"code": "VALIDATION_ERROR",
"message": "Invalid value",
"field": "age",
"context": {"min_value": 0, "max_value": 120},
}
def test_backward_compatibility(self):
"""Test backward compatibility alias."""
assert MaverickMCPError is MaverickException
# Old code should still work
exc = MaverickMCPError("Legacy error")
assert isinstance(exc, MaverickException)
assert exc.message == "Legacy error"
def test_get_error_message(self):
"""Test error message lookup."""
assert get_error_message("VALIDATION_ERROR") == "Request validation failed"
assert get_error_message("NOT_FOUND") == "Resource not found"
assert get_error_message("UNKNOWN_CODE") == "Unknown error"
def test_inheritance_chain(self):
"""Test that all custom exceptions inherit from MaverickException."""
exceptions = [
ValidationError("test"),
AuthenticationError(),
AuthorizationError(),
NotFoundError("test"),
ConflictError("test"),
RateLimitError(),
ExternalServiceError("test", "test"),
DataProviderError("test", "test"),
DataNotFoundError("test"),
DataIntegrityError("test"),
CacheConnectionError("test", "test"),
ConfigurationError("test"),
WebhookError("test"),
CircuitBreakerError("test", 5, 10),
]
for exc in exceptions:
assert isinstance(exc, MaverickException)
assert hasattr(exc, "error_code")
assert hasattr(exc, "status_code")
assert hasattr(exc, "message")
assert hasattr(exc, "context")
assert hasattr(exc, "to_dict")
```
--------------------------------------------------------------------------------
/maverick_mcp/langchain_tools/adapters.py:
--------------------------------------------------------------------------------
```python
"""
Adapters to convert MCP tools to LangChain tools.
"""
import inspect
import logging
from collections.abc import Callable
from typing import Any
from langchain_core.tools import BaseTool, StructuredTool
from pydantic import BaseModel, Field, create_model
logger = logging.getLogger(__name__)
def extract_tool_schema(func: Callable) -> type[BaseModel]:
"""
Extract parameter schema from a function's signature and annotations.
Args:
func: Function to extract schema from
Returns:
Pydantic model representing the function's parameters
"""
sig = inspect.signature(func)
fields = {}
for param_name, param in sig.parameters.items():
if param_name in ["self", "cls"]:
continue
# Get type annotation
param_type = (
param.annotation if param.annotation != inspect.Parameter.empty else Any
)
# Get default value
default = ... if param.default == inspect.Parameter.empty else param.default
# Extract description from docstring if available
description = f"Parameter {param_name}"
if func.__doc__:
# Simple extraction - could be improved with proper docstring parsing
lines = func.__doc__.split("\n")
for line in lines:
if param_name in line and ":" in line:
description = line.split(":", 1)[1].strip()
break
fields[param_name] = (
param_type,
Field(default=default, description=description),
)
# Create dynamic model
model_name = f"{func.__name__.title()}Schema"
return create_model(model_name, **fields)
def mcp_to_langchain_adapter(
mcp_tool: Callable,
name: str | None = None,
description: str | None = None,
args_schema: type[BaseModel] | None = None,
return_direct: bool = False,
persona_aware: bool = False,
) -> StructuredTool:
"""
Convert an MCP tool function to a LangChain StructuredTool.
Args:
mcp_tool: The MCP tool function to convert
name: Optional custom name for the tool
description: Optional custom description
args_schema: Optional Pydantic model for arguments
return_direct: Whether to return tool output directly
persona_aware: Whether this tool should be persona-aware
Returns:
LangChain StructuredTool
"""
# Extract metadata
tool_name = name or mcp_tool.__name__
tool_description = description or mcp_tool.__doc__ or f"Tool: {tool_name}"
# Extract or use provided schema
if args_schema is None:
args_schema = extract_tool_schema(mcp_tool)
# Create wrapper function to handle any necessary conversions
async def async_wrapper(**kwargs):
"""Async wrapper for MCP tool."""
try:
result = await mcp_tool(**kwargs)
return _format_tool_result(result)
except Exception as e:
logger.error(f"Error in tool {tool_name}: {str(e)}")
return {"error": str(e), "status": "error"}
def sync_wrapper(**kwargs):
"""Sync wrapper for MCP tool."""
try:
result = mcp_tool(**kwargs)
return _format_tool_result(result)
except Exception as e:
logger.error(f"Error in tool {tool_name}: {str(e)}")
return {"error": str(e), "status": "error"}
# Determine if tool is async
is_async = inspect.iscoroutinefunction(mcp_tool)
# Create the structured tool
if is_async:
tool = StructuredTool(
name=tool_name,
description=tool_description,
coroutine=async_wrapper,
args_schema=args_schema,
return_direct=return_direct,
)
else:
tool = StructuredTool(
name=tool_name,
description=tool_description,
func=sync_wrapper,
args_schema=args_schema,
return_direct=return_direct,
)
# Mark if persona-aware
if persona_aware:
tool.metadata = {"persona_aware": True}
return tool
def _format_tool_result(result: Any) -> str | dict[str, Any]:
"""
Format tool result for LangChain compatibility.
Args:
result: Raw tool result
Returns:
Formatted result
"""
if isinstance(result, dict):
return result
elif isinstance(result, str):
return result
elif hasattr(result, "dict"):
# Pydantic model
return result.dict()
else:
# Convert to string as fallback
return str(result)
def create_langchain_tool(
func: Callable | None = None,
*,
name: str | None = None,
description: str | None = None,
args_schema: type[BaseModel] | None = None,
return_direct: bool = False,
persona_aware: bool = False,
):
"""
Decorator to create a LangChain tool from a function.
Usage:
@create_langchain_tool(name="stock_screener", persona_aware=True)
def screen_stocks(strategy: str, limit: int = 20) -> dict:
...
"""
def decorator(f: Callable) -> StructuredTool:
return mcp_to_langchain_adapter(
f,
name=name,
description=description,
args_schema=args_schema,
return_direct=return_direct,
persona_aware=persona_aware,
)
if func is None:
return decorator
else:
return decorator(func)
# Example persona-aware tool wrapper
class PersonaAwareToolWrapper(BaseTool):
"""Wrapper to make any tool persona-aware."""
wrapped_tool: BaseTool
persona_adjuster: Callable | None = None
persona: str | None = None
def __init__(
self,
tool: BaseTool,
adjuster: Callable | None = None,
persona: str | None = None,
):
super().__init__(
name=f"persona_aware_{tool.name}", description=tool.description
)
self.wrapped_tool = tool
self.persona_adjuster = adjuster
self.persona = persona
def _run(self, *args, **kwargs):
"""Run tool with persona adjustments."""
# Apply persona adjustments if available
if self.persona_adjuster and hasattr(self, "persona"):
kwargs = self.persona_adjuster(kwargs, self.persona)
return self.wrapped_tool._run(*args, **kwargs)
async def _arun(self, *args, **kwargs):
"""Async run tool with persona adjustments."""
# Apply persona adjustments if available
if self.persona_adjuster and hasattr(self, "persona"):
kwargs = self.persona_adjuster(kwargs, self.persona)
return await self.wrapped_tool._arun(*args, **kwargs)
```
--------------------------------------------------------------------------------
/maverick_mcp/logging_config.py:
--------------------------------------------------------------------------------
```python
"""
Structured logging configuration with correlation IDs and error tracking.
"""
import json
import logging
import sys
import traceback
import uuid
from contextvars import ContextVar
from datetime import datetime
from functools import wraps
from typing import Any
# Context variable for correlation ID
correlation_id_var: ContextVar[str | None] = ContextVar("correlation_id", default=None) # type: ignore[assignment]
class StructuredFormatter(logging.Formatter):
"""JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
"""Format log record as JSON with additional metadata."""
# Get correlation ID from context
correlation_id = correlation_id_var.get()
# Build structured log entry
log_entry = {
"timestamp": datetime.now().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"correlation_id": correlation_id,
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Add exception info if present
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": traceback.format_exception(*record.exc_info),
}
# Add extra fields
for key, value in record.__dict__.items():
if key not in [
"name",
"msg",
"args",
"created",
"filename",
"funcName",
"levelname",
"levelno",
"lineno",
"module",
"msecs",
"message",
"pathname",
"process",
"processName",
"relativeCreated",
"thread",
"threadName",
"exc_info",
"exc_text",
"stack_info",
]:
log_entry[key] = value
return json.dumps(log_entry)
class CorrelationIDMiddleware:
"""Middleware to inject correlation IDs into requests."""
@staticmethod
def generate_correlation_id() -> str:
"""Generate a unique correlation ID."""
return f"mcp-{uuid.uuid4().hex[:8]}"
@staticmethod
def set_correlation_id(correlation_id: str | None = None) -> str:
"""Set correlation ID in context."""
if not correlation_id:
correlation_id = CorrelationIDMiddleware.generate_correlation_id()
correlation_id_var.set(correlation_id)
return correlation_id
@staticmethod
def get_correlation_id() -> str | None:
"""Get current correlation ID from context."""
return correlation_id_var.get()
def with_correlation_id(func):
"""Decorator to ensure correlation ID exists for function execution."""
@wraps(func)
def wrapper(*args, **kwargs):
if not correlation_id_var.get():
CorrelationIDMiddleware.set_correlation_id()
return func(*args, **kwargs)
@wraps(func)
async def async_wrapper(*args, **kwargs):
if not correlation_id_var.get():
CorrelationIDMiddleware.set_correlation_id()
return await func(*args, **kwargs)
return async_wrapper if asyncio.iscoroutinefunction(func) else wrapper
class ErrorLogger:
"""Enhanced error logging with context and metrics."""
def __init__(self, logger: logging.Logger):
self.logger = logger
self._error_counts: dict[str, int] = {}
def log_error(
self,
error: Exception,
context: dict[str, Any],
level: int = logging.ERROR,
mask_sensitive: bool = True,
):
"""Log error with full context and tracking."""
error_type = type(error).__name__
self._error_counts[error_type] = self._error_counts.get(error_type, 0) + 1
# Mask sensitive data if requested
if mask_sensitive:
context = self._mask_sensitive_data(context)
# Create structured error log
self.logger.log(
level,
f"{error_type}: {str(error)}",
extra={
"error_type": error_type,
"error_message": str(error),
"error_count": self._error_counts[error_type],
"context": context,
"stack_trace": traceback.format_exc() if sys.exc_info()[0] else None,
},
)
def _mask_sensitive_data(self, data: dict[str, Any]) -> dict[str, Any]:
"""Mask sensitive fields in logging data."""
sensitive_fields = {
"password",
"token",
"api_key",
"secret",
"card_number",
"ssn",
"email",
"phone",
"address",
"bearer",
"authorization",
"x-api-key",
}
masked_data = {}
for key, value in data.items():
if any(sensitive in key.lower() for sensitive in sensitive_fields):
masked_data[key] = "***MASKED***"
elif isinstance(value, dict):
masked_data[key] = self._mask_sensitive_data(value)
else:
masked_data[key] = value
return masked_data
def get_error_stats(self) -> dict[str, int]:
"""Get error count statistics."""
return self._error_counts.copy()
def setup_logging(
level: int = logging.INFO, use_json: bool = True, log_file: str | None = None
):
"""Configure application logging with structured output."""
root_logger = logging.getLogger()
root_logger.setLevel(level)
# Remove existing handlers
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
if use_json:
console_handler.setFormatter(StructuredFormatter())
else:
console_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
root_logger.addHandler(console_handler)
# File handler if specified
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(StructuredFormatter())
root_logger.addHandler(file_handler)
# Set specific logger levels
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
return root_logger
# Import guard for asyncio
try:
import asyncio
except ImportError:
asyncio = None # type: ignore[assignment]
```
--------------------------------------------------------------------------------
/maverick_mcp/api/services/market_service.py:
--------------------------------------------------------------------------------
```python
"""
Market data service for MaverickMCP API.
Handles market overview, economic calendar, and market-related data operations.
Extracted from server.py to improve code organization and maintainability.
"""
from typing import Any
from .base_service import BaseService
class MarketService(BaseService):
"""
Service class for market data operations.
Provides market overview, economic calendar, and related market data functionality.
"""
def register_tools(self):
"""Register market data tools with MCP."""
@self.mcp.tool()
async def get_market_overview() -> dict[str, Any]:
"""
Get comprehensive market overview including major indices, sectors, and market statistics.
Provides real-time market data for major indices (S&P 500, NASDAQ, Dow Jones),
sector performance, market breadth indicators, and key market statistics.
Enhanced features available for authenticated users.
Returns:
Dictionary containing comprehensive market overview data
"""
return await self._get_market_overview()
@self.mcp.tool()
async def get_economic_calendar(days_ahead: int = 7) -> dict[str, Any]:
"""
Get upcoming economic events and earnings announcements.
Args:
days_ahead: Number of days ahead to fetch events (1-30, default: 7)
Returns:
Dictionary containing economic calendar data with upcoming events
"""
return await self._get_economic_calendar(days_ahead)
async def _get_market_overview(self) -> dict[str, Any]:
"""Get market overview implementation."""
try:
from maverick_mcp.providers.market_data import MarketDataProvider
market_provider = MarketDataProvider()
# Get major indices
indices_data = await market_provider.get_major_indices_async()
# Get sector performance
sector_data = await market_provider.get_sector_performance_async()
# Get market breadth indicators
breadth_data = await market_provider.get_market_breadth_async()
# Get top movers
movers_data = await market_provider.get_top_movers_async()
overview = {
"timestamp": market_provider._get_current_timestamp(),
"market_status": "open", # This would be determined by market hours
"indices": indices_data,
"sectors": sector_data,
"market_breadth": breadth_data,
"top_movers": movers_data,
"market_sentiment": {
"fear_greed_index": 45, # Placeholder - would integrate with actual data
"vix": 18.5,
"put_call_ratio": 0.85,
},
"economic_highlights": [
"Fed meeting next week",
"Earnings season continues",
"GDP data released",
],
}
self.log_tool_usage("get_market_overview")
return overview
except Exception as e:
self.logger.error(f"Failed to get market overview: {e}")
return {
"error": f"Failed to fetch market overview: {str(e)}",
"timestamp": self._get_current_timestamp(),
}
async def _get_economic_calendar(self, days_ahead: int = 7) -> dict[str, Any]:
"""Get economic calendar implementation."""
# Validate input
if not isinstance(days_ahead, int) or days_ahead < 1 or days_ahead > 30:
return {
"error": "days_ahead must be an integer between 1 and 30",
"provided_value": days_ahead,
}
try:
from datetime import UTC, datetime, timedelta
from maverick_mcp.providers.market_data import MarketDataProvider
market_provider = MarketDataProvider()
# Calculate date range
start_date = datetime.now(UTC)
end_date = start_date + timedelta(days=days_ahead)
# Get economic events
economic_events = await market_provider.get_economic_events_async(
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
)
# Get earnings calendar
earnings_events = await market_provider.get_earnings_calendar_async(
start_date=start_date.strftime("%Y-%m-%d"),
end_date=end_date.strftime("%Y-%m-%d"),
)
calendar_data = {
"period": {
"start_date": start_date.strftime("%Y-%m-%d"),
"end_date": end_date.strftime("%Y-%m-%d"),
"days_ahead": days_ahead,
},
"economic_events": economic_events,
"earnings_events": earnings_events,
"key_highlights": self._extract_key_highlights(
economic_events, earnings_events
),
"timestamp": market_provider._get_current_timestamp(),
}
self.log_tool_usage("get_economic_calendar", days_ahead=days_ahead)
return calendar_data
except Exception as e:
self.logger.error(f"Failed to get economic calendar: {e}")
return {
"error": f"Failed to fetch economic calendar: {str(e)}",
"days_ahead": days_ahead,
"timestamp": self._get_current_timestamp(),
}
def _extract_key_highlights(
self, economic_events: list, earnings_events: list
) -> list[str]:
"""
Extract key highlights from economic and earnings events.
Args:
economic_events: List of economic events
earnings_events: List of earnings events
Returns:
List of key highlight strings
"""
highlights = []
# Extract high-impact economic events
high_impact_events = [
event
for event in economic_events
if event.get("impact", "").lower() in ["high", "critical"]
]
for event in high_impact_events[:3]: # Top 3 high-impact events
highlights.append(
f"{event.get('name', 'Economic event')} - {event.get('date', 'TBD')}"
)
# Extract major earnings announcements
major_earnings = [
event
for event in earnings_events
if event.get("market_cap", 0) > 100_000_000_000 # $100B+ companies
]
for event in major_earnings[:2]: # Top 2 major earnings
highlights.append(
f"{event.get('symbol', 'Unknown')} earnings - {event.get('date', 'TBD')}"
)
return highlights
def _get_current_timestamp(self) -> str:
"""Get current timestamp in ISO format."""
from datetime import UTC, datetime
return datetime.now(UTC).isoformat()
```
--------------------------------------------------------------------------------
/maverick_mcp/infrastructure/connection_manager.py:
--------------------------------------------------------------------------------
```python
"""
Enhanced connection management for FastMCP server stability.
Provides session persistence, connection monitoring, and tool registration consistency
to prevent tools from disappearing in Claude Desktop.
"""
import asyncio
import logging
import time
import uuid
from typing import Any
from fastmcp import FastMCP
logger = logging.getLogger(__name__)
class ConnectionSession:
"""Represents a single MCP connection session."""
def __init__(self, session_id: str):
self.session_id = session_id
self.created_at = time.time()
self.last_activity = time.time()
self.tools_registered = False
self.is_active = True
def update_activity(self):
"""Update last activity timestamp."""
self.last_activity = time.time()
def is_expired(self, timeout: float = 300.0) -> bool:
"""Check if session is expired (default 5 minutes)."""
return time.time() - self.last_activity > timeout
class MCPConnectionManager:
"""
Enhanced connection manager for FastMCP server stability.
Features:
- Single connection initialization pattern
- Session persistence across reconnections
- Tool registration consistency
- Connection monitoring and debugging
- Automatic cleanup of stale sessions
"""
def __init__(self, mcp_server: FastMCP):
self.mcp_server = mcp_server
self.active_sessions: dict[str, ConnectionSession] = {}
self.tools_registered = False
self.initialization_lock = asyncio.Lock()
self._cleanup_task: asyncio.Task | None = None
# Connection monitoring
self.connection_count = 0
self.total_connections = 0
self.failed_connections = 0
logger.info("MCP Connection Manager initialized")
async def handle_new_connection(self, session_id: str | None = None) -> str:
"""
Handle new MCP connection with single initialization pattern.
Args:
session_id: Optional session ID, generates new one if not provided
Returns:
Session ID for the connection
"""
if session_id is None:
session_id = str(uuid.uuid4())
async with self.initialization_lock:
# Create new session
session = ConnectionSession(session_id)
self.active_sessions[session_id] = session
self.connection_count += 1
self.total_connections += 1
logger.info(
f"New MCP connection: {session_id[:8]} "
f"(active: {self.connection_count}, total: {self.total_connections})"
)
# Ensure tools are registered only once
if not self.tools_registered:
await self._register_tools_once()
self.tools_registered = True
session.tools_registered = True
logger.info("Tools registered for first connection")
else:
logger.info("Tools already registered, skipping registration")
# Start cleanup task if not already running
if self._cleanup_task is None or self._cleanup_task.done():
self._cleanup_task = asyncio.create_task(self._cleanup_loop())
return session_id
async def handle_connection_close(self, session_id: str):
"""Handle connection close event."""
if session_id in self.active_sessions:
session = self.active_sessions[session_id]
session.is_active = False
self.connection_count = max(0, self.connection_count - 1)
logger.info(
f"Connection closed: {session_id[:8]} (active: {self.connection_count})"
)
# Remove session after delay to handle quick reconnections
await asyncio.sleep(5.0)
self.active_sessions.pop(session_id, None)
async def update_session_activity(self, session_id: str):
"""Update session activity timestamp."""
if session_id in self.active_sessions:
self.active_sessions[session_id].update_activity()
async def _register_tools_once(self):
"""Register tools only once to prevent conflicts."""
try:
from maverick_mcp.api.routers.tool_registry import register_all_router_tools
register_all_router_tools(self.mcp_server)
logger.info("Successfully registered all MCP tools")
except Exception as e:
logger.error(f"Failed to register tools: {e}")
self.failed_connections += 1
raise
async def _cleanup_loop(self):
"""Background cleanup of expired sessions."""
while True:
try:
await asyncio.sleep(60) # Check every minute
time.time()
expired_sessions = [
sid
for sid, session in self.active_sessions.items()
if session.is_expired()
]
for session_id in expired_sessions:
logger.info(f"Cleaning up expired session: {session_id[:8]}")
self.active_sessions.pop(session_id, None)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in cleanup loop: {e}")
def get_connection_status(self) -> dict[str, Any]:
"""Get current connection status for monitoring."""
active_count = sum(1 for s in self.active_sessions.values() if s.is_active)
return {
"active_connections": active_count,
"total_sessions": len(self.active_sessions),
"total_connections": self.total_connections,
"failed_connections": self.failed_connections,
"tools_registered": self.tools_registered,
"sessions": [
{
"id": sid[:8],
"active": session.is_active,
"age_seconds": time.time() - session.created_at,
"last_activity": time.time() - session.last_activity,
}
for sid, session in self.active_sessions.items()
],
}
async def shutdown(self):
"""Cleanup on server shutdown."""
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
logger.info("MCP Connection Manager shutdown complete")
# Global connection manager instance
_connection_manager: MCPConnectionManager | None = None
def get_connection_manager(mcp_server: FastMCP) -> MCPConnectionManager:
"""Get or create the global connection manager."""
global _connection_manager
if _connection_manager is None:
_connection_manager = MCPConnectionManager(mcp_server)
return _connection_manager
async def initialize_connection_management(mcp_server: FastMCP) -> MCPConnectionManager:
"""Initialize enhanced connection management."""
manager = get_connection_manager(mcp_server)
logger.info("Enhanced MCP connection management initialized")
return manager
```
--------------------------------------------------------------------------------
/scripts/migrate_db.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Database migration script for MaverickMCP.
This script initializes the SQLite database with all necessary tables
and ensures the schema is properly set up for the application.
"""
import logging
import os
import sys
from pathlib import Path
# Add the project root to the Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
# noqa: E402 - imports must come after sys.path modification
from sqlalchemy import create_engine, text # noqa: E402
from maverick_mcp.data.models import Base # noqa: E402
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("maverick_mcp.migrate")
def get_database_url() -> str:
"""Get the database URL from environment or settings."""
# Use environment variable if set, otherwise default to SQLite
database_url = os.getenv("DATABASE_URL") or "sqlite:///maverick_mcp.db"
logger.info(f"Using database URL: {database_url}")
return database_url
def create_database_if_not_exists(database_url: str) -> None:
"""Create database file if it doesn't exist (for SQLite)."""
if database_url.startswith("sqlite:///"):
# Extract the file path from the URL
db_path = database_url.replace("sqlite:///", "")
if db_path != ":memory:" and not db_path.startswith("./"):
# Handle absolute paths
db_file = Path(db_path)
else:
# Handle relative paths
db_file = Path(db_path.lstrip("./"))
# Create directory if it doesn't exist
db_file.parent.mkdir(parents=True, exist_ok=True)
if not db_file.exists():
logger.info(f"Creating SQLite database file: {db_file}")
# Create empty file
db_file.touch()
else:
logger.info(f"SQLite database already exists: {db_file}")
def test_database_connection(database_url: str) -> bool:
"""Test database connection."""
try:
logger.info("Testing database connection...")
engine = create_engine(database_url, echo=False)
with engine.connect() as conn:
if database_url.startswith("sqlite"):
result = conn.execute(text("SELECT sqlite_version()"))
version = result.scalar()
logger.info(f"Connected to SQLite version: {version}")
elif database_url.startswith("postgresql"):
result = conn.execute(text("SELECT version()"))
version = result.scalar()
logger.info(f"Connected to PostgreSQL: {version[:50]}...")
else:
result = conn.execute(text("SELECT 1"))
logger.info("Database connection successful")
engine.dispose()
return True
except Exception as e:
logger.error(f"Database connection failed: {e}")
return False
def create_tables(database_url: str) -> bool:
"""Create all tables using SQLAlchemy."""
try:
logger.info("Creating database tables...")
engine = create_engine(database_url, echo=False)
# Create all tables
Base.metadata.create_all(bind=engine)
# Verify tables were created
with engine.connect() as conn:
if database_url.startswith("sqlite"):
result = conn.execute(
text("""
SELECT name FROM sqlite_master
WHERE type='table' AND name LIKE 'mcp_%'
ORDER BY name
""")
)
else:
result = conn.execute(
text("""
SELECT table_name FROM information_schema.tables
WHERE table_schema='public' AND table_name LIKE 'mcp_%'
ORDER BY table_name
""")
)
tables = [row[0] for row in result.fetchall()]
logger.info(f"Created {len(tables)} tables: {', '.join(tables)}")
# Verify expected tables exist
expected_tables = {
"mcp_stocks",
"mcp_price_cache",
"mcp_maverick_stocks",
"mcp_maverick_bear_stocks",
"mcp_supply_demand_breakouts",
"mcp_technical_cache",
}
missing_tables = expected_tables - set(tables)
if missing_tables:
logger.warning(f"Missing expected tables: {missing_tables}")
else:
logger.info("All expected tables created successfully")
engine.dispose()
return True
except Exception as e:
logger.error(f"Table creation failed: {e}")
return False
def verify_schema(database_url: str) -> bool:
"""Verify the database schema is correct."""
try:
logger.info("Verifying database schema...")
engine = create_engine(database_url, echo=False)
with engine.connect() as conn:
# Check that we can query each main table
test_queries = [
("mcp_stocks", "SELECT COUNT(*) FROM mcp_stocks"),
("mcp_price_cache", "SELECT COUNT(*) FROM mcp_price_cache"),
("mcp_maverick_stocks", "SELECT COUNT(*) FROM mcp_maverick_stocks"),
(
"mcp_maverick_bear_stocks",
"SELECT COUNT(*) FROM mcp_maverick_bear_stocks",
),
(
"mcp_supply_demand_breakouts",
"SELECT COUNT(*) FROM mcp_supply_demand_breakouts",
),
("mcp_technical_cache", "SELECT COUNT(*) FROM mcp_technical_cache"),
]
for table_name, query in test_queries:
try:
result = conn.execute(text(query))
count = result.scalar()
logger.info(f"✓ {table_name}: {count} records")
except Exception as e:
logger.error(f"✗ {table_name}: {e}")
return False
engine.dispose()
logger.info("Schema verification completed successfully")
return True
except Exception as e:
logger.error(f"Schema verification failed: {e}")
return False
def main():
"""Main migration function."""
logger.info("Starting MaverickMCP database migration...")
# Get database URL
database_url = get_database_url()
# Create database file if needed (SQLite)
create_database_if_not_exists(database_url)
# Test connection
if not test_database_connection(database_url):
logger.error("Database connection failed. Exiting.")
return False
# Create tables
if not create_tables(database_url):
logger.error("Table creation failed. Exiting.")
return False
# Verify schema
if not verify_schema(database_url):
logger.error("Schema verification failed. Exiting.")
return False
logger.info("✅ Database migration completed successfully!")
logger.info(f"Database ready at: {database_url}")
return True
if __name__ == "__main__":
success = main()
if not success:
sys.exit(1)
```
--------------------------------------------------------------------------------
/scripts/validate_setup.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Validate setup for the Tiingo data loader.
This script checks that all required dependencies are installed
and provides setup instructions if anything is missing.
"""
import os
import sys
from pathlib import Path
def check_dependency(module_name, package_name=None, description=""):
"""Check if a dependency is available."""
if package_name is None:
package_name = module_name
try:
__import__(module_name)
print(f"✅ {module_name}: Available")
return True
except ImportError:
print(f"❌ {module_name}: Missing - {description}")
print(f" Install with: pip install {package_name}")
return False
def check_environment():
"""Check Python environment and dependencies."""
print("🐍 Python Environment Check")
print("=" * 40)
print(f"Python version: {sys.version}")
print(f"Python executable: {sys.executable}")
print()
# Core dependencies
print("📦 Core Dependencies")
print("-" * 20)
deps_ok = True
# Required for async operations
deps_ok &= check_dependency(
"aiohttp", "aiohttp>=3.8.0", "HTTP client for async operations"
)
# Data processing
deps_ok &= check_dependency(
"pandas", "pandas>=2.0.0", "Data manipulation and analysis"
)
deps_ok &= check_dependency("numpy", "numpy>=1.24.0", "Numerical computing")
# Technical indicators
deps_ok &= check_dependency(
"pandas_ta", "pandas-ta>=0.3.14b0", "Technical analysis indicators"
)
# Database
deps_ok &= check_dependency(
"sqlalchemy", "sqlalchemy>=2.0.0", "SQL toolkit and ORM"
)
deps_ok &= check_dependency(
"psycopg2", "psycopg2-binary>=2.9.0", "PostgreSQL adapter"
)
print()
# Optional dependencies
print("🔧 Optional Dependencies")
print("-" * 25)
optional_deps = [
("requests", "requests>=2.28.0", "HTTP library for fallback operations"),
("pytest", "pytest>=7.0.0", "Testing framework"),
("asynctest", "asynctest>=0.13.0", "Async testing utilities"),
]
for module, package, desc in optional_deps:
check_dependency(module, package, desc)
print()
return deps_ok
def check_api_token():
"""Check if Tiingo API token is configured."""
print("🔑 API Configuration")
print("-" * 20)
token = os.getenv("TIINGO_API_TOKEN")
if token:
print(f"✅ TIINGO_API_TOKEN: Set (length: {len(token)})")
return True
else:
print("❌ TIINGO_API_TOKEN: Not set")
print(" Get your free API token at: https://www.tiingo.com")
print(" Set with: export TIINGO_API_TOKEN=your_token_here")
return False
def check_database():
"""Check database connection."""
print("\n🗄️ Database Configuration")
print("-" * 26)
db_url = os.getenv("DATABASE_URL") or os.getenv("POSTGRES_URL")
if db_url:
# Mask password in URL for display
masked_url = db_url
if "@" in db_url and "://" in db_url:
parts = db_url.split("://", 1)
if len(parts) == 2 and "@" in parts[1]:
user_pass, host_db = parts[1].split("@", 1)
if ":" in user_pass:
user, _ = user_pass.split(":", 1)
masked_url = f"{parts[0]}://{user}:****@{host_db}"
print("✅ DATABASE_URL: Set")
print(f" URL: {masked_url}")
# Try to connect if SQLAlchemy is available
try:
import sqlalchemy
engine = sqlalchemy.create_engine(db_url)
with engine.connect() as conn:
result = conn.execute(sqlalchemy.text("SELECT 1"))
result.fetchone()
print("✅ Database connection: Success")
return True
except ImportError:
print("⚠️ Database connection: Cannot test (SQLAlchemy not installed)")
return True
except Exception as e:
print(f"❌ Database connection: Failed - {e}")
return False
else:
print("❌ DATABASE_URL: Not set")
print(
" Set with: export DATABASE_URL=postgresql://user:pass@localhost/maverick_mcp"
)
return False
def check_project_structure():
"""Check that we're in the right directory structure."""
print("\n📁 Project Structure")
print("-" * 20)
current_dir = Path.cwd()
script_dir = Path(__file__).parent
print(f"Current directory: {current_dir}")
print(f"Script directory: {script_dir}")
# Check for expected files
expected_files = [
"load_tiingo_data.py",
"tiingo_config.py",
"load_example.py",
"requirements_tiingo.txt",
]
all_present = True
for file in expected_files:
file_path = script_dir / file
if file_path.exists():
print(f"✅ {file}: Found")
else:
print(f"❌ {file}: Missing")
all_present = False
# Check for parent project structure
parent_files = [
"../maverick_mcp/__init__.py",
"../maverick_mcp/data/models.py",
"../maverick_mcp/core/technical_analysis.py",
]
print("\nParent project files:")
for file in parent_files:
file_path = script_dir / file
if file_path.exists():
print(f"✅ {file}: Found")
else:
print(f"❌ {file}: Missing")
all_present = False
return all_present
def provide_setup_instructions():
"""Provide setup instructions."""
print("\n🚀 Setup Instructions")
print("=" * 21)
print("1. Install Python dependencies:")
print(" pip install -r scripts/requirements_tiingo.txt")
print()
print("2. Get Tiingo API token:")
print(" - Sign up at https://www.tiingo.com")
print(" - Get your free API token from the dashboard")
print(" - export TIINGO_API_TOKEN=your_token_here")
print()
print("3. Configure database:")
print(" - Ensure PostgreSQL is running")
print(" - Create maverick_mcp database")
print(" - export DATABASE_URL=postgresql://user:pass@localhost/maverick_mcp")
print()
print("4. Test the setup:")
print(" python3 scripts/validate_setup.py")
print()
print("5. Run a sample load:")
print(" python3 scripts/load_tiingo_data.py --symbols AAPL,MSFT --years 1")
def main():
"""Main validation function."""
print("Tiingo Data Loader Setup Validation")
print("=" * 38)
# Check all components
deps_ok = check_environment()
api_ok = check_api_token()
db_ok = check_database()
structure_ok = check_project_structure()
print("\n" + "=" * 40)
if deps_ok and api_ok and db_ok and structure_ok:
print("🎉 Setup validation PASSED!")
print("You can now use the Tiingo data loader.")
print()
print("Quick start:")
print(" python3 scripts/load_tiingo_data.py --help")
print(" python3 scripts/load_example.py")
return 0
else:
print("❌ Setup validation FAILED!")
print("Please fix the issues above before proceeding.")
print()
provide_setup_instructions()
return 1
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/interfaces/persistence.py:
--------------------------------------------------------------------------------
```python
"""
Data persistence interface.
This module defines the abstract interface for database operations,
enabling different persistence implementations to be used interchangeably.
"""
from typing import Any, Protocol, runtime_checkable
import pandas as pd
from sqlalchemy.orm import Session
@runtime_checkable
class IDataPersistence(Protocol):
"""
Interface for data persistence operations.
This interface abstracts database operations to enable different
implementations (SQLAlchemy, MongoDB, etc.) to be used interchangeably.
"""
async def get_session(self) -> Session:
"""
Get a database session.
Returns:
Database session for operations
"""
...
async def get_read_only_session(self) -> Session:
"""
Get a read-only database session.
Returns:
Read-only database session for queries
"""
...
async def save_price_data(
self, session: Session, symbol: str, data: pd.DataFrame
) -> int:
"""
Save stock price data to persistence layer.
Args:
session: Database session
symbol: Stock ticker symbol
data: DataFrame with OHLCV data
Returns:
Number of records saved
"""
...
async def get_price_data(
self,
session: Session,
symbol: str,
start_date: str,
end_date: str,
) -> pd.DataFrame:
"""
Retrieve stock price data from persistence layer.
Args:
session: Database session
symbol: Stock ticker symbol
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
Returns:
DataFrame with historical price data
"""
...
async def get_or_create_stock(self, session: Session, symbol: str) -> Any:
"""
Get or create a stock record.
Args:
session: Database session
symbol: Stock ticker symbol
Returns:
Stock entity/record
"""
...
async def save_screening_results(
self,
session: Session,
screening_type: str,
results: list[dict[str, Any]],
) -> int:
"""
Save stock screening results.
Args:
session: Database session
screening_type: Type of screening (e.g., 'maverick', 'bearish', 'trending')
results: List of screening results
Returns:
Number of records saved
"""
...
async def get_screening_results(
self,
session: Session,
screening_type: str,
limit: int | None = None,
min_score: float | None = None,
) -> list[dict[str, Any]]:
"""
Retrieve stock screening results.
Args:
session: Database session
screening_type: Type of screening
limit: Maximum number of results
min_score: Minimum score filter
Returns:
List of screening results
"""
...
async def get_latest_screening_data(self) -> dict[str, list[dict[str, Any]]]:
"""
Get the latest screening data for all types.
Returns:
Dictionary with all screening types and their latest results
"""
...
async def check_data_freshness(self, symbol: str, max_age_hours: int = 24) -> bool:
"""
Check if cached data for a symbol is fresh enough.
Args:
symbol: Stock ticker symbol
max_age_hours: Maximum age in hours before data is considered stale
Returns:
True if data is fresh, False if stale or missing
"""
...
async def bulk_save_price_data(
self, session: Session, symbol: str, data: pd.DataFrame
) -> int:
"""
Bulk save price data for better performance.
Args:
session: Database session
symbol: Stock ticker symbol
data: DataFrame with OHLCV data
Returns:
Number of records saved
"""
...
async def get_symbols_with_data(
self, session: Session, limit: int | None = None
) -> list[str]:
"""
Get list of symbols that have price data.
Args:
session: Database session
limit: Maximum number of symbols to return
Returns:
List of stock symbols
"""
...
async def cleanup_old_data(self, session: Session, days_to_keep: int = 365) -> int:
"""
Clean up old data beyond retention period.
Args:
session: Database session
days_to_keep: Number of days of data to retain
Returns:
Number of records deleted
"""
...
class DatabaseConfig:
"""
Configuration class for database connections.
This class encapsulates database-related configuration parameters
to reduce coupling between persistence implementations and configuration sources.
"""
def __init__(
self,
database_url: str = "sqlite:///maverick_mcp.db",
pool_size: int = 5,
max_overflow: int = 10,
pool_timeout: int = 30,
pool_recycle: int = 3600,
echo: bool = False,
autocommit: bool = False,
autoflush: bool = True,
expire_on_commit: bool = True,
):
"""
Initialize database configuration.
Args:
database_url: Database connection URL
pool_size: Connection pool size
max_overflow: Maximum connection overflow
pool_timeout: Pool checkout timeout in seconds
pool_recycle: Connection recycle time in seconds
echo: Whether to echo SQL statements
autocommit: Whether to autocommit transactions
autoflush: Whether to autoflush sessions
expire_on_commit: Whether to expire objects on commit
"""
self.database_url = database_url
self.pool_size = pool_size
self.max_overflow = max_overflow
self.pool_timeout = pool_timeout
self.pool_recycle = pool_recycle
self.echo = echo
self.autocommit = autocommit
self.autoflush = autoflush
self.expire_on_commit = expire_on_commit
@property
def is_sqlite(self) -> bool:
"""Check if database is SQLite."""
return self.database_url.startswith("sqlite")
@property
def is_postgresql(self) -> bool:
"""Check if database is PostgreSQL."""
return self.database_url.startswith("postgresql")
@property
def supports_pooling(self) -> bool:
"""Check if database supports connection pooling."""
return not self.is_sqlite # SQLite doesn't benefit from pooling
class PersistenceError(Exception):
"""Base exception for persistence operations."""
pass
class DataNotFoundError(PersistenceError):
"""Raised when requested data is not found."""
pass
class DataValidationError(PersistenceError):
"""Raised when data validation fails."""
pass
class ConnectionError(PersistenceError):
"""Raised when database connection fails."""
pass
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/mocks/mock_market_data.py:
--------------------------------------------------------------------------------
```python
"""
Mock market data provider implementation for testing.
"""
from datetime import UTC, datetime, timedelta
from typing import Any
class MockMarketDataProvider:
"""
Mock implementation of IMarketDataProvider for testing.
"""
def __init__(self, test_data: dict[str, Any] | None = None):
"""
Initialize the mock market data provider.
Args:
test_data: Optional test data to return
"""
self._test_data = test_data or {}
self._call_log: list[dict[str, Any]] = []
async def get_market_summary(self) -> dict[str, Any]:
"""Get mock market summary."""
self._log_call("get_market_summary", {})
if "market_summary" in self._test_data:
return self._test_data["market_summary"]
return {
"^GSPC": {
"name": "S&P 500",
"symbol": "^GSPC",
"price": 4500.25,
"change": 15.75,
"change_percent": 0.35,
},
"^DJI": {
"name": "Dow Jones",
"symbol": "^DJI",
"price": 35000.50,
"change": -50.25,
"change_percent": -0.14,
},
"^IXIC": {
"name": "NASDAQ",
"symbol": "^IXIC",
"price": 14000.75,
"change": 25.30,
"change_percent": 0.18,
},
}
async def get_top_gainers(self, limit: int = 10) -> list[dict[str, Any]]:
"""Get mock top gainers."""
self._log_call("get_top_gainers", {"limit": limit})
if "top_gainers" in self._test_data:
return self._test_data["top_gainers"][:limit]
gainers = [
{
"symbol": "GAINER1",
"price": 150.25,
"change": 15.50,
"change_percent": 11.50,
"volume": 2500000,
},
{
"symbol": "GAINER2",
"price": 85.75,
"change": 8.25,
"change_percent": 10.65,
"volume": 1800000,
},
{
"symbol": "GAINER3",
"price": 45.30,
"change": 4.15,
"change_percent": 10.08,
"volume": 3200000,
},
]
return gainers[:limit]
async def get_top_losers(self, limit: int = 10) -> list[dict[str, Any]]:
"""Get mock top losers."""
self._log_call("get_top_losers", {"limit": limit})
if "top_losers" in self._test_data:
return self._test_data["top_losers"][:limit]
losers = [
{
"symbol": "LOSER1",
"price": 25.50,
"change": -5.75,
"change_percent": -18.38,
"volume": 4500000,
},
{
"symbol": "LOSER2",
"price": 67.20,
"change": -12.80,
"change_percent": -16.00,
"volume": 2100000,
},
{
"symbol": "LOSER3",
"price": 120.45,
"change": -18.55,
"change_percent": -13.35,
"volume": 1600000,
},
]
return losers[:limit]
async def get_most_active(self, limit: int = 10) -> list[dict[str, Any]]:
"""Get mock most active stocks."""
self._log_call("get_most_active", {"limit": limit})
if "most_active" in self._test_data:
return self._test_data["most_active"][:limit]
active = [
{
"symbol": "ACTIVE1",
"price": 200.75,
"change": 5.25,
"change_percent": 2.68,
"volume": 15000000,
},
{
"symbol": "ACTIVE2",
"price": 95.30,
"change": -2.15,
"change_percent": -2.21,
"volume": 12500000,
},
{
"symbol": "ACTIVE3",
"price": 155.80,
"change": 1.85,
"change_percent": 1.20,
"volume": 11200000,
},
]
return active[:limit]
async def get_sector_performance(self) -> dict[str, float]:
"""Get mock sector performance."""
self._log_call("get_sector_performance", {})
if "sector_performance" in self._test_data:
return self._test_data["sector_performance"]
return {
"Technology": 1.25,
"Healthcare": 0.85,
"Financials": -0.45,
"Consumer Discretionary": 0.65,
"Industrials": 0.35,
"Energy": -1.15,
"Utilities": 0.15,
"Materials": -0.25,
"Consumer Staples": 0.55,
"Real Estate": -0.75,
"Communication Services": 0.95,
}
async def get_earnings_calendar(self, days: int = 7) -> list[dict[str, Any]]:
"""Get mock earnings calendar."""
self._log_call("get_earnings_calendar", {"days": days})
if "earnings_calendar" in self._test_data:
return self._test_data["earnings_calendar"]
base_date = datetime.now(UTC).date()
return [
{
"ticker": "EARN1",
"name": "Earnings Corp 1",
"earnings_date": (base_date + timedelta(days=1)).strftime("%Y-%m-%d"),
"eps_estimate": 1.25,
},
{
"ticker": "EARN2",
"name": "Earnings Corp 2",
"earnings_date": (base_date + timedelta(days=3)).strftime("%Y-%m-%d"),
"eps_estimate": 0.85,
},
{
"ticker": "EARN3",
"name": "Earnings Corp 3",
"earnings_date": (base_date + timedelta(days=5)).strftime("%Y-%m-%d"),
"eps_estimate": 2.15,
},
]
async def get_market_overview(self) -> dict[str, Any]:
"""Get mock comprehensive market overview."""
self._log_call("get_market_overview", {})
return {
"timestamp": datetime.now(UTC).isoformat(),
"market_summary": await self.get_market_summary(),
"top_gainers": await self.get_top_gainers(5),
"top_losers": await self.get_top_losers(5),
"sector_performance": await self.get_sector_performance(),
}
# Testing utilities
def _log_call(self, method: str, args: dict[str, Any]) -> None:
"""Log method calls for testing verification."""
self._call_log.append(
{
"method": method,
"args": args,
"timestamp": datetime.now(),
}
)
def get_call_log(self) -> list[dict[str, Any]]:
"""Get the log of method calls."""
return self._call_log.copy()
def clear_call_log(self) -> None:
"""Clear the method call log."""
self._call_log.clear()
def set_test_data(self, key: str, data: Any) -> None:
"""Set test data for a specific key."""
self._test_data[key] = data
def clear_test_data(self) -> None:
"""Clear all test data."""
self._test_data.clear()
```
--------------------------------------------------------------------------------
/maverick_mcp/tests/test_mcp_tool_fixes_pytest.py:
--------------------------------------------------------------------------------
```python
"""
Pytest-compatible test suite for MCP tool fixes.
This test validates that the fixes for:
1. Research returning empty results (API keys not passed to DeepResearchAgent)
2. Portfolio risk analysis cryptic "'high'" error (DataFrame validation and column case)
3. External API key hard dependency (graceful degradation)
All continue to work correctly after code changes.
"""
import os
from datetime import UTC, datetime, timedelta
import pytest
from maverick_mcp.api.routers.data import get_stock_info
from maverick_mcp.api.routers.portfolio import risk_adjusted_analysis, stock_provider
from maverick_mcp.validation.data import GetStockInfoRequest
@pytest.mark.integration
@pytest.mark.external
def test_portfolio_risk_analysis_fix():
"""
Test Issue #2: Portfolio risk analysis DataFrame validation and column case fix.
Validates:
- DataFrame is properly retrieved with correct columns
- Column name case sensitivity is handled correctly
- Date range calculation avoids weekend/holiday issues
- Risk calculations complete successfully
"""
# Test data provider directly first
end_date = (datetime.now(UTC) - timedelta(days=7)).strftime("%Y-%m-%d")
start_date = (datetime.now(UTC) - timedelta(days=365)).strftime("%Y-%m-%d")
df = stock_provider.get_stock_data("MSFT", start_date=start_date, end_date=end_date)
# Verify DataFrame has expected structure
assert not df.empty, "DataFrame should not be empty"
assert df.shape[0] > 200, "Should have substantial historical data"
expected_cols = ["Open", "High", "Low", "Close", "Volume"]
for col in expected_cols:
assert col in df.columns, f"Missing expected column: {col}"
# Test the actual portfolio risk analysis function
result = risk_adjusted_analysis("MSFT", 75.0)
# Verify successful result structure
assert "error" not in result, f"Should not have error: {result}"
assert "current_price" in result, "Should include current price"
assert "risk_level" in result, "Should include risk level"
assert "position_sizing" in result, "Should include position sizing"
assert "analysis" in result, "Should include analysis"
# Verify data types and ranges
assert isinstance(result["current_price"], int | float), (
"Current price should be numeric"
)
assert result["current_price"] > 0, "Current price should be positive"
assert result["risk_level"] == 75.0, "Risk level should match input"
position_size = result["position_sizing"]["suggested_position_size"]
assert isinstance(position_size, int | float), "Position size should be numeric"
assert position_size > 0, "Position size should be positive"
@pytest.mark.integration
@pytest.mark.database
def test_stock_info_external_api_graceful_fallback():
"""
Test Issue #3: External API graceful fallback handling.
Validates:
- External API dependency is optional
- Graceful fallback when EXTERNAL_DATA_API_KEY not configured
- Core stock info functionality still works
"""
request = GetStockInfoRequest(ticker="MSFT")
result = get_stock_info(request)
# Should not have hard errors about missing API keys
if "error" in result:
assert "Invalid API key" not in str(result.get("error")), (
f"Should not have hard API key error: {result}"
)
# Should have basic company information
assert "company" in result, "Should include company information"
assert "market_data" in result, "Should include market data"
company = result.get("company", {})
assert company.get("name"), "Should have company name"
market_data = result.get("market_data", {})
current_price = market_data.get("current_price")
if current_price:
assert isinstance(current_price, int | float), "Price should be numeric"
assert current_price > 0, "Price should be positive"
@pytest.mark.integration
@pytest.mark.external
@pytest.mark.asyncio
async def test_research_agent_api_key_configuration():
"""
Test Issue #1: Research agent API key configuration fix.
Validates:
- DeepResearchAgent is created with API keys from settings
- Search providers are properly initialized
- API keys are correctly passed through the configuration chain
"""
from maverick_mcp.api.routers.research import get_research_agent
# Check environment has required API keys
exa_key = os.getenv("EXA_API_KEY")
tavily_key = os.getenv("TAVILY_API_KEY")
if not (exa_key and tavily_key):
pytest.skip("EXA_API_KEY and TAVILY_API_KEY required for research test")
# Create research agent
agent = get_research_agent()
# Verify agent has search providers
assert hasattr(agent, "search_providers"), "Agent should have search_providers"
assert len(agent.search_providers) > 0, "Should have at least one search provider"
# Verify providers have API keys configured
providers_configured = 0
for provider in agent.search_providers:
if hasattr(provider, "api_key") and provider.api_key:
providers_configured += 1
assert providers_configured > 0, (
"At least one search provider should have API key configured"
)
assert providers_configured >= 2, (
"Should have both EXA and Tavily providers configured"
)
@pytest.mark.unit
def test_llm_configuration_compatibility():
"""
Test LLM configuration fixes.
Validates:
- LLM can be created successfully
- Temperature and streaming settings are compatible with gpt-5-mini
- LLM can handle basic queries without errors
"""
from maverick_mcp.providers.llm_factory import get_llm
# Test LLM creation
llm = get_llm()
assert llm is not None, "LLM should be created successfully"
# Test basic query to ensure configuration is working
openai_key = os.getenv("OPENAI_API_KEY")
if openai_key:
response = llm.invoke("What is 2+2?")
assert response is not None, "LLM should return a response"
assert hasattr(response, "content"), "Response should have content attribute"
assert "4" in response.content, "LLM should correctly answer 2+2=4"
else:
pytest.skip("OPENAI_API_KEY required for LLM test")
@pytest.mark.integration
@pytest.mark.external
@pytest.mark.database
def test_all_mcp_fixes_integration():
"""
Integration test to verify all three MCP tool fixes work together.
This is a comprehensive test that ensures all fixes are compatible
and work correctly in combination.
"""
# Test 1: Portfolio analysis
portfolio_result = risk_adjusted_analysis("AAPL", 50.0)
assert "error" not in portfolio_result, "Portfolio analysis should work"
# Test 2: Stock info
request = GetStockInfoRequest(ticker="AAPL")
stock_info_result = get_stock_info(request)
assert "company" in stock_info_result, "Stock info should work"
# Test 3: Research agent (if API keys available)
exa_key = os.getenv("EXA_API_KEY")
tavily_key = os.getenv("TAVILY_API_KEY")
if exa_key and tavily_key:
from maverick_mcp.api.routers.research import get_research_agent
agent = get_research_agent()
assert len(agent.search_providers) >= 2, "Research agent should have providers"
# Test 4: LLM configuration
from maverick_mcp.providers.llm_factory import get_llm
llm = get_llm()
assert llm is not None, "LLM should be configured correctly"
```
--------------------------------------------------------------------------------
/alembic/versions/009_rename_to_supply_demand.py:
--------------------------------------------------------------------------------
```python
"""Rename tables to Supply/Demand terminology
Revision ID: 009_rename_to_supply_demand
Revises: 008_performance_optimization_indexes
Create Date: 2025-01-27
This migration renames all database objects to use
supply/demand market structure terminology, removing trademarked references.
"""
import sqlalchemy as sa
from alembic import op
# revision identifiers
revision = "009_rename_to_supply_demand"
down_revision = "008_performance_optimization_indexes"
branch_labels = None
depends_on = None
def upgrade():
"""Rename tables and indexes to supply/demand terminology."""
# Check if we're using PostgreSQL or SQLite
bind = op.get_bind()
dialect_name = bind.dialect.name
if dialect_name == "postgresql":
# PostgreSQL supports proper RENAME operations
# 1. Rename the main table
op.rename_table("stocks_minervinistocks", "stocks_supply_demand_breakouts")
# 2. Rename indexes
op.execute(
"ALTER INDEX IF EXISTS idx_stocks_minervinistocks_rs_rating_desc RENAME TO idx_stocks_supply_demand_breakouts_rs_rating_desc"
)
op.execute(
"ALTER INDEX IF EXISTS idx_stocks_minervinistocks_date_analyzed RENAME TO idx_stocks_supply_demand_breakouts_date_analyzed"
)
op.execute(
"ALTER INDEX IF EXISTS idx_stocks_minervinistocks_rs_date RENAME TO idx_stocks_supply_demand_breakouts_rs_date"
)
op.execute(
"ALTER INDEX IF EXISTS idx_minervini_stocks_rs_rating RENAME TO idx_supply_demand_breakouts_rs_rating"
)
# 3. Update any foreign key constraints if they exist
# Note: Adjust these based on your actual foreign key relationships
op.execute("""
DO $$
BEGIN
-- Check if constraint exists before trying to rename
IF EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'fk_minervinistocks_symbol'
) THEN
ALTER TABLE stocks_supply_demand_breakouts
RENAME CONSTRAINT fk_minervinistocks_symbol TO fk_supply_demand_breakouts_symbol;
END IF;
END $$;
""")
elif dialect_name == "sqlite":
# SQLite doesn't support RENAME operations well, need to recreate
# 1. Create new table with same structure
op.create_table(
"stocks_supply_demand_breakouts",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("symbol", sa.String(10), nullable=False),
sa.Column("date_analyzed", sa.Date(), nullable=False),
sa.Column("rs_rating", sa.Integer(), nullable=True),
sa.Column("price", sa.Float(), nullable=True),
sa.Column("volume", sa.BigInteger(), nullable=True),
sa.Column("meets_criteria", sa.Boolean(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"symbol", "date_analyzed", name="uq_supply_demand_breakouts_symbol_date"
),
)
# 2. Copy data from old table to new
op.execute("""
INSERT INTO stocks_supply_demand_breakouts
SELECT * FROM stocks_minervinistocks
""")
# 3. Drop old table
op.drop_table("stocks_minervinistocks")
# 4. Create indexes on new table
op.create_index(
"idx_stocks_supply_demand_breakouts_rs_rating_desc",
"stocks_supply_demand_breakouts",
["rs_rating"],
postgresql_using="btree",
postgresql_ops={"rs_rating": "DESC"},
)
op.create_index(
"idx_stocks_supply_demand_breakouts_date_analyzed",
"stocks_supply_demand_breakouts",
["date_analyzed"],
)
op.create_index(
"idx_stocks_supply_demand_breakouts_rs_date",
"stocks_supply_demand_breakouts",
["symbol", "date_analyzed"],
)
# Log successful migration
print("✅ Successfully renamed tables to Supply/Demand Breakout terminology")
print(" - stocks_minervinistocks → stocks_supply_demand_breakouts")
print(" - All related indexes have been renamed")
def downgrade():
"""Revert table names back to original terminology."""
bind = op.get_bind()
dialect_name = bind.dialect.name
if dialect_name == "postgresql":
# Rename table back
op.rename_table("stocks_supply_demand_breakouts", "stocks_minervinistocks")
# Rename indexes back
op.execute(
"ALTER INDEX IF EXISTS idx_stocks_supply_demand_breakouts_rs_rating_desc RENAME TO idx_stocks_minervinistocks_rs_rating_desc"
)
op.execute(
"ALTER INDEX IF EXISTS idx_stocks_supply_demand_breakouts_date_analyzed RENAME TO idx_stocks_minervinistocks_date_analyzed"
)
op.execute(
"ALTER INDEX IF EXISTS idx_stocks_supply_demand_breakouts_rs_date RENAME TO idx_stocks_minervinistocks_rs_date"
)
op.execute(
"ALTER INDEX IF EXISTS idx_supply_demand_breakouts_rs_rating RENAME TO idx_minervini_stocks_rs_rating"
)
# Rename constraints back
op.execute("""
DO $$
BEGIN
IF EXISTS (
SELECT 1 FROM information_schema.table_constraints
WHERE constraint_name = 'fk_supply_demand_breakouts_symbol'
) THEN
ALTER TABLE stocks_minervinistocks
RENAME CONSTRAINT fk_supply_demand_breakouts_symbol TO fk_minervinistocks_symbol;
END IF;
END $$;
""")
elif dialect_name == "sqlite":
# Create old table structure
op.create_table(
"stocks_minervinistocks",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("symbol", sa.String(10), nullable=False),
sa.Column("date_analyzed", sa.Date(), nullable=False),
sa.Column("rs_rating", sa.Integer(), nullable=True),
sa.Column("price", sa.Float(), nullable=True),
sa.Column("volume", sa.BigInteger(), nullable=True),
sa.Column("meets_criteria", sa.Boolean(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"symbol", "date_analyzed", name="uq_minervinistocks_symbol_date"
),
)
# Copy data back
op.execute("""
INSERT INTO stocks_minervinistocks
SELECT * FROM stocks_supply_demand_breakouts
""")
# Drop new table
op.drop_table("stocks_supply_demand_breakouts")
# Recreate old indexes
op.create_index(
"idx_stocks_minervinistocks_rs_rating_desc",
"stocks_minervinistocks",
["rs_rating"],
)
op.create_index(
"idx_stocks_minervinistocks_date_analyzed",
"stocks_minervinistocks",
["date_analyzed"],
)
op.create_index(
"idx_stocks_minervinistocks_rs_date",
"stocks_minervinistocks",
["symbol", "date_analyzed"],
)
```
--------------------------------------------------------------------------------
/maverick_mcp/providers/interfaces/cache.py:
--------------------------------------------------------------------------------
```python
"""
Cache manager interface.
This module defines the abstract interface for caching operations,
enabling different caching implementations (Redis, in-memory, etc.)
to be used interchangeably throughout the application.
"""
from typing import Any, Protocol, runtime_checkable
@runtime_checkable
class ICacheManager(Protocol):
"""
Interface for cache management operations.
This interface abstracts caching operations to enable different
implementations (Redis, in-memory, etc.) to be used interchangeably.
All methods should be async-compatible to support non-blocking operations.
"""
async def get(self, key: str) -> Any:
"""
Get data from cache.
Args:
key: Cache key to retrieve
Returns:
Cached data or None if not found or expired
"""
...
async def set(self, key: str, value: Any, ttl: int | None = None) -> bool:
"""
Store data in cache.
Args:
key: Cache key
value: Data to cache (must be JSON serializable)
ttl: Time-to-live in seconds (None for default TTL)
Returns:
True if successfully cached, False otherwise
"""
...
async def delete(self, key: str) -> bool:
"""
Delete a key from cache.
Args:
key: Cache key to delete
Returns:
True if key was deleted, False if key didn't exist
"""
...
async def exists(self, key: str) -> bool:
"""
Check if a key exists in cache.
Args:
key: Cache key to check
Returns:
True if key exists and hasn't expired, False otherwise
"""
...
async def clear(self, pattern: str | None = None) -> int:
"""
Clear cache entries.
Args:
pattern: Pattern to match keys (e.g., "stock:*")
If None, clears all cache entries
Returns:
Number of entries cleared
"""
...
async def get_many(self, keys: list[str]) -> dict[str, Any]:
"""
Get multiple values at once for better performance.
Args:
keys: List of cache keys to retrieve
Returns:
Dictionary mapping keys to their cached values
(missing keys will not be in the result)
"""
...
async def set_many(self, items: list[tuple[str, Any, int | None]]) -> int:
"""
Set multiple values at once for better performance.
Args:
items: List of tuples (key, value, ttl)
Returns:
Number of items successfully cached
"""
...
async def delete_many(self, keys: list[str]) -> int:
"""
Delete multiple keys for better performance.
Args:
keys: List of keys to delete
Returns:
Number of keys successfully deleted
"""
...
async def exists_many(self, keys: list[str]) -> dict[str, bool]:
"""
Check existence of multiple keys for better performance.
Args:
keys: List of keys to check
Returns:
Dictionary mapping keys to their existence status
"""
...
async def count_keys(self, pattern: str) -> int:
"""
Count keys matching a pattern.
Args:
pattern: Pattern to match (e.g., "stock:*")
Returns:
Number of matching keys
"""
...
async def get_or_set(
self, key: str, default_value: Any, ttl: int | None = None
) -> Any:
"""
Get value from cache, setting it if it doesn't exist.
Args:
key: Cache key
default_value: Value to set if key doesn't exist
ttl: Time-to-live for the default value
Returns:
Either the existing cached value or the default value
"""
...
async def increment(self, key: str, amount: int = 1) -> int:
"""
Increment a numeric value in cache.
Args:
key: Cache key
amount: Amount to increment by
Returns:
New value after increment
Raises:
ValueError: If the key exists but doesn't contain a numeric value
"""
...
async def set_if_not_exists(
self, key: str, value: Any, ttl: int | None = None
) -> bool:
"""
Set a value only if the key doesn't already exist.
Args:
key: Cache key
value: Value to set
ttl: Time-to-live in seconds
Returns:
True if the value was set, False if key already existed
"""
...
async def get_ttl(self, key: str) -> int | None:
"""
Get the remaining time-to-live for a key.
Args:
key: Cache key
Returns:
Remaining TTL in seconds, None if key doesn't exist or has no TTL
"""
...
async def expire(self, key: str, ttl: int) -> bool:
"""
Set expiration time for an existing key.
Args:
key: Cache key
ttl: Time-to-live in seconds
Returns:
True if expiration was set, False if key doesn't exist
"""
...
class CacheConfig:
"""
Configuration class for cache implementations.
This class encapsulates cache-related configuration parameters
to reduce coupling between cache implementations and configuration sources.
"""
def __init__(
self,
enabled: bool = True,
default_ttl: int = 3600,
max_memory_size: int = 1000,
redis_host: str = "localhost",
redis_port: int = 6379,
redis_db: int = 0,
redis_password: str | None = None,
redis_ssl: bool = False,
connection_pool_size: int = 20,
socket_timeout: int = 5,
socket_connect_timeout: int = 5,
):
"""
Initialize cache configuration.
Args:
enabled: Whether caching is enabled
default_ttl: Default time-to-live in seconds
max_memory_size: Maximum in-memory cache size
redis_host: Redis server host
redis_port: Redis server port
redis_db: Redis database number
redis_password: Redis password (if required)
redis_ssl: Whether to use SSL for Redis connection
connection_pool_size: Redis connection pool size
socket_timeout: Socket timeout in seconds
socket_connect_timeout: Socket connection timeout in seconds
"""
self.enabled = enabled
self.default_ttl = default_ttl
self.max_memory_size = max_memory_size
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_db = redis_db
self.redis_password = redis_password
self.redis_ssl = redis_ssl
self.connection_pool_size = connection_pool_size
self.socket_timeout = socket_timeout
self.socket_connect_timeout = socket_connect_timeout
def get_redis_url(self) -> str:
"""
Get Redis connection URL.
Returns:
Redis connection URL string
"""
scheme = "rediss" if self.redis_ssl else "redis"
auth = f":{self.redis_password}@" if self.redis_password else ""
return f"{scheme}://{auth}{self.redis_host}:{self.redis_port}/{self.redis_db}"
```