This is page 7 of 29. Use http://codebase.md/wshobson/maverick-mcp?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── FUNDING.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── feature_request.md
│ │ ├── question.md
│ │ └── security_report.md
│ ├── pull_request_template.md
│ └── workflows
│ ├── claude-code-review.yml
│ └── claude.yml
├── .gitignore
├── .python-version
├── .vscode
│ ├── launch.json
│ └── settings.json
├── alembic
│ ├── env.py
│ ├── script.py.mako
│ └── versions
│ ├── 001_initial_schema.py
│ ├── 003_add_performance_indexes.py
│ ├── 006_rename_metadata_columns.py
│ ├── 008_performance_optimization_indexes.py
│ ├── 009_rename_to_supply_demand.py
│ ├── 010_self_contained_schema.py
│ ├── 011_remove_proprietary_terms.py
│ ├── 013_add_backtest_persistence_models.py
│ ├── 014_add_portfolio_models.py
│ ├── 08e3945a0c93_merge_heads.py
│ ├── 9374a5c9b679_merge_heads_for_testing.py
│ ├── abf9b9afb134_merge_multiple_heads.py
│ ├── adda6d3fd84b_merge_proprietary_terms_removal_with_.py
│ ├── e0c75b0bdadb_fix_financial_data_precision_only.py
│ ├── f0696e2cac15_add_essential_performance_indexes.py
│ └── fix_database_integrity_issues.py
├── alembic.ini
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── DATABASE_SETUP.md
├── docker-compose.override.yml.example
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── api
│ │ └── backtesting.md
│ ├── BACKTESTING.md
│ ├── COST_BASIS_SPECIFICATION.md
│ ├── deep_research_agent.md
│ ├── exa_research_testing_strategy.md
│ ├── PORTFOLIO_PERSONALIZATION_PLAN.md
│ ├── PORTFOLIO.md
│ ├── SETUP_SELF_CONTAINED.md
│ └── speed_testing_framework.md
├── examples
│ ├── complete_speed_validation.py
│ ├── deep_research_integration.py
│ ├── llm_optimization_example.py
│ ├── llm_speed_demo.py
│ ├── monitoring_example.py
│ ├── parallel_research_example.py
│ ├── speed_optimization_demo.py
│ └── timeout_fix_demonstration.py
├── LICENSE
├── Makefile
├── MANIFEST.in
├── maverick_mcp
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── circuit_breaker.py
│ │ ├── deep_research.py
│ │ ├── market_analysis.py
│ │ ├── optimized_research.py
│ │ ├── supervisor.py
│ │ └── technical_analysis.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── api_server.py
│ │ ├── connection_manager.py
│ │ ├── dependencies
│ │ │ ├── __init__.py
│ │ │ ├── stock_analysis.py
│ │ │ └── technical_analysis.py
│ │ ├── error_handling.py
│ │ ├── inspector_compatible_sse.py
│ │ ├── inspector_sse.py
│ │ ├── middleware
│ │ │ ├── error_handling.py
│ │ │ ├── mcp_logging.py
│ │ │ ├── rate_limiting_enhanced.py
│ │ │ └── security.py
│ │ ├── openapi_config.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── agents.py
│ │ │ ├── backtesting.py
│ │ │ ├── data_enhanced.py
│ │ │ ├── data.py
│ │ │ ├── health_enhanced.py
│ │ │ ├── health_tools.py
│ │ │ ├── health.py
│ │ │ ├── intelligent_backtesting.py
│ │ │ ├── introspection.py
│ │ │ ├── mcp_prompts.py
│ │ │ ├── monitoring.py
│ │ │ ├── news_sentiment_enhanced.py
│ │ │ ├── performance.py
│ │ │ ├── portfolio.py
│ │ │ ├── research.py
│ │ │ ├── screening_ddd.py
│ │ │ ├── screening_parallel.py
│ │ │ ├── screening.py
│ │ │ ├── technical_ddd.py
│ │ │ ├── technical_enhanced.py
│ │ │ ├── technical.py
│ │ │ └── tool_registry.py
│ │ ├── server.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ ├── base_service.py
│ │ │ ├── market_service.py
│ │ │ ├── portfolio_service.py
│ │ │ ├── prompt_service.py
│ │ │ └── resource_service.py
│ │ ├── simple_sse.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── insomnia_export.py
│ │ └── postman_export.py
│ ├── application
│ │ ├── __init__.py
│ │ ├── commands
│ │ │ └── __init__.py
│ │ ├── dto
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_dto.py
│ │ ├── queries
│ │ │ ├── __init__.py
│ │ │ └── get_technical_analysis.py
│ │ └── screening
│ │ ├── __init__.py
│ │ ├── dtos.py
│ │ └── queries.py
│ ├── backtesting
│ │ ├── __init__.py
│ │ ├── ab_testing.py
│ │ ├── analysis.py
│ │ ├── batch_processing_stub.py
│ │ ├── batch_processing.py
│ │ ├── model_manager.py
│ │ ├── optimization.py
│ │ ├── persistence.py
│ │ ├── retraining_pipeline.py
│ │ ├── strategies
│ │ │ ├── __init__.py
│ │ │ ├── base.py
│ │ │ ├── ml
│ │ │ │ ├── __init__.py
│ │ │ │ ├── adaptive.py
│ │ │ │ ├── ensemble.py
│ │ │ │ ├── feature_engineering.py
│ │ │ │ └── regime_aware.py
│ │ │ ├── ml_strategies.py
│ │ │ ├── parser.py
│ │ │ └── templates.py
│ │ ├── strategy_executor.py
│ │ ├── vectorbt_engine.py
│ │ └── visualization.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── constants.py
│ │ ├── database_self_contained.py
│ │ ├── database.py
│ │ ├── llm_optimization_config.py
│ │ ├── logging_settings.py
│ │ ├── plotly_config.py
│ │ ├── security_utils.py
│ │ ├── security.py
│ │ ├── settings.py
│ │ ├── technical_constants.py
│ │ ├── tool_estimation.py
│ │ └── validation.py
│ ├── core
│ │ ├── __init__.py
│ │ ├── technical_analysis.py
│ │ └── visualization.py
│ ├── data
│ │ ├── __init__.py
│ │ ├── cache_manager.py
│ │ ├── cache.py
│ │ ├── django_adapter.py
│ │ ├── health.py
│ │ ├── models.py
│ │ ├── performance.py
│ │ ├── session_management.py
│ │ └── validation.py
│ ├── database
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── optimization.py
│ ├── dependencies.py
│ ├── domain
│ │ ├── __init__.py
│ │ ├── entities
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis.py
│ │ ├── events
│ │ │ └── __init__.py
│ │ ├── portfolio.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ ├── entities.py
│ │ │ ├── services.py
│ │ │ └── value_objects.py
│ │ ├── services
│ │ │ ├── __init__.py
│ │ │ └── technical_analysis_service.py
│ │ ├── stock_analysis
│ │ │ ├── __init__.py
│ │ │ └── stock_analysis_service.py
│ │ └── value_objects
│ │ ├── __init__.py
│ │ └── technical_indicators.py
│ ├── exceptions.py
│ ├── infrastructure
│ │ ├── __init__.py
│ │ ├── cache
│ │ │ └── __init__.py
│ │ ├── caching
│ │ │ ├── __init__.py
│ │ │ └── cache_management_service.py
│ │ ├── connection_manager.py
│ │ ├── data_fetching
│ │ │ ├── __init__.py
│ │ │ └── stock_data_service.py
│ │ ├── health
│ │ │ ├── __init__.py
│ │ │ └── health_checker.py
│ │ ├── persistence
│ │ │ ├── __init__.py
│ │ │ └── stock_repository.py
│ │ ├── providers
│ │ │ └── __init__.py
│ │ ├── screening
│ │ │ ├── __init__.py
│ │ │ └── repositories.py
│ │ └── sse_optimizer.py
│ ├── langchain_tools
│ │ ├── __init__.py
│ │ ├── adapters.py
│ │ └── registry.py
│ ├── logging_config.py
│ ├── memory
│ │ ├── __init__.py
│ │ └── stores.py
│ ├── monitoring
│ │ ├── __init__.py
│ │ ├── health_check.py
│ │ ├── health_monitor.py
│ │ ├── integration_example.py
│ │ ├── metrics.py
│ │ ├── middleware.py
│ │ └── status_dashboard.py
│ ├── providers
│ │ ├── __init__.py
│ │ ├── dependencies.py
│ │ ├── factories
│ │ │ ├── __init__.py
│ │ │ ├── config_factory.py
│ │ │ └── provider_factory.py
│ │ ├── implementations
│ │ │ ├── __init__.py
│ │ │ ├── cache_adapter.py
│ │ │ ├── macro_data_adapter.py
│ │ │ ├── market_data_adapter.py
│ │ │ ├── persistence_adapter.py
│ │ │ └── stock_data_adapter.py
│ │ ├── interfaces
│ │ │ ├── __init__.py
│ │ │ ├── cache.py
│ │ │ ├── config.py
│ │ │ ├── macro_data.py
│ │ │ ├── market_data.py
│ │ │ ├── persistence.py
│ │ │ └── stock_data.py
│ │ ├── llm_factory.py
│ │ ├── macro_data.py
│ │ ├── market_data.py
│ │ ├── mocks
│ │ │ ├── __init__.py
│ │ │ ├── mock_cache.py
│ │ │ ├── mock_config.py
│ │ │ ├── mock_macro_data.py
│ │ │ ├── mock_market_data.py
│ │ │ ├── mock_persistence.py
│ │ │ └── mock_stock_data.py
│ │ ├── openrouter_provider.py
│ │ ├── optimized_screening.py
│ │ ├── optimized_stock_data.py
│ │ └── stock_data.py
│ ├── README.md
│ ├── tests
│ │ ├── __init__.py
│ │ ├── README_INMEMORY_TESTS.md
│ │ ├── test_cache_debug.py
│ │ ├── test_fixes_validation.py
│ │ ├── test_in_memory_routers.py
│ │ ├── test_in_memory_server.py
│ │ ├── test_macro_data_provider.py
│ │ ├── test_mailgun_email.py
│ │ ├── test_market_calendar_caching.py
│ │ ├── test_mcp_tool_fixes_pytest.py
│ │ ├── test_mcp_tool_fixes.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_models_functional.py
│ │ ├── test_server.py
│ │ ├── test_stock_data_enhanced.py
│ │ ├── test_stock_data_provider.py
│ │ └── test_technical_analysis.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── performance_monitoring.py
│ │ ├── portfolio_manager.py
│ │ ├── risk_management.py
│ │ └── sentiment_analysis.py
│ ├── utils
│ │ ├── __init__.py
│ │ ├── agent_errors.py
│ │ ├── batch_processing.py
│ │ ├── cache_warmer.py
│ │ ├── circuit_breaker_decorators.py
│ │ ├── circuit_breaker_services.py
│ │ ├── circuit_breaker.py
│ │ ├── data_chunking.py
│ │ ├── database_monitoring.py
│ │ ├── debug_utils.py
│ │ ├── fallback_strategies.py
│ │ ├── llm_optimization.py
│ │ ├── logging_example.py
│ │ ├── logging_init.py
│ │ ├── logging.py
│ │ ├── mcp_logging.py
│ │ ├── memory_profiler.py
│ │ ├── monitoring_middleware.py
│ │ ├── monitoring.py
│ │ ├── orchestration_logging.py
│ │ ├── parallel_research.py
│ │ ├── parallel_screening.py
│ │ ├── quick_cache.py
│ │ ├── resource_manager.py
│ │ ├── shutdown.py
│ │ ├── stock_helpers.py
│ │ ├── structured_logger.py
│ │ ├── tool_monitoring.py
│ │ ├── tracing.py
│ │ └── yfinance_pool.py
│ ├── validation
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── data.py
│ │ ├── middleware.py
│ │ ├── portfolio.py
│ │ ├── responses.py
│ │ ├── screening.py
│ │ └── technical.py
│ └── workflows
│ ├── __init__.py
│ ├── agents
│ │ ├── __init__.py
│ │ ├── market_analyzer.py
│ │ ├── optimizer_agent.py
│ │ ├── strategy_selector.py
│ │ └── validator_agent.py
│ ├── backtesting_workflow.py
│ └── state.py
├── PLANS.md
├── pyproject.toml
├── pyrightconfig.json
├── README.md
├── scripts
│ ├── dev.sh
│ ├── INSTALLATION_GUIDE.md
│ ├── load_example.py
│ ├── load_market_data.py
│ ├── load_tiingo_data.py
│ ├── migrate_db.py
│ ├── README_TIINGO_LOADER.md
│ ├── requirements_tiingo.txt
│ ├── run_stock_screening.py
│ ├── run-migrations.sh
│ ├── seed_db.py
│ ├── seed_sp500.py
│ ├── setup_database.sh
│ ├── setup_self_contained.py
│ ├── setup_sp500_database.sh
│ ├── test_seeded_data.py
│ ├── test_tiingo_loader.py
│ ├── tiingo_config.py
│ └── validate_setup.py
├── SECURITY.md
├── server.json
├── setup.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── core
│ │ └── test_technical_analysis.py
│ ├── data
│ │ └── test_portfolio_models.py
│ ├── domain
│ │ ├── conftest.py
│ │ ├── test_portfolio_entities.py
│ │ └── test_technical_analysis_service.py
│ ├── fixtures
│ │ └── orchestration_fixtures.py
│ ├── integration
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── README.md
│ │ ├── run_integration_tests.sh
│ │ ├── test_api_technical.py
│ │ ├── test_chaos_engineering.py
│ │ ├── test_config_management.py
│ │ ├── test_full_backtest_workflow_advanced.py
│ │ ├── test_full_backtest_workflow.py
│ │ ├── test_high_volume.py
│ │ ├── test_mcp_tools.py
│ │ ├── test_orchestration_complete.py
│ │ ├── test_portfolio_persistence.py
│ │ ├── test_redis_cache.py
│ │ ├── test_security_integration.py.disabled
│ │ └── vcr_setup.py
│ ├── performance
│ │ ├── __init__.py
│ │ ├── test_benchmarks.py
│ │ ├── test_load.py
│ │ ├── test_profiling.py
│ │ └── test_stress.py
│ ├── providers
│ │ └── test_stock_data_simple.py
│ ├── README.md
│ ├── test_agents_router_mcp.py
│ ├── test_backtest_persistence.py
│ ├── test_cache_management_service.py
│ ├── test_cache_serialization.py
│ ├── test_circuit_breaker.py
│ ├── test_database_pool_config_simple.py
│ ├── test_database_pool_config.py
│ ├── test_deep_research_functional.py
│ ├── test_deep_research_integration.py
│ ├── test_deep_research_parallel_execution.py
│ ├── test_error_handling.py
│ ├── test_event_loop_integrity.py
│ ├── test_exa_research_integration.py
│ ├── test_exception_hierarchy.py
│ ├── test_financial_search.py
│ ├── test_graceful_shutdown.py
│ ├── test_integration_simple.py
│ ├── test_langgraph_workflow.py
│ ├── test_market_data_async.py
│ ├── test_market_data_simple.py
│ ├── test_mcp_orchestration_functional.py
│ ├── test_ml_strategies.py
│ ├── test_optimized_research_agent.py
│ ├── test_orchestration_integration.py
│ ├── test_orchestration_logging.py
│ ├── test_orchestration_tools_simple.py
│ ├── test_parallel_research_integration.py
│ ├── test_parallel_research_orchestrator.py
│ ├── test_parallel_research_performance.py
│ ├── test_performance_optimizations.py
│ ├── test_production_validation.py
│ ├── test_provider_architecture.py
│ ├── test_rate_limiting_enhanced.py
│ ├── test_runner_validation.py
│ ├── test_security_comprehensive.py.disabled
│ ├── test_security_cors.py
│ ├── test_security_enhancements.py.disabled
│ ├── test_security_headers.py
│ ├── test_security_penetration.py
│ ├── test_session_management.py
│ ├── test_speed_optimization_validation.py
│ ├── test_stock_analysis_dependencies.py
│ ├── test_stock_analysis_service.py
│ ├── test_stock_data_fetching_service.py
│ ├── test_supervisor_agent.py
│ ├── test_supervisor_functional.py
│ ├── test_tool_estimation_config.py
│ ├── test_visualization.py
│ └── utils
│ ├── test_agent_errors.py
│ ├── test_logging.py
│ ├── test_parallel_screening.py
│ └── test_quick_cache.py
├── tools
│ ├── check_orchestration_config.py
│ ├── experiments
│ │ ├── validation_examples.py
│ │ └── validation_fixed.py
│ ├── fast_dev.sh
│ ├── hot_reload.py
│ ├── quick_test.py
│ └── templates
│ ├── new_router_template.py
│ ├── new_tool_template.py
│ ├── screening_strategy_template.py
│ └── test_template.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/examples/parallel_research_example.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Example demonstrating the new parallel research capabilities of DeepResearchAgent.
This example shows how to:
1. Initialize DeepResearchAgent with parallel execution
2. Use both parallel and sequential modes
3. Configure parallel execution parameters
4. Access specialized research results from parallel agents
"""
import asyncio
import logging
from datetime import datetime
from typing import Any
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage, BaseMessage
from langchain_core.outputs import ChatGeneration, ChatResult
from maverick_mcp.agents.deep_research import DeepResearchAgent
from maverick_mcp.utils.parallel_research import ParallelResearchConfig
# Set up logging to see parallel execution in action
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
class MockChatModel(BaseChatModel):
"""Mock chat model for testing that extends BaseChatModel properly."""
def __init__(self, responses: list[str]):
super().__init__()
self.responses = responses
self._call_count = 0
@property
def _llm_type(self) -> str:
return "mock"
def _generate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> ChatResult:
response = self.responses[self._call_count % len(self.responses)]
self._call_count += 1
message = AIMessage(content=response)
return ChatResult(generations=[ChatGeneration(message=message)])
async def _agenerate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: AsyncCallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> ChatResult:
return self._generate(messages, stop, **kwargs)
async def main():
"""Demonstrate parallel research capabilities."""
# Create a mock LLM for testing (in real usage, use Claude/GPT)
llm = MockChatModel(
responses=[
'{"KEY_INSIGHTS": ["Strong earnings growth", "Market expansion"], "SENTIMENT": {"direction": "bullish", "confidence": 0.8}, "RISK_FACTORS": ["Market volatility"], "OPPORTUNITIES": ["AI adoption"], "CREDIBILITY": 0.7, "RELEVANCE": 0.9, "SUMMARY": "Positive outlook for tech company"}',
"Comprehensive research synthesis shows positive trends across multiple analysis areas with strong fundamentals and technical indicators supporting continued growth.",
"Technical analysis indicates strong upward momentum with key resistance levels broken.",
"Market sentiment is predominantly bullish with institutional support.",
"Competitive analysis shows strong market position with sustainable advantages.",
]
)
print("🔬 DeepResearchAgent Parallel Execution Demo")
print("=" * 50)
# 1. Create agent with parallel execution enabled (default)
print("\n1. Creating DeepResearchAgent with parallel execution...")
parallel_config = ParallelResearchConfig(
max_concurrent_agents=3, # Run 3 agents in parallel
timeout_per_agent=120, # 2 minutes per agent
enable_fallbacks=True, # Enable fallback to sequential if parallel fails
rate_limit_delay=0.5, # 0.5 second delay between agent starts
)
agent = DeepResearchAgent(
llm=llm,
persona="moderate",
enable_parallel_execution=True,
parallel_config=parallel_config,
# Note: In real usage, provide API keys:
# exa_api_key="your-exa-key",
# tavily_api_key="your-tavily-key"
)
print("✅ Agent created with parallel execution enabled")
print(f" Max concurrent agents: {agent.parallel_config.max_concurrent_agents}")
print(f" Timeout per agent: {agent.parallel_config.timeout_per_agent}s")
# 2. Demonstrate parallel research
print("\n2. Running parallel research...")
# This will automatically use parallel execution
start_time = datetime.now()
try:
# Note: This requires actual search providers (Exa/Tavily API keys) to work fully
# For demo purposes, we'll show the structure
topic = "AAPL stock analysis and investment outlook"
session_id = "demo_session_001"
print(f" Topic: {topic}")
print(f" Session: {session_id}")
print(" 🚀 Starting parallel research execution...")
# In a real environment with API keys, this would work:
# result = await agent.research_comprehensive(
# topic=topic,
# session_id=session_id,
# depth="standard",
# focus_areas=["fundamentals", "technical_analysis", "market_sentiment"],
# use_parallel_execution=True # Explicitly enable (default)
# )
# For demo, we'll simulate the expected response structure
result = {
"status": "success",
"agent_type": "deep_research",
"execution_mode": "parallel",
"persona": "Moderate",
"research_topic": topic,
"research_depth": "standard",
"findings": {
"synthesis": "Comprehensive analysis from multiple specialized agents shows strong fundamentals...",
"key_insights": [
"Strong earnings growth trajectory",
"Positive technical indicators",
"Bullish market sentiment",
"Competitive market position",
],
"overall_sentiment": {"direction": "bullish", "confidence": 0.75},
"risk_assessment": ["Market volatility", "Regulatory risks"],
"investment_implications": {
"opportunities": ["AI growth", "Market expansion"],
"threats": ["Competition", "Economic headwinds"],
"recommended_action": "Consider position building with appropriate risk management",
},
"confidence_score": 0.78,
},
"sources_analyzed": 24,
"confidence_score": 0.78,
"execution_time_ms": 15000, # 15 seconds (faster than sequential)
"parallel_execution_stats": {
"total_tasks": 3,
"successful_tasks": 3,
"failed_tasks": 0,
"parallel_efficiency": 2.8, # 2.8x faster than sequential
"task_breakdown": {
"demo_session_001_fundamental": {
"type": "fundamental",
"status": "completed",
"execution_time": 5.2,
},
"demo_session_001_sentiment": {
"type": "sentiment",
"status": "completed",
"execution_time": 4.8,
},
"demo_session_001_competitive": {
"type": "competitive",
"status": "completed",
"execution_time": 5.5,
},
},
},
}
execution_time = (datetime.now() - start_time).total_seconds()
print(f" ✅ Parallel research completed in {execution_time:.1f}s")
print(" 📊 Results from parallel execution:")
print(f" • Sources analyzed: {result['sources_analyzed']}")
print(
f" • Overall sentiment: {result['findings']['overall_sentiment']['direction']} ({result['findings']['overall_sentiment']['confidence']:.2f} confidence)"
)
print(f" • Key insights: {len(result['findings']['key_insights'])}")
print(
f" • Parallel efficiency: {result['parallel_execution_stats']['parallel_efficiency']:.1f}x speedup"
)
print(
f" • Tasks: {result['parallel_execution_stats']['successful_tasks']}/{result['parallel_execution_stats']['total_tasks']} successful"
)
# Show task breakdown
print("\n 📋 Task Breakdown:")
for _task_id, task_info in result["parallel_execution_stats"][
"task_breakdown"
].items():
task_type = task_info["type"].title()
status = task_info["status"].title()
exec_time = task_info["execution_time"]
print(f" • {task_type} Research: {status} ({exec_time:.1f}s)")
except Exception as e:
print(f" ❌ Parallel research failed (expected without API keys): {e}")
# 3. Demonstrate sequential fallback
print("\n3. Testing sequential fallback...")
_sequential_agent = DeepResearchAgent(
llm=llm,
persona="moderate",
enable_parallel_execution=False, # Force sequential mode
)
print(" ✅ Sequential-only agent created")
print(" 📝 This would use traditional LangGraph workflow for compatibility")
# 4. Show configuration options
print("\n4. Configuration Options:")
print(" 📋 Parallel Execution Configuration:")
print(f" • Max concurrent agents: {parallel_config.max_concurrent_agents}")
print(f" • Timeout per agent: {parallel_config.timeout_per_agent}s")
print(f" • Enable fallbacks: {parallel_config.enable_fallbacks}")
print(f" • Rate limit delay: {parallel_config.rate_limit_delay}s")
print("\n 🎛️ Available Research Types:")
print(" • Fundamental: Financial statements, earnings, valuation")
print(" • Technical: Chart patterns, indicators, price action")
print(" • Sentiment: News analysis, analyst ratings, social sentiment")
print(" • Competitive: Industry analysis, market position, competitors")
# 5. Usage recommendations
print("\n5. Usage Recommendations:")
print(" 💡 When to use parallel execution:")
print(" • Comprehensive research requiring multiple analysis types")
print(" • Time-sensitive research with tight deadlines")
print(" • Research topics requiring diverse data sources")
print(" • When you have sufficient API rate limits")
print("\n ⚠️ When to use sequential execution:")
print(" • Limited API rate limits")
print(" • Simple, focused research queries")
print(" • Debugging and development")
print(" • When consistency with legacy behavior is required")
print("\n6. API Integration Requirements:")
print(" 🔑 For full functionality, provide:")
print(" • EXA_API_KEY: High-quality research content")
print(" • TAVILY_API_KEY: Comprehensive web search")
print(" • Both are optional but recommended for best results")
print("\n" + "=" * 50)
print("🎉 Demo completed! The enhanced DeepResearchAgent now supports:")
print(" ✅ Parallel execution with specialized subagents")
print(" ✅ Automatic fallback to sequential execution")
print(" ✅ Configurable concurrency and timeouts")
print(" ✅ Full backward compatibility")
print(" ✅ Detailed execution statistics and monitoring")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/maverick_mcp/tools/portfolio_manager.py:
--------------------------------------------------------------------------------
```python
"""
Portfolio manager for financial portfolio analysis and management.
This module provides a portfolio management interface for tracking and analyzing investment portfolios.
"""
import asyncio
import json
import logging
import os
from datetime import UTC, datetime
from typing import Any
from dotenv import load_dotenv
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("maverick_mcp.portfolio_manager")
# Load environment variables
load_dotenv()
class PortfolioManager:
"""
Portfolio manager for tracking and analyzing investment portfolios.
"""
def __init__(
self,
portfolio_name: str,
risk_profile: str = "moderate",
portfolio_file: str | None = None,
):
"""
Initialize the portfolio manager
Args:
portfolio_name: Name of the portfolio
risk_profile: Risk profile of the portfolio ('conservative', 'moderate', 'aggressive')
portfolio_file: Path to a JSON file containing portfolio data
"""
self.portfolio_name = portfolio_name
self.risk_profile = risk_profile
self.portfolio_file = portfolio_file
# Load portfolio from file if provided
self.portfolio = []
if portfolio_file and os.path.exists(portfolio_file):
with open(portfolio_file) as f:
data = json.load(f)
self.portfolio = data.get("holdings", [])
self.risk_profile = data.get("risk_profile", risk_profile)
self.portfolio_name = data.get("name", portfolio_name)
self.transaction_history: list[dict[str, Any]] = []
async def add_to_portfolio(self, symbol: str, shares: float, price: float):
"""
Add a stock to the portfolio
Args:
symbol: Stock ticker symbol
shares: Number of shares to add
price: Purchase price per share
"""
# Check if stock already exists in portfolio
for holding in self.portfolio:
if holding["symbol"] == symbol:
# Update existing holding
old_shares = holding["shares"]
old_price = holding["avg_price"]
total_cost = (old_shares * old_price) + (shares * price)
total_shares = old_shares + shares
holding["shares"] = total_shares
holding["avg_price"] = total_cost / total_shares
holding["last_update"] = datetime.now(UTC).isoformat()
# Record transaction
self.transaction_history.append(
{
"type": "buy",
"symbol": symbol,
"shares": shares,
"price": price,
"timestamp": datetime.now(UTC).isoformat(),
}
)
return
# Add new holding
self.portfolio.append(
{
"symbol": symbol,
"shares": shares,
"avg_price": price,
"purchase_date": datetime.now(UTC).isoformat(),
"last_update": datetime.now(UTC).isoformat(),
}
)
# Record transaction
self.transaction_history.append(
{
"type": "buy",
"symbol": symbol,
"shares": shares,
"price": price,
"timestamp": datetime.now(UTC).isoformat(),
}
)
async def remove_from_portfolio(
self, symbol: str, shares: float | None = None, price: float | None = None
):
"""
Remove a stock from the portfolio
Args:
symbol: Stock ticker symbol
shares: Number of shares to remove (if None, remove all shares)
price: Selling price per share
"""
for i, holding in enumerate(self.portfolio):
if holding["symbol"] == symbol:
if shares is None or shares >= holding["shares"]:
# Remove entire holding
removed_holding = self.portfolio.pop(i)
# Record transaction
self.transaction_history.append(
{
"type": "sell",
"symbol": symbol,
"shares": removed_holding["shares"],
"price": price,
"timestamp": datetime.now(UTC).isoformat(),
}
)
else:
# Partially remove holding
holding["shares"] -= shares
holding["last_update"] = datetime.now(UTC).isoformat()
# Record transaction
self.transaction_history.append(
{
"type": "sell",
"symbol": symbol,
"shares": shares,
"price": price,
"timestamp": datetime.now(UTC).isoformat(),
}
)
return True
return False
async def get_portfolio_value(self) -> dict[str, Any]:
"""
Get the current value of the portfolio
Returns:
Dictionary with portfolio value information
"""
if not self.portfolio:
return {
"total_value": 0,
"holdings": [],
"timestamp": datetime.now(UTC).isoformat(),
}
total_value = 0
holdings_data = []
for holding in self.portfolio:
symbol = holding["symbol"]
shares = holding["shares"]
avg_price = holding["avg_price"]
current_price = avg_price # In a real implementation, fetch current price from market data API
# Calculate values
position_value = shares * current_price
cost_basis = shares * avg_price
gain_loss = position_value - cost_basis
gain_loss_pct = (gain_loss / cost_basis) * 100 if cost_basis > 0 else 0
holdings_data.append(
{
"symbol": symbol,
"shares": shares,
"avg_price": avg_price,
"current_price": current_price,
"position_value": position_value,
"cost_basis": cost_basis,
"gain_loss": gain_loss,
"gain_loss_pct": gain_loss_pct,
}
)
total_value += position_value
return {
"total_value": total_value,
"holdings": holdings_data,
"timestamp": datetime.now(UTC).isoformat(),
}
async def get_portfolio_analysis(self) -> dict[str, Any]:
"""
Get a comprehensive analysis of the portfolio
Returns:
Dictionary with portfolio analysis information
"""
if not self.portfolio:
return {
"analysis": "Portfolio is empty. No analysis available.",
"timestamp": datetime.now(UTC).isoformat(),
}
# Get current portfolio value
portfolio_value = await self.get_portfolio_value()
# In a real implementation, perform portfolio analysis here
analysis = "Portfolio analysis not implemented"
return {
"portfolio_data": portfolio_value,
"analysis": analysis,
"risk_profile": self.risk_profile,
"timestamp": datetime.now(UTC).isoformat(),
}
async def get_rebalance_recommendations(self) -> dict[str, Any]:
"""
Get recommendations for rebalancing the portfolio
Returns:
Dictionary with rebalance recommendations
"""
if not self.portfolio:
return {
"recommendations": "Portfolio is empty. No rebalance recommendations available.",
"timestamp": datetime.now(UTC).isoformat(),
}
# Get current portfolio value
portfolio_value = await self.get_portfolio_value()
# In a real implementation, generate rebalancing recommendations here
recommendations = "Rebalance recommendations not implemented"
return {
"portfolio_data": portfolio_value,
"recommendations": recommendations,
"risk_profile": self.risk_profile,
"timestamp": datetime.now(UTC).isoformat(),
}
def save_portfolio(self, filepath: str | None = None):
"""
Save the portfolio to a file
Args:
filepath: Path to save the portfolio to (if None, use the portfolio file path)
"""
if not filepath:
filepath = (
self.portfolio_file
or f"{self.portfolio_name.replace(' ', '_').lower()}_portfolio.json"
)
data = {
"name": self.portfolio_name,
"risk_profile": self.risk_profile,
"holdings": self.portfolio,
"transaction_history": self.transaction_history,
"last_update": datetime.now(UTC).isoformat(),
}
with open(filepath, "w") as f:
json.dump(data, f, indent=2)
logger.info(f"Portfolio saved to {filepath}")
return filepath
async def main():
"""Example usage of the portfolio manager"""
# Create a sample portfolio
portfolio = [
{"symbol": "AAPL", "shares": 10, "avg_price": 170.50},
{"symbol": "MSFT", "shares": 5, "avg_price": 325.25},
{"symbol": "GOOGL", "shares": 2, "avg_price": 140.75},
{"symbol": "AMZN", "shares": 3, "avg_price": 178.30},
{"symbol": "TSLA", "shares": 8, "avg_price": 185.60},
]
# Create the portfolio manager
manager = PortfolioManager(
portfolio_name="Tech Growth Portfolio",
risk_profile="moderate",
)
# Add the sample stocks to the portfolio
for holding in portfolio:
await manager.add_to_portfolio(
symbol=str(holding["symbol"]),
shares=float(holding["shares"]), # type: ignore[arg-type]
price=float(holding["avg_price"]), # type: ignore[arg-type]
)
try:
# Get portfolio value
print("Getting portfolio value...")
portfolio_value = await manager.get_portfolio_value()
print(f"Total portfolio value: ${portfolio_value['total_value']:.2f}")
# Get portfolio analysis
print("\nAnalyzing portfolio...")
analysis = await manager.get_portfolio_analysis()
print("\nPortfolio Analysis:")
print(analysis["analysis"])
# Get rebalance recommendations
print("\nGetting rebalance recommendations...")
rebalance = await manager.get_rebalance_recommendations()
print("\nRebalance Recommendations:")
print(rebalance["recommendations"])
# Save the portfolio
filepath = manager.save_portfolio()
print(f"\nPortfolio saved to {filepath}")
finally:
pass
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/maverick_mcp/application/screening/queries.py:
--------------------------------------------------------------------------------
```python
"""
Screening application queries.
This module contains application service queries that orchestrate
domain services and infrastructure adapters for screening operations.
"""
from datetime import datetime
from typing import Any
from maverick_mcp.domain.screening.entities import (
ScreeningResultCollection,
)
from maverick_mcp.domain.screening.services import IStockRepository, ScreeningService
from maverick_mcp.domain.screening.value_objects import (
ScreeningCriteria,
ScreeningStrategy,
SortingOptions,
)
class GetScreeningResultsQuery:
"""
Application query for retrieving screening results.
This query orchestrates the domain service and infrastructure
to provide a complete screening operation.
"""
def __init__(self, stock_repository: IStockRepository):
"""
Initialize the query with required dependencies.
Args:
stock_repository: Repository for accessing stock data
"""
self._stock_repository = stock_repository
self._screening_service = ScreeningService()
async def execute(
self,
strategy: ScreeningStrategy,
limit: int = 20,
criteria: ScreeningCriteria | None = None,
sorting: SortingOptions | None = None,
) -> ScreeningResultCollection:
"""
Execute the screening query.
Args:
strategy: The screening strategy to use
limit: Maximum number of results to return
criteria: Optional filtering criteria
sorting: Optional sorting configuration
Returns:
ScreeningResultCollection with results and metadata
"""
# Validate and adjust limit
validated_limit = self._screening_service.validate_screening_limits(limit)
# Get raw data from repository based on strategy
raw_data = await self._get_raw_data_for_strategy(
strategy, validated_limit, criteria
)
# Convert raw data to domain entities
screening_results = []
for raw_result in raw_data:
try:
result = self._screening_service.create_screening_result_from_raw_data(
raw_result, datetime.utcnow()
)
screening_results.append(result)
except Exception as e:
# Log and skip invalid results
# In a real application, we'd use proper logging
print(
f"Warning: Skipped invalid result for {raw_result.get('stock', 'unknown')}: {e}"
)
continue
# Apply additional filtering if criteria provided
if criteria and criteria.has_any_filters():
screening_results = self._screening_service.apply_screening_criteria(
screening_results, criteria
)
# Apply sorting
if sorting is None:
sorting = SortingOptions.for_strategy(strategy)
screening_results = self._screening_service.sort_screening_results(
screening_results, sorting
)
# Limit results after filtering and sorting
screening_results = screening_results[:validated_limit]
# Create and return collection
return self._screening_service.create_screening_collection(
screening_results,
strategy,
len(raw_data), # Total candidates before filtering
)
async def _get_raw_data_for_strategy(
self,
strategy: ScreeningStrategy,
limit: int,
criteria: ScreeningCriteria | None,
) -> list[dict[str, Any]]:
"""
Get raw data from repository based on strategy.
This method handles the strategy-specific repository calls
and basic filtering that can be done at the data layer.
"""
if strategy == ScreeningStrategy.MAVERICK_BULLISH:
min_score = None
if criteria and criteria.min_combined_score:
min_score = criteria.min_combined_score
return self._stock_repository.get_maverick_stocks(
limit=limit * 2, # Get more to allow for filtering
min_score=min_score,
)
elif strategy == ScreeningStrategy.MAVERICK_BEARISH:
min_score = None
if criteria and criteria.min_bear_score:
min_score = criteria.min_bear_score
return self._stock_repository.get_maverick_bear_stocks(
limit=limit * 2, # Get more to allow for filtering
min_score=min_score,
)
elif strategy == ScreeningStrategy.TRENDING_STAGE2:
min_momentum_score = None
if criteria and criteria.min_momentum_score:
min_momentum_score = criteria.min_momentum_score
# Check if we need moving average filtering
filter_ma = criteria and (
criteria.require_above_sma50
or criteria.require_above_sma150
or criteria.require_above_sma200
or criteria.require_ma_alignment
)
return self._stock_repository.get_trending_stocks(
limit=limit * 2, # Get more to allow for filtering
min_momentum_score=min_momentum_score,
filter_moving_averages=filter_ma,
)
else:
raise ValueError(f"Unsupported screening strategy: {strategy}")
class GetAllScreeningResultsQuery:
"""
Application query for retrieving results from all screening strategies.
This query provides a comprehensive view across all available
screening strategies.
"""
def __init__(self, stock_repository: IStockRepository):
"""
Initialize the query with required dependencies.
Args:
stock_repository: Repository for accessing stock data
"""
self._stock_repository = stock_repository
self._screening_service = ScreeningService()
async def execute(
self, limit_per_strategy: int = 10, criteria: ScreeningCriteria | None = None
) -> dict[str, ScreeningResultCollection]:
"""
Execute screening across all strategies.
Args:
limit_per_strategy: Number of results per strategy
criteria: Optional filtering criteria (applied to all strategies)
Returns:
Dictionary mapping strategy names to their result collections
"""
results = {}
# Execute each strategy
for strategy in ScreeningStrategy:
try:
query = GetScreeningResultsQuery(self._stock_repository)
collection = await query.execute(
strategy=strategy, limit=limit_per_strategy, criteria=criteria
)
results[strategy.value] = collection
except Exception as e:
# Log and continue with other strategies
print(f"Warning: Failed to get results for {strategy.value}: {e}")
# Create empty collection for failed strategy
results[strategy.value] = (
self._screening_service.create_screening_collection([], strategy, 0)
)
return results
class GetScreeningStatisticsQuery:
"""
Application query for retrieving screening statistics and analytics.
This query provides business intelligence and analytical insights
across screening results.
"""
def __init__(self, stock_repository: IStockRepository):
"""
Initialize the query with required dependencies.
Args:
stock_repository: Repository for accessing stock data
"""
self._stock_repository = stock_repository
self._screening_service = ScreeningService()
async def execute(
self, strategy: ScreeningStrategy | None = None, limit: int = 100
) -> dict[str, Any]:
"""
Execute the statistics query.
Args:
strategy: Optional specific strategy to analyze (None for all)
limit: Maximum results to analyze per strategy
Returns:
Comprehensive statistics and analytics
"""
if strategy:
# Single strategy analysis
query = GetScreeningResultsQuery(self._stock_repository)
collection = await query.execute(strategy, limit)
return {
"strategy": strategy.value,
"statistics": self._screening_service.calculate_screening_statistics(
collection
),
"timestamp": datetime.utcnow().isoformat(),
}
else:
# All strategies analysis
all_query = GetAllScreeningResultsQuery(self._stock_repository)
all_collections = await all_query.execute(limit)
combined_stats = {
"overall_summary": {
"strategies_analyzed": len(all_collections),
"total_results": sum(
len(c.results) for c in all_collections.values()
),
"timestamp": datetime.utcnow().isoformat(),
},
"by_strategy": {},
}
# Calculate stats for each strategy
for strategy_name, collection in all_collections.items():
combined_stats["by_strategy"][strategy_name] = (
self._screening_service.calculate_screening_statistics(collection)
)
# Calculate cross-strategy insights
combined_stats["cross_strategy_analysis"] = (
self._calculate_cross_strategy_insights(all_collections)
)
return combined_stats
def _calculate_cross_strategy_insights(
self, collections: dict[str, ScreeningResultCollection]
) -> dict[str, Any]:
"""
Calculate insights that span across multiple strategies.
This provides valuable business intelligence by comparing
and contrasting results across different screening approaches.
"""
all_symbols = set()
strategy_overlaps = {}
# Collect all symbols and calculate overlaps
for strategy_name, collection in collections.items():
symbols = {r.stock_symbol for r in collection.results}
all_symbols.update(symbols)
strategy_overlaps[strategy_name] = symbols
# Find intersections
bullish_symbols = strategy_overlaps.get(
ScreeningStrategy.MAVERICK_BULLISH.value, set()
)
bearish_symbols = strategy_overlaps.get(
ScreeningStrategy.MAVERICK_BEARISH.value, set()
)
trending_symbols = strategy_overlaps.get(
ScreeningStrategy.TRENDING_STAGE2.value, set()
)
return {
"total_unique_symbols": len(all_symbols),
"strategy_overlaps": {
"bullish_and_trending": len(bullish_symbols & trending_symbols),
"conflicting_signals": len(bullish_symbols & bearish_symbols),
"trending_exclusive": len(
trending_symbols - bullish_symbols - bearish_symbols
),
},
"market_sentiment": {
"bullish_bias": len(bullish_symbols) > len(bearish_symbols),
"trend_strength": len(trending_symbols) / max(len(all_symbols), 1),
"conflict_ratio": len(bullish_symbols & bearish_symbols)
/ max(len(all_symbols), 1),
},
}
```
--------------------------------------------------------------------------------
/maverick_mcp/backtesting/strategies/templates.py:
--------------------------------------------------------------------------------
```python
"""Pre-built strategy templates for VectorBT."""
from typing import Any
import pandas as pd
class SimpleMovingAverageStrategy:
"""Simple Moving Average crossover strategy for ML integration."""
def __init__(
self, parameters: dict = None, fast_period: int = 10, slow_period: int = 20
):
"""
Initialize SMA strategy.
Args:
parameters: Optional dict with fast_period and slow_period
fast_period: Period for fast moving average
slow_period: Period for slow moving average
"""
if parameters:
self.fast_period = parameters.get("fast_period", fast_period)
self.slow_period = parameters.get("slow_period", slow_period)
else:
self.fast_period = fast_period
self.slow_period = slow_period
self.name = "SMA Crossover"
self.parameters = {
"fast_period": self.fast_period,
"slow_period": self.slow_period,
}
def generate_signals(self, data: pd.DataFrame) -> tuple:
"""
Generate buy/sell signals based on SMA crossover.
Args:
data: DataFrame with at least 'close' column
Returns:
Tuple of (entries, exits) as boolean Series
"""
close = data["close"] if "close" in data.columns else data["Close"]
# Calculate SMAs
fast_sma = close.rolling(window=self.fast_period).mean()
slow_sma = close.rolling(window=self.slow_period).mean()
# Generate signals
entries = (fast_sma > slow_sma) & (fast_sma.shift(1) <= slow_sma.shift(1))
exits = (fast_sma < slow_sma) & (fast_sma.shift(1) >= slow_sma.shift(1))
# Handle NaN values
entries = entries.fillna(False)
exits = exits.fillna(False)
return entries, exits
def get_parameters(self) -> dict[str, Any]:
"""Get strategy parameters."""
return {"fast_period": self.fast_period, "slow_period": self.slow_period}
STRATEGY_TEMPLATES = {
"sma_cross": {
"name": "SMA Crossover",
"description": "Buy when fast SMA crosses above slow SMA, sell when it crosses below",
"parameters": {
"fast_period": 10,
"slow_period": 20,
},
"optimization_ranges": {
"fast_period": [5, 10, 15, 20],
"slow_period": [20, 30, 50, 100],
},
"code": """
# SMA Crossover Strategy
fast_sma = vbt.MA.run(close, {fast_period}).ma.squeeze()
slow_sma = vbt.MA.run(close, {slow_period}).ma.squeeze()
entries = (fast_sma > slow_sma) & (fast_sma.shift(1) <= slow_sma.shift(1))
exits = (fast_sma < slow_sma) & (fast_sma.shift(1) >= slow_sma.shift(1))
""",
},
"rsi": {
"name": "RSI Mean Reversion",
"description": "Buy oversold (RSI < 30), sell overbought (RSI > 70)",
"parameters": {
"period": 14,
"oversold": 30,
"overbought": 70,
},
"optimization_ranges": {
"period": [7, 14, 21],
"oversold": [20, 25, 30, 35],
"overbought": [65, 70, 75, 80],
},
"code": """
# RSI Mean Reversion Strategy
rsi = vbt.RSI.run(close, {period}).rsi.squeeze()
entries = (rsi < {oversold}) & (rsi.shift(1) >= {oversold})
exits = (rsi > {overbought}) & (rsi.shift(1) <= {overbought})
""",
},
"macd": {
"name": "MACD Signal",
"description": "Buy when MACD crosses above signal line, sell when crosses below",
"parameters": {
"fast_period": 12,
"slow_period": 26,
"signal_period": 9,
},
"optimization_ranges": {
"fast_period": [8, 10, 12, 14],
"slow_period": [21, 24, 26, 30],
"signal_period": [7, 9, 11],
},
"code": """
# MACD Signal Strategy
macd = vbt.MACD.run(close,
fast_window={fast_period},
slow_window={slow_period},
signal_window={signal_period}
)
macd_line = macd.macd.squeeze()
signal_line = macd.signal.squeeze()
entries = (macd_line > signal_line) & (macd_line.shift(1) <= signal_line.shift(1))
exits = (macd_line < signal_line) & (macd_line.shift(1) >= signal_line.shift(1))
""",
},
"bollinger": {
"name": "Bollinger Bands",
"description": "Buy at lower band (oversold), sell at upper band (overbought)",
"parameters": {
"period": 20,
"std_dev": 2.0,
},
"optimization_ranges": {
"period": [10, 15, 20, 25],
"std_dev": [1.5, 2.0, 2.5, 3.0],
},
"code": """
# Bollinger Bands Strategy
bb = vbt.BBANDS.run(close, window={period}, alpha={std_dev})
upper = bb.upper.squeeze()
lower = bb.lower.squeeze()
# Buy when price touches lower band, sell when touches upper
entries = (close <= lower) & (close.shift(1) > lower.shift(1))
exits = (close >= upper) & (close.shift(1) < upper.shift(1))
""",
},
"momentum": {
"name": "Momentum",
"description": "Buy strong momentum, sell weak momentum based on returns threshold",
"parameters": {
"lookback": 20,
"threshold": 0.05,
},
"optimization_ranges": {
"lookback": [10, 15, 20, 25, 30],
"threshold": [0.02, 0.03, 0.05, 0.07, 0.10],
},
"code": """
# Momentum Strategy
returns = close.pct_change({lookback})
entries = returns > {threshold}
exits = returns < -{threshold}
""",
},
"ema_cross": {
"name": "EMA Crossover",
"description": "Exponential moving average crossover with faster response than SMA",
"parameters": {
"fast_period": 12,
"slow_period": 26,
},
"optimization_ranges": {
"fast_period": [8, 12, 16, 20],
"slow_period": [20, 26, 35, 50],
},
"code": """
# EMA Crossover Strategy
fast_ema = vbt.MA.run(close, {fast_period}, ewm=True).ma.squeeze()
slow_ema = vbt.MA.run(close, {slow_period}, ewm=True).ma.squeeze()
entries = (fast_ema > slow_ema) & (fast_ema.shift(1) <= slow_ema.shift(1))
exits = (fast_ema < slow_ema) & (fast_ema.shift(1) >= slow_ema.shift(1))
""",
},
"mean_reversion": {
"name": "Mean Reversion",
"description": "Buy when price is below moving average by threshold",
"parameters": {
"ma_period": 20,
"entry_threshold": 0.02, # 2% below MA
"exit_threshold": 0.01, # 1% above MA
},
"optimization_ranges": {
"ma_period": [15, 20, 30, 50],
"entry_threshold": [0.01, 0.02, 0.03, 0.05],
"exit_threshold": [0.00, 0.01, 0.02],
},
"code": """
# Mean Reversion Strategy
ma = vbt.MA.run(close, {ma_period}).ma.squeeze()
deviation = (close - ma) / ma
entries = deviation < -{entry_threshold}
exits = deviation > {exit_threshold}
""",
},
"breakout": {
"name": "Channel Breakout",
"description": "Buy on breakout above rolling high, sell on breakdown below rolling low",
"parameters": {
"lookback": 20,
"exit_lookback": 10,
},
"optimization_ranges": {
"lookback": [10, 20, 30, 50],
"exit_lookback": [5, 10, 15, 20],
},
"code": """
# Channel Breakout Strategy
upper_channel = close.rolling({lookback}).max()
lower_channel = close.rolling({exit_lookback}).min()
entries = close > upper_channel.shift(1)
exits = close < lower_channel.shift(1)
""",
},
"volume_momentum": {
"name": "Volume-Weighted Momentum",
"description": "Momentum strategy filtered by volume surge",
"parameters": {
"momentum_period": 20,
"volume_period": 20,
"momentum_threshold": 0.05,
"volume_multiplier": 1.5,
},
"optimization_ranges": {
"momentum_period": [10, 20, 30],
"volume_period": [10, 20, 30],
"momentum_threshold": [0.03, 0.05, 0.07],
"volume_multiplier": [1.2, 1.5, 2.0],
},
"code": """
# Volume-Weighted Momentum Strategy
returns = close.pct_change({momentum_period})
avg_volume = volume.rolling({volume_period}).mean()
volume_surge = volume > (avg_volume * {volume_multiplier})
# Entry: positive momentum with volume surge
entries = (returns > {momentum_threshold}) & volume_surge
# Exit: negative momentum or volume dry up
exits = (returns < -{momentum_threshold}) | (volume < avg_volume * 0.8)
""",
},
"online_learning": {
"name": "Online Learning Strategy",
"description": "Adaptive strategy using online learning to predict price movements",
"parameters": {
"lookback": 20,
"learning_rate": 0.01,
"update_frequency": 5,
},
"optimization_ranges": {
"lookback": [10, 20, 30, 50],
"learning_rate": [0.001, 0.01, 0.1],
"update_frequency": [1, 5, 10, 20],
},
"code": """
# Online Learning Strategy (ML-based)
# Uses streaming updates to adapt to market conditions
# Implements SGD classifier with technical features
""",
},
"regime_aware": {
"name": "Regime-Aware Strategy",
"description": "Adapts strategy based on detected market regime (trending/ranging)",
"parameters": {
"regime_window": 50,
"threshold": 0.02,
"trend_strategy": "momentum",
"range_strategy": "mean_reversion",
},
"optimization_ranges": {
"regime_window": [20, 50, 100],
"threshold": [0.01, 0.02, 0.05],
},
"code": """
# Regime-Aware Strategy
# Detects market regime and switches between strategies
# Uses volatility and trend strength indicators
""",
},
"ensemble": {
"name": "Ensemble Strategy",
"description": "Combines multiple strategies with weighted voting",
"parameters": {
"fast_period": 10,
"slow_period": 20,
"rsi_period": 14,
"weight_method": "equal",
},
"optimization_ranges": {
"fast_period": [5, 10, 15],
"slow_period": [20, 30, 50],
"rsi_period": [7, 14, 21],
},
"code": """
# Ensemble Strategy
# Combines SMA, RSI, and MACD signals
# Uses voting or weighted average for final signal
""",
},
}
def get_strategy_template(strategy_type: str) -> dict[str, Any]:
"""Get a strategy template by type.
Args:
strategy_type: Type of strategy
Returns:
Strategy template dictionary
Raises:
ValueError: If strategy type not found
"""
if strategy_type not in STRATEGY_TEMPLATES:
available = ", ".join(STRATEGY_TEMPLATES.keys())
raise ValueError(
f"Unknown strategy type: {strategy_type}. Available: {available}"
)
return STRATEGY_TEMPLATES[strategy_type]
def list_available_strategies() -> list[str]:
"""List all available strategy types.
Returns:
List of strategy type names
"""
return list(STRATEGY_TEMPLATES.keys())
def get_strategy_info(strategy_type: str) -> dict[str, Any]:
"""Get information about a strategy.
Args:
strategy_type: Type of strategy
Returns:
Strategy information including name, description, and parameters
"""
template = get_strategy_template(strategy_type)
return {
"type": strategy_type,
"name": template["name"],
"description": template["description"],
"default_parameters": template["parameters"],
"optimization_ranges": template["optimization_ranges"],
}
```
--------------------------------------------------------------------------------
/maverick_mcp/api/services/prompt_service.py:
--------------------------------------------------------------------------------
```python
"""
Prompt service for MaverickMCP API.
Handles trading and investing prompts for technical analysis and stock screening.
Extracted from server.py to improve code organization and maintainability.
"""
from .base_service import BaseService
class PromptService(BaseService):
"""
Service class for prompt operations.
Provides trading and investing prompts for technical analysis and stock screening.
"""
def register_tools(self):
"""Register prompt tools with MCP."""
@self.mcp.prompt()
def technical_analysis(ticker: str, timeframe: str = "daily") -> str:
"""
Generate a comprehensive technical analysis prompt for a given stock.
Args:
ticker: Stock ticker symbol (e.g., "AAPL", "MSFT")
timeframe: Analysis timeframe - "daily", "weekly", or "monthly"
Returns:
Formatted prompt for technical analysis
"""
return self._technical_analysis_prompt(ticker, timeframe)
@self.mcp.prompt()
def stock_screening_report(strategy: str = "momentum") -> str:
"""
Generate a stock screening analysis prompt based on specified strategy.
Args:
strategy: Screening strategy - "momentum", "value", "growth", "quality", or "dividend"
Returns:
Formatted prompt for stock screening analysis
"""
return self._stock_screening_prompt(strategy)
def _technical_analysis_prompt(self, ticker: str, timeframe: str = "daily") -> str:
"""Generate technical analysis prompt implementation."""
# Validate inputs
valid_timeframes = ["daily", "weekly", "monthly"]
if timeframe not in valid_timeframes:
timeframe = "daily"
ticker = ticker.upper().strip()
prompt = f"""
# Technical Analysis Request for {ticker}
Please provide a comprehensive technical analysis for **{ticker}** using {timeframe} timeframe data.
## Analysis Requirements:
### 1. Price Action Analysis
- Current price level and recent price movement
- Key support and resistance levels
- Trend direction (bullish, bearish, or sideways)
- Chart patterns (if any): triangles, flags, head & shoulders, etc.
### 2. Technical Indicators Analysis
Please analyze these key indicators:
**Moving Averages:**
- 20, 50, 200-period moving averages
- Price position relative to moving averages
- Moving average convergence/divergence signals
**Momentum Indicators:**
- RSI (14-period): overbought/oversold conditions
- MACD: signal line crossovers and histogram
- Stochastic oscillator: %K and %D levels
**Volume Analysis:**
- Recent volume trends
- Volume confirmation of price moves
- On-balance volume (OBV) trend
### 3. Market Context
- Overall market trend and {ticker}'s correlation
- Sector performance and relative strength
- Recent news or events that might impact the stock
### 4. Trading Recommendations
Based on the technical analysis, please provide:
- **Entry points**: Optimal buy/sell levels
- **Stop loss**: Risk management levels
- **Target prices**: Profit-taking levels
- **Time horizon**: Short-term, medium-term, or long-term outlook
- **Risk assessment**: High, medium, or low risk trade
### 5. Alternative Scenarios
- Bull case: What would drive the stock higher?
- Bear case: What are the key risks or downside catalysts?
- Base case: Most likely scenario given current technicals
## Additional Context:
- Timeframe: {timeframe.title()} analysis
- Analysis date: {self._get_current_date()}
- Please use the most recent market data available
- Consider both technical and fundamental factors if relevant
Please structure your analysis clearly and provide actionable insights for traders and investors.
"""
self.log_tool_usage(
"technical_analysis_prompt", ticker=ticker, timeframe=timeframe
)
return prompt.strip()
def _stock_screening_prompt(self, strategy: str = "momentum") -> str:
"""Generate stock screening prompt implementation."""
# Validate strategy
valid_strategies = ["momentum", "value", "growth", "quality", "dividend"]
if strategy not in valid_strategies:
strategy = "momentum"
strategy_configs = {
"momentum": {
"title": "Momentum Stock Screening",
"description": "Identify stocks with strong price momentum and technical strength",
"criteria": [
"Strong relative strength (RS rating > 80)",
"Price above 50-day and 200-day moving averages",
"Recent breakout from consolidation pattern",
"Volume surge on breakout",
"Positive earnings growth",
"Strong sector performance",
],
"metrics": [
"Relative Strength Index (RSI)",
"Price rate of change (ROC)",
"Volume relative to average",
"Distance from moving averages",
"Earnings growth rate",
"Revenue growth rate",
],
},
"value": {
"title": "Value Stock Screening",
"description": "Find undervalued stocks with strong fundamentals",
"criteria": [
"Low P/E ratio relative to industry",
"P/B ratio below 2.0",
"Debt-to-equity ratio below industry average",
"Positive free cash flow",
"Dividend yield above market average",
"Strong return on equity (ROE > 15%)",
],
"metrics": [
"Price-to-Earnings (P/E) ratio",
"Price-to-Book (P/B) ratio",
"Price-to-Sales (P/S) ratio",
"Enterprise Value/EBITDA",
"Free cash flow yield",
"Return on equity (ROE)",
],
},
"growth": {
"title": "Growth Stock Screening",
"description": "Identify companies with accelerating growth metrics",
"criteria": [
"Revenue growth > 20% annually",
"Earnings growth acceleration",
"Strong profit margins",
"Expanding market share",
"Innovation and competitive advantages",
"Strong management execution",
],
"metrics": [
"Revenue growth rate",
"Earnings per share (EPS) growth",
"Profit margin trends",
"Return on invested capital (ROIC)",
"Price/Earnings/Growth (PEG) ratio",
"Market share metrics",
],
},
"quality": {
"title": "Quality Stock Screening",
"description": "Find high-quality companies with sustainable competitive advantages",
"criteria": [
"Consistent earnings growth (5+ years)",
"Strong balance sheet (low debt)",
"High return on equity (ROE > 20%)",
"Wide economic moat",
"Stable or growing market share",
"Strong management track record",
],
"metrics": [
"Return on equity (ROE)",
"Return on assets (ROA)",
"Debt-to-equity ratio",
"Interest coverage ratio",
"Earnings consistency",
"Free cash flow stability",
],
},
"dividend": {
"title": "Dividend Stock Screening",
"description": "Identify stocks with attractive and sustainable dividend yields",
"criteria": [
"Dividend yield between 3-8%",
"Dividend growth history (5+ years)",
"Payout ratio below 60%",
"Strong free cash flow coverage",
"Stable or growing earnings",
"Defensive business model",
],
"metrics": [
"Dividend yield",
"Dividend growth rate",
"Payout ratio",
"Free cash flow coverage",
"Dividend aristocrat status",
"Earnings stability",
],
},
}
config = strategy_configs[strategy]
prompt = f"""
# {config["title"]} Analysis Request
Please conduct a comprehensive {strategy} stock screening analysis to {config["description"]}.
## Screening Criteria:
### Primary Filters:
{chr(10).join(f"- {criteria}" for criteria in config["criteria"])}
### Key Metrics to Analyze:
{chr(10).join(f"- {metric}" for metric in config["metrics"])}
## Analysis Framework:
### 1. Market Environment Assessment
- Current market conditions and {strategy} stock performance
- Sector rotation trends favoring {strategy} strategies
- Economic factors supporting {strategy} investing
- Historical performance of {strategy} strategies in similar conditions
### 2. Stock Screening Process
Please apply the following methodology:
- **Universe**: Focus on large and mid-cap stocks (market cap > $2B)
- **Liquidity**: Average daily volume > 1M shares
- **Fundamental Screening**: Apply the primary filters listed above
- **Technical Validation**: Confirm with technical analysis
- **Risk Assessment**: Evaluate potential risks and catalysts
### 3. Top Stock Recommendations
For each recommended stock, provide:
- **Company overview**: Business model and competitive position
- **Why it fits the {strategy} criteria**: Specific metrics and rationale
- **Risk factors**: Key risks to monitor
- **Price targets**: Entry points and target prices
- **Position sizing**: Recommended allocation (1-5% portfolio weight)
### 4. Portfolio Construction
- **Diversification**: Spread across sectors and industries
- **Risk management**: Position sizing and stop-loss levels
- **Rebalancing**: When and how to adjust positions
- **Performance monitoring**: Key metrics to track
### 5. Implementation Strategy
- **Entry strategy**: Best practices for building positions
- **Timeline**: Short-term vs. long-term holding periods
- **Market timing**: Consider current market cycle
- **Tax considerations**: Tax-efficient implementation
## Additional Requirements:
- Screen date: {self._get_current_date()}
- Market cap focus: Large and mid-cap stocks
- Geographic focus: US markets (can include international if compelling)
- Minimum liquidity: $10M average daily volume
- Exclude recent IPOs (< 6 months) unless exceptionally compelling
## Output Format:
1. **Executive Summary**: Key findings and market outlook
2. **Top 10 Stock Recommendations**: Detailed analysis for each
3. **Sector Allocation**: Recommended sector weights
4. **Risk Assessment**: Portfolio-level risks and mitigation
5. **Performance Expectations**: Expected returns and timeline
Please provide actionable insights that can be immediately implemented in a {strategy}-focused investment strategy.
"""
self.log_tool_usage("stock_screening_prompt", strategy=strategy)
return prompt.strip()
def _get_current_date(self) -> str:
"""Get current date in readable format."""
from datetime import UTC, datetime
return datetime.now(UTC).strftime("%B %d, %Y")
```
--------------------------------------------------------------------------------
/maverick_mcp/utils/orchestration_logging.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive Orchestration Logging System
Provides structured logging for research agent orchestration with:
- Request ID tracking across all components
- Performance timing and metrics
- Parallel execution visibility
- Agent communication tracking
- Resource usage monitoring
"""
import functools
import logging
import time
import uuid
from contextlib import contextmanager
from typing import Any
# Color codes for better readability in terminal
class LogColors:
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKCYAN = "\033[96m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
class OrchestrationLogger:
"""Enhanced logger for orchestration components with structured output."""
def __init__(self, component_name: str):
self.component_name = component_name
self.logger = logging.getLogger(f"maverick_mcp.orchestration.{component_name}")
self.request_id: str | None = None
self.session_context: dict[str, Any] = {}
def set_request_context(
self, request_id: str | None = None, session_id: str | None = None, **kwargs
):
"""Set context for this request that will be included in all logs."""
self.request_id = request_id or str(uuid.uuid4())[:8]
self.session_context = {
"session_id": session_id,
"request_id": self.request_id,
**kwargs,
}
def _format_message(self, level: str, message: str, **kwargs) -> str:
"""Format log message with consistent structure and colors."""
color = {
"DEBUG": LogColors.OKCYAN,
"INFO": LogColors.OKGREEN,
"WARNING": LogColors.WARNING,
"ERROR": LogColors.FAIL,
}.get(level, "")
# Build context string
context_parts = []
if self.request_id:
context_parts.append(f"req:{self.request_id}")
if self.session_context.get("session_id"):
context_parts.append(f"session:{self.session_context['session_id']}")
context_str = f"[{' | '.join(context_parts)}]" if context_parts else ""
# Add component and extra info
extra_info = " | ".join(f"{k}:{v}" for k, v in kwargs.items() if v is not None)
extra_str = f" | {extra_info}" if extra_info else ""
return f"{color}🔧 {self.component_name}{LogColors.ENDC} {context_str}: {message}{extra_str}"
def debug(self, message: str, **kwargs):
"""Log debug message with context."""
self.logger.debug(self._format_message("DEBUG", message, **kwargs))
def info(self, message: str, **kwargs):
"""Log info message with context."""
self.logger.info(self._format_message("INFO", message, **kwargs))
def warning(self, message: str, **kwargs):
"""Log warning message with context."""
self.logger.warning(self._format_message("WARNING", message, **kwargs))
def error(self, message: str, **kwargs):
"""Log error message with context."""
self.logger.error(self._format_message("ERROR", message, **kwargs))
# Global registry of component loggers
_component_loggers: dict[str, OrchestrationLogger] = {}
def get_orchestration_logger(component_name: str) -> OrchestrationLogger:
"""Get or create an orchestration logger for a component."""
if component_name not in _component_loggers:
_component_loggers[component_name] = OrchestrationLogger(component_name)
return _component_loggers[component_name]
def log_method_call(
component: str | None = None,
include_params: bool = True,
include_timing: bool = True,
):
"""
Decorator to log method entry/exit with timing and parameters.
Args:
component: Component name override
include_params: Whether to log method parameters
include_timing: Whether to log execution timing
"""
def decorator(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
# Determine component name
comp_name = component
if not comp_name and args and hasattr(args[0], "__class__"):
comp_name = args[0].__class__.__name__
if not comp_name:
comp_name = func.__module__.split(".")[-1]
logger = get_orchestration_logger(comp_name)
# Log method entry
params_str = ""
if include_params:
# Sanitize parameters for logging
safe_kwargs = {k: v for k, v in kwargs.items() if not k.startswith("_")}
if safe_kwargs:
params_str = f" | params: {safe_kwargs}"
logger.info(f"🚀 START {func.__name__}{params_str}")
start_time = time.time()
try:
result = await func(*args, **kwargs)
# Log successful completion
duration = time.time() - start_time
timing_str = f" | duration: {duration:.3f}s" if include_timing else ""
# Include result summary if available
result_summary = ""
if isinstance(result, dict):
if "execution_mode" in result:
result_summary += f" | mode: {result['execution_mode']}"
if "research_confidence" in result:
result_summary += (
f" | confidence: {result['research_confidence']:.2f}"
)
if "parallel_execution_stats" in result:
stats = result["parallel_execution_stats"]
result_summary += f" | tasks: {stats.get('successful_tasks', 0)}/{stats.get('total_tasks', 0)}"
logger.info(f"✅ SUCCESS {func.__name__}{timing_str}{result_summary}")
return result
except Exception as e:
# Log error
duration = time.time() - start_time
timing_str = f" | duration: {duration:.3f}s" if include_timing else ""
logger.error(f"❌ ERROR {func.__name__}{timing_str} | error: {str(e)}")
raise
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
# Handle synchronous functions
comp_name = component
if not comp_name and args and hasattr(args[0], "__class__"):
comp_name = args[0].__class__.__name__
if not comp_name:
comp_name = func.__module__.split(".")[-1]
logger = get_orchestration_logger(comp_name)
# Log method entry
params_str = ""
if include_params:
safe_kwargs = {k: v for k, v in kwargs.items() if not k.startswith("_")}
if safe_kwargs:
params_str = f" | params: {safe_kwargs}"
logger.info(f"🚀 START {func.__name__}{params_str}")
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
timing_str = f" | duration: {duration:.3f}s" if include_timing else ""
logger.info(f"✅ SUCCESS {func.__name__}{timing_str}")
return result
except Exception as e:
duration = time.time() - start_time
timing_str = f" | duration: {duration:.3f}s" if include_timing else ""
logger.error(f"❌ ERROR {func.__name__}{timing_str} | error: {str(e)}")
raise
# Return appropriate wrapper based on function type
if hasattr(func, "_is_coroutine") or "async" in str(func):
return async_wrapper
else:
return sync_wrapper
return decorator
@contextmanager
def log_parallel_execution(component: str, task_description: str, task_count: int):
"""Context manager for logging parallel execution blocks."""
logger = get_orchestration_logger(component)
logger.info(f"🔄 PARALLEL_START {task_description} | tasks: {task_count}")
start_time = time.time()
try:
yield logger
duration = time.time() - start_time
logger.info(
f"🎯 PARALLEL_SUCCESS {task_description} | duration: {duration:.3f}s | tasks: {task_count}"
)
except Exception as e:
duration = time.time() - start_time
logger.error(
f"💥 PARALLEL_ERROR {task_description} | duration: {duration:.3f}s | error: {str(e)}"
)
raise
@contextmanager
def log_agent_execution(
agent_type: str, task_id: str, focus_areas: list[str] | None = None
):
"""Context manager for logging individual agent execution."""
logger = get_orchestration_logger(f"{agent_type}Agent")
focus_str = f" | focus: {focus_areas}" if focus_areas else ""
logger.info(f"🤖 AGENT_START {task_id}{focus_str}")
start_time = time.time()
try:
yield logger
duration = time.time() - start_time
logger.info(f"🎉 AGENT_SUCCESS {task_id} | duration: {duration:.3f}s")
except Exception as e:
duration = time.time() - start_time
logger.error(
f"🔥 AGENT_ERROR {task_id} | duration: {duration:.3f}s | error: {str(e)}"
)
raise
def log_tool_invocation(tool_name: str, request_data: dict[str, Any] | None = None):
"""Log MCP tool invocation with request details."""
logger = get_orchestration_logger("MCPToolRegistry")
request_summary = ""
if request_data:
if "query" in request_data:
request_summary += f" | query: '{request_data['query'][:50]}...'"
if "research_scope" in request_data:
request_summary += f" | scope: {request_data['research_scope']}"
if "persona" in request_data:
request_summary += f" | persona: {request_data['persona']}"
logger.info(f"🔧 TOOL_INVOKE {tool_name}{request_summary}")
def log_synthesis_operation(
operation: str, input_count: int, output_summary: str | None = None
):
"""Log result synthesis operations."""
logger = get_orchestration_logger("ResultSynthesis")
summary_str = f" | output: {output_summary}" if output_summary else ""
logger.info(f"🧠 SYNTHESIS {operation} | inputs: {input_count}{summary_str}")
def log_fallback_trigger(component: str, reason: str, fallback_action: str):
"""Log when fallback mechanisms are triggered."""
logger = get_orchestration_logger(component)
logger.warning(f"⚠️ FALLBACK_TRIGGER {reason} | action: {fallback_action}")
def log_performance_metrics(component: str, metrics: dict[str, Any]):
"""Log performance metrics for monitoring."""
logger = get_orchestration_logger(component)
metrics_str = " | ".join(f"{k}: {v}" for k, v in metrics.items())
logger.info(f"📊 PERFORMANCE_METRICS | {metrics_str}")
def log_resource_usage(
component: str,
api_calls: int | None = None,
cache_hits: int | None = None,
memory_mb: float | None = None,
):
"""Log resource usage statistics."""
logger = get_orchestration_logger(component)
usage_parts = []
if api_calls is not None:
usage_parts.append(f"api_calls: {api_calls}")
if cache_hits is not None:
usage_parts.append(f"cache_hits: {cache_hits}")
if memory_mb is not None:
usage_parts.append(f"memory_mb: {memory_mb:.1f}")
if usage_parts:
usage_str = " | ".join(usage_parts)
logger.info(f"📈 RESOURCE_USAGE | {usage_str}")
# Export key functions
__all__ = [
"OrchestrationLogger",
"get_orchestration_logger",
"log_method_call",
"log_parallel_execution",
"log_agent_execution",
"log_tool_invocation",
"log_synthesis_operation",
"log_fallback_trigger",
"log_performance_metrics",
"log_resource_usage",
]
```
--------------------------------------------------------------------------------
/maverick_mcp/utils/circuit_breaker_decorators.py:
--------------------------------------------------------------------------------
```python
"""
Decorators for easy circuit breaker integration.
Provides convenient decorators for common external service patterns.
"""
import asyncio
import functools
import logging
from collections.abc import Callable
from typing import TypeVar, cast
from maverick_mcp.config.settings import get_settings
from maverick_mcp.utils.circuit_breaker_services import (
economic_data_breaker,
http_breaker,
market_data_breaker,
news_data_breaker,
stock_data_breaker,
)
logger = logging.getLogger(__name__)
settings = get_settings()
T = TypeVar("T")
def with_stock_data_circuit_breaker(
use_fallback: bool = True, fallback_on_open: bool = True
) -> Callable:
"""
Decorator for stock data fetching functions.
Args:
use_fallback: Whether to use fallback strategies on failure
fallback_on_open: Whether to use fallback when circuit is open
Example:
@with_stock_data_circuit_breaker()
def get_stock_data(symbol: str, start: str, end: str) -> pd.DataFrame:
return yf.download(symbol, start=start, end=end)
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
if asyncio.iscoroutinefunction(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if use_fallback and len(args) >= 3:
# Extract symbol, start, end from args
symbol = args[0] if args else kwargs.get("symbol", "UNKNOWN")
start_date = (
args[1] if len(args) > 1 else kwargs.get("start_date", "")
)
end_date = args[2] if len(args) > 2 else kwargs.get("end_date", "")
return await stock_data_breaker.fetch_with_fallback_async(
func, symbol, start_date, end_date, **kwargs
)
else:
return await stock_data_breaker.call_async(func, *args, **kwargs)
return cast(Callable[..., T], async_wrapper)
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
if use_fallback and len(args) >= 3:
# Extract symbol, start, end from args
symbol = args[0] if args else kwargs.get("symbol", "UNKNOWN")
start_date = (
args[1] if len(args) > 1 else kwargs.get("start_date", "")
)
end_date = args[2] if len(args) > 2 else kwargs.get("end_date", "")
return stock_data_breaker.fetch_with_fallback(
func, symbol, start_date, end_date, **kwargs
)
else:
return stock_data_breaker.call_sync(func, *args, **kwargs)
return cast(Callable[..., T], sync_wrapper)
return decorator
def with_market_data_circuit_breaker(
use_fallback: bool = True, service: str = "finviz"
) -> Callable:
"""
Decorator for market data fetching functions.
Args:
use_fallback: Whether to use fallback strategies on failure
service: Service name (finviz, external_api)
Example:
@with_market_data_circuit_breaker(service="finviz")
def get_top_gainers() -> dict:
return fetch_finviz_gainers()
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
# Get appropriate breaker
if service == "external_api":
from maverick_mcp.utils.circuit_breaker_services import (
MarketDataCircuitBreaker,
)
breaker = MarketDataCircuitBreaker("external_api")
else:
breaker = market_data_breaker
if asyncio.iscoroutinefunction(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if use_fallback:
# Try to extract mover_type
mover_type = kwargs.get("mover_type", "market_data")
try:
return await breaker.call_async(func, *args, **kwargs)
except Exception as e:
logger.warning(f"Market data fetch failed: {e}, using fallback")
return breaker.fallback.execute_sync(mover_type)
else:
return await breaker.call_async(func, *args, **kwargs)
return cast(Callable[..., T], async_wrapper)
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
if use_fallback:
# Try to extract mover_type
mover_type = kwargs.get("mover_type", "market_data")
return breaker.fetch_with_fallback(func, mover_type, **kwargs)
else:
return breaker.call_sync(func, *args, **kwargs)
return cast(Callable[..., T], sync_wrapper)
return decorator
def with_economic_data_circuit_breaker(use_fallback: bool = True) -> Callable:
"""
Decorator for economic data fetching functions.
Args:
use_fallback: Whether to use fallback strategies on failure
Example:
@with_economic_data_circuit_breaker()
def get_gdp_data(start: str, end: str) -> pd.Series:
return fred.get_series("GDP", start, end)
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
if asyncio.iscoroutinefunction(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if use_fallback and (args or "series_id" in kwargs):
# Extract series_id and dates
series_id = args[0] if args else kwargs.get("series_id", "UNKNOWN")
start_date = (
args[1] if len(args) > 1 else kwargs.get("start_date", "")
)
end_date = args[2] if len(args) > 2 else kwargs.get("end_date", "")
try:
return await economic_data_breaker.call_async(
func, *args, **kwargs
)
except Exception as e:
logger.warning(
f"Economic data fetch failed: {e}, using fallback"
)
return economic_data_breaker.fallback.execute_sync(
series_id, start_date, end_date
)
else:
return await economic_data_breaker.call_async(func, *args, **kwargs)
return cast(Callable[..., T], async_wrapper)
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
if use_fallback and (args or "series_id" in kwargs):
# Extract series_id and dates
series_id = args[0] if args else kwargs.get("series_id", "UNKNOWN")
start_date = (
args[1] if len(args) > 1 else kwargs.get("start_date", "")
)
end_date = args[2] if len(args) > 2 else kwargs.get("end_date", "")
return economic_data_breaker.fetch_with_fallback(
func, series_id, start_date, end_date, **kwargs
)
else:
return economic_data_breaker.call_sync(func, *args, **kwargs)
return cast(Callable[..., T], sync_wrapper)
return decorator
def with_news_circuit_breaker(use_fallback: bool = True) -> Callable:
"""
Decorator for news/sentiment API calls.
Args:
use_fallback: Whether to use fallback strategies on failure
Example:
@with_news_circuit_breaker()
def get_stock_news(symbol: str) -> dict:
return fetch_news_api(symbol)
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
if asyncio.iscoroutinefunction(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if use_fallback and (args or "symbol" in kwargs):
symbol = args[0] if args else kwargs.get("symbol", "UNKNOWN")
try:
return await news_data_breaker.call_async(func, *args, **kwargs)
except Exception as e:
logger.warning(f"News data fetch failed: {e}, using fallback")
return news_data_breaker.fallback.execute_sync(symbol)
else:
return await news_data_breaker.call_async(func, *args, **kwargs)
return cast(Callable[..., T], async_wrapper)
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
if use_fallback and (args or "symbol" in kwargs):
symbol = args[0] if args else kwargs.get("symbol", "UNKNOWN")
return news_data_breaker.fetch_with_fallback(func, symbol, **kwargs)
else:
return news_data_breaker.call_sync(func, *args, **kwargs)
return cast(Callable[..., T], sync_wrapper)
return decorator
def with_http_circuit_breaker(
timeout: float | None = None, use_session: bool = False
) -> Callable:
"""
Decorator for general HTTP requests.
Args:
timeout: Override default timeout
use_session: Whether the function uses a requests Session
Example:
@with_http_circuit_breaker(timeout=10.0)
def fetch_api_data(url: str) -> dict:
response = requests.get(url)
return response.json()
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
if asyncio.iscoroutinefunction(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
# Override timeout if specified
if timeout is not None:
kwargs["timeout"] = timeout
return await http_breaker.call_async(func, *args, **kwargs)
return cast(Callable[..., T], async_wrapper)
else:
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
# Override timeout if specified
if timeout is not None:
kwargs["timeout"] = timeout
return http_breaker.call_sync(func, *args, **kwargs)
return cast(Callable[..., T], sync_wrapper)
return decorator
def circuit_breaker_method(
service: str = "http", use_fallback: bool = True, **breaker_kwargs
) -> Callable:
"""
Generic circuit breaker decorator for class methods.
Args:
service: Service type (yfinance, finviz, fred, news, http)
use_fallback: Whether to use fallback strategies
**breaker_kwargs: Additional arguments for the circuit breaker
Example:
class DataProvider:
@circuit_breaker_method(service="yfinance")
def get_stock_data(self, symbol: str) -> pd.DataFrame:
return yf.download(symbol)
"""
# Map service names to decorators
service_decorators = {
"yfinance": with_stock_data_circuit_breaker,
"stock": with_stock_data_circuit_breaker,
"finviz": lambda **kw: with_market_data_circuit_breaker(service="finviz", **kw),
"external_api": lambda **kw: with_market_data_circuit_breaker(
service="external_api", **kw
),
"market": with_market_data_circuit_breaker,
"fred": with_economic_data_circuit_breaker,
"economic": with_economic_data_circuit_breaker,
"news": with_news_circuit_breaker,
"sentiment": with_news_circuit_breaker,
"http": with_http_circuit_breaker,
}
decorator_func = service_decorators.get(service, with_http_circuit_breaker)
return decorator_func(use_fallback=use_fallback, **breaker_kwargs)
```
--------------------------------------------------------------------------------
/scripts/load_market_data.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Market data loading script for self-contained Maverick-MCP database.
This script loads stock and price data from Tiingo API into the self-contained
mcp_ prefixed tables, making Maverick-MCP completely independent.
Usage:
python scripts/load_market_data.py --symbols AAPL,MSFT,GOOGL
python scripts/load_market_data.py --file symbols.txt
python scripts/load_market_data.py --sp500 # Load S&P 500 stocks
"""
import argparse
import asyncio
import logging
import os
import sys
from datetime import datetime, timedelta
from pathlib import Path
import aiohttp
import pandas as pd
# Add parent directory to path for imports
sys.path.append(str(Path(__file__).parent.parent))
from maverick_mcp.config.database_self_contained import (
SelfContainedDatabaseSession,
init_self_contained_database,
)
from maverick_mcp.data.models import (
Stock,
bulk_insert_price_data,
)
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("market_data_loader")
class TiingoDataLoader:
"""Loads market data from Tiingo API into self-contained database."""
def __init__(self, api_token: str | None = None):
"""
Initialize Tiingo data loader.
Args:
api_token: Tiingo API token. If None, will use TIINGO_API_TOKEN env var
"""
self.api_token = api_token or os.getenv("TIINGO_API_TOKEN")
if not self.api_token:
raise ValueError("Tiingo API token required. Set TIINGO_API_TOKEN env var.")
self.base_url = "https://api.tiingo.com/tiingo"
self.session: aiohttp.ClientSession | None = None
async def __aenter__(self):
"""Async context manager entry."""
self.session = aiohttp.ClientSession(
headers={"Authorization": f"Token {self.api_token}"}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.session:
await self.session.close()
async def get_stock_metadata(self, symbol: str) -> dict | None:
"""
Get stock metadata from Tiingo.
Args:
symbol: Stock ticker symbol
Returns:
Stock metadata dict or None if not found
"""
url = f"{self.base_url}/daily/{symbol}"
try:
async with self.session.get(url) as response:
if response.status == 200:
data = await response.json()
return data
elif response.status == 404:
logger.warning(f"Stock {symbol} not found in Tiingo")
return None
else:
logger.error(
f"Error fetching metadata for {symbol}: {response.status}"
)
return None
except Exception as e:
logger.error(f"Exception fetching metadata for {symbol}: {e}")
return None
async def get_price_data(
self, symbol: str, start_date: str, end_date: str | None = None
) -> pd.DataFrame | None:
"""
Get historical price data from Tiingo.
Args:
symbol: Stock ticker symbol
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format (default: today)
Returns:
DataFrame with OHLCV data or None if not found
"""
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
url = f"{self.base_url}/daily/{symbol}/prices"
params = {"startDate": start_date, "endDate": end_date, "format": "json"}
try:
async with self.session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
if not data:
return None
df = pd.DataFrame(data)
# Convert date column and set as index
df["date"] = pd.to_datetime(df["date"]).dt.date
df.set_index("date", inplace=True)
# Rename columns to match our model
column_mapping = {
"open": "open",
"high": "high",
"low": "low",
"close": "close",
"volume": "volume",
"adjOpen": "adj_open",
"adjHigh": "adj_high",
"adjLow": "adj_low",
"adjClose": "adj_close",
"adjVolume": "adj_volume",
}
df = df.rename(columns=column_mapping)
df["symbol"] = symbol.upper()
logger.info(f"Loaded {len(df)} price records for {symbol}")
return df
elif response.status == 404:
logger.warning(f"Price data for {symbol} not found")
return None
else:
logger.error(
f"Error fetching prices for {symbol}: {response.status}"
)
return None
except Exception as e:
logger.error(f"Exception fetching prices for {symbol}: {e}")
return None
async def load_stock_data(self, symbols: list[str]) -> int:
"""
Load stock metadata and price data for multiple symbols.
Args:
symbols: List of stock ticker symbols
Returns:
Number of stocks successfully loaded
"""
loaded_count = 0
with SelfContainedDatabaseSession() as session:
for symbol in symbols:
logger.info(f"Loading data for {symbol}...")
# Get stock metadata
metadata = await self.get_stock_metadata(symbol)
if not metadata:
continue
# Create or update stock record
Stock.get_or_create(
session,
symbol,
company_name=metadata.get("name", ""),
description=metadata.get("description", ""),
exchange=metadata.get("exchangeCode", ""),
currency="USD", # Tiingo uses USD
)
# Load price data (last 2 years)
start_date = (datetime.now() - timedelta(days=730)).strftime("%Y-%m-%d")
price_df = await self.get_price_data(symbol, start_date)
if price_df is not None and not price_df.empty:
# Insert price data
records_inserted = bulk_insert_price_data(session, symbol, price_df)
logger.info(
f"Inserted {records_inserted} price records for {symbol}"
)
loaded_count += 1
# Rate limiting - Tiingo allows 2400 requests/hour
await asyncio.sleep(1.5) # ~2400 requests/hour limit
return loaded_count
def get_sp500_symbols() -> list[str]:
"""Get S&P 500 stock symbols from a predefined list."""
# Top 100 S&P 500 stocks for initial loading
return [
"AAPL",
"MSFT",
"GOOGL",
"AMZN",
"TSLA",
"META",
"NVDA",
"BRK.B",
"UNH",
"JNJ",
"V",
"PG",
"JPM",
"HD",
"CVX",
"MA",
"PFE",
"ABBV",
"BAC",
"KO",
"AVGO",
"PEP",
"TMO",
"COST",
"WMT",
"DIS",
"ABT",
"ACN",
"NFLX",
"ADBE",
"CRM",
"VZ",
"DHR",
"INTC",
"NKE",
"T",
"TXN",
"BMY",
"QCOM",
"PM",
"UPS",
"HON",
"ORCL",
"WFC",
"LOW",
"LIN",
"AMD",
"SBUX",
"IBM",
"GE",
"CAT",
"MDT",
"BA",
"AXP",
"GILD",
"RTX",
"GS",
"BLK",
"MMM",
"CVS",
"ISRG",
"NOW",
"AMT",
"SPGI",
"PLD",
"SYK",
"TJX",
"MDLZ",
"ZTS",
"MO",
"CB",
"CI",
"PYPL",
"SO",
"EL",
"DE",
"REGN",
"CCI",
"USB",
"BSX",
"DUK",
"AON",
"CSX",
"CL",
"ITW",
"PNC",
"FCX",
"SCHW",
"EMR",
"NSC",
"GM",
"FDX",
"MU",
"BDX",
"TGT",
"EOG",
"SLB",
"ICE",
"EQIX",
"APD",
]
def load_symbols_from_file(file_path: str) -> list[str]:
"""
Load stock symbols from a text file.
Args:
file_path: Path to file containing stock symbols (one per line)
Returns:
List of stock symbols
"""
symbols = []
try:
with open(file_path) as f:
for line in f:
symbol = line.strip().upper()
if symbol and not symbol.startswith("#"):
symbols.append(symbol)
logger.info(f"Loaded {len(symbols)} symbols from {file_path}")
except FileNotFoundError:
logger.error(f"Symbol file not found: {file_path}")
sys.exit(1)
except Exception as e:
logger.error(f"Error reading symbol file {file_path}: {e}")
sys.exit(1)
return symbols
async def main():
"""Main function to load market data."""
parser = argparse.ArgumentParser(
description="Load market data into self-contained database"
)
parser.add_argument(
"--symbols",
type=str,
help="Comma-separated list of stock symbols (e.g., AAPL,MSFT,GOOGL)",
)
parser.add_argument(
"--file", type=str, help="Path to file containing stock symbols (one per line)"
)
parser.add_argument(
"--sp500", action="store_true", help="Load top 100 S&P 500 stocks"
)
parser.add_argument(
"--create-tables",
action="store_true",
help="Create database tables if they don't exist",
)
parser.add_argument("--database-url", type=str, help="Override database URL")
args = parser.parse_args()
# Determine symbols to load
symbols = []
if args.symbols:
symbols = [s.strip().upper() for s in args.symbols.split(",")]
elif args.file:
symbols = load_symbols_from_file(args.file)
elif args.sp500:
symbols = get_sp500_symbols()
else:
parser.print_help()
sys.exit(1)
logger.info(f"Will load data for {len(symbols)} symbols")
# Initialize self-contained database
try:
init_self_contained_database(
database_url=args.database_url, create_tables=args.create_tables
)
logger.info("Self-contained database initialized")
except Exception as e:
logger.error(f"Database initialization failed: {e}")
sys.exit(1)
# Load market data
try:
async with TiingoDataLoader() as loader:
loaded_count = await loader.load_stock_data(symbols)
logger.info(
f"Successfully loaded data for {loaded_count}/{len(symbols)} stocks"
)
except Exception as e:
logger.error(f"Data loading failed: {e}")
sys.exit(1)
# Display database stats
from maverick_mcp.config.database_self_contained import get_self_contained_db_config
db_config = get_self_contained_db_config()
stats = db_config.get_database_stats()
print("\n📊 Database Statistics:")
print(f" Total Records: {stats['total_records']}")
for table, count in stats["tables"].items():
print(f" {table}: {count}")
print("\n✅ Market data loading completed successfully!")
if __name__ == "__main__":
asyncio.run(main())
```
--------------------------------------------------------------------------------
/tests/test_orchestration_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for the orchestration system.
Tests the end-to-end functionality of SupervisorAgent and DeepResearchAgent
to verify the orchestration system works correctly.
"""
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage, BaseMessage
from langchain_core.outputs import ChatGeneration, ChatResult
from maverick_mcp.agents.base import INVESTOR_PERSONAS, PersonaAwareAgent
from maverick_mcp.agents.deep_research import DeepResearchAgent
from maverick_mcp.agents.supervisor import ROUTING_MATRIX, SupervisorAgent
class MockChatModel(BaseChatModel):
"""Mock chat model for testing that extends BaseChatModel properly."""
def __init__(self, responses: list[str]):
super().__init__()
self.responses = responses
self._call_count = 0
@property
def _llm_type(self) -> str:
return "mock"
def _generate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: CallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> ChatResult:
response = self.responses[self._call_count % len(self.responses)]
self._call_count += 1
message = AIMessage(content=response)
return ChatResult(generations=[ChatGeneration(message=message)])
async def _agenerate(
self,
messages: list[BaseMessage],
stop: list[str] | None = None,
run_manager: AsyncCallbackManagerForLLMRun | None = None,
**kwargs: Any,
) -> ChatResult:
return self._generate(messages, stop, **kwargs)
class TestOrchestrationSystemIntegration:
"""Test the complete orchestration system integration."""
@pytest.fixture
def mock_llm(self):
"""Create a mock LLM for testing."""
llm = MagicMock()
llm.ainvoke = AsyncMock()
llm.bind_tools = MagicMock(return_value=llm)
llm.invoke = MagicMock()
return llm
@pytest.fixture
def mock_market_agent(self):
"""Create a mock market analysis agent."""
agent = MagicMock(spec=PersonaAwareAgent)
agent.analyze_market = AsyncMock(
return_value={
"status": "success",
"summary": "Market analysis completed",
"screened_symbols": ["AAPL", "MSFT", "NVDA"],
"confidence": 0.85,
"execution_time_ms": 1500,
}
)
return agent
def test_agent_imports_successful(self):
"""Test that all agent classes can be imported successfully."""
# These imports should not raise exceptions
assert SupervisorAgent is not None
assert DeepResearchAgent is not None
assert ROUTING_MATRIX is not None
assert INVESTOR_PERSONAS is not None
def test_routing_matrix_structure(self):
"""Test that routing matrix has expected structure."""
assert isinstance(ROUTING_MATRIX, dict)
assert len(ROUTING_MATRIX) > 0
# Check each routing entry has required fields
for _category, routing_info in ROUTING_MATRIX.items():
assert "primary" in routing_info
assert isinstance(routing_info["primary"], str)
assert "agents" in routing_info
assert isinstance(routing_info["agents"], list)
def test_personas_structure(self):
"""Test that investor personas have expected structure."""
expected_personas = ["conservative", "moderate", "aggressive"]
for persona_name in expected_personas:
assert persona_name in INVESTOR_PERSONAS
persona = INVESTOR_PERSONAS[persona_name]
# Check persona has required attributes
assert hasattr(persona, "name")
assert hasattr(persona, "risk_tolerance")
assert hasattr(persona, "position_size_max")
@pytest.mark.asyncio
async def test_supervisor_agent_instantiation(self, mock_llm, mock_market_agent):
"""Test SupervisorAgent can be instantiated properly."""
agents = {"market": mock_market_agent}
supervisor = SupervisorAgent(
llm=mock_llm, agents=agents, persona="moderate", ttl_hours=1
)
assert supervisor is not None
assert supervisor.persona.name == "Moderate"
assert "market" in supervisor.agents
@pytest.mark.asyncio
async def test_deep_research_agent_instantiation(self, mock_llm):
"""Test DeepResearchAgent can be instantiated properly."""
# Test without API keys (should still work)
research_agent = DeepResearchAgent(
llm=mock_llm,
persona="moderate",
ttl_hours=1,
exa_api_key=None,
)
assert research_agent is not None
assert research_agent.persona.name == "Moderate"
@pytest.mark.asyncio
async def test_deep_research_agent_with_api_keys(self, mock_llm):
"""Test DeepResearchAgent instantiation with API keys."""
# Test with mock API keys
research_agent = DeepResearchAgent(
llm=mock_llm,
persona="aggressive",
ttl_hours=2,
exa_api_key="test-exa-key",
)
assert research_agent is not None
assert research_agent.persona.name == "Aggressive"
# Should have initialized search providers
assert hasattr(research_agent, "search_providers")
@pytest.mark.asyncio
async def test_supervisor_with_research_agent(self, mock_llm, mock_market_agent):
"""Test supervisor working with research agent."""
# Create research agent
research_agent = DeepResearchAgent(
llm=mock_llm, persona="moderate", ttl_hours=1
)
# Create supervisor with both agents
agents = {"market": mock_market_agent, "research": research_agent}
supervisor = SupervisorAgent(
llm=mock_llm, agents=agents, persona="moderate", ttl_hours=1
)
assert len(supervisor.agents) == 2
assert "market" in supervisor.agents
assert "research" in supervisor.agents
def test_configuration_completeness(self):
"""Test that configuration system is complete."""
from maverick_mcp.config.settings import get_settings
settings = get_settings()
# Check that research settings exist
assert hasattr(settings, "research")
assert hasattr(settings.research, "exa_api_key")
assert hasattr(settings.research, "tavily_api_key")
# Check that data limits exist
assert hasattr(settings, "data_limits")
assert hasattr(settings.data_limits, "max_agent_iterations")
def test_exception_hierarchy(self):
"""Test that exception hierarchy is properly set up."""
from maverick_mcp.exceptions import (
AgentExecutionError,
MaverickException,
ResearchError,
WebSearchError,
)
# Test exception hierarchy
assert issubclass(AgentExecutionError, MaverickException)
assert issubclass(ResearchError, MaverickException)
assert issubclass(WebSearchError, ResearchError)
# Test exception instantiation
error = AgentExecutionError("Test error")
assert error.message == "Test error"
assert error.error_code == "AGENT_EXECUTION_ERROR"
def test_state_classes_structure(self):
"""Test that state classes have proper structure."""
from maverick_mcp.workflows.state import DeepResearchState, SupervisorState
# These should be TypedDict classes
assert hasattr(SupervisorState, "__annotations__")
assert hasattr(DeepResearchState, "__annotations__")
# Check key fields exist
supervisor_fields = SupervisorState.__annotations__.keys()
assert "query_classification" in supervisor_fields
assert "agent_results" in supervisor_fields
assert "workflow_status" in supervisor_fields
research_fields = DeepResearchState.__annotations__.keys()
assert "research_topic" in research_fields
assert "search_results" in research_fields
assert "research_findings" in research_fields
@pytest.mark.asyncio
async def test_circuit_breaker_integration(self):
"""Test that circuit breaker integration works."""
from maverick_mcp.agents.circuit_breaker import circuit_breaker, circuit_manager
# Test circuit breaker manager
assert circuit_manager is not None
# Test circuit breaker decorator
@circuit_breaker("test_breaker", failure_threshold=2)
async def test_function():
return "success"
result = await test_function()
assert result == "success"
def test_mcp_router_structure(self):
"""Test that MCP router is properly structured."""
from maverick_mcp.api.routers.agents import agents_router
# Should be a FastMCP instance
assert agents_router is not None
assert hasattr(agents_router, "name")
assert agents_router.name == "Financial_Analysis_Agents"
def test_agent_factory_function(self):
"""Test agent factory function structure."""
from maverick_mcp.api.routers.agents import get_or_create_agent
# Should be a callable function
assert callable(get_or_create_agent)
# Test with invalid agent type
with pytest.raises(ValueError, match="Unknown agent type"):
get_or_create_agent("invalid_type", "moderate")
class TestOrchestrationWorkflow:
"""Test orchestration workflow components."""
def test_persona_compatibility(self):
"""Test that all agents support all personas."""
expected_personas = ["conservative", "moderate", "aggressive"]
for persona_name in expected_personas:
assert persona_name in INVESTOR_PERSONAS
# All personas should have required attributes
persona = INVESTOR_PERSONAS[persona_name]
assert hasattr(persona, "name")
assert hasattr(persona, "risk_tolerance")
assert hasattr(persona, "position_size_max")
assert hasattr(persona, "stop_loss_multiplier")
def test_routing_categories_completeness(self):
"""Test that routing covers expected analysis categories."""
expected_categories = {
"market_screening",
"company_research",
"technical_analysis",
"sentiment_analysis",
}
routing_categories = set(ROUTING_MATRIX.keys())
# Should contain the key categories we care about
for category in expected_categories:
if category in routing_categories:
routing_info = ROUTING_MATRIX[category]
assert "primary" in routing_info
assert "agents" in routing_info
@pytest.mark.asyncio
async def test_end_to_end_mock_workflow(self):
"""Test a complete mock workflow from query to response."""
# Create mock LLM for testing
fake_llm = MockChatModel(
responses=[
"Mock analysis complete",
"Mock research findings",
"Mock synthesis result",
]
)
# Create mock agents
mock_market_agent = MagicMock()
mock_market_agent.analyze_market = AsyncMock(
return_value={
"status": "success",
"summary": "Market screening complete",
"confidence": 0.8,
}
)
# Create supervisor with mock agents
supervisor = SupervisorAgent(
llm=fake_llm, agents={"market": mock_market_agent}, persona="moderate"
)
# This would normally call the orchestration method
# For now, just verify the supervisor was created properly
assert supervisor is not None
assert len(supervisor.agents) == 1
if __name__ == "__main__":
# Run tests
pytest.main([__file__, "-v", "--tb=short"])
```
--------------------------------------------------------------------------------
/examples/deep_research_integration.py:
--------------------------------------------------------------------------------
```python
"""
DeepResearchAgent Integration Example
This example demonstrates how to use the DeepResearchAgent with the SupervisorAgent
for comprehensive financial research capabilities.
"""
import asyncio
import logging
from maverick_mcp.agents.deep_research import DeepResearchAgent
from maverick_mcp.agents.market_analysis import MarketAnalysisAgent
from maverick_mcp.agents.supervisor import SupervisorAgent
from maverick_mcp.agents.technical_analysis import TechnicalAnalysisAgent
from maverick_mcp.providers.llm_factory import get_llm
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def example_standalone_research():
"""Example of using DeepResearchAgent standalone."""
print("🔍 DeepResearchAgent Standalone Example")
print("=" * 50)
# Initialize LLM and agent
llm = get_llm()
research_agent = DeepResearchAgent(
llm=llm,
persona="moderate", # Conservative, Moderate, Aggressive, Day Trader
max_sources=30,
research_depth="comprehensive",
)
# Example 1: Company Research
print("\n📊 Example 1: Comprehensive Company Research")
print("-" * 40)
try:
result = await research_agent.research_company_comprehensive(
symbol="AAPL",
session_id="company_research_demo",
include_competitive_analysis=True,
)
print("✅ Research completed for AAPL")
print(f"📈 Confidence Score: {result.get('research_confidence', 0):.2f}")
print(f"📰 Sources Analyzed: {result.get('sources_found', 0)}")
if "persona_insights" in result:
insights = result["persona_insights"]
print(
f"🎯 Persona Insights: {len(insights.get('prioritized_insights', []))} relevant insights"
)
print(
f"⚠️ Risk Assessment: {insights.get('risk_assessment', {}).get('risk_acceptable', 'Unknown')}"
)
print(
f"💡 Recommended Action: {insights.get('recommended_action', 'No recommendation')}"
)
except Exception as e:
print(f"❌ Error in company research: {e}")
# Example 2: Market Sentiment Analysis
print("\n📈 Example 2: Market Sentiment Analysis")
print("-" * 40)
try:
result = await research_agent.analyze_market_sentiment(
topic="artificial intelligence stocks",
session_id="sentiment_analysis_demo",
timeframe="1w",
)
print("✅ Sentiment analysis completed")
if "content_analysis" in result:
analysis = result["content_analysis"]
consensus = analysis.get("consensus_view", {})
themes = analysis.get("key_themes", [])
print(
f"📊 Overall Sentiment: {consensus.get('direction', 'neutral').title()}"
)
print(f"🔒 Confidence: {consensus.get('confidence', 0):.2f}")
print(f"🔑 Key Themes: {len(themes)} themes identified")
if themes:
for i, theme in enumerate(themes[:3], 1):
print(
f" {i}. {theme.get('theme', 'Unknown')} (relevance: {theme.get('relevance', 0):.2f})"
)
except Exception as e:
print(f"❌ Error in sentiment analysis: {e}")
# Example 3: Custom Research Query
print("\n🔍 Example 3: Custom Research Query")
print("-" * 40)
try:
result = await research_agent.research_topic(
query="impact of Federal Reserve interest rate decisions on tech stocks",
session_id="custom_research_demo",
research_scope="comprehensive",
max_sources=25,
timeframe="1m",
)
print("✅ Custom research completed")
print(f"📊 Research Confidence: {result.get('research_confidence', 0):.2f}")
if "content_analysis" in result:
analysis = result["content_analysis"]
insights = analysis.get("insights", [])
print(f"💡 Insights Generated: {len(insights)}")
# Show top 3 insights
for i, insight in enumerate(insights[:3], 1):
insight_text = insight.get("insight", "No insight text")[:100] + "..."
confidence = insight.get("confidence", 0)
print(f" {i}. {insight_text} (confidence: {confidence:.2f})")
except Exception as e:
print(f"❌ Error in custom research: {e}")
async def example_supervisor_integration():
"""Example of using DeepResearchAgent with SupervisorAgent."""
print("\n🎛️ SupervisorAgent Integration Example")
print("=" * 50)
# Initialize LLM
llm = get_llm()
# Create specialized agents
market_agent = MarketAnalysisAgent(llm=llm, persona="moderate")
technical_agent = TechnicalAnalysisAgent(llm=llm, persona="moderate")
research_agent = DeepResearchAgent(llm=llm, persona="moderate")
# Create supervisor with all agents
supervisor = SupervisorAgent(
llm=llm,
agents={
"market": market_agent,
"technical": technical_agent,
"research": research_agent, # Key integration point
},
persona="moderate",
routing_strategy="llm_powered",
synthesis_mode="weighted",
)
# Example coordination scenarios
test_queries = [
{
"query": "Should I invest in MSFT? I want comprehensive analysis including recent news and competitive position",
"expected_routing": ["technical", "research"],
"description": "Investment decision requiring technical + research",
},
{
"query": "What's the current market sentiment on renewable energy stocks?",
"expected_routing": ["research"],
"description": "Pure sentiment analysis research",
},
{
"query": "Find me high-momentum stocks with strong fundamentals",
"expected_routing": ["market", "research"],
"description": "Screening + fundamental research",
},
]
for i, test_case in enumerate(test_queries, 1):
print(f"\n📋 Test Case {i}: {test_case['description']}")
print(f"Query: '{test_case['query']}'")
print(f"Expected Routing: {test_case['expected_routing']}")
print("-" * 60)
try:
result = await supervisor.coordinate_agents(
query=test_case["query"], session_id=f"supervisor_demo_{i}"
)
if result.get("status") == "success":
agents_used = result.get("agents_used", [])
confidence = result.get("confidence_score", 0)
execution_time = result.get("execution_time_ms", 0)
conflicts_resolved = result.get("conflicts_resolved", 0)
print("✅ Coordination successful")
print(f"🤖 Agents Used: {agents_used}")
print(f"📊 Confidence Score: {confidence:.2f}")
print(f"⏱️ Execution Time: {execution_time:.0f}ms")
print(f"🔧 Conflicts Resolved: {conflicts_resolved}")
# Show synthesis result
synthesis = (
result.get("synthesis", "No synthesis available")[:200] + "..."
)
print(f"📝 Synthesis Preview: {synthesis}")
else:
print(f"❌ Coordination failed: {result.get('error', 'Unknown error')}")
except Exception as e:
print(f"❌ Error in coordination: {e}")
async def example_persona_adaptation():
"""Example showing how research adapts to different investor personas."""
print("\n👥 Persona Adaptation Example")
print("=" * 50)
llm = get_llm()
personas = ["conservative", "moderate", "aggressive", "day_trader"]
query = "Should I invest in Tesla (TSLA)?"
for persona in personas:
print(f"\n🎭 Persona: {persona.title()}")
print("-" * 30)
try:
research_agent = DeepResearchAgent(
llm=llm,
persona=persona,
max_sources=20, # Smaller sample for demo
research_depth="standard",
)
result = await research_agent.research_topic(
query=query,
session_id=f"persona_demo_{persona}",
research_scope="standard",
timeframe="2w",
)
if "persona_insights" in result:
insights = result["persona_insights"]
risk_assessment = insights.get("risk_assessment", {})
action = insights.get("recommended_action", "No action")
alignment = insights.get("persona_alignment_score", 0)
print(f"📊 Persona Alignment: {alignment:.2f}")
print(
f"⚠️ Risk Acceptable: {risk_assessment.get('risk_acceptable', 'Unknown')}"
)
print(f"💡 Recommended Action: {action}")
# Show risk factors for conservative investors
if persona == "conservative" and risk_assessment.get("risk_factors"):
print(f"🚨 Risk Factors ({len(risk_assessment['risk_factors'])}):")
for factor in risk_assessment["risk_factors"][:2]:
print(f" • {factor[:80]}...")
else:
print("⚠️ No persona insights available")
except Exception as e:
print(f"❌ Error for {persona}: {e}")
async def example_research_tools_mcp():
"""Example showing MCP tool integration."""
print("\n🔧 MCP Tools Integration Example")
print("=" * 50)
# Note: This is a conceptual example - actual MCP tool usage would be through Claude Desktop
print("📚 Available Research Tools:")
print("1. comprehensive_research - Deep research on any financial topic")
print("2. analyze_market_sentiment - Market sentiment analysis")
print("3. research_company_comprehensive - Company fundamental analysis")
print("4. search_financial_news - News search and analysis")
print("5. validate_research_claims - Fact-checking and validation")
# Example tool configurations for Claude Desktop
print("\n📋 Claude Desktop Configuration Example:")
print("```json")
print("{")
print(' "mcpServers": {')
print(' "maverick-research": {')
print(' "command": "npx",')
print(' "args": ["-y", "mcp-remote", "http://localhost:8000/research"]')
print(" }")
print(" }")
print("}")
print("```")
print("\n💬 Example Claude Desktop Prompts:")
examples = [
"Research Tesla's competitive position in the EV market with comprehensive analysis",
"Analyze current market sentiment for renewable energy stocks over the past week",
"Perform fundamental analysis of Apple (AAPL) including business model and growth prospects",
"Search for recent financial news about Federal Reserve policy changes",
"Validate the claim that 'AI stocks outperformed the market by 20% this quarter'",
]
for i, example in enumerate(examples, 1):
print(f"{i}. {example}")
async def main():
"""Run all examples."""
print("🚀 DeepResearchAgent Comprehensive Examples")
print("=" * 60)
print("This demo showcases the DeepResearchAgent capabilities")
print("including standalone usage, SupervisorAgent integration,")
print("persona adaptation, and MCP tool integration.")
print("=" * 60)
try:
# Run examples
await example_standalone_research()
await example_supervisor_integration()
await example_persona_adaptation()
await example_research_tools_mcp()
print("\n✅ All examples completed successfully!")
print("\n📖 Next Steps:")
print("1. Set up EXA_API_KEY and TAVILY_API_KEY environment variables")
print("2. Configure Claude Desktop with the research MCP server")
print("3. Test with real queries through Claude Desktop")
print("4. Customize personas and research parameters as needed")
except Exception as e:
print(f"\n❌ Demo failed: {e}")
logger.exception("Demo execution failed")
if __name__ == "__main__":
# Run the examples
asyncio.run(main())
```
--------------------------------------------------------------------------------
/tests/test_error_handling.py:
--------------------------------------------------------------------------------
```python
"""
Comprehensive test suite for error handling and recovery mechanisms.
"""
from unittest.mock import Mock, patch
import pytest
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from maverick_mcp.agents.market_analysis import MarketAnalysisAgent
from maverick_mcp.exceptions import (
AgentInitializationError,
APIRateLimitError,
CircuitBreakerError,
PersonaConfigurationError,
ValidationError,
)
from maverick_mcp.logging_config import CorrelationIDMiddleware, ErrorLogger
# Mock tool input model
class MockToolInput(BaseModel):
"""Input for mock tool."""
query: str = Field(default="test", description="Test query")
# Create a proper mock tool that LangChain can work with
class MockTool(BaseTool):
"""Mock tool for testing."""
name: str = "mock_tool"
description: str = "A mock tool for testing"
args_schema: type[BaseModel] = MockToolInput
def _run(self, query: str = "test") -> str:
"""Run the tool."""
return f"Mock result for: {query}"
async def _arun(self, query: str = "test") -> str:
"""Run the tool asynchronously."""
return f"Mock result for: {query}"
# Create a mock tool with configurable set_persona method
class MockPersonaAwareTool(BaseTool):
"""Mock tool that can have a set_persona method."""
name: str = "mock_persona_tool"
description: str = "A mock persona-aware tool for testing"
args_schema: type[BaseModel] = MockToolInput
_fail_on_set_persona: bool = False # Private attribute using underscore
def __init__(self, fail_on_set_persona: bool = False, **kwargs):
"""Initialize with option to fail on set_persona."""
super().__init__(**kwargs)
# Use object.__setattr__ to bypass Pydantic validation
object.__setattr__(self, "_fail_on_set_persona", fail_on_set_persona)
def set_persona(self, persona: str) -> None:
"""Set the persona for the tool."""
if self._fail_on_set_persona:
raise Exception("Tool configuration failed")
def _run(self, query: str = "test") -> str:
"""Run the tool."""
return f"Mock result for: {query}"
async def _arun(self, query: str = "test") -> str:
"""Run the tool asynchronously."""
return f"Mock result for: {query}"
class TestAgentErrorHandling:
"""Test error handling in agent initialization and operation."""
@pytest.mark.asyncio
async def test_invalid_persona_error(self):
"""Test that invalid persona raises PersonaConfigurationError."""
mock_llm = Mock()
with pytest.raises(PersonaConfigurationError) as exc_info:
MarketAnalysisAgent(llm=mock_llm, persona="invalid_persona")
assert "Invalid persona 'invalid_persona'" in str(exc_info.value)
assert exc_info.value.context["invalid_persona"] == "invalid_persona"
assert "conservative" in exc_info.value.context["valid_personas"]
@pytest.mark.asyncio
async def test_no_tools_initialization_error(self):
"""Test that agent initialization fails gracefully with no tools."""
mock_llm = Mock()
with patch(
"maverick_mcp.agents.market_analysis.get_tool_registry"
) as mock_registry:
# Mock registry to return no tools
mock_registry.return_value.get_tool.return_value = None
# Also need to mock the directly instantiated tools
with (
patch(
"maverick_mcp.agents.market_analysis.PositionSizeTool",
return_value=None,
),
patch(
"maverick_mcp.agents.market_analysis.RiskMetricsTool",
return_value=None,
),
patch(
"maverick_mcp.agents.market_analysis.TechnicalStopsTool",
return_value=None,
),
patch(
"maverick_mcp.agents.market_analysis.NewsSentimentTool",
return_value=None,
),
patch(
"maverick_mcp.agents.market_analysis.MarketBreadthTool",
return_value=None,
),
patch(
"maverick_mcp.agents.market_analysis.SectorSentimentTool",
return_value=None,
),
):
with pytest.raises(AgentInitializationError) as exc_info:
MarketAnalysisAgent(llm=mock_llm, persona="moderate")
assert "No tools available" in str(exc_info.value)
assert exc_info.value.context["agent_type"] == "MarketAnalysisAgent"
@pytest.mark.asyncio
async def test_tool_registry_failure(self):
"""Test handling of tool registry failures."""
mock_llm = Mock()
with patch(
"maverick_mcp.agents.market_analysis.get_tool_registry"
) as mock_registry:
# Simulate registry failure
mock_registry.side_effect = Exception("Registry connection failed")
with pytest.raises(AgentInitializationError) as exc_info:
MarketAnalysisAgent(llm=mock_llm, persona="moderate")
assert "Registry connection failed" in str(exc_info.value)
@pytest.mark.asyncio
async def test_successful_initialization_with_retry(self):
"""Test successful initialization after transient failure."""
mock_llm = Mock()
attempts = 0
def mock_get_tool(name):
nonlocal attempts
attempts += 1
if attempts < 2:
return None # First attempt fails
return MockTool() # Second attempt succeeds with proper tool
with patch(
"maverick_mcp.agents.market_analysis.get_tool_registry"
) as mock_registry:
mock_registry.return_value.get_tool = mock_get_tool
# Should succeed on retry
agent = MarketAnalysisAgent(llm=mock_llm, persona="moderate")
assert agent is not None
class TestDataProviderErrorHandling:
"""Test error handling in data providers."""
def test_api_rate_limit_error(self):
"""Test API rate limit error handling."""
error = APIRateLimitError(provider="yahoo_finance", retry_after=60)
assert error.recoverable is True
assert error.context["retry_after"] == 60
assert "Rate limit exceeded" in str(error)
# Test error dictionary conversion
error_dict = error.to_dict()
assert error_dict["code"] == "RATE_LIMIT_EXCEEDED"
assert (
error_dict["message"]
== "Rate limit exceeded for yahoo_finance. Retry after 60 seconds"
)
assert error_dict["context"]["retry_after"] == 60
def test_data_not_found_error(self):
"""Test data not found error with date range."""
from maverick_mcp.exceptions import DataNotFoundError
error = DataNotFoundError(
symbol="INVALID", date_range=("2024-01-01", "2024-01-31")
)
assert "INVALID" in str(error)
assert "2024-01-01" in str(error)
assert error.context["symbol"] == "INVALID"
class TestCircuitBreakerIntegration:
"""Test circuit breaker error handling."""
def test_circuit_breaker_open_error(self):
"""Test circuit breaker open error."""
error = CircuitBreakerError(
service="stock_data_api", failure_count=5, threshold=3
)
assert error.recoverable is True
assert error.context["failure_count"] == 5
assert error.context["threshold"] == 3
assert "Circuit breaker open" in str(error)
class TestValidationErrors:
"""Test validation error handling."""
def test_parameter_validation_error(self):
"""Test parameter validation error."""
from maverick_mcp.exceptions import ParameterValidationError
error = ParameterValidationError(
param_name="start_date", expected_type="datetime", actual_type="str"
)
assert error.recoverable is True # Default is True in new implementation
assert "Expected datetime, got str" in str(error)
assert (
error.field == "start_date"
) # ParameterValidationError inherits from ValidationError which uses "field"
assert error.context["expected_type"] == "datetime"
assert error.context["actual_type"] == "str"
def test_validation_error_with_details(self):
"""Test validation error with detailed context."""
error = ValidationError(message="Invalid ticker format", field="ticker")
error.context["value"] = "ABC123"
assert error.recoverable is True # Default is True now
assert "Invalid ticker format" in str(error)
assert error.field == "ticker"
assert error.context["value"] == "ABC123"
class TestErrorLogging:
"""Test structured error logging functionality."""
def test_error_logger_masking(self):
"""Test that sensitive data is masked in logs."""
logger = Mock()
error_logger = ErrorLogger(logger)
sensitive_context = {
"api_key": "secret123",
"user_data": {"email": "[email protected]", "password": "password123"},
"safe_field": "visible_data",
}
error = ValueError("Test error")
error_logger.log_error(error, sensitive_context)
# Check that log was called
assert logger.log.called
# Get the extra data passed to logger
call_args = logger.log.call_args
extra_data = call_args[1]["extra"]
# Verify sensitive data was masked
assert extra_data["context"]["api_key"] == "***MASKED***"
assert extra_data["context"]["user_data"]["password"] == "***MASKED***"
assert extra_data["context"]["safe_field"] == "visible_data"
def test_error_counting(self):
"""Test error count tracking."""
logger = Mock()
error_logger = ErrorLogger(logger)
# Log same error type multiple times
for _i in range(3):
error_logger.log_error(ValueError("Test"), {})
# Log different error type
error_logger.log_error(TypeError("Test"), {})
stats = error_logger.get_error_stats()
assert stats["ValueError"] == 3
assert stats["TypeError"] == 1
class TestCorrelationIDMiddleware:
"""Test correlation ID tracking."""
def test_correlation_id_generation(self):
"""Test correlation ID generation and retrieval."""
# Generate new ID
correlation_id = CorrelationIDMiddleware.set_correlation_id()
assert correlation_id.startswith("mcp-")
assert len(correlation_id) == 12 # "mcp-" + 8 hex chars
# Retrieve same ID
retrieved_id = CorrelationIDMiddleware.get_correlation_id()
assert retrieved_id == correlation_id
def test_correlation_id_persistence(self):
"""Test that correlation ID persists across function calls."""
correlation_id = CorrelationIDMiddleware.set_correlation_id()
def inner_function():
return CorrelationIDMiddleware.get_correlation_id()
assert inner_function() == correlation_id
# Integration test for complete error flow
class TestErrorFlowIntegration:
"""Test complete error handling flow from agent to logging."""
@pytest.mark.asyncio
async def test_complete_error_flow(self):
"""Test error propagation from tool through agent to logging."""
mock_llm = Mock()
with patch(
"maverick_mcp.agents.market_analysis.get_tool_registry"
) as mock_registry:
# Create a proper mock tool that will fail on set_persona
mock_tool = MockPersonaAwareTool(fail_on_set_persona=True)
mock_registry.return_value.get_tool.return_value = mock_tool
# Agent should still initialize but log warning
with patch("maverick_mcp.agents.market_analysis.logger") as mock_logger:
MarketAnalysisAgent(llm=mock_llm, persona="moderate")
# Verify warning was logged
assert mock_logger.warning.called
warning_msg = mock_logger.warning.call_args[0][0]
assert "Failed to set persona" in warning_msg
```
--------------------------------------------------------------------------------
/docs/deep_research_agent.md:
--------------------------------------------------------------------------------
```markdown
# DeepResearchAgent Documentation
## Overview
The DeepResearchAgent provides comprehensive financial research capabilities using web search, content analysis, and AI-powered insights. It integrates seamlessly with the existing maverick-mcp architecture and adapts research depth and focus based on investor personas.
## Key Features
### 🔍 Comprehensive Research
- **Multi-Source Web Search**: Integrates Exa AI and Tavily for comprehensive coverage
- **Content Analysis**: AI-powered extraction of insights, sentiment, and key themes
- **Source Credibility**: Automatic scoring and validation of information sources
- **Citation Management**: Proper citations and reference tracking
- **Fact Validation**: Cross-referencing and validation of research claims
### 🎯 Persona-Aware Research
- **Conservative**: Focus on stability, dividends, risk factors, established companies
- **Moderate**: Balanced approach with growth and value considerations
- **Aggressive**: Emphasis on growth opportunities, momentum, high-return potential
- **Day Trader**: Short-term focus, liquidity, technical factors, immediate opportunities
### 🏗️ LangGraph 2025 Integration
- **State Management**: Comprehensive state tracking with `DeepResearchState`
- **Workflow Orchestration**: Multi-step research process with error handling
- **Streaming Support**: Real-time progress updates and streaming responses
- **Circuit Breaker**: Automatic failover and rate limiting protection
## Architecture
### Core Components
```
DeepResearchAgent
├── ResearchQueryAnalyzer # Query analysis and strategy planning
├── WebSearchProvider # Multi-provider search (Exa, Tavily)
├── ContentAnalyzer # AI-powered content analysis
├── PersonaAdapter # Persona-specific result filtering
└── CacheManager # Intelligent caching and performance
```
### State Management
The `DeepResearchState` extends `BaseAgentState` with comprehensive tracking:
```python
class DeepResearchState(BaseAgentState):
# Research parameters
research_query: str
research_scope: str
research_depth: str
timeframe: str
# Source management
raw_sources: list[dict]
processed_sources: list[dict]
source_credibility: dict[str, float]
# Content analysis
extracted_content: dict[str, str]
key_insights: list[dict]
sentiment_analysis: dict
# Research findings
research_themes: list[dict]
consensus_view: dict
contrarian_views: list[dict]
# Persona adaptation
persona_focus_areas: list[str]
actionable_insights: list[dict]
```
## Usage Examples
### Standalone Usage
```python
from maverick_mcp.agents.deep_research import DeepResearchAgent
from maverick_mcp.providers.llm_factory import get_llm
# Initialize agent
llm = get_llm()
research_agent = DeepResearchAgent(
llm=llm,
persona="moderate",
max_sources=50,
research_depth="comprehensive"
)
# Company research
result = await research_agent.research_company_comprehensive(
symbol="AAPL",
session_id="research_session",
include_competitive_analysis=True
)
# Market sentiment analysis
sentiment = await research_agent.analyze_market_sentiment(
topic="artificial intelligence stocks",
session_id="sentiment_session",
timeframe="1w"
)
# Custom research
custom = await research_agent.research_topic(
query="impact of Federal Reserve policy on tech stocks",
session_id="custom_session",
research_scope="comprehensive",
timeframe="1m"
)
```
### SupervisorAgent Integration
```python
from maverick_mcp.agents.supervisor import SupervisorAgent
# Create supervisor with research agent
supervisor = SupervisorAgent(
llm=llm,
agents={
"market": market_agent,
"technical": technical_agent,
"research": research_agent # DeepResearchAgent
},
persona="moderate"
)
# Coordinated analysis
result = await supervisor.coordinate_agents(
query="Should I invest in MSFT? I want comprehensive analysis",
session_id="coordination_session"
)
```
### MCP Tools Integration
Available MCP tools for Claude Desktop:
1. **`comprehensive_research`** - Deep research on any financial topic
2. **`analyze_market_sentiment`** - Market sentiment analysis
3. **`research_company_comprehensive`** - Company fundamental analysis
4. **`search_financial_news`** - News search and analysis
5. **`validate_research_claims`** - Fact-checking and validation
#### Claude Desktop Configuration
```json
{
"mcpServers": {
"maverick-research": {
"command": "npx",
"args": ["-y", "mcp-remote", "http://localhost:8003/research"]
}
}
}
```
#### Example Prompts
- "Research Tesla's competitive position in the EV market with comprehensive analysis"
- "Analyze current market sentiment for renewable energy stocks"
- "Perform fundamental analysis of Apple (AAPL) including competitive advantages"
- "Search for recent news about Federal Reserve interest rate decisions"
## Configuration
### Environment Variables
```bash
# Required API Keys
EXA_API_KEY=your_exa_api_key
TAVILY_API_KEY=your_tavily_api_key
# Optional Configuration
RESEARCH_MAX_SOURCES=50
RESEARCH_CACHE_TTL_HOURS=4
RESEARCH_DEPTH=comprehensive
```
### Settings
```python
from maverick_mcp.config.settings import get_settings
settings = get_settings()
# Research settings
research_config = settings.research
print(f"Max sources: {research_config.default_max_sources}")
print(f"Cache TTL: {research_config.cache_ttl_hours} hours")
print(f"Trusted domains: {research_config.trusted_domains}")
```
## Research Workflow
### 1. Query Analysis
- Classify research type (company, sector, market, news, fundamental)
- Determine appropriate search strategies and sources
- Set persona-specific focus areas and priorities
### 2. Search Execution
- Execute parallel searches across multiple providers
- Apply domain filtering and content type selection
- Handle rate limiting and error recovery
### 3. Content Processing
- Extract and clean content from sources
- Remove duplicates and low-quality sources
- Score sources for credibility and relevance
### 4. Content Analysis
- AI-powered insight extraction
- Sentiment analysis and trend detection
- Theme identification and cross-referencing
### 5. Persona Adaptation
- Filter insights for persona relevance
- Adjust risk assessments and recommendations
- Generate persona-specific action items
### 6. Result Synthesis
- Consolidate findings into coherent analysis
- Generate citations and source references
- Calculate confidence scores and quality metrics
## Persona Behaviors
### Conservative Investor
- **Focus**: Stability, dividends, established companies, risk factors
- **Sources**: Prioritize authoritative financial publications
- **Insights**: Emphasize capital preservation and low-risk opportunities
- **Actions**: More cautious recommendations with detailed risk analysis
### Moderate Investor
- **Focus**: Balanced growth and value, diversification
- **Sources**: Mix of news, analysis, and fundamental reports
- **Insights**: Balanced view of opportunities and risks
- **Actions**: Moderate position sizing with measured recommendations
### Aggressive Investor
- **Focus**: Growth opportunities, momentum, high-return potential
- **Sources**: Include social media sentiment and trending analysis
- **Insights**: Emphasize upside potential and growth catalysts
- **Actions**: Larger position sizing with growth-focused recommendations
### Day Trader
- **Focus**: Short-term catalysts, technical factors, liquidity
- **Sources**: Real-time news, social sentiment, technical analysis
- **Insights**: Immediate trading opportunities and momentum indicators
- **Actions**: Quick-turn recommendations with tight risk controls
## Performance & Caching
### Intelligent Caching
- **Research Results**: 4-hour TTL for comprehensive research
- **Source Content**: 1-hour TTL for raw content
- **Sentiment Analysis**: 30-minute TTL for rapidly changing topics
- **Company Fundamentals**: 24-hour TTL for stable company data
### Rate Limiting
- **Exa AI**: Respects API rate limits with exponential backoff
- **Tavily**: Built-in rate limiting and request queuing
- **Content Analysis**: Batch processing to optimize LLM usage
### Performance Optimization
- **Parallel Search**: Concurrent execution across providers
- **Content Streaming**: Progressive result delivery
- **Circuit Breakers**: Automatic failover on provider issues
- **Connection Pooling**: Efficient network resource usage
## Error Handling
### Circuit Breaker Pattern
- Automatic provider failover on repeated failures
- Graceful degradation with partial results
- Recovery testing and automatic restoration
### Fallback Strategies
- Provider fallback (Exa → Tavily → Basic web search)
- Reduced scope fallback (comprehensive → standard → basic)
- Cached result fallback when live search fails
### Error Types
- `WebSearchError`: Search provider failures
- `ContentAnalysisError`: Content processing failures
- `ResearchError`: General research operation failures
- `CircuitBreakerError`: Circuit breaker activation
## Integration Points
### SupervisorAgent Routing
- Automatic routing for research-related queries
- Intelligent agent selection based on query complexity
- Result synthesis with technical and market analysis
### MCP Server Integration
- RESTful API endpoints for external access
- Standardized request/response formats
- Authentication and rate limiting support
### Database Integration
- Research result caching in PostgreSQL/SQLite
- Source credibility tracking and learning
- Historical research analysis and trends
## Best Practices
### Query Optimization
- Use specific, focused queries for better results
- Include timeframe context for temporal relevance
- Specify research depth based on needs (basic/standard/comprehensive)
### Persona Selection
- Choose persona that matches intended investment style
- Consider persona characteristics in result interpretation
- Use persona-specific insights for decision making
### Result Interpretation
- Review confidence scores and source diversity
- Consider contrarian views alongside consensus
- Validate critical claims through multiple sources
### Performance Tuning
- Adjust max_sources based on speed vs. comprehensiveness needs
- Use appropriate research_depth for the use case
- Monitor cache hit rates and adjust TTL settings
## Troubleshooting
### Common Issues
1. **No Results Found**
- Check API key configuration
- Verify internet connectivity
- Try broader search terms
2. **Low Confidence Scores**
- Increase max_sources parameter
- Use longer timeframe for more data
- Check for topic relevance and specificity
3. **Rate Limiting Errors**
- Review API usage limits
- Implement request spacing
- Consider upgrading API plans
4. **Poor Persona Alignment**
- Review persona characteristics
- Adjust focus areas in research strategy
- Consider custom persona configuration
### Debug Mode
```python
import logging
logging.basicConfig(level=logging.DEBUG)
# Enable detailed logging for troubleshooting
research_agent = DeepResearchAgent(
llm=llm,
persona="moderate",
research_depth="comprehensive"
)
```
## Future Enhancements
### Planned Features
- **Multi-language Support**: Research in multiple languages
- **PDF Analysis**: Direct analysis of earnings reports and filings
- **Real-time Alerts**: Research-based alert generation
- **Custom Personas**: User-defined persona characteristics
- **Research Collaboration**: Multi-user research sessions
### API Extensions
- **Batch Research**: Process multiple queries simultaneously
- **Research Templates**: Pre-configured research workflows
- **Historical Analysis**: Time-series research trend analysis
- **Integration APIs**: Third-party platform integrations
---
## Support
For questions, issues, or feature requests related to the DeepResearchAgent:
1. Check the troubleshooting section above
2. Review the example code in `/examples/deep_research_integration.py`
3. Enable debug logging for detailed error information
4. Consider the integration patterns with SupervisorAgent for complex workflows
The DeepResearchAgent is designed to provide institutional-quality research capabilities while maintaining the flexibility and persona-awareness that makes it suitable for individual investors across all experience levels.
```
--------------------------------------------------------------------------------
/maverick_mcp/application/screening/dtos.py:
--------------------------------------------------------------------------------
```python
"""
Screening application DTOs (Data Transfer Objects).
This module contains DTOs for request/response communication
between the API layer and application layer.
"""
from typing import Any
from pydantic import BaseModel, Field, validator
from maverick_mcp.domain.screening.value_objects import ScreeningStrategy
class ScreeningRequestDTO(BaseModel):
"""
DTO for screening requests from the API layer.
This DTO validates and structures incoming screening requests.
"""
strategy: str = Field(
description="Screening strategy to use", example="maverick_bullish"
)
limit: int = Field(
default=20, ge=1, le=100, description="Maximum number of results to return"
)
# Filtering criteria
min_momentum_score: float | None = Field(
default=None, ge=0, le=100, description="Minimum momentum score"
)
max_momentum_score: float | None = Field(
default=None, ge=0, le=100, description="Maximum momentum score"
)
min_volume: int | None = Field(
default=None, ge=0, description="Minimum average daily volume"
)
max_volume: int | None = Field(
default=None, ge=0, description="Maximum average daily volume"
)
min_price: float | None = Field(
default=None, gt=0, description="Minimum stock price"
)
max_price: float | None = Field(
default=None, gt=0, description="Maximum stock price"
)
min_combined_score: int | None = Field(
default=None, ge=0, description="Minimum combined score for bullish screening"
)
min_bear_score: int | None = Field(
default=None, ge=0, description="Minimum bear score for bearish screening"
)
min_adr_percentage: float | None = Field(
default=None, ge=0, description="Minimum average daily range percentage"
)
max_adr_percentage: float | None = Field(
default=None, ge=0, description="Maximum average daily range percentage"
)
# Pattern filters
require_pattern_detected: bool = Field(
default=False, description="Require pattern detection signal"
)
require_squeeze: bool = Field(default=False, description="Require squeeze signal")
require_consolidation: bool = Field(
default=False, description="Require consolidation pattern"
)
require_entry_signal: bool = Field(
default=False, description="Require entry signal"
)
# Moving average filters
require_above_sma50: bool = Field(
default=False, description="Require price above SMA 50"
)
require_above_sma150: bool = Field(
default=False, description="Require price above SMA 150"
)
require_above_sma200: bool = Field(
default=False, description="Require price above SMA 200"
)
require_ma_alignment: bool = Field(
default=False,
description="Require proper moving average alignment (50>150>200)",
)
# Sorting options
sort_field: str | None = Field(
default=None, description="Field to sort by (strategy default if not specified)"
)
sort_descending: bool = Field(default=True, description="Sort in descending order")
@validator("strategy")
def validate_strategy(cls, v):
"""Validate that strategy is a known screening strategy."""
valid_strategies = [s.value for s in ScreeningStrategy]
if v not in valid_strategies:
raise ValueError(f"Invalid strategy. Must be one of: {valid_strategies}")
return v
@validator("max_momentum_score")
def validate_momentum_score_range(cls, v, values):
"""Validate that max_momentum_score >= min_momentum_score if both specified."""
if (
v is not None
and "min_momentum_score" in values
and values["min_momentum_score"] is not None
):
if v < values["min_momentum_score"]:
raise ValueError(
"max_momentum_score cannot be less than min_momentum_score"
)
return v
@validator("max_volume")
def validate_volume_range(cls, v, values):
"""Validate that max_volume >= min_volume if both specified."""
if (
v is not None
and "min_volume" in values
and values["min_volume"] is not None
):
if v < values["min_volume"]:
raise ValueError("max_volume cannot be less than min_volume")
return v
@validator("max_price")
def validate_price_range(cls, v, values):
"""Validate that max_price >= min_price if both specified."""
if v is not None and "min_price" in values and values["min_price"] is not None:
if v < values["min_price"]:
raise ValueError("max_price cannot be less than min_price")
return v
@validator("sort_field")
def validate_sort_field(cls, v):
"""Validate sort field if specified."""
if v is not None:
valid_fields = {
"combined_score",
"bear_score",
"momentum_score",
"close_price",
"volume",
"avg_volume_30d",
"adr_percentage",
"quality_score",
}
if v not in valid_fields:
raise ValueError(f"Invalid sort field. Must be one of: {valid_fields}")
return v
class ScreeningResultDTO(BaseModel):
"""
DTO for individual screening results.
This DTO represents a single stock screening result for API responses.
"""
stock_symbol: str = Field(description="Stock ticker symbol")
screening_date: str = Field(description="Date when screening was performed")
close_price: float = Field(description="Current closing price")
volume: int = Field(description="Current volume")
momentum_score: float = Field(description="Momentum score (0-100)")
adr_percentage: float = Field(description="Average daily range percentage")
# Technical indicators
ema_21: float = Field(description="21-period exponential moving average")
sma_50: float = Field(description="50-period simple moving average")
sma_150: float = Field(description="150-period simple moving average")
sma_200: float = Field(description="200-period simple moving average")
avg_volume_30d: float = Field(description="30-day average volume")
atr: float = Field(description="Average True Range")
# Pattern signals
pattern: str | None = Field(default=None, description="Detected pattern")
squeeze: str | None = Field(default=None, description="Squeeze signal")
consolidation: str | None = Field(
default=None, description="Consolidation pattern signal"
)
entry_signal: str | None = Field(default=None, description="Entry signal")
# Scores
combined_score: int = Field(description="Combined bullish score")
bear_score: int = Field(description="Bearish score")
quality_score: int = Field(description="Overall quality score")
# Business rule indicators
is_bullish: bool = Field(description="Meets bullish setup criteria")
is_bearish: bool = Field(description="Meets bearish setup criteria")
is_trending: bool = Field(description="Meets trending criteria")
risk_reward_ratio: float = Field(description="Calculated risk/reward ratio")
# Bearish-specific fields (optional)
rsi_14: float | None = Field(default=None, description="14-period RSI")
macd: float | None = Field(default=None, description="MACD line")
macd_signal: float | None = Field(default=None, description="MACD signal line")
macd_histogram: float | None = Field(default=None, description="MACD histogram")
distribution_days_20: int | None = Field(
default=None, description="Distribution days in last 20 days"
)
atr_contraction: bool | None = Field(
default=None, description="ATR contraction detected"
)
big_down_volume: bool | None = Field(
default=None, description="Big down volume detected"
)
class ScreeningCollectionDTO(BaseModel):
"""
DTO for screening result collections.
This DTO represents the complete response for a screening operation.
"""
strategy_used: str = Field(description="Screening strategy that was used")
screening_timestamp: str = Field(description="When the screening was performed")
total_candidates_analyzed: int = Field(
description="Total number of candidates analyzed"
)
results_returned: int = Field(description="Number of results returned")
results: list[ScreeningResultDTO] = Field(
description="Individual screening results"
)
# Statistics and metadata
statistics: dict[str, Any] = Field(description="Collection statistics")
applied_filters: dict[str, Any] = Field(description="Filters that were applied")
sorting_applied: dict[str, Any] = Field(description="Sorting configuration used")
# Status information
status: str = Field(default="success", description="Operation status")
execution_time_ms: float | None = Field(
default=None, description="Execution time in milliseconds"
)
warnings: list[str] = Field(
default_factory=list, description="Any warnings during processing"
)
class AllScreeningResultsDTO(BaseModel):
"""
DTO for comprehensive screening results across all strategies.
This DTO represents results from all available screening strategies.
"""
screening_timestamp: str = Field(description="When the screening was performed")
strategies_executed: list[str] = Field(
description="List of strategies that were executed"
)
# Results by strategy
maverick_bullish: ScreeningCollectionDTO | None = Field(
default=None, description="Maverick bullish screening results"
)
maverick_bearish: ScreeningCollectionDTO | None = Field(
default=None, description="Maverick bearish screening results"
)
trending: ScreeningCollectionDTO | None = Field(
default=None, description="Trending screening results"
)
# Cross-strategy analysis
cross_strategy_analysis: dict[str, Any] = Field(
description="Analysis across multiple strategies"
)
# Overall statistics
overall_summary: dict[str, Any] = Field(
description="Summary statistics across all strategies"
)
# Status information
status: str = Field(default="success", description="Operation status")
execution_time_ms: float | None = Field(
default=None, description="Total execution time in milliseconds"
)
errors: list[str] = Field(
default_factory=list, description="Any errors during processing"
)
class ScreeningStatisticsDTO(BaseModel):
"""
DTO for screening statistics and analytics.
This DTO provides comprehensive analytics and business intelligence
for screening operations.
"""
strategy: str | None = Field(
default=None, description="Strategy analyzed (None for all)"
)
timestamp: str = Field(description="When the analysis was performed")
# Single strategy statistics
statistics: dict[str, Any] | None = Field(
default=None, description="Statistics for single strategy analysis"
)
# Multi-strategy statistics
overall_summary: dict[str, Any] | None = Field(
default=None, description="Summary across all strategies"
)
by_strategy: dict[str, dict[str, Any]] | None = Field(
default=None, description="Statistics broken down by strategy"
)
cross_strategy_analysis: dict[str, Any] | None = Field(
default=None, description="Cross-strategy insights and analysis"
)
# Metadata
analysis_scope: str = Field(description="Scope of the analysis (single/all)")
results_analyzed: int = Field(description="Total number of results analyzed")
class ErrorResponseDTO(BaseModel):
"""
DTO for error responses.
This DTO provides standardized error information for API responses.
"""
status: str = Field(default="error", description="Response status")
error_code: str = Field(description="Machine-readable error code")
error_message: str = Field(description="Human-readable error message")
details: dict[str, Any] | None = Field(
default=None, description="Additional error details"
)
timestamp: str = Field(description="When the error occurred")
request_id: str | None = Field(
default=None, description="Request identifier for tracking"
)
```
--------------------------------------------------------------------------------
/tests/test_integration_simple.py:
--------------------------------------------------------------------------------
```python
"""
Simplified Integration Test Suite for MaverickMCP Security System.
This test suite validates that the core security integrations are working:
- API server can start
- Health check endpoints
- Basic authentication flow (if available)
- Security middleware is active
- Performance systems can initialize
This is a lightweight version to validate system integration without
requiring full database or Redis setup.
"""
import os
from unittest.mock import MagicMock, patch
import pytest
from fastapi.testclient import TestClient
from maverick_mcp.api.api_server import create_api_app
@pytest.fixture
def mock_settings():
"""Mock settings for testing."""
with patch.dict(
os.environ,
{
"AUTH_ENABLED": "true",
"ENVIRONMENT": "test",
"DATABASE_URL": "sqlite:///:memory:",
"REDIS_URL": "redis://localhost:6379/15",
},
):
yield
@pytest.fixture
def mock_redis():
"""Mock Redis client."""
mock_redis = MagicMock()
mock_redis.ping.return_value = True
mock_redis.get.return_value = None
mock_redis.setex.return_value = True
mock_redis.delete.return_value = 1
mock_redis.keys.return_value = []
mock_redis.flushdb.return_value = True
mock_redis.close.return_value = None
return mock_redis
@pytest.fixture
def mock_database():
"""Mock database operations."""
from unittest.mock import MagicMock
mock_db = MagicMock()
# Mock SQLAlchemy Session methods
mock_query = MagicMock()
mock_query.filter.return_value.first.return_value = None # No user found
mock_query.filter.return_value.all.return_value = []
mock_db.query.return_value = mock_query
# Mock basic session operations
mock_db.execute.return_value.scalar.return_value = 1
mock_db.execute.return_value.fetchall.return_value = []
mock_db.commit.return_value = None
mock_db.close.return_value = None
mock_db.add.return_value = None
return mock_db
@pytest.fixture
def integrated_app(mock_settings, mock_redis, mock_database):
"""Create integrated app with mocked dependencies."""
# Mock database dependency
def mock_get_db():
yield mock_database
# Mock Redis connection manager
with patch("maverick_mcp.data.performance.redis_manager") as mock_redis_manager:
mock_redis_manager.initialize.return_value = True
mock_redis_manager.get_client.return_value = mock_redis
mock_redis_manager._healthy = True
mock_redis_manager._initialized = True
mock_redis_manager.get_metrics.return_value = {
"healthy": True,
"initialized": True,
"commands_executed": 0,
"errors": 0,
}
# Mock performance systems
with patch(
"maverick_mcp.data.performance.initialize_performance_systems"
) as mock_init:
mock_init.return_value = {"redis_manager": True, "request_cache": True}
# Mock monitoring
with patch("maverick_mcp.utils.monitoring.initialize_monitoring"):
# Create app
app = create_api_app()
# Override database dependencies
from maverick_mcp.data.models import get_async_db, get_db
app.dependency_overrides[get_db] = mock_get_db
# Mock async database dependency
async def mock_get_async_db():
yield mock_database
app.dependency_overrides[get_async_db] = mock_get_async_db
yield app
@pytest.fixture
def client(integrated_app):
"""Create test client."""
return TestClient(integrated_app)
class TestSystemIntegration:
"""Test core system integration."""
def test_api_server_creation(self, integrated_app):
"""Test that API server can be created successfully."""
assert integrated_app is not None
assert hasattr(integrated_app, "router")
assert hasattr(integrated_app, "middleware")
@pytest.mark.skip(reason="Requires Redis and external services not available in CI")
def test_health_check_endpoint(self, client):
"""Test health check endpoint is available."""
response = client.get("/health")
assert response.status_code == 200
health_data = response.json()
assert "status" in health_data
assert "service" in health_data
assert health_data["service"] == "MaverickMCP API"
def test_security_middleware_present(self, integrated_app):
"""Test that security middleware is loaded."""
# FastAPI middleware stack is different, check if the app has middleware
assert hasattr(integrated_app, "middleware_stack") or hasattr(
integrated_app, "middleware"
)
# The actual middleware is added during app creation
# We can verify by checking the app structure
assert integrated_app is not None
def test_cors_configuration(self, integrated_app):
"""Test CORS middleware is configured."""
# CORS middleware is added during app creation
assert integrated_app is not None
def test_api_endpoints_available(self, client):
"""Test that key API endpoints are available."""
# Test root endpoint
response = client.get("/")
assert response.status_code == 200
root_data = response.json()
assert "service" in root_data
assert "endpoints" in root_data
# Verify key endpoints are listed and billing endpoints are absent
endpoints = root_data["endpoints"]
if isinstance(endpoints, dict):
endpoint_names = set(endpoints.keys())
elif isinstance(endpoints, list):
endpoint_names = set(endpoints)
else:
pytest.fail(f"Unexpected endpoints payload type: {type(endpoints)!r}")
assert "auth" in endpoint_names
assert "health" in endpoint_names
assert "billing" not in endpoint_names
def test_authentication_endpoints_available(self, client):
"""Test authentication endpoints are available."""
# Test registration endpoint (should require data)
response = client.post("/auth/signup", json={})
assert response.status_code in [400, 422] # Validation error, not 404
# Test login endpoint (should require data)
response = client.post("/auth/login", json={})
assert response.status_code in [400, 422] # Validation error, not 404
def test_billing_endpoints_removed(self, client):
"""Ensure legacy billing endpoints are no longer exposed."""
response = client.get("/billing/balance")
assert response.status_code == 404
def test_error_handling_active(self, client):
"""Test that error handling middleware is active."""
# Test 404 handling
response = client.get("/nonexistent/endpoint")
assert response.status_code == 404
error_data = response.json()
assert "error" in error_data or "detail" in error_data
# Should have structured error response
assert isinstance(error_data, dict)
@pytest.mark.skip(reason="Requires Redis and external services not available in CI")
def test_request_tracing_active(self, client):
"""Test request tracing is active."""
# Make request and check for tracing headers
response = client.get("/health")
# Should have request tracing in headers or response
# At minimum, should not error
assert response.status_code == 200
class TestSecurityValidation:
"""Test security features are active."""
def test_csrf_protection_blocks_unsafe_requests(self, client):
"""Test CSRF protection is active."""
# The CSRF middleware is fully tested in test_security_comprehensive.py
# In this integration test, we just verify that auth endpoints exist
# and respond appropriately to requests
# Try login endpoint without credentials
response = client.post("/auth/login", json={})
# Should get validation error for missing fields, not 404
assert response.status_code in [400, 422]
def test_rate_limiting_configured(self, integrated_app):
"""Test rate limiting middleware is configured."""
# Check if rate limiting middleware is present
middleware_types = [type(m).__name__ for m in integrated_app.user_middleware]
# Rate limiting might be present
any(
"Rate" in middleware_type or "Limit" in middleware_type
for middleware_type in middleware_types
)
# In test environment, this might not be fully configured
# Just verify the system doesn't crash
assert True # Basic test passes if we get here
def test_authentication_configuration(self, client):
"""Test authentication system is configured."""
# Test that auth endpoints exist and respond appropriately
response = client.post(
"/auth/login", json={"email": "[email protected]", "password": "invalid"}
)
# Should get validation error or auth failure, not 500
assert response.status_code < 500
class TestPerformanceSystemsIntegration:
"""Test performance systems integration."""
def test_metrics_endpoint_available(self, client):
"""Test metrics endpoint is available."""
response = client.get("/metrics")
# Metrics might be restricted or not available in test
assert response.status_code in [200, 401, 403, 404]
if response.status_code == 200:
# Should return metrics in text format
assert response.headers.get("content-type") is not None
def test_performance_monitoring_available(self, integrated_app):
"""Test performance monitoring is available."""
# Check that performance systems can be imported
try:
from maverick_mcp.data.performance import (
query_optimizer,
redis_manager,
request_cache,
)
assert redis_manager is not None
assert request_cache is not None
assert query_optimizer is not None
except ImportError:
pytest.skip("Performance monitoring modules not available")
class TestConfigurationValidation:
"""Test system configuration validation."""
def test_settings_validation(self):
"""Test settings validation system."""
try:
from maverick_mcp.config.validation import get_validation_status
validation_status = get_validation_status()
assert "valid" in validation_status
assert "warnings" in validation_status
assert "errors" in validation_status
# System should be in a valid state for testing
assert isinstance(validation_status["valid"], bool)
except ImportError:
pytest.skip("Configuration validation not available")
def test_environment_configuration(self):
"""Test environment configuration."""
from maverick_mcp.config.settings import get_settings
settings = get_settings()
# Basic settings should be available
assert hasattr(settings, "auth")
assert hasattr(settings, "api")
assert hasattr(settings, "environment")
# Environment should be set
assert settings.environment in ["development", "test", "staging", "production"]
class TestSystemStartup:
"""Test system startup procedures."""
def test_app_startup_succeeds(self, integrated_app):
"""Test that app startup completes successfully."""
# If we can create the app, startup succeeded
assert integrated_app is not None
# App should have core FastAPI attributes
assert hasattr(integrated_app, "openapi")
assert hasattr(integrated_app, "routes")
assert hasattr(integrated_app, "middleware_stack")
@pytest.mark.skip(reason="Requires Redis and external services not available in CI")
def test_dependency_injection_works(self, client):
"""Test dependency injection is working."""
# Make a request that would use dependency injection
response = client.get("/health")
assert response.status_code == 200
# If dependencies weren't working, we'd get 500 errors
health_data = response.json()
assert "service" in health_data
if __name__ == "__main__":
pytest.main([__file__, "-v", "--tb=short"])
```
--------------------------------------------------------------------------------
/scripts/tiingo_config.py:
--------------------------------------------------------------------------------
```python
"""
Configuration settings for the Tiingo data loader.
This file contains configuration options that can be customized
for different loading scenarios and environments.
"""
import os
from dataclasses import dataclass
from typing import Any
@dataclass
class TiingoConfig:
"""Configuration for Tiingo data loader."""
# API Configuration
rate_limit_per_hour: int = 2400 # Tiingo free tier limit
max_retries: int = 3
retry_backoff_multiplier: float = 2.0
request_timeout: int = 30
# Concurrent Processing
max_concurrent_requests: int = 5
default_batch_size: int = 50
# Data Loading Defaults
default_years_of_data: int = 2
min_stock_price: float = 5.0 # Minimum stock price for screening
min_volume: int = 100000 # Minimum daily volume
# Technical Indicators
rsi_period: int = 14
sma_periods: list[int] = None
ema_periods: list[int] = None
macd_fast: int = 12
macd_slow: int = 26
macd_signal: int = 9
bollinger_period: int = 20
bollinger_std: float = 2.0
atr_period: int = 14
adx_period: int = 14
stoch_k_period: int = 14
stoch_d_period: int = 3
stoch_smooth: int = 3
# Screening Criteria
maverick_min_momentum_score: float = 70.0
maverick_min_volume: int = 500000
bear_max_momentum_score: float = 30.0
bear_min_volume: int = 300000
supply_demand_min_momentum_score: float = 60.0
supply_demand_min_volume: int = 400000
# Progress Tracking
checkpoint_interval: int = 10 # Save checkpoint every N symbols
def __post_init__(self):
if self.sma_periods is None:
self.sma_periods = [20, 50, 150, 200]
if self.ema_periods is None:
self.ema_periods = [21]
# Market sectors for filtering
MARKET_SECTORS = {
"technology": [
"AAPL",
"MSFT",
"GOOGL",
"AMZN",
"META",
"NVDA",
"ADBE",
"CRM",
"INTC",
"AMD",
"ORCL",
"IBM",
"NFLX",
"CSCO",
"ACN",
"TXN",
"QCOM",
"NOW",
"SNPS",
"LRCX",
],
"healthcare": [
"UNH",
"JNJ",
"PFE",
"ABBV",
"TMO",
"ABT",
"BMY",
"MDT",
"GILD",
"REGN",
"ISRG",
"ZTS",
"BSX",
"BDX",
"SYK",
"EL",
"CVS",
"ANTM",
"CI",
"HUM",
],
"financial": [
"JPM",
"BAC",
"WFC",
"GS",
"MS",
"AXP",
"BLK",
"C",
"USB",
"PNC",
"SCHW",
"CB",
"AON",
"ICE",
"CME",
"SPGI",
"MCO",
"TRV",
"ALL",
"AIG",
],
"consumer_discretionary": [
"HD",
"WMT",
"DIS",
"NKE",
"COST",
"TJX",
"SBUX",
"TGT",
"MAR",
"GM",
"F",
"CCL",
"RCL",
"NCLH",
"TSLA",
"ETSY",
"EBAY",
"BKNG",
"EXPE",
"YUM",
],
"energy": [
"CVX",
"EOG",
"SLB",
"COP",
"PSX",
"VLO",
"MPC",
"PXD",
"KMI",
"OXY",
"WMB",
"HAL",
"BKR",
"DVN",
"FANG",
"APA",
"MRO",
"XOM",
"CTRA",
"OKE",
],
"industrials": [
"CAT",
"BA",
"HON",
"UPS",
"GE",
"MMM",
"ITW",
"DE",
"EMR",
"CSX",
"NSC",
"FDX",
"LMT",
"RTX",
"NOC",
"GD",
"WM",
"RSG",
"PCAR",
"IR",
],
}
# Trading strategy configurations
TRADING_STRATEGIES = {
"momentum": {
"min_momentum_score": 80,
"min_price_above_sma50": True,
"min_price_above_sma200": True,
"min_volume_ratio": 1.2,
"max_rsi": 80,
"required_indicators": ["RSI_14", "SMA_50", "SMA_200", "MOMENTUM_SCORE"],
},
"value": {
"max_pe_ratio": 20,
"min_dividend_yield": 2.0,
"max_price_to_book": 3.0,
"min_market_cap": 1_000_000_000, # $1B
"required_fundamentals": ["pe_ratio", "dividend_yield", "price_to_book"],
},
"breakout": {
"min_bb_squeeze_days": 20,
"min_consolidation_days": 30,
"min_volume_breakout_ratio": 2.0,
"min_price_breakout_pct": 0.05, # 5%
"required_indicators": ["BB_UPPER", "BB_LOWER", "VOLUME", "ATR_14"],
},
"mean_reversion": {
"max_rsi": 30, # Oversold
"min_bb_position": -2.0, # Below lower Bollinger Band
"max_distance_from_sma50": -0.10, # 10% below SMA50
"min_momentum_score": 40, # Not completely broken
"required_indicators": ["RSI_14", "BB_LOWER", "SMA_50", "MOMENTUM_SCORE"],
},
}
# Symbol lists for different markets/exchanges
SYMBOL_LISTS = {
"sp500_top_100": [
"AAPL",
"MSFT",
"GOOGL",
"AMZN",
"TSLA",
"META",
"NVDA",
"BRK.B",
"UNH",
"JNJ",
"V",
"PG",
"JPM",
"HD",
"CVX",
"MA",
"PFE",
"ABBV",
"BAC",
"KO",
"AVGO",
"PEP",
"TMO",
"COST",
"WMT",
"DIS",
"ABT",
"ACN",
"NFLX",
"ADBE",
"CRM",
"VZ",
"DHR",
"INTC",
"NKE",
"T",
"TXN",
"BMY",
"QCOM",
"PM",
"UPS",
"HON",
"ORCL",
"WFC",
"LOW",
"LIN",
"AMD",
"SBUX",
"IBM",
"GE",
"CAT",
"MDT",
"BA",
"AXP",
"GILD",
"RTX",
"GS",
"BLK",
"MMM",
"CVS",
"ISRG",
"NOW",
"AMT",
"SPGI",
"PLD",
"SYK",
"TJX",
"MDLZ",
"ZTS",
"MO",
"CB",
"CI",
"PYPL",
"SO",
"EL",
"DE",
"REGN",
"CCI",
"USB",
"BSX",
"DUK",
"AON",
"CSX",
"CL",
"ITW",
"PNC",
"FCX",
"SCHW",
"EMR",
"NSC",
"GM",
"FDX",
"MU",
"BDX",
"TGT",
"EOG",
"SLB",
"ICE",
"EQIX",
"APD",
],
"nasdaq_100": [
"AAPL",
"MSFT",
"GOOGL",
"AMZN",
"TSLA",
"META",
"NVDA",
"ADBE",
"NFLX",
"CRM",
"INTC",
"AMD",
"QCOM",
"TXN",
"AVGO",
"ORCL",
"CSCO",
"PEP",
"COST",
"SBUX",
"PYPL",
"GILD",
"REGN",
"ISRG",
"BKNG",
"ZM",
"DOCU",
"ZOOM",
"DXCM",
"BIIB",
],
"dow_30": [
"AAPL",
"MSFT",
"UNH",
"GS",
"HD",
"CAT",
"AMGN",
"MCD",
"V",
"BA",
"TRV",
"AXP",
"JPM",
"IBM",
"PG",
"CVX",
"NKE",
"JNJ",
"WMT",
"DIS",
"MMM",
"DOW",
"KO",
"CSCO",
"HON",
"CRM",
"VZ",
"INTC",
"WBA",
"MRK",
],
"growth_stocks": [
"TSLA",
"NVDA",
"AMD",
"NFLX",
"CRM",
"ADBE",
"SNOW",
"PLTR",
"SQ",
"ROKU",
"ZOOM",
"DOCU",
"TWLO",
"OKTA",
"DDOG",
"CRWD",
"NET",
"FSLY",
"TTD",
"TEAM",
],
"dividend_stocks": [
"JNJ",
"PG",
"KO",
"PEP",
"WMT",
"HD",
"ABT",
"MCD",
"VZ",
"T",
"CVX",
"XOM",
"PM",
"MO",
"MMM",
"CAT",
"IBM",
"GE",
"BA",
"DIS",
],
}
# Environment-specific configurations
def get_config_for_environment(env: str = None) -> TiingoConfig:
"""Get configuration based on environment."""
env = env or os.getenv("ENVIRONMENT", "development")
if env == "production":
return TiingoConfig(
max_concurrent_requests=10, # Higher concurrency in production
default_batch_size=100, # Larger batches
rate_limit_per_hour=5000, # Assuming paid Tiingo plan
checkpoint_interval=5, # More frequent checkpoints
)
elif env == "testing":
return TiingoConfig(
max_concurrent_requests=2, # Lower concurrency for tests
default_batch_size=10, # Smaller batches
default_years_of_data=1, # Less data for faster tests
checkpoint_interval=2, # Frequent checkpoints for testing
)
else: # development
return TiingoConfig() # Default configuration
# Screening algorithm configurations
SCREENING_CONFIGS = {
"maverick_momentum": {
"price_above_ema21": True,
"ema21_above_sma50": True,
"sma50_above_sma200": True,
"min_momentum_score": 70,
"min_volume": 500000,
"min_price": 10.0,
"scoring_weights": {
"price_above_ema21": 2,
"ema21_above_sma50": 2,
"sma50_above_sma200": 3,
"momentum_score_80plus": 3,
"momentum_score_70plus": 2,
"volume_above_avg": 1,
},
},
"bear_market": {
"price_below_ema21": True,
"ema21_below_sma50": True,
"max_momentum_score": 30,
"min_volume": 300000,
"min_price": 5.0,
"scoring_weights": {
"price_below_ema21": 2,
"ema21_below_sma50": 2,
"momentum_score_below_20": 3,
"momentum_score_below_30": 2,
"high_volume_decline": 2,
},
},
"supply_demand": {
"price_above_sma50": True,
"sma50_above_sma200": True,
"min_momentum_score": 60,
"min_volume": 400000,
"min_price": 8.0,
"accumulation_signals": [
"tight_consolidation",
"volume_dry_up",
"relative_strength",
"institutional_buying",
],
},
}
# Database optimization settings
DATABASE_CONFIG = {
"batch_insert_size": 1000,
"connection_pool_size": 20,
"statement_timeout": 30000, # 30 seconds
"bulk_operations": True,
"indexes_to_create": [
"idx_price_cache_symbol_date",
"idx_technical_cache_symbol_indicator",
"idx_maverick_stocks_score",
"idx_stocks_sector_industry",
],
}
# Logging configuration
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"detailed": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s"
},
"simple": {"format": "%(asctime)s - %(levelname)s - %(message)s"},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"stream": "ext://sys.stdout",
},
"file": {
"class": "logging.FileHandler",
"level": "DEBUG",
"formatter": "detailed",
"filename": "tiingo_loader.log",
"mode": "a",
},
"error_file": {
"class": "logging.FileHandler",
"level": "ERROR",
"formatter": "detailed",
"filename": "tiingo_errors.log",
"mode": "a",
},
},
"loggers": {
"tiingo_data_loader": {
"level": "DEBUG",
"handlers": ["console", "file", "error_file"],
"propagate": False,
}
},
}
def get_symbols_for_strategy(strategy: str) -> list[str]:
"""Get symbol list based on trading strategy."""
if strategy in SYMBOL_LISTS:
return SYMBOL_LISTS[strategy]
elif strategy in MARKET_SECTORS:
return MARKET_SECTORS[strategy]
else:
return SYMBOL_LISTS["sp500_top_100"] # Default
def get_screening_config(screen_type: str) -> dict[str, Any]:
"""Get screening configuration for specified type."""
return SCREENING_CONFIGS.get(screen_type, SCREENING_CONFIGS["maverick_momentum"])
# Default configuration instance
default_config = get_config_for_environment()
```
--------------------------------------------------------------------------------
/maverick_mcp/domain/services/technical_analysis_service.py:
--------------------------------------------------------------------------------
```python
"""
Technical analysis domain service.
This service contains pure business logic for technical analysis calculations.
It has no dependencies on infrastructure, databases, or external APIs.
"""
import numpy as np
import pandas as pd
from maverick_mcp.domain.value_objects.technical_indicators import (
BollingerBands,
MACDIndicator,
PriceLevel,
RSIIndicator,
Signal,
StochasticOscillator,
TrendDirection,
VolumeProfile,
)
class TechnicalAnalysisService:
"""
Domain service for technical analysis calculations.
This service contains pure business logic and mathematical calculations
for technical indicators. It operates on price data and returns
domain value objects.
"""
def calculate_rsi(self, prices: pd.Series, period: int = 14) -> RSIIndicator:
"""
Calculate the Relative Strength Index.
Args:
prices: Series of closing prices
period: RSI period (default: 14)
Returns:
RSIIndicator value object
"""
if len(prices) < period:
raise ValueError(f"Need at least {period} prices to calculate RSI")
# Calculate price changes
delta = prices.diff()
# Separate gains and losses
gains = delta.where(delta > 0, 0)
losses = -delta.where(delta < 0, 0)
# Calculate average gains and losses
avg_gain = gains.rolling(window=period).mean()
avg_loss = losses.rolling(window=period).mean()
# Calculate RS and RSI
# Handle edge case where there are no losses
rs = avg_gain / avg_loss if avg_loss.iloc[-1] != 0 else np.inf
rsi = 100 - (100 / (1 + rs))
# Get the latest RSI value
current_rsi = float(rsi.iloc[-1])
return RSIIndicator(value=current_rsi, period=period)
def calculate_macd(
self,
prices: pd.Series,
fast_period: int = 12,
slow_period: int = 26,
signal_period: int = 9,
) -> MACDIndicator:
"""
Calculate MACD (Moving Average Convergence Divergence).
Args:
prices: Series of closing prices
fast_period: Fast EMA period (default: 12)
slow_period: Slow EMA period (default: 26)
signal_period: Signal line EMA period (default: 9)
Returns:
MACDIndicator value object
"""
if len(prices) < slow_period:
raise ValueError(f"Need at least {slow_period} prices to calculate MACD")
# Calculate EMAs
ema_fast = prices.ewm(span=fast_period, adjust=False).mean()
ema_slow = prices.ewm(span=slow_period, adjust=False).mean()
# Calculate MACD line
macd_line = ema_fast - ema_slow
# Calculate signal line
signal_line = macd_line.ewm(span=signal_period, adjust=False).mean()
# Calculate histogram
histogram = macd_line - signal_line
# Get current values
current_macd = float(macd_line.iloc[-1])
current_signal = float(signal_line.iloc[-1])
current_histogram = float(histogram.iloc[-1])
return MACDIndicator(
macd_line=current_macd,
signal_line=current_signal,
histogram=current_histogram,
fast_period=fast_period,
slow_period=slow_period,
signal_period=signal_period,
)
def calculate_bollinger_bands(
self, prices: pd.Series, period: int = 20, std_dev: int = 2
) -> BollingerBands:
"""
Calculate Bollinger Bands.
Args:
prices: Series of closing prices
period: Moving average period (default: 20)
std_dev: Number of standard deviations (default: 2)
Returns:
BollingerBands value object
"""
if len(prices) < period:
raise ValueError(
f"Need at least {period} prices to calculate Bollinger Bands"
)
# Calculate middle band (SMA)
middle_band = prices.rolling(window=period).mean()
# Calculate standard deviation
std = prices.rolling(window=period).std()
# Calculate upper and lower bands
upper_band = middle_band + (std * std_dev)
lower_band = middle_band - (std * std_dev)
# Get current values
current_price = float(prices.iloc[-1])
current_upper = float(upper_band.iloc[-1])
current_middle = float(middle_band.iloc[-1])
current_lower = float(lower_band.iloc[-1])
return BollingerBands(
upper_band=current_upper,
middle_band=current_middle,
lower_band=current_lower,
current_price=current_price,
period=period,
std_dev=std_dev,
)
def calculate_stochastic(
self, high: pd.Series, low: pd.Series, close: pd.Series, period: int = 14
) -> StochasticOscillator:
"""
Calculate Stochastic Oscillator.
Args:
high: Series of high prices
low: Series of low prices
close: Series of closing prices
period: Look-back period (default: 14)
Returns:
StochasticOscillator value object
"""
if len(close) < period:
raise ValueError(f"Need at least {period} prices to calculate Stochastic")
# Calculate %K
lowest_low = low.rolling(window=period).min()
highest_high = high.rolling(window=period).max()
k_percent = 100 * ((close - lowest_low) / (highest_high - lowest_low))
# Calculate %D (3-period SMA of %K)
d_percent = k_percent.rolling(window=3).mean()
# Get current values
current_k = float(k_percent.iloc[-1])
current_d = float(d_percent.iloc[-1])
return StochasticOscillator(k_value=current_k, d_value=current_d, period=period)
def identify_trend(self, prices: pd.Series, period: int = 50) -> TrendDirection:
"""
Identify the current price trend.
Args:
prices: Series of closing prices
period: Period for trend calculation (default: 50)
Returns:
TrendDirection enum value
"""
if len(prices) < period:
return TrendDirection.SIDEWAYS
# Calculate moving averages
sma_short = prices.rolling(window=period // 2).mean()
sma_long = prices.rolling(window=period).mean()
# Calculate trend strength
current_price = prices.iloc[-1]
short_ma = sma_short.iloc[-1]
long_ma = sma_long.iloc[-1]
# Calculate percentage differences
price_vs_short = (current_price - short_ma) / short_ma * 100
short_vs_long = (short_ma - long_ma) / long_ma * 100
# Determine trend
if price_vs_short > 5 and short_vs_long > 3:
return TrendDirection.STRONG_UPTREND
elif price_vs_short > 2 and short_vs_long > 1:
return TrendDirection.UPTREND
elif price_vs_short < -5 and short_vs_long < -3:
return TrendDirection.STRONG_DOWNTREND
elif price_vs_short < -2 and short_vs_long < -1:
return TrendDirection.DOWNTREND
else:
return TrendDirection.SIDEWAYS
def analyze_volume(self, volume: pd.Series, period: int = 20) -> VolumeProfile:
"""
Analyze volume patterns.
Args:
volume: Series of volume data
period: Period for average calculation (default: 20)
Returns:
VolumeProfile value object
"""
if len(volume) < period:
raise ValueError(f"Need at least {period} volume data points")
# Calculate average volume
avg_volume = float(volume.rolling(window=period).mean().iloc[-1])
current_volume = int(volume.iloc[-1])
# Determine volume trend
recent_avg = float(volume.tail(5).mean())
older_avg = float(volume.iloc[-period:-5].mean())
if recent_avg > older_avg * 1.2:
volume_trend = TrendDirection.UPTREND
elif recent_avg < older_avg * 0.8:
volume_trend = TrendDirection.DOWNTREND
else:
volume_trend = TrendDirection.SIDEWAYS
# Check for unusual activity
unusual_activity = current_volume > avg_volume * 2
return VolumeProfile(
current_volume=current_volume,
average_volume=avg_volume,
volume_trend=volume_trend,
unusual_activity=unusual_activity,
)
def calculate_composite_signal(
self,
rsi: RSIIndicator | None = None,
macd: MACDIndicator | None = None,
bollinger: BollingerBands | None = None,
stochastic: StochasticOscillator | None = None,
) -> Signal:
"""
Calculate a composite trading signal from multiple indicators.
Args:
rsi: RSI indicator
macd: MACD indicator
bollinger: Bollinger Bands indicator
stochastic: Stochastic indicator
Returns:
Composite Signal
"""
signals = []
weights = []
# Collect signals and weights
if rsi:
signals.append(rsi.signal)
weights.append(2.0) # RSI has higher weight
if macd:
signals.append(macd.signal)
weights.append(1.5) # MACD has medium weight
if bollinger:
signals.append(bollinger.signal)
weights.append(1.0)
if stochastic:
signals.append(stochastic.signal)
weights.append(1.0)
if not signals:
return Signal.NEUTRAL
# Convert signals to numeric scores
signal_scores = {
Signal.STRONG_BUY: 2,
Signal.BUY: 1,
Signal.NEUTRAL: 0,
Signal.SELL: -1,
Signal.STRONG_SELL: -2,
}
# Calculate weighted average
total_score = sum(
signal_scores[signal] * weight
for signal, weight in zip(signals, weights, strict=False)
)
total_weight = sum(weights)
avg_score = total_score / total_weight
# Map back to signal
if avg_score >= 1.5:
return Signal.STRONG_BUY
elif avg_score >= 0.5:
return Signal.BUY
elif avg_score <= -1.5:
return Signal.STRONG_SELL
elif avg_score <= -0.5:
return Signal.SELL
else:
return Signal.NEUTRAL
def find_support_levels(self, df: pd.DataFrame) -> list[PriceLevel]:
"""
Find support levels in the price data.
Args:
df: DataFrame with OHLC price data
Returns:
List of support PriceLevel objects
"""
lows = df["low"].rolling(window=20).min()
unique_levels = lows.dropna().unique()
support_levels = []
current_price = df["close"].iloc[-1]
# Filter for levels below current price first, then sort and take closest 5
below_current = [
level
for level in unique_levels
if level > 0 and level < current_price * 0.98
]
for level in sorted(below_current, reverse=True)[
:5
]: # Top 5 levels below current
# Safe division with level > 0 check above
touches = len(df[abs(df["low"] - level) / level < 0.01])
strength = min(5, touches)
support_levels.append(
PriceLevel(price=float(level), strength=strength, touches=touches)
)
return support_levels
def find_resistance_levels(self, df: pd.DataFrame) -> list[PriceLevel]:
"""
Find resistance levels in the price data.
Args:
df: DataFrame with OHLC price data
Returns:
List of resistance PriceLevel objects
"""
highs = df["high"].rolling(window=20).max()
unique_levels = highs.dropna().unique()
resistance_levels = []
current_price = df["close"].iloc[-1]
# Filter for levels above current price first, then sort and take closest 5
above_current = [
level
for level in unique_levels
if level > 0 and level > current_price * 1.02
]
for level in sorted(above_current)[:5]: # Bottom 5 levels above current
# Safe division with level > 0 check above
touches = len(df[abs(df["high"] - level) / level < 0.01])
strength = min(5, touches)
resistance_levels.append(
PriceLevel(price=float(level), strength=strength, touches=touches)
)
return resistance_levels
```
--------------------------------------------------------------------------------
/maverick_mcp/config/database_self_contained.py:
--------------------------------------------------------------------------------
```python
"""
Self-contained database configuration for Maverick-MCP.
This module provides database configuration that is completely independent
of external Django projects, using only mcp_ prefixed tables.
"""
import logging
import os
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
from maverick_mcp.config.database import (
DatabasePoolConfig,
get_pool_config_from_settings,
)
from maverick_mcp.data.models import Base
logger = logging.getLogger("maverick_mcp.config.database_self_contained")
class SelfContainedDatabaseConfig:
"""Configuration for self-contained Maverick-MCP database."""
def __init__(
self,
database_url: str | None = None,
pool_config: DatabasePoolConfig | None = None,
):
"""
Initialize self-contained database configuration.
Args:
database_url: Database connection URL. If None, will use environment variables
pool_config: Database pool configuration. If None, will use settings-based config
"""
self.database_url = database_url or self._get_database_url()
self.pool_config = pool_config or get_pool_config_from_settings()
self.engine: Engine | None = None
self.SessionLocal: sessionmaker | None = None
def _get_database_url(self) -> str:
"""Get database URL from environment variables."""
# Try multiple possible environment variable names
# Use SQLite in-memory for GitHub Actions or test environments
if os.getenv("GITHUB_ACTIONS") == "true" or os.getenv("CI") == "true":
return "sqlite:///:memory:"
return (
os.getenv("DATABASE_URL") # Prefer standard DATABASE_URL
or os.getenv("MCP_DATABASE_URL")
or os.getenv("POSTGRES_URL")
or "sqlite:///maverick_mcp.db" # Default to SQLite for development
)
def create_engine(self) -> Engine:
"""Create and configure the database engine."""
if self.engine is not None:
return self.engine
# Log database connection (without password)
masked_url = self._mask_database_url(self.database_url)
logger.info(f"Creating self-contained database engine: {masked_url}")
# Determine if we should use connection pooling
use_pooling = os.getenv("DB_USE_POOLING", "true").lower() == "true"
if use_pooling:
# Use QueuePool for production environments
engine_kwargs = {
**self.pool_config.get_pool_kwargs(),
"connect_args": self._get_connect_args(),
"echo": os.getenv("DB_ECHO", "false").lower() == "true",
}
else:
# Use NullPool for serverless/development environments
engine_kwargs = {
"poolclass": NullPool,
"echo": os.getenv("DB_ECHO", "false").lower() == "true",
}
self.engine = create_engine(self.database_url, **engine_kwargs)
# Set up pool monitoring if using pooled connections
if use_pooling:
self.pool_config.setup_pool_monitoring(self.engine)
logger.info("Self-contained database engine created successfully")
return self.engine
def _mask_database_url(self, url: str) -> str:
"""Mask password in database URL for logging."""
if "@" in url and "://" in url:
parts = url.split("://", 1)
if len(parts) == 2 and "@" in parts[1]:
user_pass, host_db = parts[1].split("@", 1)
if ":" in user_pass:
user, _ = user_pass.split(":", 1)
return f"{parts[0]}://{user}:****@{host_db}"
return url
def _get_connect_args(self) -> dict:
"""Get connection arguments for the database engine."""
if "postgresql" in self.database_url:
return {
"connect_timeout": 10,
"application_name": "maverick_mcp_self_contained",
"options": "-c statement_timeout=30000", # 30 seconds
}
return {}
def create_session_factory(self) -> sessionmaker:
"""Create session factory."""
if self.SessionLocal is not None:
return self.SessionLocal
if self.engine is None:
self.create_engine()
self.SessionLocal = sessionmaker(
autocommit=False, autoflush=False, bind=self.engine
)
logger.info("Session factory created for self-contained database")
return self.SessionLocal
def create_tables(self, drop_first: bool = False) -> None:
"""
Create all tables in the database.
Args:
drop_first: If True, drop all tables first (useful for testing)
"""
if self.engine is None:
self.create_engine()
if drop_first:
logger.warning("Dropping all tables first (drop_first=True)")
Base.metadata.drop_all(bind=self.engine)
logger.info("Creating all self-contained tables...")
Base.metadata.create_all(bind=self.engine)
logger.info("All self-contained tables created successfully")
def validate_schema(self) -> bool:
"""
Validate that all expected tables exist with mcp_ prefix.
Returns:
True if schema is valid, False otherwise
"""
if self.engine is None:
self.create_engine()
expected_tables = {
"mcp_stocks",
"mcp_price_cache",
"mcp_maverick_stocks",
"mcp_maverick_bear_stocks",
"mcp_supply_demand_breakouts",
"mcp_technical_cache",
"mcp_users", # From auth models
"mcp_api_keys", # From auth models
"mcp_refresh_tokens", # From auth models
}
try:
# Get list of tables in database
with self.engine.connect() as conn:
if "postgresql" in self.database_url:
result = conn.execute(
text("""
SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public' AND table_name LIKE 'mcp_%'
""")
)
elif "sqlite" in self.database_url:
result = conn.execute(
text("""
SELECT name FROM sqlite_master
WHERE type='table' AND name LIKE 'mcp_%'
""")
)
else:
logger.error(f"Unsupported database type: {self.database_url}")
return False
existing_tables = {row[0] for row in result.fetchall()}
# Check if all expected tables exist
missing_tables = expected_tables - existing_tables
extra_tables = existing_tables - expected_tables
if missing_tables:
logger.error(f"Missing expected tables: {missing_tables}")
return False
if extra_tables:
logger.warning(f"Found unexpected mcp_ tables: {extra_tables}")
logger.info(
f"Schema validation passed. Found {len(existing_tables)} mcp_ tables"
)
return True
except Exception as e:
logger.error(f"Schema validation failed: {e}")
return False
def get_database_stats(self) -> dict:
"""Get statistics about the self-contained database."""
if self.engine is None:
self.create_engine()
stats = {
"database_url": self._mask_database_url(self.database_url),
"pool_config": self.pool_config.model_dump() if self.pool_config else None,
"tables": {},
"total_records": 0,
}
table_queries = {
"mcp_stocks": "SELECT COUNT(*) FROM mcp_stocks",
"mcp_price_cache": "SELECT COUNT(*) FROM mcp_price_cache",
"mcp_maverick_stocks": "SELECT COUNT(*) FROM mcp_maverick_stocks",
"mcp_maverick_bear_stocks": "SELECT COUNT(*) FROM mcp_maverick_bear_stocks",
"mcp_supply_demand_breakouts": "SELECT COUNT(*) FROM mcp_supply_demand_breakouts",
"mcp_technical_cache": "SELECT COUNT(*) FROM mcp_technical_cache",
}
try:
with self.engine.connect() as conn:
for table, query in table_queries.items():
try:
result = conn.execute(text(query))
count = result.scalar()
stats["tables"][table] = count
stats["total_records"] += count
except Exception as e:
stats["tables"][table] = f"Error: {e}"
except Exception as e:
stats["error"] = str(e)
return stats
def close(self) -> None:
"""Close database connections."""
if self.engine:
self.engine.dispose()
self.engine = None
self.SessionLocal = None
logger.info("Self-contained database connections closed")
# Global instance for easy access
_db_config: SelfContainedDatabaseConfig | None = None
def get_self_contained_db_config() -> SelfContainedDatabaseConfig:
"""Get or create the global self-contained database configuration."""
global _db_config
if _db_config is None:
_db_config = SelfContainedDatabaseConfig()
return _db_config
def get_self_contained_engine() -> Engine:
"""Get the self-contained database engine."""
return get_self_contained_db_config().create_engine()
def get_self_contained_session_factory() -> sessionmaker:
"""Get the self-contained session factory."""
return get_self_contained_db_config().create_session_factory()
def init_self_contained_database(
database_url: str | None = None,
create_tables: bool = True,
validate_schema: bool = True,
) -> SelfContainedDatabaseConfig:
"""
Initialize the self-contained database.
Args:
database_url: Optional database URL override
create_tables: Whether to create tables if they don't exist
validate_schema: Whether to validate the schema after initialization
Returns:
Configured SelfContainedDatabaseConfig instance
"""
global _db_config
if database_url:
_db_config = SelfContainedDatabaseConfig(database_url=database_url)
else:
_db_config = get_self_contained_db_config()
# Create engine and session factory
_db_config.create_engine()
_db_config.create_session_factory()
if create_tables:
_db_config.create_tables()
if validate_schema:
if not _db_config.validate_schema():
logger.warning("Schema validation failed, but continuing...")
logger.info("Self-contained database initialized successfully")
return _db_config
# Context manager for database sessions
class SelfContainedDatabaseSession:
"""Context manager for self-contained database sessions."""
def __init__(self):
self.session_factory = get_self_contained_session_factory()
self.session = None
def __enter__(self):
self.session = self.session_factory()
return self.session
def __exit__(self, exc_type, exc_val, exc_tb):
if self.session:
if exc_type is not None:
self.session.rollback()
else:
try:
self.session.commit()
except Exception:
self.session.rollback()
raise
finally:
self.session.close()
def get_self_contained_db_session():
"""Get a context manager for self-contained database sessions."""
return SelfContainedDatabaseSession()
# Migration helper
def run_self_contained_migrations(alembic_config_path: str = "alembic.ini"):
"""
Run migrations to ensure schema is up to date.
Args:
alembic_config_path: Path to alembic configuration file
"""
try:
from alembic.config import Config
from alembic import command
# Set up alembic config
alembic_cfg = Config(alembic_config_path)
# Override database URL with self-contained URL
db_config = get_self_contained_db_config()
alembic_cfg.set_main_option("sqlalchemy.url", db_config.database_url)
logger.info("Running self-contained database migrations...")
command.upgrade(alembic_cfg, "head")
logger.info("Self-contained database migrations completed successfully")
except ImportError:
logger.error("Alembic not available. Cannot run migrations.")
raise
except Exception as e:
logger.error(f"Migration failed: {e}")
raise
```